Changeset View
Standalone View
source/ps/ThreadPool.cpp
- This file was added.
/* Copyright (C) 2021 Wildfire Games. | ||||||||||||||||||||
* This file is part of 0 A.D. | ||||||||||||||||||||
* | ||||||||||||||||||||
* 0 A.D. is free software: you can redistribute it and/or modify | ||||||||||||||||||||
* it under the terms of the GNU General Public License as published by | ||||||||||||||||||||
* the Free Software Foundation, either version 2 of the License, or | ||||||||||||||||||||
* (at your option) any later version. | ||||||||||||||||||||
* | ||||||||||||||||||||
* 0 A.D. is distributed in the hope that it will be useful, | ||||||||||||||||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||||||||||||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||||||||||||||||||
* GNU General Public License for more details. | ||||||||||||||||||||
* | ||||||||||||||||||||
* You should have received a copy of the GNU General Public License | ||||||||||||||||||||
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
#include "precompiled.h" | ||||||||||||||||||||
#include "ThreadPool.h" | ||||||||||||||||||||
#include "lib/debug.h" | ||||||||||||||||||||
#include "maths/MathUtil.h" | ||||||||||||||||||||
#include "ps/CLogger.h" | ||||||||||||||||||||
#include "ps/ConfigDB.h" | ||||||||||||||||||||
#include "ps/Threading.h" | ||||||||||||||||||||
#include "ps/ThreadUtil.h" | ||||||||||||||||||||
#include "ps/Profiler2.h" | ||||||||||||||||||||
#include <condition_variable> | ||||||||||||||||||||
StanUnsubmitted Not Done Inline Actions
Stan: | ||||||||||||||||||||
#include <deque> | ||||||||||||||||||||
#include <functional> | ||||||||||||||||||||
#include <memory> | ||||||||||||||||||||
#include <mutex> | ||||||||||||||||||||
#include <list> | ||||||||||||||||||||
Not Done Inline Actions
Stan: | ||||||||||||||||||||
#include <thread> | ||||||||||||||||||||
Not Done Inline ActionsShouldn't that be detected at runtime when host has one core?. Constexpr? Stan: Shouldn't that be detected at runtime when host has one core?. Constexpr? | ||||||||||||||||||||
Done Inline ActionsThis is intended for debugging only, I might drop it in the final diff. wraitii: This is intended for debugging only, I might drop it in the final diff. | ||||||||||||||||||||
Not Done Inline ActionsStill there :P Stan: Still there :P | ||||||||||||||||||||
namespace ThreadPool | ||||||||||||||||||||
{ | ||||||||||||||||||||
/** | ||||||||||||||||||||
* Minimum number of thread pool workers. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
static constexpr size_t MIN_THREADS = 3; | ||||||||||||||||||||
/** | ||||||||||||||||||||
* Maximum number of thread pool workers. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
static constexpr size_t MAX_THREADS = 32; | ||||||||||||||||||||
/** | ||||||||||||||||||||
* Returns the # of 'reversed priority' workers to use. | ||||||||||||||||||||
* See comment on avoiding starvation in TaskManager. | ||||||||||||||||||||
* At least one such worker will always exist. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
constexpr size_t GetNbOfReversedPriorityWorker(size_t numberOfWorkers) | ||||||||||||||||||||
Not Done Inline ActionsIt's not IThread, it's ThreadOwner or ThreadHolder according to its responsibility. vladislavbelov: It's not `IThread`, it's `ThreadOwner` or `ThreadHolder` according to its responsibility. | ||||||||||||||||||||
{ | ||||||||||||||||||||
size_t ret = numberOfWorkers * 0.25; | ||||||||||||||||||||
return ret > 1 ? ret : 1; | ||||||||||||||||||||
Not Done Inline Actionsenum class? Stan: enum class? | ||||||||||||||||||||
Done Inline ActionsAnnoying for bitfields. wraitii: Annoying for bitfields. | ||||||||||||||||||||
Not Done Inline ActionsCould be clamp? Stan: Could be clamp? | ||||||||||||||||||||
} | ||||||||||||||||||||
std::unique_ptr<TaskManager> g_TaskManager; | ||||||||||||||||||||
class Thread; | ||||||||||||||||||||
} | ||||||||||||||||||||
using QueueItem = std::function<void()>; | ||||||||||||||||||||
/** | ||||||||||||||||||||
* Light wrapper around std::thread. Ensures Join has been called. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
class ThreadPool::Thread | ||||||||||||||||||||
Done Inline ActionsI think the predicate version actually checks the predicate before waiting, making this a bit redundant - need to check. wraitii: I think the predicate version actually checks the predicate before waiting, making this a bit… | ||||||||||||||||||||
Not Done Inline Actionsvirtual see msvc output. Stan: virtual see msvc output. | ||||||||||||||||||||
Done Inline ActionsMsvc is wrong here and it annoys me :P I think I'll just hide the warning. wraitii: Msvc is wrong here and it annoys me :P
I think I'll just hide the warning. | ||||||||||||||||||||
Done Inline ActionsCare to explain for me? :) Stan: Care to explain for me? :) | ||||||||||||||||||||
Done Inline ActionsSince the destructor is protected, it doesn't need to be virtual - you won't ever call the destructor of a pointer-to-the-base-class-polymorphically-pointing-to-a-derived-class, which is why virtual would be needed (so it's actually the derived class' destructor that's called). See https://docs.microsoft.com/en-us/cpp/code-quality/c26436?view=msvc-160 I suspect VS2019 wouldn't warn, like gcc and clang. wraitii: Since the destructor is protected, it doesn't need to be virtual - you won't ever call the… | ||||||||||||||||||||
{ | ||||||||||||||||||||
public: | ||||||||||||||||||||
Thread() = default; | ||||||||||||||||||||
Thread(const Thread&) = delete; | ||||||||||||||||||||
Thread(Thread&&) = delete; | ||||||||||||||||||||
template<typename T, void(T::* callable)()> | ||||||||||||||||||||
void Start(T* object) | ||||||||||||||||||||
{ | ||||||||||||||||||||
m_Thread = std::thread(Threading::HandleExceptions<DoStart<T, callable>>::Wrapper, object); | ||||||||||||||||||||
} | ||||||||||||||||||||
template<typename T, void(T::* callable)()> | ||||||||||||||||||||
static void DoStart(T* object); | ||||||||||||||||||||
protected: | ||||||||||||||||||||
~Thread() | ||||||||||||||||||||
{ | ||||||||||||||||||||
ENSURE(!m_Thread.joinable()); | ||||||||||||||||||||
} | ||||||||||||||||||||
std::thread m_Thread; | ||||||||||||||||||||
std::atomic<bool> m_Kill = false; | ||||||||||||||||||||
}; | ||||||||||||||||||||
/** | ||||||||||||||||||||
* Thread with a work queue that it processes until it's destroyed. | ||||||||||||||||||||
Not Done Inline ActionsWhat's the point with only one value? Stan: What's the point with only one value? | ||||||||||||||||||||
Done Inline ActionsI had another value earlier... I guess I could make it a boolean again. wraitii: I had another value earlier... I guess I could make it a boolean again. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
class ThreadPool::WorkerThread : public Thread | ||||||||||||||||||||
{ | ||||||||||||||||||||
public: | ||||||||||||||||||||
WorkerThread(ThreadPool::TaskManager::Impl& taskManager, bool reversePriority = false); | ||||||||||||||||||||
~WorkerThread(); | ||||||||||||||||||||
void ClearQueue() | ||||||||||||||||||||
Not Done Inline ActionsShould not be public. vladislavbelov: Should not be public. | ||||||||||||||||||||
{ | ||||||||||||||||||||
std::lock_guard<std::mutex> lock(m_Mutex); | ||||||||||||||||||||
m_Queue.clear(); | ||||||||||||||||||||
} | ||||||||||||||||||||
/** | ||||||||||||||||||||
* Add a task to this worker's queue. | ||||||||||||||||||||
* Takes ownership of the task. | ||||||||||||||||||||
* May be called from any thread. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
void PushTask(QueueItem&& task) | ||||||||||||||||||||
Not Done Inline ActionsTechnically it's not the Xth thread pool, it's the xth thread in the thread pool :P Stan: Technically it's not the Xth thread pool, it's the xth thread in the thread pool :P | ||||||||||||||||||||
Done Inline ActionsThe fact it's a thread name is obvious from context, and on linux I only have 16 characters. wraitii: The fact it's a thread name is obvious from context, and on linux I only have 16 characters. | ||||||||||||||||||||
{ | ||||||||||||||||||||
{ | ||||||||||||||||||||
Not Done Inline ActionsDoes this have any value? I mean literally anything could be going on in that thread and we already have the PROFILE2 macro used for the calling code. Stan: Does this have any value? I mean literally anything could be going on in that thread and we… | ||||||||||||||||||||
Done Inline Actionsliterally crashes profiler2 to call without registering the thread. wraitii: literally crashes profiler2 to call without registering the thread. | ||||||||||||||||||||
std::lock_guard<std::mutex> lock(m_Mutex); | ||||||||||||||||||||
m_Queue.push_back(std::move(task)); | ||||||||||||||||||||
} | ||||||||||||||||||||
Wake(); | ||||||||||||||||||||
} | ||||||||||||||||||||
void Wake() | ||||||||||||||||||||
{ | ||||||||||||||||||||
m_ConditionVariable.notify_one(); | ||||||||||||||||||||
} | ||||||||||||||||||||
protected: | ||||||||||||||||||||
template<bool reversePriority> | ||||||||||||||||||||
void RunUntilDeath(); | ||||||||||||||||||||
std::deque<QueueItem> m_Queue; | ||||||||||||||||||||
std::mutex m_Mutex; | ||||||||||||||||||||
std::condition_variable m_ConditionVariable; | ||||||||||||||||||||
ThreadPool::TaskManager::Impl& m_TaskManager; | ||||||||||||||||||||
}; | ||||||||||||||||||||
/** | ||||||||||||||||||||
* PImpl-ed implementation of the threadpool's task manager. | ||||||||||||||||||||
* | ||||||||||||||||||||
* Avoiding starvation: | ||||||||||||||||||||
* To keep things simple while avoiding starvation, | ||||||||||||||||||||
* some workers will process the global queue before their own. | ||||||||||||||||||||
* These workers won't be returned when asking for a single-worker executor. | ||||||||||||||||||||
* The low-priority queue is not guaranteed to run. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
class ThreadPool::TaskManager::Impl | ||||||||||||||||||||
{ | ||||||||||||||||||||
friend class TaskManager; | ||||||||||||||||||||
friend class WorkerThread; | ||||||||||||||||||||
public: | ||||||||||||||||||||
Impl(TaskManager& backref, size_t numberOfWorkers); | ||||||||||||||||||||
~Impl() | ||||||||||||||||||||
{ | ||||||||||||||||||||
ClearQueue(); | ||||||||||||||||||||
m_Workers.clear(); | ||||||||||||||||||||
} | ||||||||||||||||||||
Not Done Inline ActionsInstead of duplication and a bit of performance loose you could have a condition variable in each thread. vladislavbelov: Instead of duplication and a bit of performance loose you could have a condition variable in… | ||||||||||||||||||||
/** | ||||||||||||||||||||
* Push a task on the global queue. | ||||||||||||||||||||
Not Done Inline ActionsMight be deque. vladislavbelov: Might be `deque`. | ||||||||||||||||||||
* Takes ownership of @a task. | ||||||||||||||||||||
* May be called from any thread. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
template<Priority Priority> | ||||||||||||||||||||
static void PushTask(TaskManager::Impl& taskManager, QueueItem&& task); | ||||||||||||||||||||
protected: | ||||||||||||||||||||
void ClearQueue(); | ||||||||||||||||||||
ThreadExecutor GetWorker(); | ||||||||||||||||||||
// Back reference (keep this first). | ||||||||||||||||||||
TaskManager& m_TaskManager; | ||||||||||||||||||||
std::mutex m_GlobalMutex; | ||||||||||||||||||||
std::mutex m_GlobalLowPriorityMutex; | ||||||||||||||||||||
std::deque<QueueItem> m_GlobalQueue; | ||||||||||||||||||||
std::deque<QueueItem> m_GlobalLowPriorityQueue; | ||||||||||||||||||||
std::atomic<bool> m_HasWork = false; | ||||||||||||||||||||
std::atomic<bool> m_HasLowPriorityWork = false; | ||||||||||||||||||||
// Ideally this would be a vector, since it does get iterated, but that requires movable types, | ||||||||||||||||||||
// and we want the executors to be long-lived, thus these need to not move. | ||||||||||||||||||||
std::list<WorkerThread> m_Workers; | ||||||||||||||||||||
// This does not contain all workers, see comment on avoiding starvation above. | ||||||||||||||||||||
std::vector<ThreadExecutor> m_DedicatedExecutors; | ||||||||||||||||||||
EachThreadExecutor m_EachThreadExecutor; | ||||||||||||||||||||
// Round-robin counter for GetWorker. | ||||||||||||||||||||
size_t m_RoundRobinIdx = 0; | ||||||||||||||||||||
size_t m_FirstReversedPriorityIdx; | ||||||||||||||||||||
}; | ||||||||||||||||||||
Not Done Inline ActionsDuplication, you need to select (from i) the worker affinity only once. vladislavbelov: Duplication, you need to select (from `i`) the worker affinity only once. | ||||||||||||||||||||
ThreadPool::TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1) | ||||||||||||||||||||
{ | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::TaskManager::TaskManager(size_t numberOfWorkers) | ||||||||||||||||||||
{ | ||||||||||||||||||||
Not Done Inline ActionsIt doesn't make sense to have a shorter name, the full numberOfWorkers is pretty short already and more clear. vladislavbelov: It doesn't make sense to have a shorter name, the full `numberOfWorkers` is pretty short… | ||||||||||||||||||||
Not Done Inline ActionsMaybe you could CStrIntern that first part ? Stan: Maybe you could CStrIntern that first part ? | ||||||||||||||||||||
numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_THREADS, MAX_THREADS); | ||||||||||||||||||||
m = std::make_unique<Impl>(*this, numberOfWorkers); | ||||||||||||||||||||
Not Done Inline Actionsm->m_Executors.resize(n) ? same for the list and the condition variables? Stan: m->m_Executors.resize(n) ? same for the list and the condition variables? | ||||||||||||||||||||
Not Done Inline ActionsWhy limited by a simple constant and not std::thread::hardware_concurrency() * constant? vladislavbelov: Why limited by a simple constant and not `std::thread::hardware_concurrency() * constant`? | ||||||||||||||||||||
Done Inline ActionsThe use of a condition variable per-worker makes it potentially inefficient with too many workers. That being said, if I switch to the task manager holding all tasks, it could be changed. wraitii: The use of a condition variable per-worker makes it potentially inefficient with too many… | ||||||||||||||||||||
Not Done Inline ActionsTaskManager holds all tasks in its queue. vladislavbelov: `TaskManager` holds all tasks in its queue. | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::TaskManager::~TaskManager() {} | ||||||||||||||||||||
ThreadPool::TaskManager::Impl::Impl(TaskManager& backref, size_t numberOfWorkers) | ||||||||||||||||||||
: m_TaskManager(backref) | ||||||||||||||||||||
{ | ||||||||||||||||||||
m_FirstReversedPriorityIdx = numberOfWorkers - GetNbOfReversedPriorityWorker(numberOfWorkers); | ||||||||||||||||||||
for (size_t i = 0; i < numberOfWorkers; ++i) | ||||||||||||||||||||
{ | ||||||||||||||||||||
bool reversePriority = i >= m_FirstReversedPriorityIdx; | ||||||||||||||||||||
WorkerThread& worker = m_Workers.emplace_back(*this, reversePriority); | ||||||||||||||||||||
if (!reversePriority) | ||||||||||||||||||||
m_DedicatedExecutors.emplace_back(ThreadExecutor(worker)); | ||||||||||||||||||||
m_EachThreadExecutor.m_Executors.emplace_back(ThreadExecutor(worker)); | ||||||||||||||||||||
// Register the thread on Profiler2 and name it, for convenience. | ||||||||||||||||||||
// (This is called here because RunUntilDeath is templated and the static value won't work as expected then). | ||||||||||||||||||||
m_EachThreadExecutor.m_Executors.back().Submit([]() { | ||||||||||||||||||||
// The profiler does better if the names are unique. | ||||||||||||||||||||
static std::atomic<int> n = 0; | ||||||||||||||||||||
Done Inline ActionsThis becomes increasingly suboptimal as users have more cores available. I'm not sure it's a huge problem event at 64 cores though. wraitii: This becomes increasingly suboptimal as users have more cores available. I'm not sure it's a… | ||||||||||||||||||||
std::string name = "ThreadPool #" + std::to_string(n++); | ||||||||||||||||||||
debug_SetThreadName(name.c_str()); | ||||||||||||||||||||
g_Profiler2.RegisterCurrentThread(name); | ||||||||||||||||||||
}).Wait(); | ||||||||||||||||||||
Done Inline ActionsThis can register too many hooks, but that doesn't really matter since it doesn't re-create the threads. wraitii: This can register too many hooks, but that doesn't really matter since it doesn't re-create the… | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
void ThreadPool::TaskManager::ClearQueue() { m->ClearQueue(); } | ||||||||||||||||||||
void ThreadPool::TaskManager::Impl::ClearQueue() | ||||||||||||||||||||
{ | ||||||||||||||||||||
for (WorkerThread& worker : m_Workers) | ||||||||||||||||||||
worker.ClearQueue(); | ||||||||||||||||||||
} | ||||||||||||||||||||
size_t ThreadPool::TaskManager::GetNbOfWorkers() const | ||||||||||||||||||||
{ | ||||||||||||||||||||
return m->m_Workers.size(); | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::GlobalExecutor<ThreadPool::Priority::NORMAL> ThreadPool::TaskManager::GetExecutor() | ||||||||||||||||||||
{ | ||||||||||||||||||||
return *this; | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::GlobalExecutor<ThreadPool::Priority::LOW> ThreadPool::TaskManager::GetLowPriorityExecutor() | ||||||||||||||||||||
{ | ||||||||||||||||||||
return *this; | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::EachThreadExecutor& ThreadPool::TaskManager::GetAllWorkers() | ||||||||||||||||||||
{ | ||||||||||||||||||||
return m->m_EachThreadExecutor; | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::ThreadExecutor ThreadPool::TaskManager::GetWorker() { return m->GetWorker(); } | ||||||||||||||||||||
ThreadPool::ThreadExecutor ThreadPool::TaskManager::Impl::GetWorker() | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (m_RoundRobinIdx >= m_FirstReversedPriorityIdx) | ||||||||||||||||||||
m_RoundRobinIdx = 0; | ||||||||||||||||||||
return m_DedicatedExecutors[m_RoundRobinIdx++]; | ||||||||||||||||||||
} | ||||||||||||||||||||
template<ThreadPool::Priority Priority> | ||||||||||||||||||||
void ThreadPool::TaskManager::Impl::PushTask(TaskManager::Impl& taskManager, QueueItem&& task) | ||||||||||||||||||||
{ | ||||||||||||||||||||
std::atomic<bool>& hasWork = Priority == Priority::NORMAL ? taskManager.m_HasWork : taskManager.m_HasLowPriorityWork; | ||||||||||||||||||||
if constexpr (Priority == Priority::NORMAL) | ||||||||||||||||||||
{ | ||||||||||||||||||||
std::lock_guard<std::mutex> lock(taskManager.m_GlobalMutex); | ||||||||||||||||||||
taskManager.m_GlobalQueue.emplace_back(std::move(task)); | ||||||||||||||||||||
hasWork = true; | ||||||||||||||||||||
} | ||||||||||||||||||||
else | ||||||||||||||||||||
{ | ||||||||||||||||||||
std::lock_guard<std::mutex> lock(taskManager.m_GlobalLowPriorityMutex); | ||||||||||||||||||||
taskManager.m_GlobalLowPriorityQueue.emplace_back(std::move(task)); | ||||||||||||||||||||
hasWork = true; | ||||||||||||||||||||
} | ||||||||||||||||||||
// Wake idle workers. This has increasingly worse behaviour as the # of workers increases, | ||||||||||||||||||||
// so having a sane upper limit makes sense. | ||||||||||||||||||||
for (WorkerThread& worker : taskManager.m_Workers) | ||||||||||||||||||||
{ | ||||||||||||||||||||
// Check if we still have work in case the earlier wakeups have accepted the item already. | ||||||||||||||||||||
if (!hasWork) | ||||||||||||||||||||
break; | ||||||||||||||||||||
worker.Wake(); | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
void ThreadPool::TaskManager::Initialise() | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (!g_TaskManager) | ||||||||||||||||||||
g_TaskManager = std::make_unique<TaskManager>(); | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::TaskManager& ThreadPool::TaskManager::Instance() | ||||||||||||||||||||
{ | ||||||||||||||||||||
ENSURE(g_TaskManager); | ||||||||||||||||||||
return *g_TaskManager; | ||||||||||||||||||||
} | ||||||||||||||||||||
// Executor definitions | ||||||||||||||||||||
template<ThreadPool::Priority Priority> | ||||||||||||||||||||
Not Done Inline Actionsdetach doesn't sound good here, it means you make the wait implicit. vladislavbelov: `detach` doesn't sound good here, it means you make the wait implicit. | ||||||||||||||||||||
void ThreadPool::GlobalExecutor<Priority>::ExecuteTask(QueueItem&& task) | ||||||||||||||||||||
{ | ||||||||||||||||||||
TaskManager::Impl::PushTask<Priority>(*m_TaskManager.m, std::move(task)); | ||||||||||||||||||||
} | ||||||||||||||||||||
void ThreadPool::ThreadExecutor::ExecuteTask(QueueItem&& task) | ||||||||||||||||||||
{ | ||||||||||||||||||||
m_Worker.PushTask(std::move(task)); | ||||||||||||||||||||
} | ||||||||||||||||||||
// Thread definition | ||||||||||||||||||||
ThreadPool::WorkerThread::WorkerThread(ThreadPool::TaskManager::Impl& taskManager, bool reversePriority) | ||||||||||||||||||||
: m_TaskManager(taskManager) | ||||||||||||||||||||
{ | ||||||||||||||||||||
// Explicitness is required or the compiler gets confused about types. | ||||||||||||||||||||
if (reversePriority) | ||||||||||||||||||||
Start<WorkerThread, &WorkerThread::RunUntilDeath<true>>(this); | ||||||||||||||||||||
else | ||||||||||||||||||||
Start<WorkerThread, &WorkerThread::RunUntilDeath<false>>(this); | ||||||||||||||||||||
} | ||||||||||||||||||||
ThreadPool::WorkerThread::~WorkerThread() | ||||||||||||||||||||
{ | ||||||||||||||||||||
Not Done Inline ActionsMaybe we could pass the name? Stan: Maybe we could pass the name? | ||||||||||||||||||||
ClearQueue(); | ||||||||||||||||||||
m_Kill = true; | ||||||||||||||||||||
m_ConditionVariable.notify_all(); | ||||||||||||||||||||
if (m_Thread.joinable()) | ||||||||||||||||||||
m_Thread.join(); | ||||||||||||||||||||
} | ||||||||||||||||||||
Not Done Inline ActionsThat logic should be hidden inside TaskManager. vladislavbelov: That logic should be hidden inside `TaskManager`. | ||||||||||||||||||||
template<bool reversePriority> | ||||||||||||||||||||
void ThreadPool::WorkerThread::RunUntilDeath() | ||||||||||||||||||||
{ | ||||||||||||||||||||
Not Done Inline Actionsmerge the m_Kill if? Stan: merge the m_Kill if? | ||||||||||||||||||||
Done Inline ActionsI'd rather not, since those are relatively unrelated blocks, and in a concurrent environment, it's plausible the first one would return false and the second true. wraitii: I'd rather not, since those are relatively unrelated blocks, and in a concurrent environment… | ||||||||||||||||||||
std::unique_lock<std::mutex> lock(m_Mutex); | ||||||||||||||||||||
while (!m_Kill) | ||||||||||||||||||||
{ | ||||||||||||||||||||
m_ConditionVariable.wait(lock, [this]() -> bool { | ||||||||||||||||||||
if (!m_Queue.empty() || m_Kill) | ||||||||||||||||||||
return true; | ||||||||||||||||||||
return m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork; | ||||||||||||||||||||
}); | ||||||||||||||||||||
Not Done Inline ActionsCode duplication. vladislavbelov: Code duplication. | ||||||||||||||||||||
// Fetch work from the global queues if necessary. | ||||||||||||||||||||
if ((reversePriority || m_Queue.empty()) && !m_Kill && m_TaskManager.m_HasWork) | ||||||||||||||||||||
{ | ||||||||||||||||||||
// Particularly critical section since we're locking the global queue. | ||||||||||||||||||||
std::unique_lock<std::mutex> globalLock(m_TaskManager.m_GlobalMutex); | ||||||||||||||||||||
// Check again for emptiness. | ||||||||||||||||||||
if (!m_TaskManager.m_GlobalQueue.empty()) | ||||||||||||||||||||
{ | ||||||||||||||||||||
m_Queue.emplace_front(std::move(m_TaskManager.m_GlobalQueue.front())); | ||||||||||||||||||||
m_TaskManager.m_GlobalQueue.pop_front(); | ||||||||||||||||||||
if (m_TaskManager.m_GlobalQueue.empty()) | ||||||||||||||||||||
m_TaskManager.m_HasWork = false; | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
else if (m_Queue.empty() && !m_Kill && m_TaskManager.m_HasLowPriorityWork) | ||||||||||||||||||||
{ | ||||||||||||||||||||
std::unique_lock<std::mutex> globalLock(m_TaskManager.m_GlobalLowPriorityMutex); | ||||||||||||||||||||
if (!m_TaskManager.m_GlobalLowPriorityQueue.empty()) | ||||||||||||||||||||
{ | ||||||||||||||||||||
m_Queue.emplace_front(std::move(m_TaskManager.m_GlobalLowPriorityQueue.front())); | ||||||||||||||||||||
m_TaskManager.m_GlobalLowPriorityQueue.pop_front(); | ||||||||||||||||||||
if (m_TaskManager.m_GlobalLowPriorityQueue.empty()) | ||||||||||||||||||||
m_TaskManager.m_HasLowPriorityWork = false; | ||||||||||||||||||||
} | ||||||||||||||||||||
Not Done Inline ActionsThe conditions are a bit hard to read since they are overlapping a bit. Maybe there is another way to test ? Stan: The conditions are a bit hard to read since they are overlapping a bit. Maybe there is another… | ||||||||||||||||||||
} | ||||||||||||||||||||
if (m_Queue.empty()) | ||||||||||||||||||||
continue; | ||||||||||||||||||||
QueueItem task = std::move(m_Queue.front()); | ||||||||||||||||||||
m_Queue.pop_front(); | ||||||||||||||||||||
lock.unlock(); | ||||||||||||||||||||
task(); | ||||||||||||||||||||
lock.lock(); | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
// Defined here - needs access to derived types. | ||||||||||||||||||||
template<typename T, void(T::* callable)()> | ||||||||||||||||||||
void ThreadPool::Thread::DoStart(T* object) | ||||||||||||||||||||
Not Done Inline ActionsShouldn't it be the responsibility of the thread to give the correct pointer ? Stan: Shouldn't it be the responsibility of the thread to give the correct pointer ? | ||||||||||||||||||||
Done Inline Actions? wraitii: ? | ||||||||||||||||||||
Not Done Inline ActionsUsing an accesor or something Stan: Using an accesor or something | ||||||||||||||||||||
Done Inline ActionsNo, this is a member of the executor - just a trick to make a temporary thread without showing it to the client code. wraitii: No, this is a member of the executor - just a trick to make a temporary thread without showing… | ||||||||||||||||||||
Done Inline ActionsIs there a way to do it in a non tricky way? Stan: Is there a way to do it in a non tricky way? | ||||||||||||||||||||
Done Inline ActionsIt's not that tricky, & no without forcing the client code to store a reference to the ThreadExecutor, which is ugly IMO. wraitii: It's not that tricky, & no without forcing the client code to store a reference to the… | ||||||||||||||||||||
{ | ||||||||||||||||||||
std::invoke(callable, object); | ||||||||||||||||||||
} |