Index: binaries/data/config/default.cfg =================================================================== --- binaries/data/config/default.cfg +++ binaries/data/config/default.cfg @@ -41,6 +41,9 @@ ; Default server name or IP to use in multiplayer multiplayerserver = "127.0.0.1" +; # of threads for the thread pool. -1 means "pick automatically", 0-32 start a number of auxiliary threads. +thread_pool_threads = -1; + ; Force a particular resolution. (If these are 0, the default is ; to keep the current desktop resolution in fullscreen mode or to ; use 1024x768 in windowed mode.) Index: source/graphics/MapGenerator.h =================================================================== --- source/graphics/MapGenerator.h +++ source/graphics/MapGenerator.h @@ -20,6 +20,7 @@ #include "lib/posix/posix_pthread.h" #include "ps/FileIo.h" +#include "ps/Future.h" #include "ps/TemplateLoader.h" #include "scriptinterface/ScriptInterface.h" @@ -28,7 +29,6 @@ #include #include #include -#include class CMapGeneratorWorker; @@ -231,7 +231,7 @@ /** * Holds the mapgeneration thread identifier. */ - std::thread m_WorkerThread; + Future m_WorkerThread; /** * Avoids thread synchronization issues. Index: source/graphics/MapGenerator.cpp =================================================================== --- source/graphics/MapGenerator.cpp +++ source/graphics/MapGenerator.cpp @@ -30,7 +30,7 @@ #include "ps/CLogger.h" #include "ps/FileIo.h" #include "ps/Profile.h" -#include "ps/Threading.h" +#include "ps/ThreadPool.h" #include "ps/scripting/JSInterface_VFS.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptContext.h" @@ -68,9 +68,8 @@ CMapGeneratorWorker::~CMapGeneratorWorker() { - // Wait for thread to end - if (m_WorkerThread.joinable()) - m_WorkerThread.join(); + // Cancel or wait for the task to end. + m_WorkerThread.CancelOrWait(); } void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::string& settings) @@ -83,13 +82,12 @@ m_Settings = settings; // Launch the worker thread - m_WorkerThread = std::thread(Threading::HandleExceptions::Wrapper, this); + m_WorkerThread = ThreadPool::TaskManager::Instance().GetWorker().Submit([this]() { RunThread(this); }); } void CMapGeneratorWorker::RunThread(CMapGeneratorWorker* self) { - debug_SetThreadName("MapGenerator"); - g_Profiler2.RegisterCurrentThread("MapGenerator"); + PROFILE2("Map Generation"); shared_ptr mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE); Index: source/lib/sysdep/os/osx/odbg.cpp =================================================================== --- source/lib/sysdep/os/osx/odbg.cpp +++ source/lib/sysdep/os/osx/odbg.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2010 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -28,6 +28,8 @@ #include "lib/sysdep/sysdep.h" #include "lib/debug.h" +#include + void* debug_GetCaller(void* UNUSED(context), const wchar_t* UNUSED(lastFuncToSkip)) { return NULL; @@ -43,7 +45,7 @@ return ERR::NOT_SUPPORTED; } -void debug_SetThreadName(char const* UNUSED(name)) +void debug_SetThreadName(char const* name) { - // Currently unimplemented + pthread_setname_np(name); } Index: source/main.cpp =================================================================== --- source/main.cpp +++ source/main.cpp @@ -58,6 +58,7 @@ #include "ps/UserReport.h" #include "ps/Util.h" #include "ps/VideoMode.h" +#include "ps/ThreadPool.h" #include "ps/World.h" #include "ps/GameSetup/GameSetup.h" #include "ps/GameSetup/Atlas.h" @@ -576,6 +577,9 @@ ScriptEngine scriptEngine; CXeromyces::Startup(); + // Initialise the global thread pool at this point (JS & Profiler2 are set up). + ThreadPool::TaskManager::Initialise(); + if (ATLAS_RunIfOnCmdLine(args, false)) { CXeromyces::Terminate(); @@ -710,6 +714,7 @@ ATLAS_RunIfOnCmdLine(args, true); #endif + ThreadPool::TaskManager::Instance().Clear(); CXeromyces::Terminate(); } Index: source/ps/Future.h =================================================================== --- /dev/null +++ source/ps/Future.h @@ -0,0 +1,350 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_FUTURE +#define INCLUDED_FUTURE + +#include "ps/FutureForward.h" + +#include +#include +#include +#include +#include + +class IExecutor; + +/** + * Corresponds somewhat to std::packaged_task. + * Like packaged_task, this holds a function acting as a promise. + * This type is mostly just the shared state and the call operator, + * handling the promise & continuation logic. + */ +template +class PackagedTask +{ + friend class Future; + static constexpr bool VoidReturn = std::is_same_v; + + enum class Status + { + PENDING, + STARTED, + DONE, + INVALID + }; + + class SharedState + { + public: + SharedState(std::function&& func) : m_Func(func) {} + ~SharedState() + { + // For safety, wait on started task completion, but not on pending ones (auto-cancelled). + if (!Cancel()) + Wait(); + } + + bool IsReady() const + { + return m_Status == Status::DONE || m_Status == Status::INVALID; + } + + void Wait() + { + // Fast path: we're already done. + if (IsReady()) + return; + // Slow path: we aren't done when we run the above check. Lock and wait until we are. + std::unique_lock lock(m_Mutex); + m_ConditionVariable.wait(lock, [this]() -> bool { return m_Status == Status::DONE; }); + } + + /** + * If the task is pending, cancel it by marking it as done. + * @return true if the task was indeed cancelled, false otherwise (the task is running or already done). + */ + bool Cancel() + { + if (IsReady()) + return false; + Status expected = Status::PENDING; + // If we're pending, try atomically setting to done. + return m_Status.compare_exchange_strong(expected, Status::INVALID); + } + + // Implemented below, accesses into IExecutor. + void Continue(); + + std::shared_ptr m_Executor; + std::atomic m_Status = Status::PENDING; + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; + + std::function m_Func; + std::unique_ptr> m_Continuation; + + struct Empty {}; + union Result + { + std::aligned_storage_t m_Bytes; + Ret m_Result; + Result() : m_Bytes() {}; + }; + // TODO C++20: [[no_unique_address]] this for a minor memory optimisation. + // We don't use Result directly so the result doesn't have to be default constructible. + std::conditional_t m_Result; + }; + +public: + PackagedTask(std::shared_ptr ss) : m_SharedState(ss) {} + PackagedTask(const PackagedTask&) = default; + PackagedTask(PackagedTask&&) = default; + + ~PackagedTask() {} + + void operator()() + { + Status expected = Status::PENDING; + if (!m_SharedState->m_Status.compare_exchange_strong(expected, Status::STARTED)) + return; + + if constexpr (VoidReturn) + m_SharedState->m_Func(); + else + // To avoid UB, explicitly placement-new the value. + new (&m_SharedState->m_Result) Ret{m_SharedState->m_Func()}; + + if (m_SharedState->m_Continuation) + { + m_SharedState->Continue(); + return; + } + // Because we might have threads waiting on us, we need to make sure that they either: + // - don't wait on our condition variable + // - receive the notification when we're done. + // This requires locking the mutex (@see Wait). + { + std::unique_lock lock(m_SharedState->m_Mutex); + m_SharedState->m_Status = Status::DONE; + } + + m_SharedState->m_ConditionVariable.notify_all(); + } + +protected: + std::shared_ptr m_SharedState; +}; + +/** + * Corresponds to std::future (or more aptly the P1054r0 "ContinuableFuture" concept). + * Unlike std::future, Future can request the cancellation of the task that would produce the result. + * This makes it more similar to Java's CancellableTask or C#'s Task. + * The name Future was kept over Task so it would be more familiar to C++ users, + * but this all should be revised once Concurrency TS wraps up. + * + * The destructor is never blocking. The promise may still be running on destruction. + * TODO: + * - Handle exceptions. + */ +template +class Future +{ + friend class PackagedTask; + static constexpr bool VoidReturn = std::is_same_v; +public: + Future() = default; + /** + * Only copyable before wrapping (and a no-op). + */ + Future(const Future&) + { + ENSURE(!Valid()); + } + Future(Future&& o) = default; + ~Future() = default; + + Future& operator=(Future&& o) = default; + + /** + * Make the future wait for the result of @a func. + */ + template + PackagedTask Wrap(const std::shared_ptr& exec, T&& func) + { + static_assert(std::is_convertible_v, Ret>, "The return type of the wrapped function cannot be converted to the type of the Future."); + m_SharedState = std::make_shared::SharedState>(std::move(func)); + m_SharedState->m_Executor = exec; + return PackagedTask(m_SharedState); + } + + /** + * Create a futureless executor - meant for testing, cannot be continued. + */ + template + PackagedTask Wrap(T&& func) + { + static_assert(std::is_convertible_v, Ret>, "The return type of the wrapped function cannot be converted to the type of the Future."); + m_SharedState = std::make_shared::SharedState>(std::move(func)); + return PackagedTask(m_SharedState); + } + + // Quick template to get the result type of a lambda call, possibly taking no argument. + template + struct ReturnType { using type = std::invoke_result_t; }; + template + struct ReturnType { using type = std::invoke_result_t; }; + /** + * Add a continuation to this future, taking the result of the future as input. + */ + template + auto Then(T&& func) && -> Future::type> + { + ENSURE(m_SharedState->m_Executor); + using ReturnType = typename ReturnType::type; + Future fut; + PackagedTask task = fut.Wrap(std::move(m_SharedState->m_Executor), [func=func, input=m_SharedState]() mutable -> ReturnType { + if constexpr (!VoidReturn) + { + auto&& val = std::move(input->m_Result.m_Result); + // Clear our link to the 'continued' shared state here, so it gets deleted before we run. + input.reset(); + return func(std::move(val)); + } + else + return func(); + }); + { + std::unique_lock lock(m_SharedState->m_Mutex); + m_SharedState->m_Continuation = std::make_unique>(std::move(task)); + } + if (m_SharedState->m_Status == PackagedTask::Status::DONE) + m_SharedState->Continue(); + return fut; + } + + class BadFutureAccess : public std::exception + { + virtual const char* what() const noexcept + { + return "Tried to access the result of a future that was never wrapped or a cancelled future."; + } + }; + + Ret Get() + { + Wait(); + if constexpr (VoidReturn) + return; + else + { + if (m_SharedState->m_Status == PackagedTask::Status::INVALID) + { + // We were cancelled or never wrapped, throw. + throw BadFutureAccess(); + } + return m_SharedState->m_Result.m_Result; + } + } + + bool IsReady() const + { + return !!m_SharedState && m_SharedState->IsReady(); + } + + bool Valid() const + { + return !!m_SharedState; + } + + void Wait() + { + if (Valid()) + m_SharedState->Wait(); + } + + void CancelOrWait() + { + if (!Valid()) + return; + if (!Cancel()) + Wait(); + } + + /** + * Cancels the task. The result is always invalid, even if the task had completed before. + * Note that this cannot stop started tasks. + * @return True if the task was cancelled, false otherwise. + */ + bool Cancel() + { + if (!Valid()) + return false; + return m_SharedState->Cancel(); + } +protected: + std::shared_ptr::SharedState> m_SharedState; +}; + +/** + * Executor interface. An executor is "something that schedules work". + * TODO C++23: probably replaceable with standard versions when that lands in C++23 or later. + */ +class IExecutor +{ +public: + virtual ~IExecutor() = default; + /** + * Submit a task in a fire-and-forget manner. The task is not guaranteed to run before program exit. + */ + void Execute(std::function&& func) + { + ExecuteTask(std::move(func)); + } + /** + * Submit a task and get a future that will hold its result. + * The task is not guaranteed to run before program exit. + * Note that if you discard the future, the task is auto-cancelled, + * so use Execute for fire-and-forget tasks. + */ + template + [[nodiscard]] Future> Submit(T&& func) + { + Future> ret; + ExecuteTask(ret.Wrap(Clone(), std::move(func))); + return ret; + } + +protected: + // Do the actual work. + virtual void ExecuteTask(std::function&& task) = 0; + virtual std::shared_ptr Clone() const = 0; +}; + +template +void PackagedTask::SharedState::Continue() +{ + { + std::unique_lock lock(m_Mutex); + m_Status = PackagedTask::Status::INVALID; + m_Executor->Execute(std::move(*m_Continuation)); + } + // Any thread that was waiting on the result of this shared state is likely in an invalid state -> wake them up to fail. + m_ConditionVariable.notify_all(); +} + +#endif // INCLUDED_FUTURE Index: source/ps/FutureForward.h =================================================================== --- /dev/null +++ source/ps/FutureForward.h @@ -0,0 +1,29 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_FUTURE_FORWARD +#define INCLUDED_FUTURE_FORWARD + +/** + * Lightweight header / forward declarations for Future. + * Include this in your header files where possible. + */ + +template +class Future; + +#endif // INCLUDED_FUTURE_FORWARD Index: source/ps/GameSetup/GameSetup.cpp =================================================================== --- source/ps/GameSetup/GameSetup.cpp +++ source/ps/GameSetup/GameSetup.cpp @@ -68,6 +68,7 @@ #include "ps/Profiler2.h" #include "ps/Pyrogenesis.h" // psSetLogDir #include "ps/scripting/JSInterface_Console.h" +#include "ps/ThreadPool.h" #include "ps/TouchInput.h" #include "ps/UserReport.h" #include "ps/Util.h" Index: source/ps/ThreadPool.h =================================================================== --- /dev/null +++ source/ps/ThreadPool.h @@ -0,0 +1,133 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_THREADPOOL +#define INCLUDED_THREADPOOL + +#include "ps/Future.h" + +#include +#include + +class TestThreadPool; +class CConfigDB; + +namespace ThreadPool +{ +class TaskManager; +class WorkerThread; + +enum class Priority +{ + NORMAL, + LOW +}; + +/** + * Execute work on the global queue. + * The low-priority queue has a differently-typed executor to highlight the different + * starvation guaranteeds. + */ +template +class GlobalExecutor final : public IExecutor +{ + friend class TaskManager; +protected: + GlobalExecutor(TaskManager& tm) : m_TaskManager(tm) {} + + virtual void ExecuteTask(std::function&& task) override; + virtual std::shared_ptr Clone() const override { return std::make_shared(*this); }; + + TaskManager& m_TaskManager; +}; + +/** + * Execute work directly a on thread's queue. + */ +class ThreadExecutor final : public IExecutor +{ + friend class EachThreadExecutor; + friend class TaskManager; +protected: + ThreadExecutor(WorkerThread& worker) : m_Worker(worker) {} + + virtual void ExecuteTask(std::function&& task) override; + virtual std::shared_ptr Clone() const override { return std::make_shared(*this); }; + + WorkerThread& m_Worker; +}; + +/** + * Provides a convenient interface to iterating all worker threads. + */ +class EachThreadExecutor +{ + friend class TaskManager; +public: + std::vector::iterator begin() { return m_Executors.begin(); } + std::vector::iterator end() { return m_Executors.end(); } +private: + std::vector m_Executors; +}; + +/** + * TaskManager. + */ +class TaskManager +{ + friend class WorkerThread; + template + friend class GlobalExecutor; +public: + TaskManager(size_t nbWorkers = 0); + ~TaskManager(); + TaskManager(const TaskManager&) = delete; + TaskManager(TaskManager&&) = delete; + + static void Initialise(); + static TaskManager& Instance(); + + /** + * Clears all tasks. This blocks on started tasks. + */ + void Clear(); + + size_t GetNbOfWorkers() const; + + /** + * The global executor assigns work to the global queue, not a specific worker. + */ + GlobalExecutor GetExecutor(); + GlobalExecutor GetLowPriorityExecutor(); + + /** + * Assigns work to a specific worker. + */ + ThreadExecutor GetWorker(); + + /** + * Returns an executor that can be used to start (optionally different) work on (optionally all) threads. + */ + EachThreadExecutor& GetAllWorkers(); + +private: + class Impl; + std::unique_ptr m; +}; +} // ThreadPool + +#endif // INCLUDED_THREADPOOL Index: source/ps/ThreadPool.cpp =================================================================== --- /dev/null +++ source/ps/ThreadPool.cpp @@ -0,0 +1,399 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "precompiled.h" + +#include "ThreadPool.h" + +#include "lib/debug.h" +#include "maths/MathUtil.h" +#include "ps/CLogger.h" +#include "ps/ConfigDB.h" +#include "ps/Threading.h" +#include "ps/ThreadUtil.h" +#include "ps/Profiler2.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace ThreadPool +{ +/** + * Minimum number of thread pool workers. + */ +static constexpr size_t MIN_THREADS = 3; + +/** + * Maximum number of thread pool workers. + */ +static constexpr size_t MAX_THREADS = 32; + +/** + * Returns the # of 'reversed priority' workers to use. + * See comment on avoiding starvation in TaskManager. + * At least one such worker will always exist. + */ +constexpr size_t GetNbOfReversedPriorityWorker(size_t nbWorkers) +{ + size_t ret = nbWorkers * 0.25; + return ret > 1 ? ret : 1; +} + +std::unique_ptr g_TaskManager; + +class Thread; +} + +using QueueItem = std::function; + +/** + * Light wrapper around std::thread. Ensures Join has been called. + */ +class ThreadPool::Thread +{ +public: + Thread() = default; + Thread(const Thread&) = delete; + Thread(Thread&&) = delete; + + template + void Start(T* object) + { + m_Thread = std::thread(Threading::HandleExceptions>::Wrapper, object); + } + template + static void DoStart(T* object); + +protected: + ~Thread() + { + ENSURE(!m_Thread.joinable()); + } + + std::thread m_Thread; + std::atomic m_Kill = false; +}; + +/** + * Thread with a work queue that it processes until it's destroyed. + */ +class ThreadPool::WorkerThread : public Thread +{ +public: + WorkerThread(ThreadPool::TaskManager::Impl& taskManager, bool reversePriority = false); + ~WorkerThread(); + + void Clear() + { + std::lock_guard lock(m_Mutex); + m_Queue.clear(); + } + + /** + * Add a task to this worker's queue. + * Takes ownership of the task. + * May be called from any thread. + */ + void PushTask(QueueItem&& task) + { + { + std::lock_guard lock(m_Mutex); + m_Queue.push_back(std::move(task)); + } + Wake(); + } + + /** + * @return true if the worker was idle, and now woke up. + */ + void MaybeWake() + { + std::lock_guard lock(m_Mutex); + if (m_Queue.empty()) + Wake(); + } + + void Wake() + { + m_ConditionVariable.notify_one(); + } + +protected: + template + void RunUntilDeath(); + + std::deque m_Queue; + + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; + ThreadPool::TaskManager::Impl& m_TaskManager; +}; + +/** + * PImpl-ed implementation of the threadpool's task manager. + * + * Avoiding starvation: + * To keep things simple while avoiding starvation, + * some workers will process the global queue before their own. + * These workers won't be returned when asking for a single-worker executor. + * The low-priority queue is not guaranteed to run. + */ +class ThreadPool::TaskManager::Impl +{ + friend class TaskManager; + friend class WorkerThread; +public: + Impl(TaskManager& backref, size_t nbWorkers); + ~Impl() + { + Clear(); + m_Workers.clear(); + } + + /** + * Push a task on the global queue. + * Takes ownership of @a task. + * May be called from any thread. + */ + template + static void PushTask(TaskManager::Impl& taskManager, QueueItem&& task); + +protected: + void Clear(); + ThreadExecutor GetWorker(); + + // Back reference (keep this first). + TaskManager& m_TaskManager; + + std::mutex m_GlobalMutex; + std::mutex m_GlobalLowPriorityMutex; + std::deque m_GlobalQueue; + std::deque m_GlobalLowPriorityQueue; + std::atomic m_HasWork = false; + std::atomic m_HasLowPriorityWork = false; + + // Ideally this would be a vector, since it does get iterated, but that requires movable types, + // and we want the executors to be long-lived, thus these need to not move. + std::list m_Workers; + + // This does not contain all workers, see comment on avoiding starvation above. + std::vector m_DedicatedExecutors; + EachThreadExecutor m_EachThreadExecutor; + + // Round-robin counter for GetWorker. + size_t m_RoundRobinIdx = 0; + size_t m_FirstReversedIdx; +}; + +ThreadPool::TaskManager::TaskManager(size_t nbWorkers) +{ + if (nbWorkers < MIN_THREADS) + nbWorkers = Clamp(std::thread::hardware_concurrency() - 1, MIN_THREADS, MAX_THREADS); + + m = std::make_unique(*this, nbWorkers); +} + +ThreadPool::TaskManager::~TaskManager() {} + +ThreadPool::TaskManager::Impl::Impl(TaskManager& backref, size_t nbWorkers) + : m_TaskManager(backref) +{ + m_FirstReversedIdx = nbWorkers - GetNbOfReversedPriorityWorker(nbWorkers); + for (size_t i = 0; i < nbWorkers; ++i) + { + bool reversePriority = i >= m_FirstReversedIdx; + WorkerThread& worker = m_Workers.emplace_back(*this, reversePriority); + if (!reversePriority) + m_DedicatedExecutors.emplace_back(ThreadExecutor(worker)); + m_EachThreadExecutor.m_Executors.emplace_back(ThreadExecutor(worker)); + // Register the thread on Profiler2 and name it, for convenience. + // (This is called here because RunUntilDeath is templated and the static value won't work as expected then). + m_EachThreadExecutor.m_Executors.back().Submit([]() { + // The profiler does better if the names are unique. + static std::atomic n = 0; + std::string name = "ThreadPool #" + std::to_string(n++); + debug_SetThreadName(name.c_str()); + g_Profiler2.RegisterCurrentThread(name); + }).Wait(); + } +} + +void ThreadPool::TaskManager::Clear() { m->Clear(); } +void ThreadPool::TaskManager::Impl::Clear() +{ + for (WorkerThread& worker : m_Workers) + worker.Clear(); +} + +size_t ThreadPool::TaskManager::GetNbOfWorkers() const +{ + return m->m_Workers.size(); +} + +ThreadPool::GlobalExecutor ThreadPool::TaskManager::GetExecutor() +{ + return *this; +} + +ThreadPool::GlobalExecutor ThreadPool::TaskManager::GetLowPriorityExecutor() +{ + return *this; +} + +ThreadPool::EachThreadExecutor& ThreadPool::TaskManager::GetAllWorkers() +{ + return m->m_EachThreadExecutor; +} + +ThreadPool::ThreadExecutor ThreadPool::TaskManager::GetWorker() { return m->GetWorker(); } +ThreadPool::ThreadExecutor ThreadPool::TaskManager::Impl::GetWorker() +{ + if (m_RoundRobinIdx >= m_FirstReversedIdx) + m_RoundRobinIdx = 0; + return m_DedicatedExecutors[m_RoundRobinIdx]; +} + +template +void ThreadPool::TaskManager::Impl::PushTask(TaskManager::Impl& taskManager, QueueItem&& task) +{ + std::atomic& hasWork = Priority == Priority::NORMAL ? taskManager.m_HasWork : taskManager.m_HasLowPriorityWork; + if constexpr (Priority == Priority::NORMAL) + { + std::lock_guard lock(taskManager.m_GlobalMutex); + hasWork = true; + taskManager.m_GlobalQueue.push_back(std::move(task)); + } + else + { + std::lock_guard lock(taskManager.m_GlobalLowPriorityMutex); + hasWork = true; + taskManager.m_GlobalLowPriorityQueue.push_back(std::move(task)); + } + // Wake idle workers. This has increasingly worse behaviour as the # of workers increases, + // so having a sane upper limit makes sense. + for (WorkerThread& worker : taskManager.m_Workers) + { + // Check if we still have work in case the earlier wakeups have accepted the item already. + if (!hasWork) + break; + worker.MaybeWake(); + } +} + +void ThreadPool::TaskManager::Initialise() +{ + if (!g_TaskManager) + g_TaskManager = std::make_unique(); +} + +ThreadPool::TaskManager& ThreadPool::TaskManager::Instance() +{ + ENSURE(g_TaskManager); + return *g_TaskManager; +} + +// Executor definitions + +template +void ThreadPool::GlobalExecutor::ExecuteTask(QueueItem&& task) +{ + TaskManager::Impl::PushTask(*m_TaskManager.m, std::move(task)); +} + +void ThreadPool::ThreadExecutor::ExecuteTask(QueueItem&& task) +{ + m_Worker.PushTask(std::move(task)); +} + +// Thread definition + +ThreadPool::WorkerThread::WorkerThread(ThreadPool::TaskManager::Impl& taskManager, bool reversePriority) + : m_TaskManager(taskManager) +{ + // Explicitness is required or the compiler gets confused about types. + if (reversePriority) + Start>(this); + else + Start>(this); +} + +ThreadPool::WorkerThread::~WorkerThread() +{ + Clear(); + m_Kill = true; + m_ConditionVariable.notify_all(); + if (m_Thread.joinable()) + m_Thread.join(); +} + +template +void ThreadPool::WorkerThread::RunUntilDeath() +{ + std::unique_lock lock(m_Mutex); + while (!m_Kill) + { + m_ConditionVariable.wait(lock, [this]() -> bool { + if (!m_Queue.empty() || m_Kill) + return true; + return m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork; + }); + + // Fetch work from the global queues if necessary. + if ((reversePriority || m_Queue.empty()) && !m_Kill && m_TaskManager.m_HasWork) + { + // Particularly critical section since we're locking the global queue. + std::unique_lock globalLock(m_TaskManager.m_GlobalMutex); + // Check again for emptiness. + if (!m_TaskManager.m_GlobalQueue.empty()) + { + m_Queue.emplace_front(std::move(m_TaskManager.m_GlobalQueue.front())); + m_TaskManager.m_GlobalQueue.pop_front(); + if (m_TaskManager.m_GlobalQueue.empty()) + m_TaskManager.m_HasWork = false; + } + } + else if (m_Queue.empty() && !m_Kill && m_TaskManager.m_HasLowPriorityWork) + { + std::unique_lock globalLock(m_TaskManager.m_GlobalLowPriorityMutex); + if (!m_TaskManager.m_GlobalLowPriorityQueue.empty()) + { + m_Queue.emplace_front(std::move(m_TaskManager.m_GlobalLowPriorityQueue.front())); + m_TaskManager.m_GlobalLowPriorityQueue.pop_front(); + if (m_TaskManager.m_GlobalLowPriorityQueue.empty()) + m_TaskManager.m_HasLowPriorityWork = false; + } + } + if (m_Queue.empty()) + continue; + lock.unlock(); + m_Queue.front()(); + lock.lock(); + m_Queue.pop_front(); + } +} + +// Defined here - needs access to derived types. +template +void ThreadPool::Thread::DoStart(T* object) +{ + std::invoke(callable, object); +} Index: source/ps/tests/test_Future.h =================================================================== --- /dev/null +++ source/ps/tests/test_Future.h @@ -0,0 +1,168 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/Future.h" +#include "ps/ThreadPool.h" + +#include + +struct TestExecutor final : public IExecutor +{ + virtual std::shared_ptr Clone() const override { return std::make_shared(*this); }; + virtual void ExecuteTask(std::function&& task) override + { + task(); + } +}; + +class TestFuture : public CxxTest::TestSuite +{ +public: + void test_future_basic() + { + int counter = 0; + { + Future noret; + std::function task = noret.Wrap([&counter]() mutable { counter++; }); + task(); + TS_ASSERT_EQUALS(counter, 1); + } + + { + Future noret; + { + std::function task = noret.Wrap([&counter]() mutable { counter++; }); + // Auto-cancels the task. + } + } + TS_ASSERT_EQUALS(counter, 1); + } + + void test_future_return() + { + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + // Convertible type. + { + Future future; + std::function task = future.Wrap([]() -> u8 { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + // Complex, non-default constructible type. + struct NonDef + { + NonDef() = delete; + NonDef(int input) : value(input) {}; + int value; + }; + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get().value, 1); + } + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + } + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + future.Cancel(); + future.Wait(); + TS_ASSERT_THROWS(future.Get(), const Future::BadFutureAccess&); + } + } + + void test_future_moving() + { + Future future; + std::function function; + + // Set things up so all temporaries passed into the futures will be reset to obviously invalid memory. + std::aligned_storage_t), alignof(Future)> futureStorage; + std::aligned_storage_t), alignof(std::function)> functionStorage; + Future* f = &future; // CppCheck doesn't read placement new correctly, do this to silence errors. + std::function* c = &function; + f = new (&futureStorage) Future{}; + c = new (&functionStorage) std::function{}; + + *c = []() { return 7; }; + std::function task = f->Wrap(std::move(*c)); + + future = std::move(*f); + function = std::move(*c); + + // Destroy and clear the memory + f->~Future(); + c->~function(); + memset(&futureStorage, 0xFF, sizeof(decltype(futureStorage))); + memset(&functionStorage, 0xFF, sizeof(decltype(functionStorage))); + + // Let's move the packaged task while at it. + std::function task2 = std::move(task); + task2(); + TS_ASSERT_EQUALS(future.Get(), 7); + } + + void test_future_continuation_basic_taskfirst() + { + TestExecutor exec; + Future future; + PackagedTask task = future.Wrap(exec.Clone(), [](){ return 2; }); + // Run the task before adding continuations. + task(); + TS_ASSERT_EQUALS(future.Get(), 2); + future = std::move(future).Then([](int&& res) -> int { return res * 2; }); + TS_ASSERT_EQUALS(future.Get(), 4); + future = std::move(future).Then([](int&& res) -> int { return res * 2; }); + TS_ASSERT_EQUALS(future.Get(), 8); + } + + void test_future_continuation_basic_taskafter() + { + TestExecutor exec; + Future future; + PackagedTask task = future.Wrap(exec.Clone(), [](){ return 2; }); + future = std::move(future).Then([](int&& res) -> int { return res * 2; }); + future = std::move(future).Then([](int&& res) -> int { return res * 2; }); + // Run the task after adding continuations. + task(); + TS_ASSERT_EQUALS(future.Get(), 8); + } + + void test_future_continuation_different_types() + { + TestExecutor exec; + Future future; + PackagedTask task = future.Wrap(exec.Clone(), [](){ return; }); + Future f2 = std::move(future).Then([]() -> int { return 2; }); + task(); + TS_ASSERT_EQUALS(f2.Get(), 2); + Future f3 = std::move(f2).Then([](int&& input) -> bool { return input == 2; }); + TS_ASSERT_EQUALS(f3.Get(), true); + } +}; Index: source/ps/tests/test_ThreadPool.h =================================================================== --- /dev/null +++ source/ps/tests/test_ThreadPool.h @@ -0,0 +1,118 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/Future.h" +#include "ps/ThreadPool.h" + +#include +#include +#include + +class TestThreadPool : public CxxTest::TestSuite +{ +public: + void test_basic() + { + ThreadPool::TaskManager& taskManager = ThreadPool::TaskManager::Instance(); + // There is a minimum of 3. + TS_ASSERT(taskManager.GetNbOfWorkers() >= 3); + + std::atomic tasks_run = 0; + auto increment_run = [&tasks_run]() { tasks_run++; }; + Future future = taskManager.GetWorker().Submit(increment_run); + future.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + + // Test Execute. + std::condition_variable cv; + std::mutex mutex; + std::atomic go = false; + future = taskManager.GetWorker().Submit([&]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() -> bool { return go; }); + lock.unlock(); + increment_run(); + lock.lock(); + go = false; + lock.unlock(); + cv.notify_all(); + }); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + std::unique_lock lock(mutex); + go = true; + lock.unlock(); + cv.notify_all(); + lock.lock(); + cv.wait(lock, [&go]() -> bool { return !go; }); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + } + + void test_PoolExecutor() + { + ThreadPool::TaskManager& taskManager = ThreadPool::TaskManager::Instance(); + // Push a blocked task on all workers. + std::condition_variable cv; + std::mutex mutex; + bool go = false; + std::atomic tasks_run = 0; + std::vector> preTasks; + for (ThreadPool::ThreadExecutor& exec : taskManager.GetAllWorkers()) + preTasks.emplace_back(exec.Submit([&tasks_run, &cv, &mutex, &go]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() { return go; }); + tasks_run++; + })); + // Then push a general task. + auto increment_run = [&tasks_run]() { tasks_run++; }; + Future future = taskManager.GetExecutor().Submit(increment_run); + // Then start the pre-tasks, which ought start all of them. + { + std::unique_lock lock(mutex); + go = true; + } + cv.notify_all(); + future.Wait(); + for (Future& task : preTasks) + task.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), preTasks.size() + 1); + // Also check with no waiting expected. + taskManager.GetExecutor().Submit(increment_run).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), preTasks.size() + 2); + taskManager.GetLowPriorityExecutor().Submit(increment_run).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), preTasks.size() + 3); + } + + void test_Load() + { + ThreadPool::TaskManager& taskManager = ThreadPool::TaskManager::Instance(); + std::vector> futures; + for (size_t i = 0; i < 100000; ++i) + { + if (i % 3 == 0) + futures.emplace_back(taskManager.GetExecutor().Submit([]() { return 7; })); + else if (i % 3 == 1) + futures.emplace_back(taskManager.GetLowPriorityExecutor().Submit([]() { return 7; })); + else if (i % 3 == 2) + futures.emplace_back(taskManager.GetWorker().Submit([]() { return 7; })); + } + + for (Future& f : futures) + TS_ASSERT_EQUALS(f.Get(), 7); + } +}; Index: source/test_setup.cpp =================================================================== --- source/test_setup.cpp +++ source/test_setup.cpp @@ -36,6 +36,7 @@ #include "lib/timer.h" #include "lib/sysdep/sysdep.h" #include "ps/Profiler2.h" +#include "ps/ThreadPool.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptEngine.h" #include "scriptinterface/ScriptContext.h" @@ -84,6 +85,8 @@ m_ScriptEngine = new ScriptEngine; g_ScriptContext = ScriptContext::CreateContext(); + ThreadPool::TaskManager::Initialise(); + return true; }