Index: binaries/data/config/default.cfg =================================================================== --- binaries/data/config/default.cfg +++ binaries/data/config/default.cfg @@ -41,6 +41,9 @@ ; Default server name or IP to use in multiplayer multiplayerserver = "127.0.0.1" +; # of threads for the thread pool. -1 means "pick automatically", 0-64 start a number of auxiliary threads. +thread_pool_threads = -1; + ; Force a particular resolution. (If these are 0, the default is ; to keep the current desktop resolution in fullscreen mode or to ; use 1024x768 in windowed mode.) Index: binaries/data/mods/public/gui/session/developer_overlay/DeveloperOverlayCheckboxes.js =================================================================== --- binaries/data/mods/public/gui/session/developer_overlay/DeveloperOverlayCheckboxes.js +++ binaries/data/mods/public/gui/session/developer_overlay/DeveloperOverlayCheckboxes.js @@ -262,6 +262,27 @@ } }; +DeveloperOverlayCheckboxes.prototype.DisableThreadPool = class +{ + label() + { + return translate("Disable Thread Pool"); + } + + onPress(checked) + { + if (checked) + Engine.ConfigDB_CreateValue("system", "thread_pool_threads", 0); + else + Engine.ConfigDB_CreateValue("system", "thread_pool_threads", -1); + } + + checked() + { + return +Engine.ConfigDB_GetValue("system", "thread_pool_threads") == 0; + } +}; + DeveloperOverlayCheckboxes.prototype.EnableCulling = class { label() Index: source/graphics/MapGenerator.h =================================================================== --- source/graphics/MapGenerator.h +++ source/graphics/MapGenerator.h @@ -21,6 +21,7 @@ #include "lib/posix/posix_pthread.h" #include "ps/FileIo.h" #include "ps/TemplateLoader.h" +#include "ps/ThreadPool.h" #include "scriptinterface/ScriptInterface.h" #include @@ -28,7 +29,6 @@ #include #include #include -#include class CMapGeneratorWorker; @@ -231,7 +231,7 @@ /** * Holds the mapgeneration thread identifier. */ - std::thread m_WorkerThread; + CancellableFuture m_WorkerThread; /** * Avoids thread synchronization issues. Index: source/graphics/MapGenerator.cpp =================================================================== --- source/graphics/MapGenerator.cpp +++ source/graphics/MapGenerator.cpp @@ -30,7 +30,7 @@ #include "ps/CLogger.h" #include "ps/FileIo.h" #include "ps/Profile.h" -#include "ps/Threading.h" +#include "ps/ThreadPool.h" #include "ps/scripting/JSInterface_VFS.h" #include "scriptinterface/FunctionWrapper.h" #include "scriptinterface/ScriptContext.h" @@ -68,9 +68,8 @@ CMapGeneratorWorker::~CMapGeneratorWorker() { - // Wait for thread to end - if (m_WorkerThread.joinable()) - m_WorkerThread.join(); + // Cancel/wait for thread to end. + m_WorkerThread.Cancel(); } void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::string& settings) @@ -83,13 +82,12 @@ m_Settings = settings; // Launch the worker thread - m_WorkerThread = std::thread(Threading::HandleExceptions::Wrapper, this); + m_WorkerThread = g_ThreadPool.GetWorker(true).Submit([this]() { RunThread(this); }); } void CMapGeneratorWorker::RunThread(CMapGeneratorWorker* self) { - debug_SetThreadName("MapGenerator"); - g_Profiler2.RegisterCurrentThread("MapGenerator"); + PROFILE2("Map Generation"); shared_ptr mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE); Index: source/main.cpp =================================================================== --- source/main.cpp +++ source/main.cpp @@ -58,6 +58,7 @@ #include "ps/UserReport.h" #include "ps/Util.h" #include "ps/VideoMode.h" +#include "ps/ThreadPool.h" #include "ps/World.h" #include "ps/GameSetup/GameSetup.h" #include "ps/GameSetup/Atlas.h" @@ -576,6 +577,9 @@ ScriptEngine scriptEngine; CXeromyces::Startup(); + // Initialise the thread pool threads at this point - we have Profiler2, JS and ConfigDB set up. + g_ThreadPool.InitThreads(); + if (ATLAS_RunIfOnCmdLine(args, false)) { CXeromyces::Terminate(); @@ -710,6 +714,7 @@ ATLAS_RunIfOnCmdLine(args, true); #endif + g_ThreadPool.Reset(); CXeromyces::Terminate(); } Index: source/ps/GameSetup/GameSetup.cpp =================================================================== --- source/ps/GameSetup/GameSetup.cpp +++ source/ps/GameSetup/GameSetup.cpp @@ -68,6 +68,7 @@ #include "ps/Profiler2.h" #include "ps/Pyrogenesis.h" // psSetLogDir #include "ps/scripting/JSInterface_Console.h" +#include "ps/ThreadPool.h" #include "ps/TouchInput.h" #include "ps/UserReport.h" #include "ps/Util.h" @@ -890,6 +891,9 @@ // g_ConfigDB, command line args, globals CONFIG_Init(args); + // Update the thread pool now that config is started. + g_ThreadPool.ReadConfig(); + // Using a global object for the context is a workaround until Simulation and AI use // their own threads and also their own contexts. const int contextSize = 384 * 1024 * 1024; Index: source/ps/ThreadPool.h =================================================================== --- /dev/null +++ source/ps/ThreadPool.h @@ -0,0 +1,169 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_THREADPOOL +#define INCLUDED_THREADPOOL + +#include +#include + +class TestThreadPool; +class ThreadPoolTask; + +namespace ThreadPool +{ + class Pool; + class Thread; + class ThreadExecutor; +} + +class CancellableFuture +{ + friend class ThreadPool::ThreadExecutor; +public: + CancellableFuture() = default; + CancellableFuture(const std::shared_ptr& t) : m_Task(t) {}; + + /** + * Cancel the execution or wait on its end. + */ + void Cancel(); + + /** + * Wait on the task to be done before returning. + */ + void Wait(); +protected: + std::shared_ptr m_Task; + // For temporary threads, this extends the lifetime so the client code + // can just store a reference to the future and not the thread executor. + std::shared_ptr m_Thread = nullptr; +}; + +namespace ThreadPool +{ +class PoolExecutor +{ +public: + PoolExecutor(Pool& p) : m_Pool(p) {} + + void Execute(std::function&& task); + CancellableFuture Submit(std::function&& task); + +protected: + Pool& m_Pool; +}; + +class ThreadExecutor +{ + friend class Pool; + friend class EachThreadExecutor; +public: + void Execute(std::function&& task); + CancellableFuture Submit(std::function&& task); + + ThreadExecutor(ThreadExecutor&&); + ~ThreadExecutor(); +protected: + // Reference constructor. + ThreadExecutor(Thread& thread); + // Owning constructor + struct CreateThread {}; + ThreadExecutor(CreateThread); + ThreadExecutor(const ThreadExecutor&) = delete; + + union MaybeOwnedThread + { + MaybeOwnedThread() : ptr(nullptr) {}; + ~MaybeOwnedThread() {}; + Thread* ptr; + std::shared_ptr ownedPtr; + } m_Thread; + bool m_Owns; +}; + +class EachThreadExecutor +{ +public: + EachThreadExecutor(std::vector& e) : m_Executors(e) {} + + std::vector::iterator begin() { return m_Executors.begin(); } + std::vector::iterator end() { return m_Executors.end(); } + +private: + std::vector& m_Executors; +}; + +class Pool +{ + friend class ::TestThreadPool; + friend class PoolExecutor; + friend class Thread; +public: + Pool(); + Pool(const Pool&) = delete; + Pool(Pool&&) = delete; + ~Pool(); + + /** + * Create worker threads. + */ + void InitThreads(); + + /** + * The thread pool's threads are initialised before ConfigDB. This is called when the latter is. + */ + void ReadConfig(); + +public: + void Reset(); + + /** + * The Pool executor assigns work to the pool, not a specific worker. + * Tasks are not guaranteed to be run in a separate thread. + * Prefer this executor if you will have work to do asynchronously, + * but starting it immediately is not required. + */ + PoolExecutor GetExecutor(); + + /** + * The thread executor assigns work to a specific worker. + * The underlying thread is guaranteed to live at least as long as the executor, + * unless the thread pool is reset. + * @param mustBeAsync - if true, a temporary thread may be started. + */ + ThreadExecutor GetWorker(bool mustBeAsync = false); + + /** + * Returns an executor that can be used to start (optionally different) work on (optionally all) threads. + * Lifetime guarantee: until Reset() is called. + */ + EachThreadExecutor GetAllWorkers(); + + size_t GetNbOfThreads(); + +protected: + void InitThreads(size_t n); + + class Impl; + std::unique_ptr m; +}; +} // ThreadPool + +extern ThreadPool::Pool g_ThreadPool; + +#endif // INCLUDED_THREADPOOL Index: source/ps/ThreadPool.cpp =================================================================== --- /dev/null +++ source/ps/ThreadPool.cpp @@ -0,0 +1,450 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "precompiled.h" + +#include "ThreadPool.h" + +#include "lib/debug.h" +#include "maths/MathUtil.h" +#include "ps/ConfigDB.h" +#include "ps/Threading.h" +#include "ps/ThreadUtil.h" +#include "ps/Profiler2.h" + +#include +#include +#include +#include +#include +#include + +#define FORCE_SINGLE_THREAD 0 + +ThreadPool::Pool g_ThreadPool; + +class ThreadPoolTask +{ +public: + ThreadPoolTask(std::function&& func) : m_Func(func) {} + ThreadPoolTask(ThreadPoolTask&&) = delete; + ThreadPoolTask(const ThreadPoolTask&) = delete; + ~ThreadPoolTask() + { + // Wait on started task completion, but not on pending ones (auto-cancelled). + if (m_State != STARTED) + return; + Wait(); + } + + void operator()() + { + m_Func(); + m_State = DONE; + } + + void Wait() + { + // Fast path: we're already done. + if (m_State == DONE) + return; + // Slow path: we aren't done when we run the above check. Lock and wait until we are. + std::unique_lock lock(m_Mutex); + m_ConditionVariable.wait(lock, [this]() -> bool { return m_State == DONE; }); + } + + /** + * @return true if the task was indeed started. False if already started or already done. + */ + bool Start() + { + ThreadPoolTask::State expected = ThreadPoolTask::PENDING; + return m_State.compare_exchange_strong(expected, ThreadPoolTask::STARTED); + } + + void Done() + { + // Because we might have threads waiting on us, we need to make sure that they either: + // - don't wait on our condition variable + // - receive the notification when we're done. + // This requires locking the mutex (@see Wait). + { + std::unique_lock lock(m_Mutex); + m_State = DONE; + } + m_ConditionVariable.notify_all(); + } + + void Cancel() + { + State expected = PENDING; + // If we're pending, try atomically setting to done. + if (m_State.compare_exchange_strong(expected, DONE)) + return; + // That failed - we'll need to wait until we're done. + Wait(); + } + + enum State + { + PENDING, + STARTED, + DONE + }; +private: + std::function m_Func; + std::atomic m_State = PENDING; + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; +}; + +void CancellableFuture::Cancel() +{ + if (m_Task) + m_Task->Cancel(); +} + +void CancellableFuture::Wait() +{ + if (m_Task) + m_Task->Wait(); +} + +/** + * Wraps an std::thread, a work queue and some convenience variables. + * Definitions are provided below. + */ +class ThreadPool::Thread +{ + friend Pool; +public: + Thread(bool threaded = true, ThreadPool::Pool::Impl* pool = nullptr); + ~Thread(); + Thread(const Thread&) = delete; + Thread(Thread&&) = delete; + + // Called from main thread. + void PushTask(const std::shared_ptr& task); +protected: + static void Start(Thread* self); + void RunUntilDeath(); + + // May be called from any thread. + void ExecuteTask(const std::shared_ptr& task); + + std::deque> m_Queue; + std::atomic m_Kill = false; + + // If true, we own an actual thread - otherwise this is a 'fake' thread and must run synchronously. + bool m_Threaded = true; + // If non-null, the thread tries to pop work from this pool's global queue. + ThreadPool::Pool::Impl* m_Pool = nullptr; + + std::thread m_Thread; + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; +}; + +/** + * PImpl-ed implementation of the thread pool. + */ +class ThreadPool::Pool::Impl +{ +public: + static void PushTask(Pool& pool, const std::shared_ptr& task); + + std::mutex m_GlobalQueueMutex; + std::deque> m_GlobalQueue; + + std::unique_ptr m_FakeThread; + // TODO: when vector supports nonmovable types, use that instead. + std::list m_Threads; + std::vector m_Executors; + // Since the threads can be executed on individually, it seems more efficient to have on for each. + std::vector m_ConditionVariables; +}; + +ThreadPool::Pool::Pool() : m(std::make_unique()) +{ + Reset(); +} + +ThreadPool::Pool::~Pool() +{ +} + +void ThreadPool::Pool::InitThreads() +{ +#if FORCE_SINGLE_THREAD + InitThreads(0); +#else + InitThreads(std::thread::hardware_concurrency() - 1); +#endif +} + +void ThreadPool::Pool::InitThreads(size_t n) +{ + debug_printf("ThreadPool: setting up %li threads\n", n); + ENSURE(m->m_FakeThread && m->m_Threads.empty()); + // If we don't have/want/need actual threads, just keep using the fake one we created on init. + if (n < 1) + return; + m->m_FakeThread.reset(); + m->m_Threads.clear(); + m->m_Executors.clear(); + m->m_ConditionVariables.clear(); + for (size_t i = 0; i < n; ++i) + { + Thread& thread = m->m_Threads.emplace_back(true, m.get()); + m->m_Executors.emplace_back(ThreadExecutor(thread)); + m->m_ConditionVariables.emplace_back(&thread.m_ConditionVariable); + m->m_Executors.back().Execute([]() { + // The profiler does better if the names are unique. + static std::atomic n = 0; + std::string name = "ThreadPool #" + std::to_string(n++); + debug_SetThreadName(name.c_str()); + g_Profiler2.RegisterCurrentThread(name); + }); + } +} + +void ThreadPool::Pool::ReadConfig() +{ + g_ConfigDB.RegisterHookAndCall("thread_pool_threads", [this]() { + int n_threads = -1; + CFG_GET_VAL("thread_pool_threads", n_threads); + if (n_threads == -1) + n_threads = std::thread::hardware_concurrency() - 1; + else + n_threads = Clamp(n_threads, 0, 64); + if (n_threads != static_cast(m->m_Threads.size())) + { + // This is a bit wasteful but that shouldn't matter. + Reset(); + InitThreads(n_threads); + } + }); +} + +void ThreadPool::Pool::Reset() +{ + m->m_Threads.clear(); + m->m_Executors.clear(); + m->m_ConditionVariables.clear(); + // Create a 'fake' thread that runs on the main thread + // so client code doesn't have to care too much about the state of the pool. + m->m_FakeThread = std::make_unique(false, m.get()); + m->m_Executors.emplace_back(ThreadExecutor(*m->m_FakeThread)); + m->m_ConditionVariables.emplace_back(&m->m_FakeThread->m_ConditionVariable); +} + +size_t ThreadPool::Pool::GetNbOfThreads() +{ + return m->m_FakeThread ? 1 : m->m_Threads.size(); +} + +ThreadPool::PoolExecutor ThreadPool::Pool::GetExecutor() +{ + return PoolExecutor(*this); +} + +ThreadPool::EachThreadExecutor ThreadPool::Pool::GetAllWorkers() +{ + return m->m_Executors; +} + +ThreadPool::ThreadExecutor ThreadPool::Pool::GetWorker(bool mustBeAsync) +{ + if (m->m_FakeThread) + { + if (mustBeAsync) + // Create a new thread, its lifetime tied to the ThreadExecutor. + return ThreadExecutor(ThreadExecutor::CreateThread{}); + else + return *m->m_FakeThread; + } + // Return an appropriate thread. + // TODO: implement this. + ENSURE(!m->m_Threads.empty()); + return m->m_Threads.front(); +} + +// There is a starvation risk here if the main thread pushes too many tasks. +void ThreadPool::Pool::Impl::PushTask(Pool& pool, const std::shared_ptr& task) +{ + { + std::lock_guard lock(pool.m->m_GlobalQueueMutex); + pool.m->m_GlobalQueue.push_back(task); + } + for (std::condition_variable* var : pool.m->m_ConditionVariables) + // So far as I can tell, the performance of notify_one and notify_all will be basically equivalent + // if only one thread is waiting (and that's the case here). + // _all seems to fit my semantics better, so I use that. + var->notify_all(); +} + +// Thread definition + +ThreadPool::Thread::Thread(bool threaded, ThreadPool::Pool::Impl* pool) : m_Threaded(threaded), m_Pool(pool) +{ + if (threaded) + m_Thread = std::thread(Threading::HandleExceptions::Wrapper, this); +} + +ThreadPool::Thread::~Thread() +{ + m_Kill = true; + { + std::unique_lock lock(m_Mutex); + m_Queue.clear(); + } + if (m_Thread.joinable()) + { + m_ConditionVariable.notify_all(); + m_Thread.join(); + } +} + +void ThreadPool::Thread::PushTask(const std::shared_ptr& task) +{ + if (!m_Threaded) + ExecuteTask(task); + else + { + { + std::unique_lock lock(m_Mutex); + m_Queue.push_back(task); + } + m_ConditionVariable.notify_all(); + } +} + +void ThreadPool::Thread::Start(Thread* self) +{ + self->RunUntilDeath(); +} + +void ThreadPool::Thread::RunUntilDeath() +{ + std::unique_lock lock(m_Mutex, std::defer_lock); + while (!m_Kill) + { + if (!lock.owns_lock()) + lock.lock(); + m_ConditionVariable.wait(lock, [this]() -> bool { + if (!m_Queue.empty() || m_Kill) + return true; + if (!m_Pool) + return false; + // TODO: should we try to pop more? Is the lock contention fine? + std::lock_guard lock(m_Pool->m_GlobalQueueMutex); + return !m_Pool->m_GlobalQueue.empty(); + }); + if (!m_Kill && !m_Queue.empty()) + { + std::shared_ptr task = std::move(m_Queue.front()); + m_Queue.pop_front(); + lock.unlock(); + ExecuteTask(task); + } + if (!m_Kill && m_Pool) + { + // Check the global pool queue. + // TODO: should we try to pop more? Is the lock contention fine? + std::lock_guard lock(m_Pool->m_GlobalQueueMutex); + if (!m_Pool->m_GlobalQueue.empty()) + { + m_Queue.push_back(std::move(m_Pool->m_GlobalQueue.front())); + m_Pool->m_GlobalQueue.pop_front(); + } + } + } +} + +void ThreadPool::Thread::ExecuteTask(const std::shared_ptr& task) +{ + // Skips cancelled tasks atomically. + if (task->Start()) + { + (*task)(); + task->Done(); + } +} + + +// Executor implementation below + +ThreadPool::ThreadExecutor::ThreadExecutor(Thread& thread) +{ + m_Owns = false; + m_Thread.ptr = &thread; +} + +ThreadPool::ThreadExecutor::ThreadExecutor(CreateThread) +{ + m_Owns = true; + m_Thread.ownedPtr = std::make_unique(); +} + +ThreadPool::ThreadExecutor::ThreadExecutor(ThreadExecutor&& o) +{ + m_Owns = o.m_Owns; + if (m_Owns) + m_Thread.ownedPtr = std::move(o.m_Thread.ownedPtr); + else + m_Thread.ptr = o.m_Thread.ptr; + o.m_Thread.ptr = nullptr; + o.m_Owns = false; +} + +ThreadPool::ThreadExecutor::~ThreadExecutor() +{ + if (m_Owns) + m_Thread.ownedPtr.~shared_ptr(); +} + +void ThreadPool::ThreadExecutor::Execute(std::function&& task) +{ + Thread& thread = m_Owns ? *m_Thread.ownedPtr : *m_Thread.ptr; + thread.PushTask(std::make_shared(std::move(task))); +} + +CancellableFuture ThreadPool::ThreadExecutor::Submit(std::function&& task) +{ + std::shared_ptr threadTask = std::make_shared(std::move(task)); + CancellableFuture ret(threadTask); + Thread& thread = m_Owns ? *m_Thread.ownedPtr : *m_Thread.ptr; + thread.PushTask(threadTask); + + // Extend the lifetime of the thread. + if (m_Owns) + ret.m_Thread = m_Thread.ownedPtr; + return ret; +} + +void ThreadPool::PoolExecutor::Execute(std::function&& task) +{ + Pool::Impl::PushTask(m_Pool, std::make_shared(std::move(task))); +} + +CancellableFuture ThreadPool::PoolExecutor::Submit(std::function&& task) +{ + std::shared_ptr threadTask = std::make_shared(std::move(task)); + CancellableFuture ret(threadTask); + Pool::Impl::PushTask(m_Pool, threadTask); + return ret; +} Index: source/ps/tests/test_ThreadPool.h =================================================================== --- /dev/null +++ source/ps/tests/test_ThreadPool.h @@ -0,0 +1,88 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#include "lib/self_test.h" + +#include "ps/ThreadPool.h" + +#include +#include +#include + +class TestThreadPool : public CxxTest::TestSuite +{ +public: + void test_basic() + { + ThreadPool::Pool pool; + std::atomic tasks_run = 0; + auto increment_run = [&tasks_run]() { tasks_run++; }; + // Task is run on the synchronous thread. + pool.GetWorker().Execute(increment_run); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + + // Force one actual thread. + pool.InitThreads(1); + CancellableFuture future = pool.GetWorker().Submit(increment_run); + future.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + + // Test Execute. + std::atomic go = false; + future = pool.GetWorker().Submit([&go, &increment_run]() { + while (!go) + std::this_thread::yield(); + increment_run(); + go = false; + }); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + go = true; + while (go) + std::this_thread::yield(); + TS_ASSERT_EQUALS(tasks_run.load(), 3); + + pool.Reset(); + pool.GetWorker().Execute(increment_run); + TS_ASSERT_EQUALS(tasks_run.load(), 4); + } + + void test_PoolExecutor() + { + ThreadPool::Pool pool; + pool.InitThreads(1); + // Push a blocked task on the worker so we can actually test the global queue. + std::condition_variable cv; + std::mutex mutex; + bool go = false; + pool.GetAllWorkers().begin()->Execute([&cv, &mutex, &go]() { + std::unique_lock lock(mutex); + cv.wait(lock, [&go]() { return go; }); + }); + // Then push a general task. + std::atomic tasks_run = 0; + auto increment_run = [&tasks_run]() { tasks_run++; }; + CancellableFuture future = pool.GetExecutor().Submit(increment_run); + go = true; + cv.notify_all(); + future.Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 1); + // Also check with no waiting expected. + pool.GetExecutor().Submit(increment_run).Wait(); + TS_ASSERT_EQUALS(tasks_run.load(), 2); + } + +};