Index: binaries/data/config/default.cfg =================================================================== --- binaries/data/config/default.cfg +++ binaries/data/config/default.cfg @@ -41,6 +41,9 @@ ; Default server name or IP to use in multiplayer multiplayerserver = "127.0.0.1" +; # of threads for the thread pool. -1 means "pick automatically", 0-32 start a number of auxiliary threads. +thread_pool_threads = -1; + ; Force a particular resolution. (If these are 0, the default is ; to keep the current desktop resolution in fullscreen mode or to ; use 1024x768 in windowed mode.) Index: binaries/data/mods/public/gui/session/developer_overlay/DeveloperOverlayCheckboxes.js =================================================================== --- binaries/data/mods/public/gui/session/developer_overlay/DeveloperOverlayCheckboxes.js +++ binaries/data/mods/public/gui/session/developer_overlay/DeveloperOverlayCheckboxes.js @@ -262,6 +262,27 @@ } }; +DeveloperOverlayCheckboxes.prototype.DisableThreadPool = class +{ + label() + { + return translate("Disable Thread Pool"); + } + + onPress(checked) + { + if (checked) + Engine.ConfigDB_CreateValue("system", "thread_pool_threads", 0); + else + Engine.ConfigDB_CreateValue("system", "thread_pool_threads", -1); + } + + checked() + { + return +Engine.ConfigDB_GetValue("system", "thread_pool_threads") == 0; + } +}; + DeveloperOverlayCheckboxes.prototype.EnableCulling = class { label() Index: source/graphics/MapGenerator.h =================================================================== --- source/graphics/MapGenerator.h +++ source/graphics/MapGenerator.h @@ -21,6 +21,7 @@ #include "lib/posix/posix_pthread.h" #include "ps/FileIo.h" #include "ps/TemplateLoader.h" +#include "ps/ThreadPool.h" #include "scriptinterface/ScriptInterface.h" #include @@ -28,7 +29,6 @@ #include #include #include -#include class CMapGeneratorWorker; @@ -231,7 +231,7 @@ /** * Holds the mapgeneration thread identifier. */ - std::thread m_WorkerThread; + Future m_WorkerThread; /** * Avoids thread synchronization issues. Index: source/graphics/MapGenerator.cpp =================================================================== --- source/graphics/MapGenerator.cpp +++ source/graphics/MapGenerator.cpp @@ -30,7 +30,7 @@ #include "ps/CLogger.h" #include "ps/FileIo.h" #include "ps/Profile.h" -#include "ps/Threading.h" +#include "ps/ThreadPool.h" #include "ps/scripting/JSInterface_VFS.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptContext.h" @@ -68,9 +68,8 @@ CMapGeneratorWorker::~CMapGeneratorWorker() { - // Wait for thread to end - if (m_WorkerThread.joinable()) - m_WorkerThread.join(); + // Cancel or wait for the task to end. + m_WorkerThread.Cancel(); } void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::string& settings) @@ -83,13 +82,12 @@ m_Settings = settings; // Launch the worker thread - m_WorkerThread = std::thread(Threading::HandleExceptions::Wrapper, this); + m_WorkerThread = g_ThreadPool.GetWorker(true).Submit([this]() { RunThread(this); }); } void CMapGeneratorWorker::RunThread(CMapGeneratorWorker* self) { - debug_SetThreadName("MapGenerator"); - g_Profiler2.RegisterCurrentThread("MapGenerator"); + PROFILE2("Map Generation"); shared_ptr mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE); Index: source/lib/sysdep/os/osx/odbg.cpp =================================================================== --- source/lib/sysdep/os/osx/odbg.cpp +++ source/lib/sysdep/os/osx/odbg.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2010 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -28,6 +28,8 @@ #include "lib/sysdep/sysdep.h" #include "lib/debug.h" +#include + void* debug_GetCaller(void* UNUSED(context), const wchar_t* UNUSED(lastFuncToSkip)) { return NULL; @@ -43,7 +45,7 @@ return ERR::NOT_SUPPORTED; } -void debug_SetThreadName(char const* UNUSED(name)) +void debug_SetThreadName(char const* name) { - // Currently unimplemented + pthread_setname_np(name); } Index: source/main.cpp =================================================================== --- source/main.cpp +++ source/main.cpp @@ -58,6 +58,7 @@ #include "ps/UserReport.h" #include "ps/Util.h" #include "ps/VideoMode.h" +#include "ps/ThreadPool.h" #include "ps/World.h" #include "ps/GameSetup/GameSetup.h" #include "ps/GameSetup/Atlas.h" @@ -576,6 +577,9 @@ ScriptEngine scriptEngine; CXeromyces::Startup(); + // Initialise the thread pool threads at this point - we have Profiler2 & JS set up. + g_ThreadPool.InitThreads(); + if (ATLAS_RunIfOnCmdLine(args, false)) { CXeromyces::Terminate(); @@ -710,6 +714,7 @@ ATLAS_RunIfOnCmdLine(args, true); #endif + g_ThreadPool.Reset(); CXeromyces::Terminate(); } Index: source/ps/Future.h =================================================================== --- /dev/null +++ source/ps/Future.h @@ -0,0 +1,289 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_FUTURE +#define INCLUDED_FUTURE + +#include +#include +#include +#include +#include + +template +class Future; + +/** + * Corresponds to std::packaged_task. + * Like packaged_task, this holds a function acting as a promise. + * Unlike packaged_task, this can be cancelled. + * It also can be created from a Future (so this can be untemplated, @see Future). + * which is convenient to store it in a queue (typically in the ThreadPool::Pool queue). + * PackagedTask is blocking if the wrapped function has started processing + * when it is destroyed, non-blocking otherwise. + */ +class PackagedTask +{ + template + friend class Future; +protected: + /** + * Status and SharedState are shared with Future, but it's more convenient header-wise to define them here. + */ + enum Status + { + PENDING, + STARTED, + DONE + }; + + /** + * Shared state between Future and PackagedTask. + * Equivalent to what the c++ standard refers to as "shared state" between an std::future, std::promise and std::packaged_task. + */ + class SharedState + { + public: + bool IsReady() const + { + return m_Status == DONE; + } + + void Wait() + { + // Fast path: we're already done. + if (m_Status == DONE) + return; + // Slow path: we aren't done when we run the above check. Lock and wait until we are. + std::unique_lock lock(m_Mutex); + m_ConditionVariable.wait(lock, [this]() -> bool { return m_Status == DONE; }); + } + + /** + * Mark the task as done. + * @return true if the task was indeed cancelled, false if Cancel() had to wait on it to end or it was already done/cancelled. + */ + bool Cancel() + { + Status expected = PENDING; + // If we're pending, try atomically setting to done. + if (m_Status.compare_exchange_strong(expected, DONE)) + return true; + // That failed - we'll need to wait until we're done. + Wait(); + return false; + } + + std::atomic m_Status = PENDING; + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; + }; +public: + /** + * Standalone constructor. Wraps the function. + */ + PackagedTask(std::function&& func) + { + m_SharedState = std::make_shared(); + m_Func = func; + } + /** + * Constructor from an existing future - @a func will + * set the value of the future. + */ + template + PackagedTask(const Future& future, std::function&& func) + { + m_SharedState = future.m_SharedState; + m_Func = func; + } + PackagedTask(PackagedTask&&) = default; + PackagedTask(const PackagedTask&) = delete; + ~PackagedTask() + { + // Wait on started task completion, but not on pending ones (auto-cancelled). + if (m_SharedState) + m_SharedState->Cancel(); + } + + /** + * Start the task - if it was cancelled, do nothing. + * (this also prevents starting a task twice). + */ + void operator()() + { + PackagedTask::Status expected = PENDING; + if (m_SharedState->m_Status.compare_exchange_strong(expected, PackagedTask::STARTED)) + { + m_Func(); + + // Because we might have threads waiting on us, we need to make sure that they either: + // - don't wait on our condition variable + // - receive the notification when we're done. + // This requires locking the mutex (@see Wait). + { + std::unique_lock lock(m_SharedState->m_Mutex); + m_SharedState->m_Status = DONE; + } + m_SharedState->m_ConditionVariable.notify_all(); + } + } + +protected: + std::function m_Func; + std::shared_ptr m_SharedState; +}; + +/** + * Corresponds to std::future. + * Unlike std::future, Future can request the cancellation of the task that would produce the result. + * This makes it more similar to Java's CancellableTask or C#'s Task. + * The name Future was kept over Task so it would be more familiar to C++ users. + * + * Note that currently Future auto-cancels the wrapped task on destruction. + * TODO: + * - Make the above behaviour optional? + * - Handle exceptions. + */ +template +class Future +{ + friend class PackagedTask; + static constexpr bool VoidReturn = std::is_same_v; +public: + Future() {} + /** + * Only copyable before wrapping (and a no-op). + */ + Future(const Future&) + { + ENSURE(!Valid()); + } + Future(Future&& o) = default; + ~Future() + { + // Since the future holds the result, + // we need to make sure the function doesn't outlive it + // in a non-cancelled state anyways. + Cancel(); + } + + Future& operator=(Future&& o) + { + Cancel(); + m_Result = std::move(o.m_Result); + m_SharedState = std::move(o.m_SharedState); + return *this; + } + + /** + * Make the future wait for the result of @a func. + * @return a (cancellable) PackagedTask. + * This is the other way around from C++, where std::packaged_task outputs the std::future, + * because that lets PackagedTask be untemplated (convenient for storage). + */ + template + PackagedTask Wrap(T&& func) + { + static_assert(std::is_convertible_v, Ret>, "The return type of the wrapped function cannot be converted to the type of the Future."); + m_SharedState = std::make_shared(); + if constexpr (VoidReturn) + return PackagedTask(*this, std::move(func)); + else + { + // Allocate the memory space for the result (via the bytes member). + m_Result = std::make_unique(); + return PackagedTask(*this, [resultPtr = &m_Result->m_Result, func=std::move(func)]() { + // This is safe - the function passed to PackagedTask + // is not executed if the task was cancelled, + // and Future cancels the task when it is destroyed, + // so m_Result will exist at this point. + // To avoid UB, explicitly placement-new the value. + new (resultPtr) Ret{func()}; + }); + } + } + + class BadFutureAccess : public std::exception + { + virtual const char* what() const noexcept + { + return "Tried to access the result of a future that was never wrapped or a cancelled future."; + } + }; + + Ret Get() + { + Wait(); + if constexpr (VoidReturn) + return; + else + { + if (!m_Result) + { + // We were never wrapped/cancelled, throw. + throw BadFutureAccess(); + } + return m_Result->m_Result; + } + } + + bool IsReady() const + { + return !!m_SharedState && m_SharedState->IsReady(); + } + + bool Valid() const + { + return !!m_SharedState; + } + + void Wait() + { + if (Valid()) + m_SharedState->Wait(); + } + + void Cancel() + { + if (Valid() && m_SharedState->Cancel()) + if constexpr (!VoidReturn) + // If we've managed to cancel the task, we can delete the result storage. + // This acts as the boolean to know, in DONE state, if we actually ran or were cancelled. + m_Result.reset(); + } +protected: + std::shared_ptr m_SharedState; + + struct Empty {}; + union Result + { + std::aligned_storage_t m_Bytes; + Ret m_Result; + Result() : m_Bytes() {}; + }; + // TODO C++20: [[no_unique_address]] this for a minor memory optimisation. + // We could use std::optional, except that the value needs a stable address + // or the wrapped function won't know where to write, + // so m_Result can't be moved, and the Future ought be movable + // (or using it becomes un-necessarily annoying). + // And we _could_ just use unique_ptr, but then Ret needs to be + // default constructible, which is also annoying. + std::conditional_t> m_Result; +}; + +#endif // INCLUDED_FUTURE Index: source/ps/GameSetup/GameSetup.cpp =================================================================== --- source/ps/GameSetup/GameSetup.cpp +++ source/ps/GameSetup/GameSetup.cpp @@ -68,6 +68,7 @@ #include "ps/Profiler2.h" #include "ps/Pyrogenesis.h" // psSetLogDir #include "ps/scripting/JSInterface_Console.h" +#include "ps/ThreadPool.h" #include "ps/TouchInput.h" #include "ps/UserReport.h" #include "ps/Util.h" @@ -890,6 +891,9 @@ // g_ConfigDB, command line args, globals CONFIG_Init(args); + // Update the thread pool now that config is started. + g_ThreadPool.ReadConfig(); + // Using a global object for the context is a workaround until Simulation and AI use // their own threads and also their own contexts. const int contextSize = 384 * 1024 * 1024; Index: source/ps/ThreadPool.h =================================================================== --- /dev/null +++ source/ps/ThreadPool.h @@ -0,0 +1,218 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_THREADPOOL +#define INCLUDED_THREADPOOL + +#include "ps/Future.h" + +#include +#include + +class TestThreadPool; + +namespace ThreadPool +{ +class Pool; +class Thread; +class ThreadExecutor; + +class IExecutor +{ +protected: + // This is purely an interface to reduce duplication, + // and not meant to be used polymorphically, so it has a non-virtual, protected destructor. + ~IExecutor() = default; +public: + /** + * Submit a task in a fire-and-forget manner. The task is not guaranteed to run before program exit. + */ + void Execute(std::function&& func) + { + ExecuteTask(std::move(func)); + } + /** + * Submit a task and get a future that will hold its result. + * The task is not guaranteed to run before program exit. + * Note that if you discard the future, the task is auto-cancelled, + * so use Execute for fire-and-forget tasks. + */ + template + [[nodiscard]] Future> Submit(T&& func) + { + Future> ret; + ExecuteTask(ret.Wrap(std::move(func))); + return ret; + } +protected: + // Do the actual work. + virtual void ExecuteTask(PackagedTask&& task) = 0; +}; + +/** + * Execute work on the pool's global queue. + */ +class PoolExecutor final : public IExecutor +{ +public: + PoolExecutor(Pool& p) : m_Pool(p) {} +protected: + virtual void ExecuteTask(PackagedTask&& task) override; + + Pool& m_Pool; +}; + +/** + * Execute work directly a on thread's queue. + * This thread may be temporary, in which case its lifetime + * is extended by pending promises (and thus Future objects in a waiting state) and the ThreadExecutor itself. + */ +class ThreadExecutor final : public IExecutor +{ + friend class Pool; + friend class EachThreadExecutor; +public: + + /** + * Shadow the IExecutor one as we may need to rewrap the function + * to extend the lifetime of the thread for futures. + */ + template + [[nodiscard]] Future> Submit(T&& func) + { + Future> ret; + if (m_Owns) + // Extend the lifetime of the thread by packaging it in the task. + ExecuteTask(ret.Wrap([thread=m_Thread.ownedPtr, func=std::move(func)]() { + func(); + })); + else + ExecuteTask(ret.Wrap(func)); + return ret; + } + + + ThreadExecutor(ThreadExecutor&&); + ~ThreadExecutor(); +protected: + // Reference constructor. + ThreadExecutor(Thread& thread); + // Owning constructor + struct CreateThread {}; + ThreadExecutor(CreateThread); + ThreadExecutor(const ThreadExecutor&); + + virtual void ExecuteTask(PackagedTask&& task) override; +protected: + union MaybeOwnedThread + { + MaybeOwnedThread(Thread* p); + MaybeOwnedThread(CreateThread); + MaybeOwnedThread(bool owns, MaybeOwnedThread&&); + MaybeOwnedThread(bool owns, const MaybeOwnedThread&); + ~MaybeOwnedThread() {}; + Thread* ptr; + std::shared_ptr ownedPtr; + } m_Thread; + bool m_Owns; +}; + +/** + * Provides a convenient interface to iterating all worker threads. + */ +class EachThreadExecutor +{ +public: + EachThreadExecutor(std::vector& e) : m_Executors(e) {} + + std::vector::iterator begin() { return m_Executors.begin(); } + std::vector::iterator end() { return m_Executors.end(); } + +private: + std::vector& m_Executors; +}; + +/** + * ThreadPool::Pool is the actual thead pool. + * It is ready to accept work from the moment it's created, + * but will only create actual threads once InitThreads is called. + */ +class Pool +{ + friend class ::TestThreadPool; + friend class PoolExecutor; + friend class Thread; +public: + Pool(); + Pool(const Pool&) = delete; + Pool(Pool&&) = delete; + /** + * On destruction Reset is called (@see Reset) and all threads are joined. + */ + ~Pool(); + + /** + * Create worker threads. Their number is initially automatically determined. + */ + void InitThreads(); + + /** + * The thread pool's threads are initialised before ConfigDB. This is called when the latter is. + */ + void ReadConfig(); + + /** + * Clears all tasks from the thread pool & its threads, waiting for started tasks. + * Threads are not deleted. + */ + void Reset(); + + /** + * The Pool executor assigns work to the pool, not a specific worker. + * Tasks are not guaranteed to be run in a separate thread. + * Prefer this executor if you will have work to do asynchronously, + * but starting it immediately is not required. + */ + PoolExecutor GetExecutor(); + + /** + * The thread executor assigns work to a specific worker. + * The underlying thread is guaranteed to live at least as long as the executor, + * unless the thread pool is reset. + * @param mustBeAsync - if true, a temporary thread may be started. + */ + ThreadExecutor GetWorker(bool mustBeAsync = false); + + /** + * Returns an executor that can be used to start (optionally different) work on (optionally all) threads. + * Lifetime guarantee: until Reset() is called. + */ + EachThreadExecutor GetAllWorkers(); + + size_t GetNbOfThreads(); + +protected: + void InitThreads(size_t n); + + class Impl; + std::unique_ptr m; +}; +} // ThreadPool + +extern ThreadPool::Pool g_ThreadPool; + +#endif // INCLUDED_THREADPOOL Index: source/ps/ThreadPool.cpp =================================================================== --- /dev/null +++ source/ps/ThreadPool.cpp @@ -0,0 +1,431 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "precompiled.h" + +#include "ThreadPool.h" + +#include "lib/debug.h" +#include "maths/MathUtil.h" +#include "ps/ConfigDB.h" +#include "ps/Threading.h" +#include "ps/ThreadUtil.h" +#include "ps/Profiler2.h" + +#include +#include +#include +#include +#include +#include + +#define FORCE_SINGLE_THREAD 0 + +ThreadPool::Pool g_ThreadPool; + +namespace ThreadPool +{ +/** + * A reasonable maximum number of thread pool workers. + * The current design may lose some efficiency with too many threads (@see Pool::PushTask), + * and we are unlikely to max them out anyways. + */ +static constexpr size_t MAX_THREADS = 32; + +class IThread; +} + +/** + * Wraps an std::thread and some convenience variables. + */ +class ThreadPool::IThread +{ +public: + IThread() = default; + IThread(const IThread&) = delete; + IThread(IThread&&) = delete; + + static void Start(IThread* self) + { + self->RunUntilDeath(); + } + + virtual void RunUntilDeath() = 0; +protected: + // Pure interface type. + // Joins as a fail-safe but derived classes probably want to join or detach themselves. + ~IThread() + { + m_Kill = true; + if (m_Thread.joinable()) + { + m_ConditionVariable.notify_all(); + m_Thread.join(); + } + } + + std::atomic m_Kill = false; + + std::thread m_Thread; + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; +}; + +/** + * Thread with a work queue that it processes until it's destroyed. + */ +class ThreadPool::Thread final : public IThread +{ + friend Pool; +public: + enum + { + THREADED = 1 << 0, + DETACH_ON_DESTRUCTION = 1 << 1, + }; + + Thread(u16 flags = THREADED, ThreadPool::Pool::Impl* pool = nullptr); + ~Thread(); + + /** + * Clear the task queue. + */ + void Clear(); + + /** + * Set the thread name and register it with Profiler2. + */ + void RegisterThread() + { + PushTask(PackagedTask([]() { + // The profiler does better if the names are unique. + static std::atomic n = 0; + std::string name = "ThreadPool #" + std::to_string(n++); + debug_SetThreadName(name.c_str()); + g_Profiler2.RegisterCurrentThread(name); + })); + } + + /** + * Add a task to this worker's queue. + * Takes ownership of the task. + * May be called from any thread. + */ + void PushTask(PackagedTask&& task); + +protected: + virtual void RunUntilDeath() override; + + std::deque m_Queue; + + // If true, we own an actual thread - otherwise this is a 'fake' thread and must run synchronously. + bool m_Threaded = true; + // If true, detach instead of joining on destruction. + bool m_DetachOnDestruction = false; + // If non-null, the thread tries to pop work from this pool's global queue. + ThreadPool::Pool::Impl* m_Pool = nullptr; +}; + +/** + * PImpl-ed implementation of the thread pool. + */ +class ThreadPool::Pool::Impl +{ +public: + /** + * Push a task on the global queue. + * Takes ownership of @a task. + * May be called from any thread. + */ + static void PushTask(Pool& pool, PackagedTask&& task); + + std::mutex m_GlobalQueueMutex; + std::deque m_GlobalQueue; + + // Round-robin counter for GetWorker. + size_t m_RoundRobinIdx = 0; + + // Holds the non-threaded worker created on construction, before InitThreads is called. + std::unique_ptr m_FakeThread; + + // TODO: when vector supports nonmovable types, use that instead. + std::list m_Threads; + std::vector m_Executors; + // Since the threads can be executed on individually, it seems more efficient to have on for each. + std::vector m_ConditionVariables; +}; + +ThreadPool::Pool::Pool() : m(std::make_unique()) +{ + InitThreads(0); +} + +ThreadPool::Pool::~Pool() +{ +} + +void ThreadPool::Pool::InitThreads() +{ + InitThreads(Clamp(std::thread::hardware_concurrency() - 1, 0, MAX_THREADS)); +} + +void ThreadPool::Pool::InitThreads(size_t n) +{ +#if FORCE_SINGLE_THREAD + n = 0; +#endif + m->m_FakeThread.reset(); + m->m_Threads.clear(); + m->m_Executors.clear(); + m->m_ConditionVariables.clear(); + m->m_RoundRobinIdx = 0; + + // If we don't have/want/need actual threads, just keep using the fake one we created on init. + if (n < 1) + { + // Create a 'fake' thread that runs on the main thread + // so client code doesn't have to care too much about the state of the pool. + m->m_FakeThread = std::make_unique(false, m.get()); + m->m_Executors.emplace_back(ThreadExecutor(*m->m_FakeThread)); + m->m_ConditionVariables.emplace_back(&m->m_FakeThread->m_ConditionVariable); + return; + } + for (size_t i = 0; i < n; ++i) + { + Thread& thread = m->m_Threads.emplace_back(true, m.get()); + m->m_Executors.emplace_back(ThreadExecutor(thread)); + m->m_ConditionVariables.emplace_back(&thread.m_ConditionVariable); + thread.RegisterThread(); + } +} + +void ThreadPool::Pool::ReadConfig() +{ + g_ConfigDB.RegisterHookAndCall("thread_pool_threads", [this]() { + int n_threads = -1; + CFG_GET_VAL("thread_pool_threads", n_threads); + if (n_threads == -1) + n_threads = std::thread::hardware_concurrency() - 1; + else + n_threads = Clamp(n_threads, 0, MAX_THREADS); + if (n_threads != static_cast(m->m_Threads.size())) + { + // This is a bit wasteful but that shouldn't matter. + InitThreads(n_threads); + } + }); + // Use the opportunity to log our # of threads - earlier CLogger isn't up. + LOGMESSAGE("ThreadPool: setting up %li threads\n", m->m_Threads.size()); +} + +void ThreadPool::Pool::Reset() +{ + if (m->m_FakeThread) + m->m_FakeThread->Clear(); + for (Thread& thread : m->m_Threads) + thread.Clear(); +} + +size_t ThreadPool::Pool::GetNbOfThreads() +{ + return m->m_FakeThread ? 1 : m->m_Threads.size(); +} + +ThreadPool::PoolExecutor ThreadPool::Pool::GetExecutor() +{ + return PoolExecutor(*this); +} + +ThreadPool::EachThreadExecutor ThreadPool::Pool::GetAllWorkers() +{ + return m->m_Executors; +} + +ThreadPool::ThreadExecutor ThreadPool::Pool::GetWorker(bool mustBeAsync) +{ + // If the required thread must be async, check if we have enough workers to assign one, + // otherwise create a custom thread. + if (mustBeAsync && m->m_Threads.size() < 2) + // Create a new thread, its lifetime tied to the ThreadExecutor. + return ThreadExecutor(ThreadExecutor::CreateThread{}); + + if (m->m_FakeThread) + return *m->m_FakeThread; + + // Return a thread in a round-robin fashion. + ENSURE(!m->m_Threads.empty()); + if (m->m_RoundRobinIdx >= m->m_Executors.size()) + m->m_RoundRobinIdx = 0; + return m->m_Executors[m->m_RoundRobinIdx++]; +} + +// There is a starvation risk here if the main thread pushes too many tasks. +void ThreadPool::Pool::Impl::PushTask(Pool& pool, PackagedTask&& task) +{ + if (pool.m->m_FakeThread) + { + pool.m->m_FakeThread->PushTask(std::move(task)); + return; + } + + { + std::lock_guard lock(pool.m->m_GlobalQueueMutex); + pool.m->m_GlobalQueue.push_back(std::move(task)); + } + // Since each thread has their own condition variable, we can't just notify_one. + // This should be OK for low # of threads, though it gets increasingly inefficient. + for (std::condition_variable* var : pool.m->m_ConditionVariables) + // So far as I can tell, the performance of notify_one and notify_all will be basically equivalent + // if only one thread is waiting (and that's the case here). + // _all seems to fit my semantics better, so I use that. + var->notify_all(); +} + +// Thread definition + +ThreadPool::Thread::Thread(u16 flags, ThreadPool::Pool::Impl* pool) + : m_Threaded(flags & THREADED), m_DetachOnDestruction(flags & DETACH_ON_DESTRUCTION), m_Pool(pool) +{ + if (m_Threaded) + m_Thread = std::thread(Threading::HandleExceptions::Wrapper, this); +} + +ThreadPool::Thread::~Thread() +{ + m_Kill = true; + Clear(); + if (m_Thread.joinable()) + { + m_ConditionVariable.notify_all(); + if (m_DetachOnDestruction) + m_Thread.detach(); + else + m_Thread.join(); + } +} + +void ThreadPool::Thread::Clear() +{ + std::unique_lock lock(m_Mutex); + m_Queue.clear(); +} + +void ThreadPool::Thread::PushTask(PackagedTask&& task) +{ + if (!m_Threaded) + task(); + else + { + { + std::unique_lock lock(m_Mutex); + m_Queue.push_back(std::move(task)); + } + m_ConditionVariable.notify_all(); + } +} + +void ThreadPool::Thread::RunUntilDeath() +{ + std::unique_lock lock(m_Mutex, std::defer_lock); + while (!m_Kill) + { + if (!lock.owns_lock()) + lock.lock(); + m_ConditionVariable.wait(lock, [this]() -> bool { + if (!m_Queue.empty() || m_Kill) + return true; + if (!m_Pool) + return false; + // TODO: should we try to pop more? Is the lock contention fine? + std::lock_guard lock(m_Pool->m_GlobalQueueMutex); + return !m_Pool->m_GlobalQueue.empty(); + }); + if (!m_Kill && !m_Queue.empty()) + { + PackagedTask task = std::move(m_Queue.front()); + m_Queue.pop_front(); + lock.unlock(); + task(); + // Check if we need to kill the thread - we might end up in an invalid state + // once task is destroyed, so we ought no longer access 'this' + if (m_Kill) + break; + } + if (!m_Kill && m_Pool) + { + // Check the global pool queue. + // TODO: should we try to pop more? Is the lock contention fine? + std::lock_guard lock(m_Pool->m_GlobalQueueMutex); + if (!m_Pool->m_GlobalQueue.empty()) + { + m_Queue.push_back(std::move(m_Pool->m_GlobalQueue.front())); + m_Pool->m_GlobalQueue.pop_front(); + } + } + } +} + +// Executor implementation below + +ThreadPool::ThreadExecutor::ThreadExecutor(Thread& thread) : m_Thread(&thread), m_Owns(false) {} +ThreadPool::ThreadExecutor::ThreadExecutor(CreateThread ct) : m_Thread(ct), m_Owns(true) +{ + m_Thread.ownedPtr->RegisterThread(); +} + +ThreadPool::ThreadExecutor::ThreadExecutor(const ThreadExecutor& o) : m_Owns(o.m_Owns), m_Thread(o.m_Owns, o.m_Thread) {} +ThreadPool::ThreadExecutor::ThreadExecutor(ThreadExecutor&& o) : m_Owns(o.m_Owns), m_Thread(o.m_Owns, o.m_Thread) {} + +ThreadPool::ThreadExecutor::MaybeOwnedThread::MaybeOwnedThread(Thread* t) : ptr(t) {} +ThreadPool::ThreadExecutor::MaybeOwnedThread::MaybeOwnedThread(CreateThread) + : ownedPtr(std::make_shared(Thread::THREADED | Thread::DETACH_ON_DESTRUCTION, nullptr)) +{ +} +ThreadPool::ThreadExecutor::MaybeOwnedThread::MaybeOwnedThread(bool owns, const MaybeOwnedThread& o) +{ + if (owns) + ownedPtr = o.ownedPtr; + else + ptr = o.ptr; +} + +ThreadPool::ThreadExecutor::MaybeOwnedThread::MaybeOwnedThread(bool owns, MaybeOwnedThread&& o) +{ + if (owns) + ownedPtr = std::move(o.ownedPtr); + else + ptr = o.ptr; +} + +ThreadPool::ThreadExecutor::~ThreadExecutor() +{ + if (m_Owns) + m_Thread.ownedPtr.~shared_ptr(); +} + +void ThreadPool::PoolExecutor::ExecuteTask(PackagedTask&& task) +{ + Pool::Impl::PushTask(m_Pool, std::move(task)); +} + +void ThreadPool::ThreadExecutor::ExecuteTask(PackagedTask&& task) +{ + if (m_Owns) + m_Thread.ownedPtr->PushTask(std::move(task)); + else + m_Thread.ptr->PushTask(std::move(task)); +} Index: source/ps/tests/test_Future.h =================================================================== --- /dev/null +++ source/ps/tests/test_Future.h @@ -0,0 +1,134 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/Future.h" + +#include + +class TestFuture : public CxxTest::TestSuite +{ +public: + void test_packaged_task() + { + int counter = 0; + // This is legal per defect LWG 2420 -> the return type is ignored (as one would expect). + PackagedTask task([&counter]() mutable -> int { return counter++; }); + task(); + TS_ASSERT_EQUALS(counter, 1); + { + // Auto-cancelled on destructions. + PackagedTask task([&counter]() mutable { return counter++; }); + } + TS_ASSERT_EQUALS(counter, 1); + + } + + void test_future_basic() + { + int counter = 0; + { + Future noret; + PackagedTask task = noret.Wrap([&counter]() mutable { counter++; }); + task(); + TS_ASSERT_EQUALS(counter, 1); + } + + { + Future noret; + { + PackagedTask task = noret.Wrap([&counter]() mutable { counter++; }); + // Auto-cancels the task. + } + } + TS_ASSERT_EQUALS(counter, 1); + } + + void test_future_return() + { + { + Future future; + PackagedTask task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + // Convertible type. + { + Future future; + PackagedTask task = future.Wrap([]() -> u8 { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + // Complex, non-default constructible type. + struct NonDef + { + NonDef() = delete; + NonDef(int input) : value(input) {}; + int value; + }; + { + Future future; + PackagedTask task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get().value, 1); + } + { + Future future; + PackagedTask task = future.Wrap([]() { return 1; }); + } + { + Future future; + PackagedTask task = future.Wrap([]() { return 1; }); + future.Cancel(); + future.Wait(); + TS_ASSERT_THROWS(future.Get(), const Future::BadFutureAccess&); + } + } + + void test_future_moving() + { + Future future; + std::function function; + + // Set things up so all temporaries passed into the futures will be reset to obviously invalid memory. + std::aligned_storage_t), alignof(Future)> futureStorage; + std::aligned_storage_t), alignof(std::function)> functionStorage; + Future* f = new (&futureStorage) Future{}; + std::function* c = new (&functionStorage) std::function{}; + + *c = []() { return 7; }; + PackagedTask task = f->Wrap(std::move(*c)); + + future = std::move(*f); + function = std::move(*c); + + // Destroy and clear the memory + f->~Future(); + c->~function(); + memset(&futureStorage, 0xFF, sizeof(decltype(futureStorage))); + memset(&functionStorage, 0xFF, sizeof(decltype(functionStorage))); + + // Let's move the packaged task while at it. + PackagedTask task2 = std::move(task); + task2(); + TS_ASSERT_EQUALS(future.Get(), 7); + } + +}; Index: source/ps/tests/test_ThreadPool.h =================================================================== --- /dev/null +++ source/ps/tests/test_ThreadPool.h @@ -0,0 +1,123 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/ThreadPool.h" + +#include +#include +#include + +class TestThreadPool : public CxxTest::TestSuite +{ +public: + void test_basic() + { + ThreadPool::Pool pool; + std::atomic tasks_run = 0; + auto increment_run = [&tasks_run]() { tasks_run++; }; + // Task is run on the synchronous thread. + pool.GetWorker().Execute(increment_run); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + TS_ASSERT_EQUALS(pool.GetNbOfThreads(), 1); + + // Force one actual thread. + pool.InitThreads(1); + Future future = pool.GetWorker().Submit(increment_run); + future.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + TS_ASSERT_EQUALS(pool.GetNbOfThreads(), 1); + + // Test Execute. + std::condition_variable cv; + std::mutex mutex; + std::atomic go = false; + future = pool.GetWorker().Submit([&]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() -> bool { return go; }); + lock.unlock(); + increment_run(); + go = false; + cv.notify_all(); + }); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + go = true; + cv.notify_all(); + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() -> bool { return !go; }); + TS_ASSERT_EQUALS(tasks_run.load(), 3); + } + + void test_PoolExecutor() + { + ThreadPool::Pool pool; + pool.InitThreads(1); + // Push a blocked task on the worker so we can actually test the global queue. + std::condition_variable cv; + std::mutex mutex; + bool go = false; + pool.GetAllWorkers().begin()->Execute([&cv, &mutex, &go]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() { return go; }); + }); + // Then push a general task. + std::atomic tasks_run = 0; + auto increment_run = [&tasks_run]() { tasks_run++; }; + Future future = pool.GetExecutor().Submit(increment_run); + go = true; + cv.notify_all(); + future.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + // Also check with no waiting expected. + pool.GetExecutor().Submit(increment_run).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + } + + void test_ThreadExecutor_owning() + { + ThreadPool::Pool pool; + + { + // Request an async thread - this will start a temporary. + ThreadPool::ThreadExecutor exec = pool.GetWorker(true); + TS_ASSERT_EQUALS(pool.GetNbOfThreads(), 1); // should not increment. + int counter = 0; + exec.Execute([&counter]() mutable { counter++; }); + exec.Execute([&counter]() mutable { counter++; }); + exec.Submit([&counter]() mutable { counter++; }).Wait(); + TS_ASSERT_EQUALS(counter, 3); + } + + + { + // Test lifetime extension via futures. + int counter = 0; + Future future; + { + ThreadPool::ThreadExecutor exec = pool.GetWorker(true); + exec.Execute([&counter]() mutable { counter++; }); + future = exec.Submit([&counter]() mutable { counter++; }); + // This task will never run: the future, when done, will auto-cancel it. + exec.Execute([&counter]() mutable { counter++; }); + } + future.Wait(); + TS_ASSERT_EQUALS(counter, 2); + } + } + +};