Changeset View
Changeset View
Standalone View
Standalone View
source/ps/Execution/WhenAll.h
- This file was added.
/* Copyright (C) 2022 Wildfire Games. | |||||||||||||
* This file is part of 0 A.D. | |||||||||||||
* | |||||||||||||
* 0 A.D. is free software: you can redistribute it and/or modify | |||||||||||||
* it under the terms of the GNU General Public License as published by | |||||||||||||
* the Free Software Foundation, either version 2 of the License, or | |||||||||||||
* (at your option) any later version. | |||||||||||||
* | |||||||||||||
* 0 A.D. is distributed in the hope that it will be useful, | |||||||||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||||||||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||||||||||||
* GNU General Public License for more details. | |||||||||||||
* | |||||||||||||
* You should have received a copy of the GNU General Public License | |||||||||||||
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>. | |||||||||||||
*/ | |||||||||||||
#ifndef INCLUDED_EXECUTION_WHEN_ALL | |||||||||||||
#define INCLUDED_EXECUTION_WHEN_ALL | |||||||||||||
#include "ps/Execution/Execution.h" | |||||||||||||
#include <boost/optional.hpp> | |||||||||||||
#include <condition_variable> | |||||||||||||
#include <mutex> | |||||||||||||
StanUnsubmitted Not Done Inline Actions
Stan: | |||||||||||||
namespace PS | |||||||||||||
{ | |||||||||||||
namespace Execution | |||||||||||||
{ | |||||||||||||
namespace WhenAll | |||||||||||||
{ | |||||||||||||
template<typename Wrapped, typename... Results> | |||||||||||||
class CollectorReceiver | |||||||||||||
{ | |||||||||||||
public: | |||||||||||||
CollectorReceiver(Wrapped next) : | |||||||||||||
m_Next{next} | |||||||||||||
{} | |||||||||||||
// Not moveable because SingleReceiver hold references. | |||||||||||||
CollectorReceiver(const CollectorReceiver&) = delete; | |||||||||||||
CollectorReceiver& operator = (const CollectorReceiver&) = delete; | |||||||||||||
CollectorReceiver(CollectorReceiver&&) = delete; | |||||||||||||
CollectorReceiver& operator = (CollectorReceiver&&) = delete; | |||||||||||||
template<size_t idx, typename... Res> | |||||||||||||
void SetValue(std::integral_constant<size_t, idx>, Res&&... result) | |||||||||||||
{ | |||||||||||||
if constexpr (sizeof...(Res) == 0) | |||||||||||||
{ | |||||||||||||
static_assert(std::is_same_v< | |||||||||||||
std::tuple_element_t<idx, std::tuple<Results...>>, VoidTag>); | |||||||||||||
std::get<idx>(m_Results).emplace(VoidTag{}); | |||||||||||||
} | |||||||||||||
else | |||||||||||||
{ | |||||||||||||
static_assert(sizeof...(Res) == 1 && | |||||||||||||
std::is_same_v<Res..., std::tuple_element_t<idx, std::tuple<Results...>>>); | |||||||||||||
std::get<idx>(m_Results).emplace(std::forward<Res...>(result...)); | |||||||||||||
} | |||||||||||||
Continue(); | |||||||||||||
} | |||||||||||||
void SetError(std::exception_ptr ex) | |||||||||||||
{ | |||||||||||||
if (!m_HoldsException.exchange(true)) | |||||||||||||
{ | |||||||||||||
m_Exception = ex; | |||||||||||||
m_StopSource.request_stop(); | |||||||||||||
} | |||||||||||||
Continue(); | |||||||||||||
} | |||||||||||||
void SetDone() | |||||||||||||
{ | |||||||||||||
Continue(); | |||||||||||||
} | |||||||||||||
private: | |||||||||||||
void Continue() | |||||||||||||
{ | |||||||||||||
if (--m_StillRunning == 0) | |||||||||||||
{ | |||||||||||||
if (m_Exception) | |||||||||||||
{ | |||||||||||||
m_Next.SetError(m_Exception); | |||||||||||||
} | |||||||||||||
else if (m_StopSource.stop_requested()) | |||||||||||||
m_Next.SetDone(); | |||||||||||||
else | |||||||||||||
m_Next.SetValue(GetValues()); | |||||||||||||
} | |||||||||||||
} | |||||||||||||
std::tuple<Results...> GetValues() | |||||||||||||
{ | |||||||||||||
return GetValuesImpl(std::index_sequence_for<Results...>{}); | |||||||||||||
} | |||||||||||||
template<size_t... idx> | |||||||||||||
std::tuple<Results...> GetValuesImpl(std::index_sequence<idx...>) | |||||||||||||
{ | |||||||||||||
return std::tuple<Results...> | |||||||||||||
{ | |||||||||||||
std::forward<Results>(std::get<idx>(m_Results).value())... | |||||||||||||
}; | |||||||||||||
} | |||||||||||||
Wrapped m_Next; | |||||||||||||
std::atomic<size_t> m_StillRunning{sizeof...(Results)}; | |||||||||||||
stop_source m_StopSource; | |||||||||||||
std::tuple<boost::optional<Results>...> m_Results; | |||||||||||||
std::atomic<bool> m_HoldsException = false; | |||||||||||||
std::exception_ptr m_Exception; | |||||||||||||
}; | |||||||||||||
// SingleReceiver forwards the result to the collector | |||||||||||||
template<size_t idx, typename Collector> | |||||||||||||
class SingleReceiver | |||||||||||||
{ | |||||||||||||
public: | |||||||||||||
explicit SingleReceiver(std::integral_constant<size_t, idx>, Collector& collector) : | |||||||||||||
m_Collector{collector} | |||||||||||||
{} | |||||||||||||
template<typename... Value> | |||||||||||||
void SetValue(Value&&... result) | |||||||||||||
{ | |||||||||||||
m_Collector.SetValue(std::integral_constant<size_t, idx>{}, std::forward<Value>(result)...); | |||||||||||||
} | |||||||||||||
void SetError(std::exception_ptr ex) | |||||||||||||
{ | |||||||||||||
m_Collector.SetError(std::move(ex)); | |||||||||||||
} | |||||||||||||
void SetDone() | |||||||||||||
{ | |||||||||||||
m_Collector.SetDone(); | |||||||||||||
} | |||||||||||||
private: | |||||||||||||
Collector& m_Collector; | |||||||||||||
}; | |||||||||||||
template<typename NextRecv, typename... Senders> | |||||||||||||
class OperationState | |||||||||||||
{ | |||||||||||||
public: | |||||||||||||
OperationState(NextRecv next, std::tuple<Senders...> previousSender) : | |||||||||||||
m_Continuation{next}, | |||||||||||||
m_ToBeStarted(connectEverything(m_Continuation, previousSender, | |||||||||||||
jprahmanUnsubmitted Not Done Inline ActionsThis is doing a full copy of previousSender? jprahman: This is doing a full copy of previousSender? | |||||||||||||
std::index_sequence_for<Senders...>{})) | |||||||||||||
{} | |||||||||||||
void Start() | |||||||||||||
{ | |||||||||||||
StartImpl(std::make_index_sequence<std::tuple_size_v<Prev>>{}); | |||||||||||||
} | |||||||||||||
private: | |||||||||||||
template<size_t... idx> | |||||||||||||
void StartImpl(std::index_sequence<idx...>) | |||||||||||||
{ | |||||||||||||
(std::get<idx>(m_ToBeStarted).Start(),...); | |||||||||||||
} | |||||||||||||
using Collector = CollectorReceiver<NextRecv, HandleVoid<ResultTypeOf<Senders>>...>; | |||||||||||||
// Hack so it is usable in std::invoke_result_t | |||||||||||||
struct ConnectEverythingFn | |||||||||||||
{ | |||||||||||||
template<size_t... idx> | |||||||||||||
auto operator ()(Collector& continuation, | |||||||||||||
std::tuple<Senders...> previousSenders, std::index_sequence<idx...>) const | |||||||||||||
{ | |||||||||||||
return std::tuple | |||||||||||||
{ | |||||||||||||
std::get<idx>(previousSenders).Connect( | |||||||||||||
SingleReceiver{std::integral_constant<size_t, idx>{}, | |||||||||||||
continuation})... | |||||||||||||
}; | |||||||||||||
} | |||||||||||||
}; | |||||||||||||
static constexpr ConnectEverythingFn connectEverything{}; | |||||||||||||
using Prev = std::invoke_result_t<ConnectEverythingFn, Collector&, | |||||||||||||
std::tuple<Senders...>, std::index_sequence_for<Senders...>>; | |||||||||||||
Collector m_Continuation; | |||||||||||||
Prev m_ToBeStarted; | |||||||||||||
}; | |||||||||||||
template<typename... Prevs> | |||||||||||||
class Sender | |||||||||||||
{ | |||||||||||||
public: | |||||||||||||
using ResultType = std::tuple<HandleVoid<ResultTypeOf<Prevs>>...>; | |||||||||||||
Sender(Prevs... prevs) : | |||||||||||||
m_Prevs{std::forward<Prevs>(prevs)...} | |||||||||||||
{} | |||||||||||||
template<typename Next> | |||||||||||||
auto Connect(Next&& next) const& | |||||||||||||
{ | |||||||||||||
return OperationState(std::forward<Next>(next), m_Prevs); | |||||||||||||
} | |||||||||||||
template<typename Next> | |||||||||||||
auto Connect(Next&& next) && | |||||||||||||
{ | |||||||||||||
return OperationState(std::forward<Next>(next), std::move(m_Prevs)); | |||||||||||||
} | |||||||||||||
private: | |||||||||||||
std::tuple<Prevs...> m_Prevs; | |||||||||||||
}; | |||||||||||||
struct Fn | |||||||||||||
{ | |||||||||||||
template<typename... Prevs> | |||||||||||||
WhenAll::Sender<Prevs...> operator ()(Prevs&&... prevs) const | |||||||||||||
{ | |||||||||||||
static_assert(sizeof...(Prevs) > 0); | |||||||||||||
return WhenAll::Sender<Prevs...>{std::forward<Prevs>(prevs)...}; | |||||||||||||
} | |||||||||||||
}; | |||||||||||||
} // WhenAll | |||||||||||||
constexpr WhenAll::Fn whenAll; | |||||||||||||
} | |||||||||||||
} | |||||||||||||
#endif // INCLUDED_EXECUTION_WHEN_ALL |
Wildfire Games · Phabricator