Changeset View
Standalone View
source/ps/Future.h
Show All 20 Lines | |||||||||||||||||||
#include "ps/FutureForward.h" | #include "ps/FutureForward.h" | ||||||||||||||||||
#include <atomic> | #include <atomic> | ||||||||||||||||||
#include <condition_variable> | #include <condition_variable> | ||||||||||||||||||
#include <functional> | #include <functional> | ||||||||||||||||||
#include <mutex> | #include <mutex> | ||||||||||||||||||
#include <type_traits> | #include <type_traits> | ||||||||||||||||||
template<typename ResultType> | |||||||||||||||||||
class PackagedTask; | class PackagedTask; | ||||||||||||||||||
namespace FutureSharedStateDetail | namespace FutureSharedStateDetail | ||||||||||||||||||
{ | { | ||||||||||||||||||
enum class Status | enum class Status | ||||||||||||||||||
{ | { | ||||||||||||||||||
PENDING, | PENDING, | ||||||||||||||||||
STARTED, | STARTED, | ||||||||||||||||||
DONE, | DONE, | ||||||||||||||||||
CANCELED | CANCELED | ||||||||||||||||||
}; | }; | ||||||||||||||||||
template<typename ResultType> | template<typename ResultType> | ||||||||||||||||||
class SharedStateResult | class SharedStateResult | ||||||||||||||||||
{ | { | ||||||||||||||||||
jprahman: I've seen these sort of metaprogramming template helpers in other recent diffs as well. It may… | |||||||||||||||||||
Done Inline ActionsI'll put it in ps/Utils.h phosit: I'll put it in ps/Utils.h | |||||||||||||||||||
public: | public: | ||||||||||||||||||
void ResetResult() | void ResetResult() | ||||||||||||||||||
{ | { | ||||||||||||||||||
if (m_HasResult) | if (m_HasResult) | ||||||||||||||||||
m_Result.m_Result.~ResultType(); | m_Result.m_Result.~ResultType(); | ||||||||||||||||||
m_HasResult = false; | m_HasResult = false; | ||||||||||||||||||
} | } | ||||||||||||||||||
Not Done Inline ActionsDo we ever expect it to be well defined for operator()() to be called twice for the same packaged task? I wonder if we should it a programming error to invoke it twice and assert? jprahman: Do we ever expect it to be well defined for operator()() to be called twice for the same… | |||||||||||||||||||
Done Inline ActionsThis check is not about calling it twice. It checks if the task is already canceled. phosit: This check is not about calling it twice. It checks if the task is already canceled. | |||||||||||||||||||
union Result | union Result | ||||||||||||||||||
{ | { | ||||||||||||||||||
std::aligned_storage_t<sizeof(ResultType), alignof(ResultType)> m_Bytes; | std::aligned_storage_t<sizeof(ResultType), alignof(ResultType)> m_Bytes; | ||||||||||||||||||
ResultType m_Result; | ResultType m_Result; | ||||||||||||||||||
Result() : m_Bytes() {}; | Result() : m_Bytes() {}; | ||||||||||||||||||
~Result() {}; | ~Result() {}; | ||||||||||||||||||
}; | }; | ||||||||||||||||||
// We don't use Result directly so the result doesn't have to be default constructible. | // We don't use Result directly so the result doesn't have to be default constructible. | ||||||||||||||||||
Result m_Result; | Result m_Result; | ||||||||||||||||||
jprahmanUnsubmitted Not Done Inline Actions
Unrelated, but probably worth changing this to something like this. The double m_Result.m_Result sequence below is rather confusing, and m_Storage.m_Result is more precise. jprahman: Unrelated, but probably worth changing this to something like this. The double m_Result. | |||||||||||||||||||
bool m_HasResult = false; | bool m_HasResult = false; | ||||||||||||||||||
}; | }; | ||||||||||||||||||
// Don't have m_Result for void ReturnType | // Don't have m_Result for void ReturnType | ||||||||||||||||||
template<> | template<> | ||||||||||||||||||
class SharedStateResult<void> | class SharedStateResult<void> | ||||||||||||||||||
{ | { | ||||||||||||||||||
}; | }; | ||||||||||||||||||
/** | /** | ||||||||||||||||||
* Abstract base-class of SharedState only exposes the interface for PackagedTask: The function call | |||||||||||||||||||
* operator. | |||||||||||||||||||
*/ | |||||||||||||||||||
class ErasedSharedState | |||||||||||||||||||
{ | |||||||||||||||||||
protected: | |||||||||||||||||||
ErasedSharedState() = default; | |||||||||||||||||||
~ErasedSharedState() = default; | |||||||||||||||||||
ErasedSharedState(const ErasedSharedState&) = delete; | |||||||||||||||||||
ErasedSharedState& operator = (const ErasedSharedState&) = delete; | |||||||||||||||||||
public: | |||||||||||||||||||
virtual void operator()() = 0; | |||||||||||||||||||
}; | |||||||||||||||||||
/** | |||||||||||||||||||
* The shared state between futures and packaged state. | * The shared state between futures and packaged state. | ||||||||||||||||||
* Holds all relevant data. | * Holds all relevant data. | ||||||||||||||||||
*/ | */ | ||||||||||||||||||
template<typename ResultType> | template<typename ResultType> | ||||||||||||||||||
class SharedState : public SharedStateResult<ResultType> | class SharedState final : public SharedStateResult<ResultType>, public ErasedSharedState | ||||||||||||||||||
{ | { | ||||||||||||||||||
static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | ||||||||||||||||||
public: | public: | ||||||||||||||||||
SharedState(std::function<ResultType()>&& func) : m_Func(std::move(func)) {} | SharedState(std::function<ResultType()>&& func) : m_Func(std::move(func)) {} | ||||||||||||||||||
~SharedState() | ~SharedState() | ||||||||||||||||||
{ | { | ||||||||||||||||||
// For safety, wait on started task completion, but not on pending ones (auto-cancelled). | // For safety, wait on started task completion, but not on pending ones (auto-cancelled). | ||||||||||||||||||
if (!Cancel()) | if (!Cancel()) | ||||||||||||||||||
{ | { | ||||||||||||||||||
Wait(); | Wait(); | ||||||||||||||||||
Cancel(); | Cancel(); | ||||||||||||||||||
} | } | ||||||||||||||||||
if constexpr (!VoidResult) | if constexpr (!VoidResult) | ||||||||||||||||||
SharedStateResult<ResultType>::ResetResult(); | SharedStateResult<ResultType>::ResetResult(); | ||||||||||||||||||
} | } | ||||||||||||||||||
SharedState(const SharedState&) = delete; | SharedState(const SharedState&) = delete; | ||||||||||||||||||
SharedState(SharedState&&) = delete; | SharedState(SharedState&&) = delete; | ||||||||||||||||||
Done Inline ActionsSharedState should really be non moveable phosit: SharedState should really be non moveable | |||||||||||||||||||
void operator()() final | |||||||||||||||||||
{ | |||||||||||||||||||
Status expected = Status::PENDING; | |||||||||||||||||||
if (!m_Status.compare_exchange_strong(expected, Status::STARTED)) | |||||||||||||||||||
return; | |||||||||||||||||||
if constexpr (VoidResult) | |||||||||||||||||||
m_Func(); | |||||||||||||||||||
else | |||||||||||||||||||
{ | |||||||||||||||||||
// To avoid UB, explicitly placement-new the value. | |||||||||||||||||||
new (&SharedStateResult<ResultType>::m_Result.m_Result) ResultType{m_Func()}; | |||||||||||||||||||
jprahmanUnsubmitted Not Done Inline ActionsThis assumes ResultType has a move-ctor or copy-ctor, which is probably a reasonable assumption here. jprahman: This assumes ResultType has a move-ctor or copy-ctor, which is probably a reasonable assumption… | |||||||||||||||||||
SharedStateResult<ResultType>::m_HasResult = true; | |||||||||||||||||||
} | |||||||||||||||||||
// Because we might have threads waiting on us, we need to make sure that they either: | |||||||||||||||||||
// - don't wait on our condition variable | |||||||||||||||||||
// - receive the notification when we're done. | |||||||||||||||||||
// This requires locking the mutex (@see Wait). | |||||||||||||||||||
{ | |||||||||||||||||||
std::lock_guard<std::mutex> lock(m_Mutex); | |||||||||||||||||||
m_Status = Status::DONE; | |||||||||||||||||||
} | |||||||||||||||||||
m_ConditionVariable.notify_all(); | |||||||||||||||||||
} | |||||||||||||||||||
bool IsDoneOrCanceled() const | bool IsDoneOrCanceled() const | ||||||||||||||||||
{ | { | ||||||||||||||||||
return m_Status == Status::DONE || m_Status == Status::CANCELED; | return m_Status == Status::DONE || m_Status == Status::CANCELED; | ||||||||||||||||||
} | } | ||||||||||||||||||
void Wait() | void Wait() | ||||||||||||||||||
{ | { | ||||||||||||||||||
// Fast path: we're already done. | // Fast path: we're already done. | ||||||||||||||||||
if (IsDoneOrCanceled()) | if (IsDoneOrCanceled()) | ||||||||||||||||||
return; | return; | ||||||||||||||||||
// Slow path: we aren't done when we run the above check. Lock and wait until we are. | // Slow path: we aren't done when we run the above check. Lock and wait until we are. | ||||||||||||||||||
std::unique_lock<std::mutex> lock(m_Mutex); | std::unique_lock<std::mutex> lock(m_Mutex); | ||||||||||||||||||
m_ConditionVariable.wait(lock, [this]() -> bool { return IsDoneOrCanceled(); }); | m_ConditionVariable.wait(lock, [this]() -> bool { return IsDoneOrCanceled(); }); | ||||||||||||||||||
} | } | ||||||||||||||||||
/** | /** | ||||||||||||||||||
* If the task is pending, cancel it: the status becomes CANCELED and if the task was completed, the result is destroyed. | * If the task is pending, cancel it: the status becomes CANCELED and if the task was completed, the result is destroyed. | ||||||||||||||||||
* @return true if the task was indeed cancelled, false otherwise (the task is running or already done). | * @return true if the task was indeed cancelled, false otherwise (the task is running or already done). | ||||||||||||||||||
*/ | */ | ||||||||||||||||||
bool Cancel() | bool Cancel() | ||||||||||||||||||
{ | { | ||||||||||||||||||
Status expected = Status::PENDING; | Status expected = Status::PENDING; | ||||||||||||||||||
bool cancelled = m_Status.compare_exchange_strong(expected, Status::CANCELED); | bool cancelled = m_Status.compare_exchange_strong(expected, Status::CANCELED); | ||||||||||||||||||
// If we're done, invalidate, if we're pending, atomically cancel, otherwise fail. | // If we're done, invalidate, if we're pending, atomically cancel, otherwise fail. | ||||||||||||||||||
if (cancelled || m_Status == Status::DONE) | if (cancelled || m_Status == Status::DONE) | ||||||||||||||||||
{ | { | ||||||||||||||||||
if (m_Status == Status::DONE) | if (m_Status == Status::DONE) | ||||||||||||||||||
m_Status = Status::CANCELED; | m_Status = Status::CANCELED; | ||||||||||||||||||
jprahmanUnsubmitted Not Done Inline Actions
Another unrelated nit, avoid a pair of std::atomic::loads jprahman: Another unrelated nit, avoid a pair of std::atomic::loads | |||||||||||||||||||
if constexpr (!VoidResult) | if constexpr (!VoidResult) | ||||||||||||||||||
SharedStateResult<ResultType>::ResetResult(); | SharedStateResult<ResultType>::ResetResult(); | ||||||||||||||||||
m_ConditionVariable.notify_all(); | m_ConditionVariable.notify_all(); | ||||||||||||||||||
return cancelled; | return cancelled; | ||||||||||||||||||
} | } | ||||||||||||||||||
return false; | return false; | ||||||||||||||||||
} | } | ||||||||||||||||||
Show All 9 Lines | std::enable_if_t<!std::is_same_v<_ResultType, void>, ResultType> GetResult() | ||||||||||||||||||
SharedStateResult<ResultType>::m_HasResult = false; | SharedStateResult<ResultType>::m_HasResult = false; | ||||||||||||||||||
return std::move(SharedStateResult<ResultType>::m_Result.m_Result); | return std::move(SharedStateResult<ResultType>::m_Result.m_Result); | ||||||||||||||||||
} | } | ||||||||||||||||||
std::atomic<Status> m_Status = Status::PENDING; | std::atomic<Status> m_Status = Status::PENDING; | ||||||||||||||||||
std::mutex m_Mutex; | std::mutex m_Mutex; | ||||||||||||||||||
std::condition_variable m_ConditionVariable; | std::condition_variable m_ConditionVariable; | ||||||||||||||||||
std::function<ResultType()> m_Func; | std::function<ResultType()> m_Func; | ||||||||||||||||||
Done Inline ActionsI forgot to remove this x) phosit: I forgot to remove this x) | |||||||||||||||||||
}; | }; | ||||||||||||||||||
} // namespace FutureSharedStateDetail | } // namespace FutureSharedStateDetail | ||||||||||||||||||
/** | /** | ||||||||||||||||||
* Corresponds to std::future. | * Corresponds to std::future. | ||||||||||||||||||
* Unlike std::future, Future can request the cancellation of the task that would produce the result. | * Unlike std::future, Future can request the cancellation of the task that would produce the result. | ||||||||||||||||||
* This makes it more similar to Java's CancellableTask or C#'s Task. | * This makes it more similar to Java's CancellableTask or C#'s Task. | ||||||||||||||||||
* The name Future was kept over Task so it would be more familiar to C++ users, | * The name Future was kept over Task so it would be more familiar to C++ users, | ||||||||||||||||||
* but this all should be revised once Concurrency TS wraps up. | * but this all should be revised once Concurrency TS wraps up. | ||||||||||||||||||
* | * | ||||||||||||||||||
* Future is _not_ thread-safe. Call it from a single thread or ensure synchronization externally. | * Future is _not_ thread-safe. Call it from a single thread or ensure synchronization externally. | ||||||||||||||||||
* | * | ||||||||||||||||||
* The destructor is never blocking. The promise may still be running on destruction. | * The destructor is never blocking. The promise may still be running on destruction. | ||||||||||||||||||
* TODO: | * TODO: | ||||||||||||||||||
* - Handle exceptions. | * - Handle exceptions. | ||||||||||||||||||
*/ | */ | ||||||||||||||||||
template<typename ResultType> | template<typename ResultType> | ||||||||||||||||||
class Future | class Future | ||||||||||||||||||
Not Done Inline ActionsI'm not convinced that templating Future<> on the type of the function rather than the result type makes the most sense. Most future implementations I've seen template on the result type, and then type erase the function type used to produce the value/consume the result as a callback. jprahman: I'm not convinced that templating Future<> on the type of the function rather than the result… | |||||||||||||||||||
{ | { | ||||||||||||||||||
template<typename T> | |||||||||||||||||||
friend class PackagedTask; | |||||||||||||||||||
static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | ||||||||||||||||||
Not Done Inline ActionsIf we limit sharedState to only be owned by at most two entities (Future holder and Task holder), we could drop the std::shared_ptr<> and associated reference counting, and instead use terminal states of the SharedState state machine to indicate when the SharedState should be destructed. As of today, there are two levels of std::atomic operations:
The ref count in 2. is implicit in the state machine from 1 if only up to two copies of the reference can be held (one in the Future, and another in the Promise/Task used to fulfill the future). Ie. there are terminal states in the state machine which indicate the object can/should be deallocated. jprahman: If we limit sharedState to only be owned by at most two entities (Future holder and Task… | |||||||||||||||||||
Done Inline ActionsI don't get what you try to say. What do you mean by "terminal state"? The FutureSharedStateDetail::Status? phosit: I don't get what you try to say. What do you mean by "terminal state"? The… | |||||||||||||||||||
Not Done Inline ActionsMy idea here is spelled out in more detail here: https://github.com/facebook/folly/blob/main/folly/futures/detail/Core.h#L250-L338. Basic idea is that for a future/promise pair, there are up to two different owners of the shared state:
As the future and promise are constructed/destructed/callbacks are set/result is set, the state of the FSM is updated. Certain states and branches in the FSM correspond to certain combinations of the future and/or promise still being alive. The final state of the FSM occurs when the associated future and promise have both been destructed and a final transition is made to a terminal state to indicate this. jprahman: My idea here is spelled out in more detail here: https://github. | |||||||||||||||||||
Done Inline ActionsThe idea is promissing... I leave that for another patch. phosit: The idea is promissing... I leave that for another patch. | |||||||||||||||||||
using Status = FutureSharedStateDetail::Status; | using Status = FutureSharedStateDetail::Status; | ||||||||||||||||||
using SharedState = FutureSharedStateDetail::SharedState<ResultType>; | using SharedState = FutureSharedStateDetail::SharedState<ResultType>; | ||||||||||||||||||
public: | public: | ||||||||||||||||||
Future() = default; | Future() = default; | ||||||||||||||||||
Future(const Future& o) = delete; | Future(const Future& o) = delete; | ||||||||||||||||||
Future(Future&&) = default; | Future(Future&&) = default; | ||||||||||||||||||
Future& operator=(Future&&) = default; | Future& operator=(Future&&) = default; | ||||||||||||||||||
~Future() = default; | ~Future() = default; | ||||||||||||||||||
/** | /** | ||||||||||||||||||
* Make the future wait for the result of @a func. | * Make the future wait for the result of @a func. | ||||||||||||||||||
*/ | */ | ||||||||||||||||||
template<typename T> | template<typename T> | ||||||||||||||||||
Not Done Inline Actions
IMO probably best to move templatization on the functor type to the constructor itself, and perform type erasure inside SharedState<Result> jprahman: IMO probably best to move templatization on the functor type to the constructor itself, and… | |||||||||||||||||||
PackagedTask<ResultType> Wrap(T&& func); | PackagedTask Wrap(T&& func); | ||||||||||||||||||
/** | /** | ||||||||||||||||||
* Move the result out of the future, and invalidate the future. | * Move the result out of the future, and invalidate the future. | ||||||||||||||||||
* If the future is not complete, calls Wait(). | * If the future is not complete, calls Wait(). | ||||||||||||||||||
* If the future is canceled, asserts. | * If the future is canceled, asserts. | ||||||||||||||||||
*/ | */ | ||||||||||||||||||
template<typename SfinaeType = ResultType> | template<typename SfinaeType = ResultType> | ||||||||||||||||||
std::enable_if_t<!std::is_same_v<SfinaeType, void>, ResultType> Get() | std::enable_if_t<!std::is_same_v<SfinaeType, void>, ResultType> Get() | ||||||||||||||||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | if (m_SharedState) | ||||||||||||||||||
m_SharedState->Cancel(); | m_SharedState->Cancel(); | ||||||||||||||||||
m_SharedState.reset(); | m_SharedState.reset(); | ||||||||||||||||||
} | } | ||||||||||||||||||
protected: | protected: | ||||||||||||||||||
std::shared_ptr<SharedState> m_SharedState; | std::shared_ptr<SharedState> m_SharedState; | ||||||||||||||||||
}; | }; | ||||||||||||||||||
/** | /** | ||||||||||||||||||
* Corresponds somewhat to std::packaged_task. | * A lightweight handle to a task can be used for storing in a task queue. | ||||||||||||||||||
* Like packaged_task, this holds a function acting as a promise. | |||||||||||||||||||
* This type is mostly just the shared state and the call operator, | |||||||||||||||||||
* handling the promise & continuation logic. | |||||||||||||||||||
*/ | */ | ||||||||||||||||||
template<typename ResultType> | |||||||||||||||||||
class PackagedTask | class PackagedTask | ||||||||||||||||||
{ | { | ||||||||||||||||||
static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | |||||||||||||||||||
public: | public: | ||||||||||||||||||
PackagedTask() = delete; | PackagedTask() = default; | ||||||||||||||||||
PackagedTask(std::shared_ptr<typename Future<ResultType>::SharedState> ss) : m_SharedState(std::move(ss)) {} | explicit PackagedTask(std::shared_ptr<FutureSharedStateDetail::ErasedSharedState> ss) : | ||||||||||||||||||
m_SharedState(std::move(ss)) | |||||||||||||||||||
{} | |||||||||||||||||||
void operator()() | void operator()() | ||||||||||||||||||
{ | { | ||||||||||||||||||
typename Future<ResultType>::Status expected = Future<ResultType>::Status::PENDING; | (*m_SharedState)(); | ||||||||||||||||||
if (!m_SharedState->m_Status.compare_exchange_strong(expected, Future<ResultType>::Status::STARTED)) | |||||||||||||||||||
return; | |||||||||||||||||||
if constexpr (VoidResult) | |||||||||||||||||||
m_SharedState->m_Func(); | |||||||||||||||||||
else | |||||||||||||||||||
{ | |||||||||||||||||||
// To avoid UB, explicitly placement-new the value. | |||||||||||||||||||
new (&m_SharedState->m_Result) ResultType{std::move(m_SharedState->m_Func())}; | |||||||||||||||||||
m_SharedState->m_HasResult = true; | |||||||||||||||||||
} | |||||||||||||||||||
// Because we might have threads waiting on us, we need to make sure that they either: | |||||||||||||||||||
// - don't wait on our condition variable | |||||||||||||||||||
// - receive the notification when we're done. | |||||||||||||||||||
// This requires locking the mutex (@see Wait). | |||||||||||||||||||
{ | |||||||||||||||||||
std::lock_guard<std::mutex> lock(m_SharedState->m_Mutex); | |||||||||||||||||||
m_SharedState->m_Status = Future<ResultType>::Status::DONE; | |||||||||||||||||||
} | |||||||||||||||||||
m_SharedState->m_ConditionVariable.notify_all(); | |||||||||||||||||||
// We no longer need the shared state, drop it immediately. | // We no longer need the shared state, drop it immediately. | ||||||||||||||||||
m_SharedState.reset(); | m_SharedState.reset(); | ||||||||||||||||||
} | } | ||||||||||||||||||
void Cancel() | |||||||||||||||||||
{ | |||||||||||||||||||
m_SharedState->Cancel(); | |||||||||||||||||||
m_SharedState.reset(); | |||||||||||||||||||
} | |||||||||||||||||||
protected: | protected: | ||||||||||||||||||
std::shared_ptr<typename Future<ResultType>::SharedState> m_SharedState; | std::shared_ptr<FutureSharedStateDetail::ErasedSharedState> m_SharedState; | ||||||||||||||||||
}; | }; | ||||||||||||||||||
template<typename ResultType> | template<typename ResultType> | ||||||||||||||||||
template<typename T> | template<typename T> | ||||||||||||||||||
PackagedTask<ResultType> Future<ResultType>::Wrap(T&& func) | PackagedTask Future<ResultType>::Wrap(T&& func) | ||||||||||||||||||
{ | { | ||||||||||||||||||
static_assert(std::is_convertible_v<std::invoke_result_t<T>, ResultType>, "The return type of the wrapped function cannot be converted to the type of the Future."); | static_assert(std::is_convertible_v<std::invoke_result_t<T>, ResultType>, "The return type of the wrapped function cannot be converted to the type of the Future."); | ||||||||||||||||||
m_SharedState = std::make_shared<SharedState>(std::move(func)); | m_SharedState = std::make_shared<SharedState>(std::move(func)); | ||||||||||||||||||
return PackagedTask<ResultType>(m_SharedState); | return PackagedTask{m_SharedState}; | ||||||||||||||||||
} | } | ||||||||||||||||||
#endif // INCLUDED_FUTURE | #endif // INCLUDED_FUTURE |
I've seen these sort of metaprogramming template helpers in other recent diffs as well. It may be worthwhile to pull them out into a TMP helper library of type traits, wrapper type, and such vs. reinventing them in different headers (where they may or may not collide depending on the namespaces they're defined in).