Changeset View
Changeset View
Standalone View
Standalone View
ps/trunk/source/ps/Future.h
Show All 21 Lines | |||||
#include <atomic> | #include <atomic> | ||||
#include <condition_variable> | #include <condition_variable> | ||||
#include <functional> | #include <functional> | ||||
#include <mutex> | #include <mutex> | ||||
#include <optional> | #include <optional> | ||||
#include <type_traits> | #include <type_traits> | ||||
template<typename ResultType> | template<typename Callback> | ||||
class PackagedTask; | class PackagedTask; | ||||
namespace FutureSharedStateDetail | namespace FutureSharedStateDetail | ||||
{ | { | ||||
enum class Status | enum class Status | ||||
{ | { | ||||
PENDING, | PENDING, | ||||
STARTED, | STARTED, | ||||
DONE, | DONE, | ||||
CANCELED | CANCELED | ||||
}; | }; | ||||
template<typename T> | template<typename T> | ||||
using ResultHolder = std::conditional_t<std::is_void_v<T>, std::nullopt_t, std::optional<T>>; | using ResultHolder = std::conditional_t<std::is_void_v<T>, std::nullopt_t, std::optional<T>>; | ||||
/** | /** | ||||
* The shared state between futures and packaged state. | * The shared state between futures and packaged state. | ||||
* Holds all relevant data. | * Holds all relevant data. | ||||
*/ | */ | ||||
template<typename ResultType> | template<typename ResultType> | ||||
class SharedState : public ResultHolder<ResultType> | class Receiver : public ResultHolder<ResultType> | ||||
{ | { | ||||
static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | ||||
public: | public: | ||||
SharedState(std::function<ResultType()>&& func) : | Receiver() : | ||||
ResultHolder<ResultType>{std::nullopt}, | ResultHolder<ResultType>{std::nullopt} | ||||
m_Func(std::move(func)) | |||||
{} | {} | ||||
~SharedState() | ~Receiver() | ||||
{ | { | ||||
// For safety, wait on started task completion, but not on pending ones (auto-cancelled). | // For safety, wait on started task completion, but not on pending ones (auto-cancelled). | ||||
if (!Cancel()) | if (!Cancel()) | ||||
{ | { | ||||
Wait(); | Wait(); | ||||
Cancel(); | Cancel(); | ||||
} | } | ||||
} | } | ||||
SharedState(const SharedState&) = delete; | Receiver(const Receiver&) = delete; | ||||
SharedState(SharedState&&) = delete; | Receiver(Receiver&&) = delete; | ||||
bool IsDoneOrCanceled() const | bool IsDoneOrCanceled() const | ||||
{ | { | ||||
return m_Status == Status::DONE || m_Status == Status::CANCELED; | return m_Status == Status::DONE || m_Status == Status::CANCELED; | ||||
} | } | ||||
void Wait() | void Wait() | ||||
{ | { | ||||
Show All 38 Lines | std::enable_if_t<!std::is_same_v<_ResultType, void>, ResultType> GetResult() | ||||
ResultType ret = std::move(**this); | ResultType ret = std::move(**this); | ||||
this->reset(); | this->reset(); | ||||
return ret; | return ret; | ||||
} | } | ||||
std::atomic<Status> m_Status = Status::PENDING; | std::atomic<Status> m_Status = Status::PENDING; | ||||
std::mutex m_Mutex; | std::mutex m_Mutex; | ||||
std::condition_variable m_ConditionVariable; | std::condition_variable m_ConditionVariable; | ||||
}; | |||||
template<typename Callback> | |||||
struct SharedState | |||||
{ | |||||
SharedState(Callback&& callbackFunc) : | |||||
callback{std::forward<Callback>(callbackFunc)} | |||||
{} | |||||
std::function<ResultType()> m_Func; | Callback callback; | ||||
Receiver<std::invoke_result_t<Callback>> receiver; | |||||
}; | }; | ||||
} // namespace FutureSharedStateDetail | } // namespace FutureSharedStateDetail | ||||
/** | /** | ||||
* Corresponds to std::future. | * Corresponds to std::future. | ||||
* Unlike std::future, Future can request the cancellation of the task that would produce the result. | * Unlike std::future, Future can request the cancellation of the task that would produce the result. | ||||
* This makes it more similar to Java's CancellableTask or C#'s Task. | * This makes it more similar to Java's CancellableTask or C#'s Task. | ||||
Show All 10 Lines | |||||
class Future | class Future | ||||
{ | { | ||||
template<typename T> | template<typename T> | ||||
friend class PackagedTask; | friend class PackagedTask; | ||||
static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | ||||
using Status = FutureSharedStateDetail::Status; | using Status = FutureSharedStateDetail::Status; | ||||
using SharedState = FutureSharedStateDetail::SharedState<ResultType>; | |||||
public: | public: | ||||
Future() = default; | Future() = default; | ||||
Future(const Future& o) = delete; | Future(const Future& o) = delete; | ||||
Future(Future&&) = default; | Future(Future&&) = default; | ||||
Future& operator=(Future&&) = default; | Future& operator=(Future&&) = default; | ||||
~Future() = default; | ~Future() = default; | ||||
/** | /** | ||||
* Make the future wait for the result of @a func. | * Make the future wait for the result of @a func. | ||||
*/ | */ | ||||
template<typename T> | template<typename Callback> | ||||
PackagedTask<ResultType> Wrap(T&& func); | PackagedTask<Callback> Wrap(Callback&& callback); | ||||
/** | /** | ||||
* Move the result out of the future, and invalidate the future. | * Move the result out of the future, and invalidate the future. | ||||
* If the future is not complete, calls Wait(). | * If the future is not complete, calls Wait(). | ||||
* If the future is canceled, asserts. | * If the future is canceled, asserts. | ||||
*/ | */ | ||||
template<typename SfinaeType = ResultType> | template<typename SfinaeType = ResultType> | ||||
std::enable_if_t<!std::is_same_v<SfinaeType, void>, ResultType> Get() | std::enable_if_t<!std::is_same_v<SfinaeType, void>, ResultType> Get() | ||||
{ | { | ||||
ENSURE(!!m_SharedState); | ENSURE(!!m_Receiver); | ||||
Wait(); | Wait(); | ||||
if constexpr (VoidResult) | if constexpr (VoidResult) | ||||
return; | return; | ||||
else | else | ||||
{ | { | ||||
ENSURE(m_SharedState->m_Status != Status::CANCELED); | ENSURE(m_Receiver->m_Status != Status::CANCELED); | ||||
// This mark the state invalid - can't call Get again. | // This mark the state invalid - can't call Get again. | ||||
return m_SharedState->GetResult(); | return m_Receiver->GetResult(); | ||||
} | } | ||||
} | } | ||||
/** | /** | ||||
* @return true if the shared state is valid and has a result (i.e. Get can be called). | * @return true if the shared state is valid and has a result (i.e. Get can be called). | ||||
*/ | */ | ||||
bool IsReady() const | bool IsReady() const | ||||
{ | { | ||||
return !!m_SharedState && m_SharedState->m_Status == Status::DONE; | return !!m_Receiver && m_Receiver->m_Status == Status::DONE; | ||||
} | } | ||||
/** | /** | ||||
* @return true if the future has a shared state and it's not been invalidated, ie. pending, started or done. | * @return true if the future has a shared state and it's not been invalidated, ie. pending, started or done. | ||||
*/ | */ | ||||
bool Valid() const | bool Valid() const | ||||
{ | { | ||||
return !!m_SharedState && m_SharedState->m_Status != Status::CANCELED; | return !!m_Receiver && m_Receiver->m_Status != Status::CANCELED; | ||||
} | } | ||||
void Wait() | void Wait() | ||||
{ | { | ||||
if (Valid()) | if (Valid()) | ||||
m_SharedState->Wait(); | m_Receiver->Wait(); | ||||
} | } | ||||
/** | /** | ||||
* Cancels the task, waiting if the task is currently started. | * Cancels the task, waiting if the task is currently started. | ||||
* Use this function over Cancel() if you need to ensure determinism (i.e. in the simulation). | * Use this function over Cancel() if you need to ensure determinism (i.e. in the simulation). | ||||
* @see Cancel. | * @see Cancel. | ||||
*/ | */ | ||||
void CancelOrWait() | void CancelOrWait() | ||||
{ | { | ||||
if (!Valid()) | if (!Valid()) | ||||
return; | return; | ||||
if (!m_SharedState->Cancel()) | if (!m_Receiver->Cancel()) | ||||
m_SharedState->Wait(); | m_Receiver->Wait(); | ||||
m_SharedState.reset(); | m_Receiver.reset(); | ||||
} | } | ||||
protected: | protected: | ||||
std::shared_ptr<SharedState> m_SharedState; | std::shared_ptr<FutureSharedStateDetail::Receiver<ResultType>> m_Receiver; | ||||
}; | }; | ||||
/** | /** | ||||
* Corresponds somewhat to std::packaged_task. | * Corresponds somewhat to std::packaged_task. | ||||
* Like packaged_task, this holds a function acting as a promise. | * Like packaged_task, this holds a function acting as a promise. | ||||
* This type is mostly just the shared state and the call operator, | * This type is mostly just the shared state and the call operator, | ||||
* handling the promise & continuation logic. | * handling the promise & continuation logic. | ||||
*/ | */ | ||||
template<typename ResultType> | template<typename Callback> | ||||
class PackagedTask | class PackagedTask | ||||
{ | { | ||||
static constexpr bool VoidResult = std::is_same_v<ResultType, void>; | |||||
public: | public: | ||||
PackagedTask() = delete; | PackagedTask() = delete; | ||||
PackagedTask(std::shared_ptr<typename Future<ResultType>::SharedState> ss) : m_SharedState(std::move(ss)) {} | PackagedTask(std::shared_ptr<FutureSharedStateDetail::SharedState<Callback>> ss) : | ||||
m_SharedState(std::move(ss)) | |||||
{} | |||||
void operator()() | void operator()() | ||||
{ | { | ||||
typename Future<ResultType>::Status expected = Future<ResultType>::Status::PENDING; | FutureSharedStateDetail::Status expected = FutureSharedStateDetail::Status::PENDING; | ||||
if (!m_SharedState->m_Status.compare_exchange_strong(expected, Future<ResultType>::Status::STARTED)) | if (!m_SharedState->receiver.m_Status.compare_exchange_strong(expected, | ||||
FutureSharedStateDetail::Status::STARTED)) | |||||
{ | |||||
return; | return; | ||||
} | |||||
if constexpr (VoidResult) | if constexpr (std::is_void_v<std::invoke_result_t<Callback>>) | ||||
m_SharedState->m_Func(); | m_SharedState->callback(); | ||||
else | else | ||||
m_SharedState->emplace(m_SharedState->m_Func()); | m_SharedState->receiver.emplace(m_SharedState->callback()); | ||||
// Because we might have threads waiting on us, we need to make sure that they either: | // Because we might have threads waiting on us, we need to make sure that they either: | ||||
// - don't wait on our condition variable | // - don't wait on our condition variable | ||||
// - receive the notification when we're done. | // - receive the notification when we're done. | ||||
// This requires locking the mutex (@see Wait). | // This requires locking the mutex (@see Wait). | ||||
{ | { | ||||
std::lock_guard<std::mutex> lock(m_SharedState->m_Mutex); | std::lock_guard<std::mutex> lock(m_SharedState->receiver.m_Mutex); | ||||
m_SharedState->m_Status = Future<ResultType>::Status::DONE; | m_SharedState->receiver.m_Status = FutureSharedStateDetail::Status::DONE; | ||||
} | } | ||||
m_SharedState->m_ConditionVariable.notify_all(); | m_SharedState->receiver.m_ConditionVariable.notify_all(); | ||||
// We no longer need the shared state, drop it immediately. | // We no longer need the shared state, drop it immediately. | ||||
m_SharedState.reset(); | m_SharedState.reset(); | ||||
} | } | ||||
void Cancel() | void Cancel() | ||||
{ | { | ||||
m_SharedState->Cancel(); | m_SharedState->Cancel(); | ||||
m_SharedState.reset(); | m_SharedState.reset(); | ||||
} | } | ||||
protected: | private: | ||||
std::shared_ptr<typename Future<ResultType>::SharedState> m_SharedState; | std::shared_ptr<FutureSharedStateDetail::SharedState<Callback>> m_SharedState; | ||||
}; | }; | ||||
template<typename ResultType> | template<typename ResultType> | ||||
template<typename T> | template<typename Callback> | ||||
PackagedTask<ResultType> Future<ResultType>::Wrap(T&& func) | PackagedTask<Callback> Future<ResultType>::Wrap(Callback&& callback) | ||||
{ | { | ||||
static_assert(std::is_same_v<std::invoke_result_t<T>, ResultType>, | static_assert(std::is_same_v<std::invoke_result_t<Callback>, ResultType>, | ||||
"The return type of the wrapped function is not the same as the type the Future expects."); | "The return type of the wrapped function is not the same as the type the Future expects."); | ||||
m_SharedState = std::make_shared<SharedState>(std::move(func)); | auto temp = std::make_shared<FutureSharedStateDetail::SharedState<Callback>>(std::move(callback)); | ||||
return PackagedTask<ResultType>(m_SharedState); | m_Receiver = {temp, &temp->receiver}; | ||||
return PackagedTask<Callback>(std::move(temp)); | |||||
} | } | ||||
#endif // INCLUDED_FUTURE | #endif // INCLUDED_FUTURE |
Wildfire Games · Phabricator