Index: source/ps/Future.h =================================================================== --- source/ps/Future.h +++ source/ps/Future.h @@ -118,7 +118,36 @@ return ret; } + template + void SetValue(Args&&... args) + { + // these restrictions could be loosened if desired. + static_assert(sizeof...(Args) == 0 || sizeof...(Args) == 1); + static_assert(std::is_void_v || sizeof...(Args) != 0, + "If `ResultType` is `void` there has to be no arguments."); + static_assert((std::is_same_v && ...), + "The argument has to be of type `ResultType`"); + + if constexpr (sizeof...(Args) != 0) + this->emplace(std::forward(args)...); + + // Because we might have threads waiting on us, we need to make sure that they either: + // - don't wait on our condition variable + // - receive the notification when we're done. + // This requires locking the mutex (@see Wait). + { + std::lock_guard lock(m_Mutex); + m_Status = FutureSharedStateDetail::Status::DONE; + } + + m_ConditionVariable.notify_all(); + } + + // The `SharedState`s use this member directly it's simpler to make it + // public. std::atomic m_Status = Status::PENDING; + +private: std::mutex m_Mutex; std::condition_variable m_ConditionVariable; }; @@ -165,6 +194,37 @@ Future& operator=(Future&&) = default; ~Future() = default; + /** + * A constructor for any `LazyTaskRepresentation`. (the name might change) + * + * `LazyTaskRepresentation` does (as the name suggests) represent a + * not-started task. Meaning not pushed to a task manager. It is not held + * alive during the asyncronous computation. It has to have a public + * member type template `SharedState` which fulfills that purpose. The + * template parameter of `SharedState` is a task-manager type. + * + * `LazyTaskRepresentation::SharedState` has to have a public + * non-static-member variable `receiver`. The type of `receiver` has to be + * a instantiation of `FutureSharedStateDetail::Receiver`. + * + * When `Start` is called on the `SharedState` it has to "start itself" + * e.g. push something to a task-manager. + * + * Currently only `Execution::Bulk` fulfills this concept. + */ + template + Future(TaskManager& taskManager, LazyTaskRepresentation representation) + { + auto temp = std::make_shared>( + taskManager, std::move(representation)); + + temp->Start(); + + using ReceiverType = decltype(temp->receiver); + + m_SharedState = std::shared_ptr{temp, &temp->receiver}; + } + /** * Make the future wait for the result of @a func. */ @@ -258,20 +318,12 @@ } if constexpr (std::is_void_v>) - m_SharedState->callback(); - else - m_SharedState->receiver.emplace(m_SharedState->callback()); - - // Because we might have threads waiting on us, we need to make sure that they either: - // - don't wait on our condition variable - // - receive the notification when we're done. - // This requires locking the mutex (@see Wait). { - std::lock_guard lock(m_SharedState->receiver.m_Mutex); - m_SharedState->receiver.m_Status = FutureSharedStateDetail::Status::DONE; + m_SharedState->callback(); + m_SharedState->receiver.SetValue(); } - - m_SharedState->receiver.m_ConditionVariable.notify_all(); + else + m_SharedState->receiver.SetValue(m_SharedState->callback()); // We no longer need the shared state, drop it immediately. m_SharedState.reset(); @@ -298,4 +350,85 @@ return PackagedTask(std::move(temp)); } +// TODO: Move this to it's own file. +namespace Execution +{ +/** + * Invokes the function-object with a range of values. The return value of the + * call operator is ignored. If the call operator does throw, `std::terminate` + * is called (implicitly). (We should store the exception in the receiver or + * check that the call operator is `noexcept`) + * + * The range and the callback are copyed. If the range or the callback is + * "borowed" like `PS::span` or `std::reference_wrapper` the underlying variable + * has to be kept alive untill the task completes. + */ +template +class Bulk +{ + using RangeIteratorTrait = std::iterator_traits; + using RangeIteratorCategory = typename RangeIteratorTrait::iterator_category; +public: + static_assert(std::is_base_of_v, + "The range has to be a forward range."); + static_assert(std::is_invocable_v); + + template + class SharedState : public std::enable_shared_from_this> + { + public: + SharedState(TaskManager& sheduler, Bulk constructionArg) : + range{std::move(constructionArg.range)}, + callback{std::move(constructionArg.callback)}, + taskManager{sheduler} + {} + + void Start() + { + // Set `STARTED` so that we don't get canceled. + receiver.m_Status.store(FutureSharedStateDetail::Status::STARTED); + + std::vector> tasksToPush; + tasksToPush.reserve(range.size()); + + for(auto& argument : range) + // It is save to reference `argument` since it's a reference themself. + tasksToPush.emplace_back([&argument, callback = callback, + sharedThis = this->shared_from_this()]() mutable noexcept + { + // This could be changed to `std::invoke` if desired. + std::move(callback)(argument); + + const auto amountRunning = sharedThis->runningBranches.fetch_sub(1); + if (amountRunning == 1) + // If we are the last branch running inform the receiver. + sharedThis->receiver.SetValue(); + }); + + taskManager.BulkPushPackagedTasks(std::move(tasksToPush)); + } + + private: + // Adopt the data from `Bulk` + Range range; + Callback callback; + + // Additional members needed to run the task. + TaskManager& taskManager; + std::atomic runningBranches{range.size()}; + + public: + FutureSharedStateDetail::Receiver receiver; + }; + + Bulk(Range argumentRange, Callback callbackFunc) : + range{std::move(argumentRange)}, + callback{std::move(callbackFunc)} + {} + + Range range; + Callback callback; +}; +} // Namespace Execution + #endif // INCLUDED_FUTURE Index: source/ps/TaskManager.h =================================================================== --- source/ps/TaskManager.h +++ source/ps/TaskManager.h @@ -71,6 +71,15 @@ return ret; } + /** + * Packaged tasks can be pushed without getting a `Future` back. + * The result (return and exception) should be comunicated throu a chanel + * packaged within the task. Hence the call operator doesn't return + * anything (`void`) and it doesn't throw. + */ + void BulkPushPackagedTasks(std::vector> packagedTasks, + TaskPriority priority = TaskPriority::NORMAL); + private: TaskManager(size_t numberOfWorkers); Index: source/ps/TaskManager.cpp =================================================================== --- source/ps/TaskManager.cpp +++ source/ps/TaskManager.cpp @@ -143,6 +143,8 @@ */ void PushTask(std::function&& task, TaskPriority priority); + void BulkPushTasks(std::vector>, TaskPriority priority); + protected: void ClearQueue(); @@ -197,6 +199,29 @@ return m->m_Workers.size(); } +void TaskManager::BulkPushPackagedTasks(std::vector> packagedTasks, + TaskPriority priority) +{ + m->BulkPushTasks(std::move(packagedTasks), priority); +} + +void TaskManager::Impl::BulkPushTasks(std::vector> packagedTasks, + TaskPriority priority) +{ + std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; + std::deque& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; + std::atomic& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork; + { + std::lock_guard lock(mutex); + queue.insert(queue.end(), + std::move_iterator(packagedTasks.begin()), std::move_iterator(packagedTasks.end())); + hasWork = true; + } + + for (WorkerThread& worker : m_Workers) + worker.Wake(); +} + void TaskManager::DoPushTask(std::function&& task, TaskPriority priority) { m->PushTask(std::move(task), priority); Index: source/simulation2/components/CCmpPathfinder.cpp =================================================================== --- source/simulation2/components/CCmpPathfinder.cpp +++ source/simulation2/components/CCmpPathfinder.cpp @@ -40,6 +40,7 @@ #include "ps/CLogger.h" #include "ps/CStr.h" #include "ps/Profile.h" +#include "ps/TaskManager.h" #include "ps/XML/Xeromyces.h" #include "renderer/Scene.h" @@ -66,9 +67,6 @@ m_LongPathfinder = std::make_unique(); m_PathfinderHier = std::make_unique(); - // Set up one future for each worker thread. - m_Futures.resize(workerThreads); - // Register Relax NG validator CXeromyces::AddValidator(g_VFS, "pathfinder", "simulation/data/pathfinder.rng"); @@ -105,10 +103,7 @@ { SetDebugOverlay(false); // cleans up memory - // Wait on all pathfinding tasks. - for (Future& future : m_Futures) - future.CancelOrWait(); - m_Futures.clear(); + m_Future.CancelOrWait(); SAFE_DELETE(m_AtlasOverlay); @@ -802,8 +797,7 @@ } // We're done, clear futures. // Use CancelOrWait instead of just Cancel to ensure determinism. - for (Future& future : m_Futures) - future.CancelOrWait(); + m_Future.CancelOrWait(); { PROFILE2("PostMessages"); @@ -829,17 +823,18 @@ m_LongPathRequests.PrepareForComputation(useMax ? m_MaxSameTurnMoves : 0); Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); - for (size_t i = 0; i < m_Futures.size(); ++i) - { - ENSURE(!m_Futures[i].Valid()); - // Pass the i+1th vertex pathfinder to keep the first for the main thread, - // each thread get its own instance to avoid conflicts in cached data. - m_Futures[i] = taskManager.PushTask([&pathfinder=*this, &vertexPfr=m_VertexPathfinders[i + 1]]() { - PROFILE2("Async pathfinding"); - pathfinder.m_ShortPathRequests.Compute(pathfinder, vertexPfr); - pathfinder.m_LongPathRequests.Compute(pathfinder, *pathfinder.m_LongPathfinder); - }); - } + + ENSURE(!m_Future.Valid()); + // Keep the first pathfinder for the main thread, each task-branch get its + // own instance to avoid conflicts in cached data. + m_Future = {taskManager, Execution::Bulk{ + PS::span{m_VertexPathfinders.data() + 1, m_VertexPathfinders.size() - 1}, + [&pathfinder = *this](VertexPathfinder& vertexPfr) + { + PROFILE2("Async pathfinding"); + pathfinder.m_ShortPathRequests.Compute(pathfinder, vertexPfr); + pathfinder.m_LongPathRequests.Compute(pathfinder, *pathfinder.m_LongPathfinder); + }}}; } Index: source/simulation2/components/CCmpPathfinder_Common.h =================================================================== --- source/simulation2/components/CCmpPathfinder_Common.h +++ source/simulation2/components/CCmpPathfinder_Common.h @@ -35,6 +35,7 @@ #include "graphics/Terrain.h" #include "maths/MathUtil.h" #include "ps/CLogger.h" +#include "ps/Future.h" #include "ps/TaskManager.h" #include "renderer/TerrainOverlay.h" #include "simulation2/components/ICmpObstructionManager.h" @@ -100,7 +101,7 @@ std::unique_ptr m_LongPathfinder; // One per live asynchronous path computing task. - std::vector> m_Futures; + Future m_Future; template class PathRequests {