Index: source/graphics/MapGenerator.h =================================================================== --- source/graphics/MapGenerator.h +++ source/graphics/MapGenerator.h @@ -20,6 +20,7 @@ #include "lib/posix/posix_pthread.h" #include "ps/FileIo.h" +#include "ps/Future.h" #include "ps/TemplateLoader.h" #include "scriptinterface/StructuredClone.h" @@ -28,7 +29,6 @@ #include #include #include -#include class CMapGeneratorWorker; @@ -178,11 +178,6 @@ */ std::vector FindActorTemplates(const std::string& path, bool includeSubdirectories); - /** - * Perform map generation in an independent thread. - */ - static void RunThread(CMapGeneratorWorker* self); - /** * Perform the map generation. */ @@ -229,9 +224,10 @@ CTemplateLoader m_TemplateLoader; /** - * Holds the mapgeneration thread identifier. + * Holds the completion result of the asynchronous map generation. + * TODO: this whole class could really be a future on its own. */ - std::thread m_WorkerThread; + Future m_WorkerThread; /** * Avoids thread synchronization issues. Index: source/graphics/MapGenerator.cpp =================================================================== --- source/graphics/MapGenerator.cpp +++ source/graphics/MapGenerator.cpp @@ -30,7 +30,7 @@ #include "ps/CLogger.h" #include "ps/FileIo.h" #include "ps/Profile.h" -#include "ps/Threading.h" +#include "ps/ThreadPool.h" #include "ps/scripting/JSInterface_VFS.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptContext.h" @@ -68,9 +68,8 @@ CMapGeneratorWorker::~CMapGeneratorWorker() { - // Wait for thread to end - if (m_WorkerThread.joinable()) - m_WorkerThread.join(); + // Cancel or wait for the task to end. + m_WorkerThread.CancelOrWait(); } void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::string& settings) @@ -82,35 +81,31 @@ m_ScriptPath = scriptFile; m_Settings = settings; - // Launch the worker thread - m_WorkerThread = std::thread(Threading::HandleExceptions::Wrapper, this); -} + // Start generating the map asynchronously. + m_WorkerThread = ThreadPool::TaskManager::Instance().PushTask([this]() { + PROFILE2("Map Generation"); -void CMapGeneratorWorker::RunThread(CMapGeneratorWorker* self) -{ - debug_SetThreadName("MapGenerator"); - g_Profiler2.RegisterCurrentThread("MapGenerator"); + shared_ptr mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE); - shared_ptr mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE); + // Enable the script to be aborted + JS_AddInterruptCallback(mapgenContext->GetGeneralJSContext(), MapGeneratorInterruptCallback); - // Enable the script to be aborted - JS_AddInterruptCallback(mapgenContext->GetGeneralJSContext(), MapGeneratorInterruptCallback); + m_ScriptInterface = new ScriptInterface("Engine", "MapGenerator", mapgenContext); - self->m_ScriptInterface = new ScriptInterface("Engine", "MapGenerator", mapgenContext); + // Run map generation scripts + if (!Run() || m_Progress > 0) + { + // Don't leave progress in an unknown state, if generator failed, set it to -1 + std::lock_guard lock(m_WorkerMutex); + m_Progress = -1; + } - // Run map generation scripts - if (!self->Run() || self->m_Progress > 0) - { - // Don't leave progress in an unknown state, if generator failed, set it to -1 - std::lock_guard lock(self->m_WorkerMutex); - self->m_Progress = -1; - } + SAFE_DELETE(m_ScriptInterface); - SAFE_DELETE(self->m_ScriptInterface); - - // At this point the random map scripts are done running, so the thread has no further purpose - // and can die. The data will be stored in m_MapData already if successful, or m_Progress - // will contain an error value on failure. + // At this point the random map scripts are done running, so the thread has no further purpose + // and can die. The data will be stored in m_MapData already if successful, or m_Progress + // will contain an error value on failure. + }); } bool CMapGeneratorWorker::Run() Index: source/lib/sysdep/os/osx/odbg.cpp =================================================================== --- source/lib/sysdep/os/osx/odbg.cpp +++ source/lib/sysdep/os/osx/odbg.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2010 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -28,6 +28,8 @@ #include "lib/sysdep/sysdep.h" #include "lib/debug.h" +#include + void* debug_GetCaller(void* UNUSED(context), const wchar_t* UNUSED(lastFuncToSkip)) { return NULL; @@ -43,7 +45,7 @@ return ERR::NOT_SUPPORTED; } -void debug_SetThreadName(char const* UNUSED(name)) +void debug_SetThreadName(char const* name) { - // Currently unimplemented + pthread_setname_np(name); } Index: source/main.cpp =================================================================== --- source/main.cpp +++ source/main.cpp @@ -59,6 +59,7 @@ #include "ps/UserReport.h" #include "ps/Util.h" #include "ps/VideoMode.h" +#include "ps/ThreadPool.h" #include "ps/World.h" #include "ps/GameSetup/GameSetup.h" #include "ps/GameSetup/Atlas.h" @@ -577,6 +578,9 @@ ScriptEngine scriptEngine; CXeromyces::Startup(); + // Initialise the global thread pool at this point (JS & Profiler2 are set up). + ThreadPool::TaskManager::Initialise(); + if (ATLAS_RunIfOnCmdLine(args, false)) { CXeromyces::Terminate(); @@ -712,6 +716,7 @@ ATLAS_RunIfOnCmdLine(args, true); #endif + ThreadPool::TaskManager::Instance().ClearQueue(); CXeromyces::Terminate(); } Index: source/ps/Future.h =================================================================== --- /dev/null +++ source/ps/Future.h @@ -0,0 +1,326 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_FUTURE +#define INCLUDED_FUTURE + +#include "ps/FutureForward.h" + +#include +#include +#include +#include +#include + +template +class PackagedTask; + +/** + * Corresponds to std::future. + * Unlike std::future, Future can request the cancellation of the task that would produce the result. + * This makes it more similar to Java's CancellableTask or C#'s Task. + * The name Future was kept over Task so it would be more familiar to C++ users, + * but this all should be revised once Concurrency TS wraps up. + * + * Future is _not_ thread-safe. Call it from a single thread or ensure synchronization externally. + * + * The destructor is never blocking. The promise may still be running on destruction. + * TODO: + * - Handle exceptions. + */ +template +class Future +{ + template + friend class PackagedTask; + + static constexpr bool VoidResult = std::is_same_v; + + enum class Status + { + PENDING, + STARTED, + DONE, + CANCELED + }; + + template + class SharedStateResult + { + public: + void ResetResult() + { + if (m_HasResult) + m_Result.m_Result.~ResultType(); + m_HasResult = false; + } + + union Result + { + std::aligned_storage_t m_Bytes; + ResultType m_Result; + Result() : m_Bytes() {}; + ~Result() {}; + }; + // We don't use Result directly so the result doesn't have to be default constructible. + Result m_Result; + bool m_HasResult = false; + }; + + // Don't have m_Result for void ReturnType + template<> + class SharedStateResult + { + }; + + class SharedState : public SharedStateResult + { + public: + SharedState(std::function&& func) : m_Func(func) {} + ~SharedState() + { + // For safety, wait on started task completion, but not on pending ones (auto-cancelled). + if (!Cancel()) + { + Wait(); + Cancel(); + } + if constexpr (!VoidResult) + SharedStateResult::ResetResult(); + } + + SharedState(const SharedState&) = delete; + SharedState(SharedState&&) = delete; + + bool IsDoneOrCanceled() const + { + return m_Status == Status::DONE || m_Status == Status::CANCELED; + } + + void Wait() + { + // Fast path: we're already done. + if (IsDoneOrCanceled()) + return; + // Slow path: we aren't done when we run the above check. Lock and wait until we are. + std::unique_lock lock(m_Mutex); + m_ConditionVariable.wait(lock, [this]() -> bool { return IsDoneOrCanceled(); }); + } + + /** + * If the task is pending, cancel it by marking it as done. + * @return true if the task was indeed cancelled, false otherwise (the task is running or already done). + */ + bool Cancel() + { + Status expected = Status::PENDING; + bool cancelled = m_Status.compare_exchange_strong(expected, Status::CANCELED); + // If we're done, invalidate, if we're pending, atomically cancel, otherwise fail. + if (cancelled || m_Status == Status::DONE) + { + if (m_Status == Status::DONE) + m_Status = Status::CANCELED; + if constexpr (!VoidResult) + SharedStateResult::ResetResult(); + m_ConditionVariable.notify_all(); + return cancelled; + } + return false; + } + + /** + * Move the result away from the shared state, mark the future invalid. + */ + template + std::enable_if_t, ResultType> GetResult() + { + // The caller must ensure that this is only called if we have a result. + ENSURE(SharedStateResult::m_HasResult); + m_Status = Status::CANCELED; + SharedStateResult::m_HasResult = false; + if constexpr (!VoidResult) + return std::move(SharedStateResult::m_Result.m_Result); + else + return; + } + + std::atomic m_Status = Status::PENDING; + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; + + std::function m_Func; + }; + +public: + Future() {}; + Future(const Future& o) = delete; + Future(Future&&) = default; + Future& operator=(Future&&) = default; + ~Future() {} + + /** + * Make the future wait for the result of @a func. + */ + template + PackagedTask Wrap(T&& func); + + class BadFutureAccess : public std::exception + { + virtual const char* what() const noexcept + { + return "Tried to access the result of a future that was never wrapped or a cancelled future."; + } + }; + + /** + * Move the result out of the future, and invalidate the future. + */ + template + std::enable_if_t, ResultType> Get() + { + if (!m_SharedState) + throw BadFutureAccess(); + Wait(); + if constexpr (VoidResult) + return; + else + { + if (m_SharedState->m_Status == Status::CANCELED) + { + // We were cancelled or never wrapped, throw. + throw BadFutureAccess(); + } + // This mark the state invalid - can't call Get again. + return m_SharedState->GetResult(); + } + } + + /** + * @return true if the shared state is valid and has a result (i.e. Get can be called). + */ + bool IsReady() const + { + return !!m_SharedState && m_SharedState->m_Status == Status::DONE; + } + + /** + * @return true if the future has a shared state and it's not been invalidated, ie. pending, started or done. + */ + bool Valid() const + { + return !!m_SharedState && m_SharedState->m_Status != Status::CANCELED; + } + + void Wait() + { + if (Valid()) + m_SharedState->Wait(); + } + + /** + * Cancels the task, waiting if the task is currently started. + * Use this function over Cancel() if you need to ensure determinism (i.e. in the simulation). + * @see Cancel. + */ + void CancelOrWait() + { + if (!Valid()) + return; + if (!m_SharedState->Cancel()) + Wait(); + m_SharedState.reset(); + } + + /** + * Cancels the task (without waiting). + * The result is always invalid, even if the task had completed before. + * Note that this cannot stop started tasks. + */ + void Cancel() + { + if (m_SharedState) + m_SharedState->Cancel(); + m_SharedState.reset(); + } +protected: + std::shared_ptr m_SharedState; +}; + +/** + * Corresponds somewhat to std::packaged_task. + * Like packaged_task, this holds a function acting as a promise. + * This type is mostly just the shared state and the call operator, + * handling the promise & continuation logic. + */ +template +class PackagedTask +{ + static constexpr bool VoidResult = std::is_same_v; +public: + PackagedTask() = delete; + PackagedTask(const std::shared_ptr::SharedState>& ss) : m_SharedState(ss) {} + + void operator()() + { + typename Future::Status expected = Future::Status::PENDING; + if (!m_SharedState->m_Status.compare_exchange_strong(expected, Future::Status::STARTED)) + return; + + if constexpr (VoidResult) + m_SharedState->m_Func(); + else + { + // To avoid UB, explicitly placement-new the value. + new (&m_SharedState->m_Result) ResultType{std::move(m_SharedState->m_Func())}; + m_SharedState->m_HasResult = true; + } + + // Because we might have threads waiting on us, we need to make sure that they either: + // - don't wait on our condition variable + // - receive the notification when we're done. + // This requires locking the mutex (@see Wait). + { + std::lock_guard lock(m_SharedState->m_Mutex); + m_SharedState->m_Status = Future::Status::DONE; + } + + m_SharedState->m_ConditionVariable.notify_all(); + + // We no longer need the shared state, drop it immediately. + m_SharedState.reset(); + } + + void Cancel() + { + m_SharedState->Cancel(); + m_SharedState.reset(); + } + +protected: + std::shared_ptr::SharedState> m_SharedState; +}; + +template +template +PackagedTask Future::Wrap(T&& func) +{ + static_assert(std::is_convertible_v, ResultType>, "The return type of the wrapped function cannot be converted to the type of the Future."); + m_SharedState = std::make_shared(std::move(func)); + return PackagedTask(m_SharedState); +} + +#endif // INCLUDED_FUTURE Index: source/ps/FutureForward.h =================================================================== --- /dev/null +++ source/ps/FutureForward.h @@ -0,0 +1,29 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_FUTURE_FORWARD +#define INCLUDED_FUTURE_FORWARD + +/** + * Lightweight header / forward declarations for Future. + * Include this in your header files where possible. + */ + +template +class Future; + +#endif // INCLUDED_FUTURE_FORWARD Index: source/ps/GameSetup/GameSetup.cpp =================================================================== --- source/ps/GameSetup/GameSetup.cpp +++ source/ps/GameSetup/GameSetup.cpp @@ -68,6 +68,7 @@ #include "ps/Profiler2.h" #include "ps/Pyrogenesis.h" // psSetLogDir #include "ps/scripting/JSInterface_Console.h" +#include "ps/ThreadPool.h" #include "ps/TouchInput.h" #include "ps/UserReport.h" #include "ps/Util.h" Index: source/ps/ThreadPool.h =================================================================== --- /dev/null +++ source/ps/ThreadPool.h @@ -0,0 +1,132 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_THREADPOOL +#define INCLUDED_THREADPOOL + +#include "ps/Future.h" + +#include +#include + +class TestThreadPool; +class CConfigDB; + +namespace ThreadPool +{ +class TaskManager; +class WorkerThread; + +enum class Priority +{ + NORMAL, + LOW +}; + +/** + * Opaque struct to indicate affinity to a given worker. + */ +struct WorkerAffinity +{ + friend class TaskManager; +public: + bool operator==(const WorkerAffinity& o) const { return m_Index == o.m_Index; } +protected: + constexpr WorkerAffinity() = default; + constexpr WorkerAffinity(u16 i) : m_Index(i) {} + u16 m_Index = -1; +}; + +/** + * The task manager class is the meat of the thread pool. + * It creates all worker threads on construction. + * To push work, query the appropriate executor. + * See implementation for additional comments. + */ +class TaskManager +{ + friend class WorkerThread; + template + friend class GlobalExecutor; +public: + TaskManager(); + TaskManager(size_t numberOfWorkers); + ~TaskManager(); + TaskManager(const TaskManager&) = delete; + TaskManager(TaskManager&&) = delete; + + static void Initialise(); + static TaskManager& Instance(); + + /** + * Clears all tasks from the queue. This blocks on started tasks. + */ + void ClearQueue(); + + /** + * @return the number of threaded workers available. + */ + size_t GetNbOfWorkers() const; + + static inline constexpr WorkerAffinity NoAffinity = WorkerAffinity(); + + /** + * Return a value usable to set affinity to a worker. + */ + WorkerAffinity GetAffinity() const; + + /** + * Return all distinct worker affinities. + */ + const std::vector& WorkerAffinities() const; + + /** + * Push a task to be executed. + */ + template + Future> PushTask(T&& func, Priority priority = Priority::NORMAL) + { + Future> ret; + DoPushTask(std::move(ret.Wrap(std::move(func))), NoAffinity, priority); + return ret; + } + + /** + * Push a task with worker affinity. Only that worker will execute that task. + */ + template + Future> PushTask(T&& func, WorkerAffinity affinity, Priority priority = Priority::NORMAL) + { + Future> ret; + DoPushTask(std::move(ret.Wrap(std::move(func))), affinity, priority); + return ret; + } + +private: + void DoPushTask(std::function&& task, WorkerAffinity affinity, Priority priority); + // Preserves encapsulation of WorkerAffinity for ::Impl. + static WorkerAffinity CreateWorkerAffinity(u16 i) + { + return WorkerAffinity(i); + } + + class Impl; + std::unique_ptr m; +}; +} // namespace ThreadPool + +#endif // INCLUDED_THREADPOOL Index: source/ps/ThreadPool.cpp =================================================================== --- /dev/null +++ source/ps/ThreadPool.cpp @@ -0,0 +1,368 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "precompiled.h" + +#include "ThreadPool.h" + +#include "lib/debug.h" +#include "maths/MathUtil.h" +#include "ps/CLogger.h" +#include "ps/ConfigDB.h" +#include "ps/Threading.h" +#include "ps/ThreadUtil.h" +#include "ps/Profiler2.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace ThreadPool +{ +/** + * Minimum number of thread pool workers. + */ +static constexpr size_t MIN_THREADS = 3; + +/** + * Maximum number of thread pool workers. + */ +static constexpr size_t MAX_THREADS = 32; + +std::unique_ptr g_TaskManager; + +class Thread; + +struct QueueItem +{ + QueueItem(std::function&& f, WorkerAffinity a) : m_Task(std::move(f)), m_Affinity(a) {} + std::function m_Task; + WorkerAffinity m_Affinity; +}; +} + +/** + * Light wrapper around std::thread. Ensures Join has been called. + */ +class ThreadPool::Thread +{ +public: + Thread() = default; + Thread(const Thread&) = delete; + Thread(Thread&&) = delete; + + template + void Start(T* object) + { + m_Thread = std::thread(Threading::HandleExceptions>::Wrapper, object); + } + template + static void DoStart(T* object); + +protected: + ~Thread() + { + ENSURE(!m_Thread.joinable()); + } + + std::thread m_Thread; + std::atomic m_Kill = false; +}; + +/** + * Worker thread: process the taskManager queues until killed. + */ +class ThreadPool::WorkerThread : public Thread +{ +public: + WorkerThread(ThreadPool::TaskManager::Impl& taskManager, WorkerAffinity affinity); + ~WorkerThread(); + + /** + * Mark the worker for destruction. May need to be woken up. + */ + void Stop(); + + std::atomic m_HasWork = false; +protected: + void RunUntilDeath(); + + WorkerAffinity m_Affinity; + ThreadPool::TaskManager::Impl& m_TaskManager; +}; + +/** + * PImpl-ed implementation of the threadpool's task manager. + * + * The normal priority queue is processed first, the low priority only if there are no higher-priority tasks + */ +class ThreadPool::TaskManager::Impl +{ + friend class TaskManager; + friend class WorkerThread; +public: + Impl(TaskManager& backref); + ~Impl() + { + ClearQueue(); + for (WorkerThread& worker : m_Workers) + worker.Stop(); + m_ConditionVariable.notify_all(); + m_Workers.clear(); + } + + /** + * 2-phase init to avoid having to think too hard about the order of class members. + */ + void SetupWorkers(size_t numberOfWorkers); + + /** + * Push a task on the global queue. + * Takes ownership of @a task. + * May be called from any thread. + */ + void PushTask(std::function&& task, WorkerAffinity affinity, Priority priority); + + WorkerAffinity GetAffinity() const; + +protected: + void ClearQueue(); + + // Back reference (keep this first). + TaskManager& m_TaskManager; + + std::mutex m_GlobalMutex; + std::mutex m_GlobalLowPriorityMutex; + std::deque m_GlobalQueue; + std::deque m_GlobalLowPriorityQueue; + + // All workers share a mutex & a condition variable. + std::mutex m_WorkersMutex; + std::condition_variable m_ConditionVariable; + + // Ideally this would be a vector, since it does get iterated, but that requires movable types. + std::list m_Workers; + // TODO: won't be necessary when m_Workers can be a vector + std::vector*> m_WorkersWorkToggle; + // Cached. + std::vector m_WorkerAffinities; + + // Round-robin counter for GetWorker. + mutable size_t m_RoundRobinIdx = 0; +}; + +ThreadPool::TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1) +{ +} + +ThreadPool::TaskManager::TaskManager(size_t numberOfWorkers) +{ + m = std::make_unique(*this); + numberOfWorkers = Clamp(numberOfWorkers, MIN_THREADS, MAX_THREADS); + m->SetupWorkers(numberOfWorkers); +} + +ThreadPool::TaskManager::~TaskManager() {} + +ThreadPool::TaskManager::Impl::Impl(TaskManager& backref) + : m_TaskManager(backref) +{ +} + +void ThreadPool::TaskManager::Impl::SetupWorkers(size_t numberOfWorkers) +{ + for (size_t i = 0; i < numberOfWorkers; ++i) + { + m_Workers.emplace_back(*this, WorkerAffinity(i)); + m_WorkersWorkToggle.emplace_back(&m_Workers.back().m_HasWork); + m_WorkerAffinities.emplace_back(ThreadPool::TaskManager::CreateWorkerAffinity(i)); + // Register the thread on Profiler2 and name it, for convenience. + // (This is called here because RunUntilDeath is templated and the static value won't work as expected then). + m_TaskManager.PushTask([]() { + // The profiler does better if the names are unique. + static std::atomic n = 0; + std::string name = "ThreadPool #" + std::to_string(n++); + debug_SetThreadName(name.c_str()); + g_Profiler2.RegisterCurrentThread(name); + }, WorkerAffinity(i), Priority::NORMAL).Wait(); + } +} + +void ThreadPool::TaskManager::ClearQueue() { m->ClearQueue(); } +void ThreadPool::TaskManager::Impl::ClearQueue() +{ + { + std::lock_guard lock(m_GlobalMutex); + m_GlobalQueue.clear(); + } + { + std::lock_guard lock(m_GlobalLowPriorityMutex); + m_GlobalLowPriorityQueue.clear(); + } + for (std::atomic* hasWork : m_WorkersWorkToggle) + *hasWork = false; +} + +size_t ThreadPool::TaskManager::GetNbOfWorkers() const +{ + return m->m_Workers.size(); +} + +ThreadPool::WorkerAffinity ThreadPool::TaskManager::GetAffinity() const +{ + return m->GetAffinity(); +} + +ThreadPool::WorkerAffinity ThreadPool::TaskManager::Impl::GetAffinity() const +{ + if (m_RoundRobinIdx >= m_Workers.size()) + m_RoundRobinIdx = 0; + return WorkerAffinity(m_RoundRobinIdx++); +} + +const std::vector& ThreadPool::TaskManager::WorkerAffinities() const +{ + return m->m_WorkerAffinities; +} + +void ThreadPool::TaskManager::DoPushTask(std::function&& task, WorkerAffinity affinity, Priority priority) +{ + m->PushTask(std::move(task), affinity, priority); +} + +void ThreadPool::TaskManager::Impl::PushTask(std::function&& task, WorkerAffinity affinity, Priority priority) +{ + std::mutex& mutex = priority == Priority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; + std::deque& queue = priority == Priority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; + + { + std::lock_guard lock(mutex); + queue.emplace_back(std::move(task), affinity); + } + + if (affinity == NoAffinity) + { + for (std::atomic* hasWork : m_WorkersWorkToggle) + *hasWork = true; + m_ConditionVariable.notify_one(); + } + else + { + *m_WorkersWorkToggle[affinity.m_Index] = true; + m_ConditionVariable.notify_all(); + } +} + +void ThreadPool::TaskManager::Initialise() +{ + if (!g_TaskManager) + g_TaskManager = std::make_unique(); +} + +ThreadPool::TaskManager& ThreadPool::TaskManager::Instance() +{ + ENSURE(g_TaskManager); + return *g_TaskManager; +} + +// Thread definition + +ThreadPool::WorkerThread::WorkerThread(ThreadPool::TaskManager::Impl& taskManager, WorkerAffinity affinity) + : m_TaskManager(taskManager), m_Affinity(affinity) +{ + Start(this); +} + +ThreadPool::WorkerThread::~WorkerThread() +{ + m_Kill = true; + if (m_Thread.joinable()) + m_Thread.join(); +} + +void ThreadPool::WorkerThread::Stop() +{ + m_Kill = true; +} + +void ThreadPool::WorkerThread::RunUntilDeath() +{ + std::function task; + bool hasTask = false; + std::unique_lock lock(m_TaskManager.m_WorkersMutex); + while (!m_Kill) + { + m_TaskManager.m_ConditionVariable.wait(lock, [this]() -> bool { + return m_HasWork || m_Kill; + }); + lock.unlock(); + + if (m_Kill) + break; + + // Reset to false, we'll set it to true if we actually found work. + // This makes us do one 'bonus' iteration at the end, but simplifies book-keeping. + m_HasWork = false; + hasTask = false; + + // Fetch work from the global queues if necessary. + { + // Particularly critical section since we're locking the global queue. + std::unique_lock globalLock(m_TaskManager.m_GlobalMutex); + for (std::deque::const_iterator it = m_TaskManager.m_GlobalQueue.begin(); it != m_TaskManager.m_GlobalQueue.end(); ++it) + if (it->m_Affinity == TaskManager::NoAffinity || it->m_Affinity == m_Affinity) + { + task = std::move(it->m_Task); + hasTask = true; + m_TaskManager.m_GlobalQueue.erase(it); + break; + } + } + if (!hasTask) + { + std::unique_lock globalLock(m_TaskManager.m_GlobalLowPriorityMutex); + for (std::deque::const_iterator it = m_TaskManager.m_GlobalLowPriorityQueue.begin(); it != m_TaskManager.m_GlobalLowPriorityQueue.end(); ++it) + if (it->m_Affinity == TaskManager::NoAffinity || it->m_Affinity == m_Affinity) + { + task = std::move(it->m_Task); + hasTask = true; + m_TaskManager.m_GlobalLowPriorityQueue.erase(it); + break; + } + } + if (!hasTask) + { + lock.lock(); + continue; + } + + m_HasWork = true; + task(); + lock.lock(); + } +} + +// Defined here - needs access to derived types. +template +void ThreadPool::Thread::DoStart(T* object) +{ + std::invoke(callable, object); +} Index: source/ps/tests/test_Future.h =================================================================== --- /dev/null +++ source/ps/tests/test_Future.h @@ -0,0 +1,133 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/Future.h" +#include "ps/ThreadPool.h" + +#include + +class TestFuture : public CxxTest::TestSuite +{ +public: + void test_future_basic() + { + int counter = 0; + { + Future noret; + std::function task = noret.Wrap([&counter]() mutable { counter++; }); + task(); + TS_ASSERT_EQUALS(counter, 1); + } + + { + Future noret; + { + std::function task = noret.Wrap([&counter]() mutable { counter++; }); + // Auto-cancels the task. + } + } + TS_ASSERT_EQUALS(counter, 1); + } + + void test_future_return() + { + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + // Convertible type. + { + Future future; + std::function task = future.Wrap([]() -> u8 { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + static int destroyed = 0; + // No trivial constructor or destructor. Also not copiable. + struct NonDef + { + NonDef() = delete; + NonDef(int input) : value(input) {}; + NonDef(const NonDef&) = delete; + NonDef(NonDef&& o) + { + value = o.value; + o.value = 0; + } + ~NonDef() { if (value != 0) destroyed++; } + int value = 0; + }; + TS_ASSERT_EQUALS(destroyed, 0); + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get().value, 1); + } + TS_ASSERT_EQUALS(destroyed, 1); + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + } + TS_ASSERT_EQUALS(destroyed, 1); + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + future.Cancel(); + future.Wait(); + TS_ASSERT_THROWS(future.Get(), const Future::BadFutureAccess&); + } + TS_ASSERT_EQUALS(destroyed, 1); + } + + void test_future_moving() + { + Future future; + std::function function; + + // Set things up so all temporaries passed into the futures will be reset to obviously invalid memory. + std::aligned_storage_t), alignof(Future)> futureStorage; + std::aligned_storage_t), alignof(std::function)> functionStorage; + Future* f = &future; // CppCheck doesn't read placement new correctly, do this to silence errors. + std::function* c = &function; + f = new (&futureStorage) Future{}; + c = new (&functionStorage) std::function{}; + + *c = []() { return 7; }; + std::function task = f->Wrap(std::move(*c)); + + future = std::move(*f); + function = std::move(*c); + + // Destroy and clear the memory + f->~Future(); + c->~function(); + memset(&futureStorage, 0xFF, sizeof(decltype(futureStorage))); + memset(&functionStorage, 0xFF, sizeof(decltype(functionStorage))); + + // Let's move the packaged task while at it. + std::function task2 = std::move(task); + task2(); + TS_ASSERT_EQUALS(future.Get(), 7); + } +}; Index: source/ps/tests/test_ThreadPool.h =================================================================== --- /dev/null +++ source/ps/tests/test_ThreadPool.h @@ -0,0 +1,137 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/Future.h" +#include "ps/ThreadPool.h" + +#include +#include +#include + +class TestThreadPool : public CxxTest::TestSuite +{ +public: + void test_basic() + { + ThreadPool::TaskManager& taskManager = ThreadPool::TaskManager::Instance(); + // There is a minimum of 3. + TS_ASSERT(taskManager.GetNbOfWorkers() >= 3); + + std::atomic tasks_run = 0; + auto increment_run = [&tasks_run]() { tasks_run++; }; + Future future = taskManager.PushTask(increment_run); + future.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + + // Test Execute. + std::condition_variable cv; + std::mutex mutex; + std::atomic go = false; + future = taskManager.PushTask([&]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() -> bool { return go; }); + lock.unlock(); + increment_run(); + lock.lock(); + go = false; + lock.unlock(); + cv.notify_all(); + }); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + std::unique_lock lock(mutex); + go = true; + lock.unlock(); + cv.notify_all(); + lock.lock(); + cv.wait(lock, [&go]() -> bool { return !go; }); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + // Wait on the future before the mutex/cv go out of scope. + future.Wait(); + } + + void test_PoolExecutor() + { + ThreadPool::TaskManager& taskManager = ThreadPool::TaskManager::Instance(); + // Push a blocked task on all workers. + std::condition_variable cv; + std::mutex mutex; + bool go = false; + std::atomic tasks_run = 0; + std::vector> preTasks; + for (ThreadPool::WorkerAffinity affinity : taskManager.WorkerAffinities()) + preTasks.emplace_back(taskManager.PushTask([&tasks_run, &cv, &mutex, &go]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() { return go; }); + tasks_run++; + }, affinity)); + // Then push general tasks + auto increment_run = [&tasks_run]() { tasks_run++; }; + Future future = taskManager.PushTask(increment_run); + Future futureLow = taskManager.PushTask(increment_run, ThreadPool::Priority::LOW); + // Then start the pre-tasks, which ought start all of them. + { + std::unique_lock lock(mutex); + go = true; + } + cv.notify_all(); + future.Wait(); + futureLow.Wait(); + for (Future& task : preTasks) + task.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), preTasks.size() + 2); + // Also check with no waiting expected. + taskManager.PushTask(increment_run).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), preTasks.size() + 3); + taskManager.PushTask(increment_run, ThreadPool::Priority::LOW).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), preTasks.size() + 4); + } + + void test_Load() + { + ThreadPool::TaskManager& taskManager = ThreadPool::TaskManager::Instance(); + +#define ITERATIONS 100000 + std::vector> futures; + futures.resize(ITERATIONS); + std::vector values(ITERATIONS); + + auto f1 = taskManager.PushTask([&taskManager, &futures]() { + for (u32 i = 0; i < ITERATIONS; i+=3) + futures[i] = taskManager.PushTask([]() { return 5; }); + }); + + auto f2 = taskManager.PushTask([&taskManager, &futures]() { + for (u32 i = 1; i < ITERATIONS; i+=3) + futures[i] = taskManager.PushTask([]() { return 5; }, ThreadPool::Priority::LOW); + }); + + auto f3 = taskManager.PushTask([&taskManager, &futures]() { + for (u32 i = 2; i < ITERATIONS; i+=3) + futures[i] = taskManager.PushTask([]() { return 5; }); + }); + + f1.Wait(); + f2.Wait(); + f3.Wait(); + + for (size_t i = 0; i < ITERATIONS; ++i) + TS_ASSERT_EQUALS(futures[i].Get(), 5); +#undef ITERATIONS + } +}; Index: source/test_setup.cpp =================================================================== --- source/test_setup.cpp +++ source/test_setup.cpp @@ -36,6 +36,7 @@ #include "lib/timer.h" #include "lib/sysdep/sysdep.h" #include "ps/Profiler2.h" +#include "ps/ThreadPool.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptEngine.h" #include "scriptinterface/ScriptContext.h" @@ -84,11 +85,14 @@ m_ScriptEngine = new ScriptEngine; g_ScriptContext = ScriptContext::CreateContext(); + ThreadPool::TaskManager::Initialise(); + return true; } virtual bool tearDownWorld() { + ThreadPool::TaskManager::Instance().ClearQueue(); g_ScriptContext.reset(); SAFE_DELETE(m_ScriptEngine); g_Profiler2.Shutdown();