Index: ps/trunk/source/graphics/MapGenerator.h =================================================================== --- ps/trunk/source/graphics/MapGenerator.h +++ ps/trunk/source/graphics/MapGenerator.h @@ -19,6 +19,7 @@ #define INCLUDED_MAPGENERATOR #include "ps/FileIo.h" +#include "ps/Future.h" #include "ps/TemplateLoader.h" #include "scriptinterface/StructuredClone.h" @@ -26,7 +27,6 @@ #include #include #include -#include class CMapGeneratorWorker; @@ -177,11 +177,6 @@ std::vector FindActorTemplates(const std::string& path, bool includeSubdirectories); /** - * Perform map generation in an independent thread. - */ - static void RunThread(CMapGeneratorWorker* self); - - /** * Perform the map generation. */ bool Run(); @@ -227,9 +222,10 @@ CTemplateLoader m_TemplateLoader; /** - * Holds the mapgeneration thread identifier. + * Holds the completion result of the asynchronous map generation. + * TODO: this whole class could really be a future on its own. */ - std::thread m_WorkerThread; + Future m_WorkerThread; /** * Avoids thread synchronization issues. Index: ps/trunk/source/graphics/MapGenerator.cpp =================================================================== --- ps/trunk/source/graphics/MapGenerator.cpp +++ ps/trunk/source/graphics/MapGenerator.cpp @@ -30,7 +30,7 @@ #include "ps/CLogger.h" #include "ps/FileIo.h" #include "ps/Profile.h" -#include "ps/Threading.h" +#include "ps/TaskManager.h" #include "ps/scripting/JSInterface_VFS.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptContext.h" @@ -69,9 +69,8 @@ CMapGeneratorWorker::~CMapGeneratorWorker() { - // Wait for thread to end - if (m_WorkerThread.joinable()) - m_WorkerThread.join(); + // Cancel or wait for the task to end. + m_WorkerThread.CancelOrWait(); } void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::string& settings) @@ -83,35 +82,31 @@ m_ScriptPath = scriptFile; m_Settings = settings; - // Launch the worker thread - m_WorkerThread = std::thread(Threading::HandleExceptions::Wrapper, this); -} - -void CMapGeneratorWorker::RunThread(CMapGeneratorWorker* self) -{ - debug_SetThreadName("MapGenerator"); - g_Profiler2.RegisterCurrentThread("MapGenerator"); + // Start generating the map asynchronously. + m_WorkerThread = Threading::TaskManager::Instance().PushTask([this]() { + PROFILE2("Map Generation"); - std::shared_ptr mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE); + std::shared_ptr mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE); - // Enable the script to be aborted - JS_AddInterruptCallback(mapgenContext->GetGeneralJSContext(), MapGeneratorInterruptCallback); + // Enable the script to be aborted + JS_AddInterruptCallback(mapgenContext->GetGeneralJSContext(), MapGeneratorInterruptCallback); - self->m_ScriptInterface = new ScriptInterface("Engine", "MapGenerator", mapgenContext); + m_ScriptInterface = new ScriptInterface("Engine", "MapGenerator", mapgenContext); - // Run map generation scripts - if (!self->Run() || self->m_Progress > 0) - { - // Don't leave progress in an unknown state, if generator failed, set it to -1 - std::lock_guard lock(self->m_WorkerMutex); - self->m_Progress = -1; - } + // Run map generation scripts + if (!Run() || m_Progress > 0) + { + // Don't leave progress in an unknown state, if generator failed, set it to -1 + std::lock_guard lock(m_WorkerMutex); + m_Progress = -1; + } - SAFE_DELETE(self->m_ScriptInterface); + SAFE_DELETE(m_ScriptInterface); - // At this point the random map scripts are done running, so the thread has no further purpose - // and can die. The data will be stored in m_MapData already if successful, or m_Progress - // will contain an error value on failure. + // At this point the random map scripts are done running, so the thread has no further purpose + // and can die. The data will be stored in m_MapData already if successful, or m_Progress + // will contain an error value on failure. + }); } bool CMapGeneratorWorker::Run() Index: ps/trunk/source/lib/sysdep/os/osx/odbg.cpp =================================================================== --- ps/trunk/source/lib/sysdep/os/osx/odbg.cpp +++ ps/trunk/source/lib/sysdep/os/osx/odbg.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2010 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -28,6 +28,8 @@ #include "lib/sysdep/sysdep.h" #include "lib/debug.h" +#include + void* debug_GetCaller(void* UNUSED(context), const wchar_t* UNUSED(lastFuncToSkip)) { return NULL; @@ -43,7 +45,7 @@ return ERR::NOT_SUPPORTED; } -void debug_SetThreadName(char const* UNUSED(name)) +void debug_SetThreadName(char const* name) { - // Currently unimplemented + pthread_setname_np(name); } Index: ps/trunk/source/main.cpp =================================================================== --- ps/trunk/source/main.cpp +++ ps/trunk/source/main.cpp @@ -59,6 +59,7 @@ #include "ps/UserReport.h" #include "ps/Util.h" #include "ps/VideoMode.h" +#include "ps/TaskManager.h" #include "ps/World.h" #include "ps/GameSetup/GameSetup.h" #include "ps/GameSetup/Atlas.h" @@ -578,6 +579,9 @@ ScriptEngine scriptEngine; CXeromyces::Startup(); + // Initialise the global task manager at this point (JS & Profiler2 are set up). + Threading::TaskManager::Initialise(); + if (ATLAS_RunIfOnCmdLine(args, false)) { CXeromyces::Terminate(); @@ -704,6 +708,7 @@ ATLAS_RunIfOnCmdLine(args, true); #endif + Threading::TaskManager::Instance().ClearQueue(); CXeromyces::Terminate(); } Index: ps/trunk/source/ps/Future.h =================================================================== --- ps/trunk/source/ps/Future.h +++ ps/trunk/source/ps/Future.h @@ -0,0 +1,326 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_FUTURE +#define INCLUDED_FUTURE + +#include "ps/FutureForward.h" + +#include +#include +#include +#include +#include + +template +class PackagedTask; + +namespace FutureSharedStateDetail +{ +enum class Status +{ + PENDING, + STARTED, + DONE, + CANCELED +}; + +template +class SharedStateResult +{ +public: + void ResetResult() + { + if (m_HasResult) + m_Result.m_Result.~ResultType(); + m_HasResult = false; + } + + union Result + { + std::aligned_storage_t m_Bytes; + ResultType m_Result; + Result() : m_Bytes() {}; + ~Result() {}; + }; + // We don't use Result directly so the result doesn't have to be default constructible. + Result m_Result; + bool m_HasResult = false; +}; + +// Don't have m_Result for void ReturnType +template<> +class SharedStateResult +{ +}; + +/** + * The shared state between futures and packaged state. + * Holds all relevant data. + */ +template +class SharedState : public SharedStateResult +{ + static constexpr bool VoidResult = std::is_same_v; +public: + SharedState(std::function&& func) : m_Func(func) {} + ~SharedState() + { + // For safety, wait on started task completion, but not on pending ones (auto-cancelled). + if (!Cancel()) + { + Wait(); + Cancel(); + } + if constexpr (!VoidResult) + SharedStateResult::ResetResult(); + } + + SharedState(const SharedState&) = delete; + SharedState(SharedState&&) = delete; + + bool IsDoneOrCanceled() const + { + return m_Status == Status::DONE || m_Status == Status::CANCELED; + } + + void Wait() + { + // Fast path: we're already done. + if (IsDoneOrCanceled()) + return; + // Slow path: we aren't done when we run the above check. Lock and wait until we are. + std::unique_lock lock(m_Mutex); + m_ConditionVariable.wait(lock, [this]() -> bool { return IsDoneOrCanceled(); }); + } + + /** + * If the task is pending, cancel it: the status becomes CANCELED and if the task was completed, the result is destroyed. + * @return true if the task was indeed cancelled, false otherwise (the task is running or already done). + */ + bool Cancel() + { + Status expected = Status::PENDING; + bool cancelled = m_Status.compare_exchange_strong(expected, Status::CANCELED); + // If we're done, invalidate, if we're pending, atomically cancel, otherwise fail. + if (cancelled || m_Status == Status::DONE) + { + if (m_Status == Status::DONE) + m_Status = Status::CANCELED; + if constexpr (!VoidResult) + SharedStateResult::ResetResult(); + m_ConditionVariable.notify_all(); + return cancelled; + } + return false; + } + + /** + * Move the result away from the shared state, mark the future invalid. + */ + template + std::enable_if_t, ResultType> GetResult() + { + // The caller must ensure that this is only called if we have a result. + ENSURE(SharedStateResult::m_HasResult); + m_Status = Status::CANCELED; + SharedStateResult::m_HasResult = false; + return std::move(SharedStateResult::m_Result.m_Result); + } + + std::atomic m_Status = Status::PENDING; + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; + + std::function m_Func; +}; + +} // namespace FutureSharedStateDetail + +/** + * Corresponds to std::future. + * Unlike std::future, Future can request the cancellation of the task that would produce the result. + * This makes it more similar to Java's CancellableTask or C#'s Task. + * The name Future was kept over Task so it would be more familiar to C++ users, + * but this all should be revised once Concurrency TS wraps up. + * + * Future is _not_ thread-safe. Call it from a single thread or ensure synchronization externally. + * + * The destructor is never blocking. The promise may still be running on destruction. + * TODO: + * - Handle exceptions. + */ +template +class Future +{ + template + friend class PackagedTask; + + static constexpr bool VoidResult = std::is_same_v; + + using Status = FutureSharedStateDetail::Status; + using SharedState = FutureSharedStateDetail::SharedState; +public: + Future() {}; + Future(const Future& o) = delete; + Future(Future&&) = default; + Future& operator=(Future&&) = default; + ~Future() {} + + /** + * Make the future wait for the result of @a func. + */ + template + PackagedTask Wrap(T&& func); + + /** + * Move the result out of the future, and invalidate the future. + * If the future is not complete, calls Wait(). + * If the future is canceled, asserts. + */ + template + std::enable_if_t, ResultType> Get() + { + ENSURE(!!m_SharedState); + + Wait(); + if constexpr (VoidResult) + return; + else + { + ENSURE(m_SharedState->m_Status != Status::CANCELED); + + // This mark the state invalid - can't call Get again. + return m_SharedState->GetResult(); + } + } + + /** + * @return true if the shared state is valid and has a result (i.e. Get can be called). + */ + bool IsReady() const + { + return !!m_SharedState && m_SharedState->m_Status == Status::DONE; + } + + /** + * @return true if the future has a shared state and it's not been invalidated, ie. pending, started or done. + */ + bool Valid() const + { + return !!m_SharedState && m_SharedState->m_Status != Status::CANCELED; + } + + void Wait() + { + if (Valid()) + m_SharedState->Wait(); + } + + /** + * Cancels the task, waiting if the task is currently started. + * Use this function over Cancel() if you need to ensure determinism (i.e. in the simulation). + * @see Cancel. + */ + void CancelOrWait() + { + if (!Valid()) + return; + if (!m_SharedState->Cancel()) + m_SharedState->Wait(); + m_SharedState.reset(); + } + + /** + * Cancels the task (without waiting). + * The result is always invalid, even if the task had completed before. + * Note that this cannot stop started tasks. + */ + void Cancel() + { + if (m_SharedState) + m_SharedState->Cancel(); + m_SharedState.reset(); + } +protected: + std::shared_ptr m_SharedState; +}; + +/** + * Corresponds somewhat to std::packaged_task. + * Like packaged_task, this holds a function acting as a promise. + * This type is mostly just the shared state and the call operator, + * handling the promise & continuation logic. + */ +template +class PackagedTask +{ + static constexpr bool VoidResult = std::is_same_v; +public: + PackagedTask() = delete; + PackagedTask(const std::shared_ptr::SharedState>& ss) : m_SharedState(ss) {} + + void operator()() + { + typename Future::Status expected = Future::Status::PENDING; + if (!m_SharedState->m_Status.compare_exchange_strong(expected, Future::Status::STARTED)) + return; + + if constexpr (VoidResult) + m_SharedState->m_Func(); + else + { + // To avoid UB, explicitly placement-new the value. + new (&m_SharedState->m_Result) ResultType{std::move(m_SharedState->m_Func())}; + m_SharedState->m_HasResult = true; + } + + // Because we might have threads waiting on us, we need to make sure that they either: + // - don't wait on our condition variable + // - receive the notification when we're done. + // This requires locking the mutex (@see Wait). + { + std::lock_guard lock(m_SharedState->m_Mutex); + m_SharedState->m_Status = Future::Status::DONE; + } + + m_SharedState->m_ConditionVariable.notify_all(); + + // We no longer need the shared state, drop it immediately. + m_SharedState.reset(); + } + + void Cancel() + { + m_SharedState->Cancel(); + m_SharedState.reset(); + } + +protected: + std::shared_ptr::SharedState> m_SharedState; +}; + +template +template +PackagedTask Future::Wrap(T&& func) +{ + static_assert(std::is_convertible_v, ResultType>, "The return type of the wrapped function cannot be converted to the type of the Future."); + m_SharedState = std::make_shared(std::move(func)); + return PackagedTask(m_SharedState); +} + +#endif // INCLUDED_FUTURE Index: ps/trunk/source/ps/FutureForward.h =================================================================== --- ps/trunk/source/ps/FutureForward.h +++ ps/trunk/source/ps/FutureForward.h @@ -0,0 +1,29 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_FUTURE_FORWARD +#define INCLUDED_FUTURE_FORWARD + +/** + * Lightweight header / forward declarations for Future. + * Include this in your header files where possible. + */ + +template +class Future; + +#endif // INCLUDED_FUTURE_FORWARD Index: ps/trunk/source/ps/TaskManager.h =================================================================== --- ps/trunk/source/ps/TaskManager.h +++ ps/trunk/source/ps/TaskManager.h @@ -0,0 +1,90 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_THREADING_TASKMANAGER +#define INCLUDED_THREADING_TASKMANAGER + +#include "ps/Future.h" + +#include +#include + +class TestTaskManager; +class CConfigDB; + +namespace Threading +{ +class TaskManager; +class WorkerThread; + +enum class TaskPriority +{ + NORMAL, + LOW +}; + +/** + * The task manager creates all worker threads on initialisation, + * and manages the task queues. + * See implementation for additional comments. + */ +class TaskManager +{ + friend class WorkerThread; +public: + TaskManager(); + ~TaskManager(); + TaskManager(const TaskManager&) = delete; + TaskManager(TaskManager&&) = delete; + TaskManager& operator=(const TaskManager&) = delete; + TaskManager& operator=(TaskManager&&) = delete; + + static void Initialise(); + static TaskManager& Instance(); + + /** + * Clears all tasks from the queue. This blocks on started tasks. + */ + void ClearQueue(); + + /** + * @return the number of threaded workers. + */ + size_t GetNumberOfWorkers() const; + + /** + * Push a task to be executed. + */ + template + Future> PushTask(T&& func, TaskPriority priority = TaskPriority::NORMAL) + { + Future> ret; + DoPushTask(std::move(ret.Wrap(std::move(func))), priority); + return ret; + } + +private: + TaskManager(size_t numberOfWorkers); + + void DoPushTask(std::function&& task, TaskPriority priority); + + class Impl; + std::unique_ptr m; +}; +} // namespace Threading + +#endif // INCLUDED_THREADING_TASKMANAGER Index: ps/trunk/source/ps/TaskManager.cpp =================================================================== --- ps/trunk/source/ps/TaskManager.cpp +++ ps/trunk/source/ps/TaskManager.cpp @@ -0,0 +1,310 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "precompiled.h" + +#include "TaskManager.h" + +#include "lib/debug.h" +#include "maths/MathUtil.h" +#include "ps/CLogger.h" +#include "ps/ConfigDB.h" +#include "ps/Threading.h" +#include "ps/ThreadUtil.h" +#include "ps/Profiler2.h" + +#include +#include +#include +#include +#include +#include + +namespace Threading +{ +/** + * Minimum number of TaskManager workers. + */ +static constexpr size_t MIN_THREADS = 3; + +/** + * Maximum number of TaskManager workers. + */ +static constexpr size_t MAX_THREADS = 32; + +std::unique_ptr g_TaskManager; + +class Thread; + +using QueueItem = std::function; +} + +/** + * Light wrapper around std::thread. Ensures Join has been called. + */ +class Threading::Thread +{ +public: + Thread() = default; + Thread(const Thread&) = delete; + Thread(Thread&&) = delete; + + template + void Start(T* object) + { + m_Thread = std::thread(Threading::HandleExceptions>::Wrapper, object); + } + template + static void DoStart(T* object); + +protected: + ~Thread() + { + ENSURE(!m_Thread.joinable()); + } + + std::thread m_Thread; + std::atomic m_Kill = false; +}; + +/** + * Worker thread: process the taskManager queues until killed. + */ +class Threading::WorkerThread : public Thread +{ +public: + WorkerThread(Threading::TaskManager::Impl& taskManager); + ~WorkerThread(); + + /** + * Wake the worker. + */ + void Wake(); + +protected: + void RunUntilDeath(); + + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; + + Threading::TaskManager::Impl& m_TaskManager; +}; + +/** + * PImpl-ed implementation of the Task manager. + * + * The normal priority queue is processed first, the low priority only if there are no higher-priority tasks + */ +class Threading::TaskManager::Impl +{ + friend class TaskManager; + friend class WorkerThread; +public: + Impl(TaskManager& backref); + ~Impl() + { + ClearQueue(); + m_Workers.clear(); + } + + /** + * 2-phase init to avoid having to think too hard about the order of class members. + */ + void SetupWorkers(size_t numberOfWorkers); + + /** + * Push a task on the global queue. + * Takes ownership of @a task. + * May be called from any thread. + */ + void PushTask(std::function&& task, TaskPriority priority); + +protected: + void ClearQueue(); + + template + bool PopTask(std::function& taskOut); + + // Back reference (keep this first). + TaskManager& m_TaskManager; + + std::atomic m_HasWork; + std::atomic m_HasLowPriorityWork; + std::mutex m_GlobalMutex; + std::mutex m_GlobalLowPriorityMutex; + std::deque m_GlobalQueue; + std::deque m_GlobalLowPriorityQueue; + + // Ideally this would be a vector, since it does get iterated, but that requires movable types. + std::deque m_Workers; + + // Round-robin counter for GetWorker. + mutable size_t m_RoundRobinIdx = 0; +}; + +Threading::TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1) +{ +} + +Threading::TaskManager::TaskManager(size_t numberOfWorkers) +{ + m = std::make_unique(*this); + numberOfWorkers = Clamp(numberOfWorkers, MIN_THREADS, MAX_THREADS); + m->SetupWorkers(numberOfWorkers); +} + +Threading::TaskManager::~TaskManager() {} + +Threading::TaskManager::Impl::Impl(TaskManager& backref) + : m_TaskManager(backref) +{ +} + +void Threading::TaskManager::Impl::SetupWorkers(size_t numberOfWorkers) +{ + for (size_t i = 0; i < numberOfWorkers; ++i) + m_Workers.emplace_back(*this); +} + +void Threading::TaskManager::ClearQueue() { m->ClearQueue(); } +void Threading::TaskManager::Impl::ClearQueue() +{ + { + std::lock_guard lock(m_GlobalMutex); + m_GlobalQueue.clear(); + } + { + std::lock_guard lock(m_GlobalLowPriorityMutex); + m_GlobalLowPriorityQueue.clear(); + } +} + +size_t Threading::TaskManager::GetNumberOfWorkers() const +{ + return m->m_Workers.size(); +} + +void Threading::TaskManager::DoPushTask(std::function&& task, TaskPriority priority) +{ + m->PushTask(std::move(task), priority); +} + +void Threading::TaskManager::Impl::PushTask(std::function&& task, TaskPriority priority) +{ + std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; + std::deque& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; + std::atomic& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork; + { + std::lock_guard lock(mutex); + queue.emplace_back(std::move(task)); + hasWork = true; + } + + for (WorkerThread& worker : m_Workers) + worker.Wake(); +} + +template +bool Threading::TaskManager::Impl::PopTask(std::function& taskOut) +{ + std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; + std::deque& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; + std::atomic& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork; + + // Particularly critical section since we're locking the global queue. + std::lock_guard globalLock(mutex); + if (!queue.empty()) + { + taskOut = std::move(queue.front()); + queue.pop_front(); + hasWork = !queue.empty(); + return true; + } + return false; +} + +void Threading::TaskManager::Initialise() +{ + if (!g_TaskManager) + g_TaskManager = std::make_unique(); +} + +Threading::TaskManager& Threading::TaskManager::Instance() +{ + ENSURE(g_TaskManager); + return *g_TaskManager; +} + +// Thread definition + +Threading::WorkerThread::WorkerThread(Threading::TaskManager::Impl& taskManager) + : m_TaskManager(taskManager) +{ + Start(this); +} + +Threading::WorkerThread::~WorkerThread() +{ + m_Kill = true; + m_ConditionVariable.notify_one(); + if (m_Thread.joinable()) + m_Thread.join(); +} + +void Threading::WorkerThread::Wake() +{ + m_ConditionVariable.notify_one(); +} + +void Threading::WorkerThread::RunUntilDeath() +{ + // The profiler does better if the names are unique. + static std::atomic n = 0; + std::string name = "Task Mgr #" + std::to_string(n++); + debug_SetThreadName(name.c_str()); + g_Profiler2.RegisterCurrentThread(name); + + + std::function task; + bool hasTask = false; + std::unique_lock lock(m_Mutex, std::defer_lock); + while (!m_Kill) + { + lock.lock(); + m_ConditionVariable.wait(lock, [this](){ + return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork; + }); + lock.unlock(); + + if (m_Kill) + break; + + // Fetch work from the global queues. + hasTask = m_TaskManager.PopTask(task); + if (!hasTask) + hasTask = m_TaskManager.PopTask(task); + if (hasTask) + task(); + } +} + +// Defined here - needs access to derived types. +template +void Threading::Thread::DoStart(T* object) +{ + std::invoke(callable, object); +} Index: ps/trunk/source/ps/tests/test_Future.h =================================================================== --- ps/trunk/source/ps/tests/test_Future.h +++ ps/trunk/source/ps/tests/test_Future.h @@ -0,0 +1,136 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/Future.h" + +#include +#include + +class TestFuture : public CxxTest::TestSuite +{ +public: + void test_future_basic() + { + int counter = 0; + { + Future noret; + std::function task = noret.Wrap([&counter]() mutable { counter++; }); + task(); + TS_ASSERT_EQUALS(counter, 1); + } + + { + Future noret; + { + std::function task = noret.Wrap([&counter]() mutable { counter++; }); + // Auto-cancels the task. + } + } + TS_ASSERT_EQUALS(counter, 1); + } + + void test_future_return() + { + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + // Convertible type. + { + Future future; + std::function task = future.Wrap([]() -> u8 { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get(), 1); + } + + static int destroyed = 0; + // No trivial constructor or destructor. Also not copiable. + struct NonDef + { + NonDef() = delete; + NonDef(int input) : value(input) {}; + NonDef(const NonDef&) = delete; + NonDef(NonDef&& o) + { + value = o.value; + o.value = 0; + } + ~NonDef() { if (value != 0) destroyed++; } + int value = 0; + }; + TS_ASSERT_EQUALS(destroyed, 0); + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + task(); + TS_ASSERT_EQUALS(future.Get().value, 1); + } + TS_ASSERT_EQUALS(destroyed, 1); + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + } + TS_ASSERT_EQUALS(destroyed, 1); + /** + * TODO: find a way to test this + { + Future future; + std::function task = future.Wrap([]() { return 1; }); + future.Cancel(); + future.Wait(); + TS_ASSERT_THROWS(future.Get(), const Future::BadFutureAccess&); + } + */ + TS_ASSERT_EQUALS(destroyed, 1); + } + + void test_future_moving() + { + Future future; + std::function function; + + // Set things up so all temporaries passed into the futures will be reset to obviously invalid memory. + std::aligned_storage_t), alignof(Future)> futureStorage; + std::aligned_storage_t), alignof(std::function)> functionStorage; + Future* f = &future; // CppCheck doesn't read placement new correctly, do this to silence errors. + std::function* c = &function; + f = new (&futureStorage) Future{}; + c = new (&functionStorage) std::function{}; + + *c = []() { return 7; }; + std::function task = f->Wrap(std::move(*c)); + + future = std::move(*f); + function = std::move(*c); + + // Destroy and clear the memory + f->~Future(); + c->~function(); + memset(&futureStorage, 0xFF, sizeof(decltype(futureStorage))); + memset(&functionStorage, 0xFF, sizeof(decltype(functionStorage))); + + // Let's move the packaged task while at it. + std::function task2 = std::move(task); + task2(); + TS_ASSERT_EQUALS(future.Get(), 7); + } +}; Index: ps/trunk/source/ps/tests/test_TaskManager.h =================================================================== --- ps/trunk/source/ps/tests/test_TaskManager.h +++ ps/trunk/source/ps/tests/test_TaskManager.h @@ -0,0 +1,118 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/Future.h" +#include "ps/TaskManager.h" + +#include +#include +#include + +class TestTaskManager : public CxxTest::TestSuite +{ +public: + void test_basic() + { + Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); + // There is a minimum of 3. + TS_ASSERT(taskManager.GetNumberOfWorkers() >= 3); + + std::atomic tasks_run = 0; + auto increment_run = [&tasks_run]() { tasks_run++; }; + Future future = taskManager.PushTask(increment_run); + future.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + + // Test Execute. + std::condition_variable cv; + std::mutex mutex; + std::atomic go = false; + future = taskManager.PushTask([&]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() -> bool { return go; }); + lock.unlock(); + increment_run(); + lock.lock(); + go = false; + lock.unlock(); + cv.notify_all(); + }); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + std::unique_lock lock(mutex); + go = true; + lock.unlock(); + cv.notify_all(); + lock.lock(); + cv.wait(lock, [&go]() -> bool { return !go; }); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + // Wait on the future before the mutex/cv go out of scope. + future.Wait(); + } + + void test_Priority() + { + Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); + std::atomic tasks_run = 0; + // Push general tasks + auto increment_run = [&tasks_run]() { tasks_run++; }; + Future future = taskManager.PushTask(increment_run); + Future futureLow = taskManager.PushTask(increment_run, Threading::TaskPriority::LOW); + future.Wait(); + futureLow.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + // Also check with no waiting expected. + taskManager.PushTask(increment_run).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 3); + taskManager.PushTask(increment_run, Threading::TaskPriority::LOW).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 4); + } + + void test_Load() + { + Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); + +#define ITERATIONS 100000 + std::vector> futures; + futures.resize(ITERATIONS); + std::vector values(ITERATIONS); + + auto f1 = taskManager.PushTask([&taskManager, &futures]() { + for (u32 i = 0; i < ITERATIONS; i+=3) + futures[i] = taskManager.PushTask([]() { return 5; }); + }); + + auto f2 = taskManager.PushTask([&taskManager, &futures]() { + for (u32 i = 1; i < ITERATIONS; i+=3) + futures[i] = taskManager.PushTask([]() { return 5; }, Threading::TaskPriority::LOW); + }); + + auto f3 = taskManager.PushTask([&taskManager, &futures]() { + for (u32 i = 2; i < ITERATIONS; i+=3) + futures[i] = taskManager.PushTask([]() { return 5; }); + }); + + f1.Wait(); + f2.Wait(); + f3.Wait(); + + for (size_t i = 0; i < ITERATIONS; ++i) + TS_ASSERT_EQUALS(futures[i].Get(), 5); +#undef ITERATIONS + } +}; Index: ps/trunk/source/test_setup.cpp =================================================================== --- ps/trunk/source/test_setup.cpp +++ ps/trunk/source/test_setup.cpp @@ -36,6 +36,7 @@ #include "lib/timer.h" #include "lib/sysdep/sysdep.h" #include "ps/Profiler2.h" +#include "ps/TaskManager.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptEngine.h" #include "scriptinterface/ScriptContext.h" @@ -84,11 +85,14 @@ m_ScriptEngine = new ScriptEngine; g_ScriptContext = ScriptContext::CreateContext(); + Threading::TaskManager::Initialise(); + return true; } virtual bool tearDownWorld() { + Threading::TaskManager::Instance().ClearQueue(); g_ScriptContext.reset(); SAFE_DELETE(m_ScriptEngine); g_Profiler2.Shutdown();