Changeset View
Changeset View
Standalone View
Standalone View
source/ps/ThreadPool.cpp
Show All 22 Lines | |||||
#include "maths/MathUtil.h" | #include "maths/MathUtil.h" | ||||
#include "ps/CLogger.h" | #include "ps/CLogger.h" | ||||
#include "ps/ConfigDB.h" | #include "ps/ConfigDB.h" | ||||
#include "ps/Threading.h" | #include "ps/Threading.h" | ||||
#include "ps/ThreadUtil.h" | #include "ps/ThreadUtil.h" | ||||
#include "ps/Profiler2.h" | #include "ps/Profiler2.h" | ||||
#include <condition_variable> | #include <condition_variable> | ||||
#include <chrono> | |||||
#include <deque> | #include <deque> | ||||
#include <functional> | #include <functional> | ||||
#include <memory> | #include <memory> | ||||
#include <mutex> | #include <mutex> | ||||
#include <list> | #include <list> | ||||
#include <thread> | #include <thread> | ||||
namespace ThreadPool | namespace ThreadPool | ||||
Show All 16 Lines | |||||
constexpr size_t GetNbOfReversedPriorityWorker(size_t nbWorkers) | constexpr size_t GetNbOfReversedPriorityWorker(size_t nbWorkers) | ||||
{ | { | ||||
size_t ret = nbWorkers * 0.25; | size_t ret = nbWorkers * 0.25; | ||||
return ret > 1 ? ret : 1; | return ret > 1 ? ret : 1; | ||||
} | } | ||||
std::unique_ptr<TaskManager> g_TaskManager; | std::unique_ptr<TaskManager> g_TaskManager; | ||||
/** | |||||
* Run the timer thread every X. | |||||
* NB: this is currently allowed to drift over time, as that's presumed not too important. | |||||
* This value should be high enough that tasks can execute somewhat real-time, | |||||
* but low enough that the thread is largely idle & won't have negative scheduling effects. | |||||
*/ | |||||
static constexpr std::chrono::microseconds TIMER_INTERVAL = std::chrono::microseconds(30000); | |||||
/** | |||||
* The timer thread runs recurrent tasks on its own thread to avoid context switching. | |||||
* This means these tasks must be fast. | |||||
* When the wakup takes more than this much time, print a warning. | |||||
* This is conservative, it's meant to detect code errors. | |||||
*/ | |||||
static constexpr std::chrono::microseconds MAX_TIMER_WAKUP_TIME = std::chrono::microseconds(1000); | |||||
class Thread; | class Thread; | ||||
class TimerThread; | |||||
} | } | ||||
using QueueItem = std::function<void()>; | using QueueItem = std::function<void()>; | ||||
/** | /** | ||||
* Light wrapper around std::thread. Ensures Join has been called. | * Light wrapper around std::thread. Ensures Join has been called. | ||||
*/ | */ | ||||
class ThreadPool::Thread | class ThreadPool::Thread | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | protected: | ||||
std::deque<QueueItem> m_Queue; | std::deque<QueueItem> m_Queue; | ||||
std::mutex m_Mutex; | std::mutex m_Mutex; | ||||
std::condition_variable m_ConditionVariable; | std::condition_variable m_ConditionVariable; | ||||
ThreadPool::TaskManager::Impl& m_TaskManager; | ThreadPool::TaskManager::Impl& m_TaskManager; | ||||
}; | }; | ||||
class ThreadPool::TimerThread final : public Thread | |||||
{ | |||||
public: | |||||
TimerThread(const GlobalExecutor<Priority::NORMAL>& exec) : m_Executor(exec) | |||||
{ | |||||
Start<TimerThread, &TimerThread::RunUntilDeath>(this); | |||||
}; | |||||
~TimerThread() | |||||
{ | |||||
m_Kill = true; | |||||
Clear(); | |||||
if (m_Thread.joinable()) | |||||
m_Thread.join(); | |||||
} | |||||
/** | |||||
* Clear recurrent & one-off tasks. | |||||
*/ | |||||
void Clear() | |||||
{ | |||||
std::lock_guard<std::mutex> lock(m_Mutex); | |||||
m_RecurrentTasks.clear(); | |||||
} | |||||
/** | |||||
* Add a recurrent task. The task will be run on the next timer call. | |||||
* @param repeat - How often to run this task. 0 is every time, 1 is every 2, 2 every 3, and so on. | |||||
* @param func - Task to run. | |||||
*/ | |||||
void PushRecurrentTask(u32 repeat, std::function<RecurrentTaskStatus(GlobalExecutor<Priority::NORMAL>&)>&& func) | |||||
{ | |||||
m_RecurrentTasks.emplace_back(repeat, std::move(func)); | |||||
} | |||||
protected: | |||||
void RunUntilDeath(); | |||||
std::mutex m_Mutex; | |||||
std::condition_variable m_ConditionVariable; | |||||
GlobalExecutor<Priority::NORMAL> m_Executor; | |||||
struct RecurrentTask | |||||
{ | |||||
RecurrentTask(u32 repeat, std::function<RecurrentTaskStatus(GlobalExecutor<Priority::NORMAL>&)>&& func) : m_Repeat(repeat), m_Func(std::move(func)) {} | |||||
u32 m_Repeat = 0; | |||||
u32 m_Cooldown = 0; | |||||
std::function<RecurrentTaskStatus(GlobalExecutor<Priority::NORMAL>&)> m_Func; | |||||
}; | |||||
std::vector<RecurrentTask> m_RecurrentTasks; | |||||
}; | |||||
/** | /** | ||||
* PImpl-ed implementation of the threadpool's task manager. | * PImpl-ed implementation of the threadpool's task manager. | ||||
* | * | ||||
* Avoiding starvation: | * Avoiding starvation: | ||||
* To keep things simple while avoiding starvation, | * To keep things simple while avoiding starvation, | ||||
* some workers will process the global queue before their own. | * some workers will process the global queue before their own. | ||||
* These workers won't be returned when asking for a single-worker executor. | * These workers won't be returned when asking for a single-worker executor. | ||||
* The low-priority queue is not guaranteed to run. | * The low-priority queue is not guaranteed to run. | ||||
Show All 30 Lines | protected: | ||||
std::deque<QueueItem> m_GlobalQueue; | std::deque<QueueItem> m_GlobalQueue; | ||||
std::deque<QueueItem> m_GlobalLowPriorityQueue; | std::deque<QueueItem> m_GlobalLowPriorityQueue; | ||||
std::atomic<bool> m_HasWork; | std::atomic<bool> m_HasWork; | ||||
std::atomic<bool> m_HasLowPriorityWork; | std::atomic<bool> m_HasLowPriorityWork; | ||||
// Ideally this would be a vector, since it does get iterated, but that requires movable types, | // Ideally this would be a vector, since it does get iterated, but that requires movable types, | ||||
// and we want the executors to be long-lived, thus these need to not move. | // and we want the executors to be long-lived, thus these need to not move. | ||||
std::list<WorkerThread> m_Workers; | std::list<WorkerThread> m_Workers; | ||||
TimerThread m_TimerWorker; | |||||
// This does not contain all workers, see comment on avoiding starvation above. | // This does not contain all workers, see comment on avoiding starvation above. | ||||
std::vector<ThreadExecutor> m_DedicatedExecutors; | std::vector<ThreadExecutor> m_DedicatedExecutors; | ||||
EachThreadExecutor m_EachThreadExecutor; | EachThreadExecutor m_EachThreadExecutor; | ||||
// Round-robin counter for GetWorker. | // Round-robin counter for GetWorker. | ||||
size_t m_RoundRobinIdx = 0; | size_t m_RoundRobinIdx = 0; | ||||
size_t m_FirstReversedIdx; | size_t m_FirstReversedIdx; | ||||
}; | }; | ||||
ThreadPool::TaskManager::TaskManager(size_t nbWorkers) | ThreadPool::TaskManager::TaskManager(size_t nbWorkers) | ||||
{ | { | ||||
if (nbWorkers < MIN_THREADS) | if (nbWorkers < MIN_THREADS) | ||||
nbWorkers = Clamp<size_t>(std::thread::hardware_concurrency() - 1, MIN_THREADS, MAX_THREADS); | nbWorkers = Clamp<size_t>(std::thread::hardware_concurrency() - 1, MIN_THREADS, MAX_THREADS); | ||||
m = std::make_unique<Impl>(*this, nbWorkers); | m = std::make_unique<Impl>(*this, nbWorkers); | ||||
} | } | ||||
ThreadPool::TaskManager::~TaskManager() {} | ThreadPool::TaskManager::~TaskManager() {} | ||||
ThreadPool::TaskManager::Impl::Impl(TaskManager& backref, size_t nbWorkers) | ThreadPool::TaskManager::Impl::Impl(TaskManager& backref, size_t nbWorkers) | ||||
: m_TaskManager(backref) | : m_TaskManager(backref), m_TimerWorker(m_TaskManager.GetExecutor()) | ||||
{ | { | ||||
m_FirstReversedIdx = nbWorkers - GetNbOfReversedPriorityWorker(nbWorkers); | m_FirstReversedIdx = nbWorkers - GetNbOfReversedPriorityWorker(nbWorkers); | ||||
for (size_t i = 0; i < nbWorkers; ++i) | for (size_t i = 0; i < nbWorkers; ++i) | ||||
{ | { | ||||
bool reversePriority = i >= m_FirstReversedIdx; | bool reversePriority = i >= m_FirstReversedIdx; | ||||
WorkerThread& worker = m_Workers.emplace_back(*this, reversePriority); | WorkerThread& worker = m_Workers.emplace_back(*this, reversePriority); | ||||
if (!reversePriority) | if (!reversePriority) | ||||
m_DedicatedExecutors.emplace_back(ThreadExecutor(worker)); | m_DedicatedExecutors.emplace_back(ThreadExecutor(worker)); | ||||
m_EachThreadExecutor.m_Executors.emplace_back(ThreadExecutor(worker)); | m_EachThreadExecutor.m_Executors.emplace_back(ThreadExecutor(worker)); | ||||
// Register the thread on Profiler2 and name it, for convenience. | // Register the thread on Profiler2 and name it, for convenience. | ||||
// (This is called here because RunUntilDeath is templated and the static value won't work as expected then). | // (This is called here because RunUntilDeath is templated and the static value won't work as expected then). | ||||
m_EachThreadExecutor.m_Executors.back().Submit([]() { | m_EachThreadExecutor.m_Executors.back().Submit([]() { | ||||
// The profiler does better if the names are unique. | // The profiler does better if the names are unique. | ||||
static std::atomic<int> n = 0; | static std::atomic<int> n = 0; | ||||
std::string name = "ThreadPool #" + std::to_string(n++); | std::string name = "ThreadPool #" + std::to_string(n++); | ||||
debug_SetThreadName(name.c_str()); | debug_SetThreadName(name.c_str()); | ||||
g_Profiler2.RegisterCurrentThread(name); | g_Profiler2.RegisterCurrentThread(name); | ||||
}).Wait(); | }).Wait(); | ||||
} | } | ||||
} | } | ||||
void ThreadPool::TaskManager::Clear() { m->Clear(); } | void ThreadPool::TaskManager::Clear() { m->Clear(); } | ||||
void ThreadPool::TaskManager::Impl::Clear() | void ThreadPool::TaskManager::Impl::Clear() | ||||
{ | { | ||||
m_TimerWorker.Clear(); | |||||
for (WorkerThread& worker : m_Workers) | for (WorkerThread& worker : m_Workers) | ||||
worker.Clear(); | worker.Clear(); | ||||
} | } | ||||
size_t ThreadPool::TaskManager::GetNbOfWorkers() const | size_t ThreadPool::TaskManager::GetNbOfWorkers() const | ||||
{ | { | ||||
return m->m_Workers.size(); | return m->m_Workers.size(); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | |||||
} | } | ||||
ThreadPool::TaskManager& ThreadPool::TaskManager::Instance() | ThreadPool::TaskManager& ThreadPool::TaskManager::Instance() | ||||
{ | { | ||||
ENSURE(g_TaskManager); | ENSURE(g_TaskManager); | ||||
return *g_TaskManager; | return *g_TaskManager; | ||||
} | } | ||||
void ThreadPool::TaskManager::AddRecurrentTask(u32 repeatms, std::function<RecurrentTaskStatus(GlobalExecutor<Priority::NORMAL>&)>&& func) | |||||
{ | |||||
u32 repeat = round(std::chrono::duration<double, std::milli>(repeatms) / TIMER_INTERVAL); | |||||
// Need to remove one additional tick as 0 means "every timer" | |||||
if (repeat > 0) | |||||
--repeat; | |||||
m->m_TimerWorker.PushRecurrentTask(repeat, std::move(func)); | |||||
} | |||||
// Executor definitions | // Executor definitions | ||||
template<ThreadPool::Priority Priority> | template<ThreadPool::Priority Priority> | ||||
void ThreadPool::GlobalExecutor<Priority>::ExecuteTask(QueueItem&& task) | void ThreadPool::GlobalExecutor<Priority>::ExecuteTask(QueueItem&& task) | ||||
{ | { | ||||
TaskManager::Impl::PushTask<Priority>(*m_TaskManager.m, std::move(task)); | TaskManager::Impl::PushTask<Priority>(*m_TaskManager.m, std::move(task)); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | if (m_Queue.empty()) | ||||
continue; | continue; | ||||
lock.unlock(); | lock.unlock(); | ||||
m_Queue.front()(); | m_Queue.front()(); | ||||
lock.lock(); | lock.lock(); | ||||
m_Queue.pop_front(); | m_Queue.pop_front(); | ||||
} | } | ||||
} | } | ||||
void ThreadPool::TimerThread::RunUntilDeath() | |||||
{ | |||||
std::string name("ThreadPool Timer"); | |||||
g_Profiler2.RegisterCurrentThread(name); | |||||
debug_SetThreadName(name.c_str()); | |||||
std::chrono::microseconds sleepTime = TIMER_INTERVAL; | |||||
std::chrono::steady_clock clock; | |||||
std::chrono::time_point t0 = clock.now(); | |||||
Stan: Use timer_time() instead of std::chrono as it was made for it, and I think there are some… | |||||
wraitiiAuthorUnsubmitted Done Inline ActionsCan you source that? I have to include <chrono> for condition_variable anyways, so I'd rather use it in general. wraitii: Can you source that?
I have to include <chrono> for condition_variable anyways, so I'd rather… | |||||
StanUnsubmitted Not Done Inline Actionscondition_variable is in <condition_variable> no? Stan: condition_variable is in <condition_variable> no?
https://gcc-help.gcc.gnu.narkive. | |||||
wraitiiAuthorUnsubmitted Done Inline ActionsI'm using wait_for which takes chrono types. These 3 links seem outdated / not relevant for our compilers. wraitii: I'm using `wait_for` which takes `chrono` types.
These 3 links seem outdated / not relevant… | |||||
u32 warningCounter = 0; | |||||
std::vector<size_t> indicesToDrop; | |||||
std::unique_lock<std::mutex> lock(m_Mutex); | |||||
while (!m_Kill) | |||||
{ | |||||
// Tolerate spurious wake ups. | |||||
m_ConditionVariable.wait_for(lock, sleepTime); | |||||
if (m_Kill) | |||||
break; | |||||
// We give no guarantees on actual timing (because that's much easier), | |||||
// so just fire tasks. | |||||
t0 = clock.now(); | |||||
indicesToDrop.clear(); | |||||
for (size_t i = 0; i < m_RecurrentTasks.size(); ++i) | |||||
{ | |||||
RecurrentTask& task = m_RecurrentTasks[i]; | |||||
if (task.m_Cooldown == 0) | |||||
{ | |||||
RecurrentTaskStatus status = task.m_Func(m_Executor); | |||||
if (status == RecurrentTaskStatus::OK) | |||||
task.m_Cooldown = task.m_Repeat; | |||||
else if (status == RecurrentTaskStatus::STOP) | |||||
indicesToDrop.push_back(i); | |||||
// Else keep as-is for retry. | |||||
} | |||||
else | |||||
--task.m_Cooldown; | |||||
} | |||||
for (size_t idx : indicesToDrop) | |||||
{ | |||||
m_RecurrentTasks[idx] = std::move(m_RecurrentTasks.back()); | |||||
m_RecurrentTasks.pop_back(); | |||||
} | |||||
indicesToDrop.clear(); | |||||
auto wakeTime = clock.now() - t0; | |||||
// Since it's measured, correct for run time, but this won't prevent | |||||
// actual drift over time. | |||||
sleepTime = std::chrono::duration_cast<std::chrono::microseconds>(TIMER_INTERVAL - wakeTime); | |||||
// Warning logic - we want to warn if the timer is too often too slow | |||||
// since that probably indicates a mistake somewhere (e.g. the wrong function is run). | |||||
// However we don't want spurious warnings, so add 2 for each overrun, and remove 1 otherwise. | |||||
// This will warn if enough runs are slow in a short time period. | |||||
if (wakeTime > MAX_TIMER_WAKUP_TIME) | |||||
{ | |||||
warningCounter += 2; | |||||
if (warningCounter > 50) | |||||
{ | |||||
warningCounter = 0; | |||||
LOGWARNING("ThreadPool Timer: starting tasks took %ims, which is more than the limit of %ims", | |||||
std::chrono::duration_cast<std::chrono::milliseconds>(sleepTime).count(), | |||||
std::chrono::duration_cast<std::chrono::milliseconds>(MAX_TIMER_WAKUP_TIME).count()); | |||||
} | |||||
} | |||||
else if (warningCounter > 0) | |||||
--warningCounter; | |||||
} | |||||
} | |||||
// Defined here - needs access to derived types. | // Defined here - needs access to derived types. | ||||
template<typename T, void(T::* callable)()> | template<typename T, void(T::* callable)()> | ||||
void ThreadPool::Thread::DoStart(T* object) | void ThreadPool::Thread::DoStart(T* object) | ||||
{ | { | ||||
std::invoke(callable, object); | std::invoke(callable, object); | ||||
} | } |
Wildfire Games · Phabricator
Use timer_time() instead of std::chrono as it was made for it, and I think there are some implem issues with regards to performance with it on windows.