Changeset View
Standalone View
source/ps/TaskManager.cpp
Show First 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | |||||
protected: | protected: | ||||
~Thread() | ~Thread() | ||||
{ | { | ||||
ENSURE(!m_Thread.joinable()); | ENSURE(!m_Thread.joinable()); | ||||
} | } | ||||
std::thread m_Thread; | std::thread m_Thread; | ||||
std::atomic<bool> m_Kill = false; | |||||
}; | }; | ||||
/** | /** | ||||
* Worker thread: process the taskManager queues until killed. | * Worker thread: process the taskManager queues until killed. | ||||
*/ | */ | ||||
class WorkerThread : public Thread | class WorkerThread : public Thread | ||||
{ | { | ||||
public: | public: | ||||
WorkerThread(TaskManager::Impl& taskManager); | WorkerThread(TaskManager::Impl& taskManager); | ||||
~WorkerThread(); | ~WorkerThread(); | ||||
/** | |||||
* Wake the worker. | |||||
*/ | |||||
void Wake(); | |||||
protected: | protected: | ||||
void RunUntilDeath(); | void RunUntilDeath(); | ||||
std::mutex m_Mutex; | |||||
std::condition_variable m_ConditionVariable; | |||||
TaskManager::Impl& m_TaskManager; | TaskManager::Impl& m_TaskManager; | ||||
}; | }; | ||||
/** | /** | ||||
* PImpl-ed implementation of the Task manager. | * PImpl-ed implementation of the Task manager. | ||||
* | * | ||||
* The normal priority queue is processed first, the low priority only if there are no higher-priority tasks | * The normal priority queue is processed first, the low priority only if there are no higher-priority tasks | ||||
*/ | */ | ||||
class TaskManager::Impl | class TaskManager::Impl | ||||
{ | { | ||||
friend class TaskManager; | friend class TaskManager; | ||||
friend class WorkerThread; | friend class WorkerThread; | ||||
public: | public: | ||||
Impl(TaskManager& backref); | Impl() = default; | ||||
~Impl() | ~Impl(); | ||||
{ | |||||
ClearQueue(); | |||||
m_Workers.clear(); | |||||
} | |||||
/** | /** | ||||
* 2-phase init to avoid having to think too hard about the order of class members. | * 2-phase init to avoid having to think too hard about the order of class members. | ||||
*/ | */ | ||||
void SetupWorkers(size_t numberOfWorkers); | void SetupWorkers(size_t numberOfWorkers); | ||||
/** | /** | ||||
* Push a task on the global queue. | * Push a task on the global queue. | ||||
* Takes ownership of @a task. | * Takes ownership of @a task. | ||||
* May be called from any thread. | * May be called from any thread. | ||||
*/ | */ | ||||
void PushTask(std::function<void()>&& task, TaskPriority priority); | void PushTask(std::function<void()>&& task, TaskPriority priority); | ||||
protected: | protected: | ||||
void ClearQueue(); | void ClearQueue(); | ||||
template<TaskPriority Priority> | |||||
bool PopTask(std::function<void()>& taskOut); | bool PopTask(std::function<void()>& taskOut); | ||||
// Back reference (keep this first). | std::condition_variable m_HasWork; | ||||
TaskManager& m_TaskManager; | |||||
std::atomic<bool> m_HasWork = false; | |||||
std::atomic<bool> m_HasLowPriorityWork = false; | |||||
std::mutex m_GlobalMutex; | std::mutex m_GlobalMutex; | ||||
std::mutex m_GlobalLowPriorityMutex; | |||||
std::deque<QueueItem> m_GlobalQueue; | std::deque<QueueItem> m_GlobalQueue; | ||||
std::mutex m_GlobalLowPriorityMutex; | |||||
std::deque<QueueItem> m_GlobalLowPriorityQueue; | std::deque<QueueItem> m_GlobalLowPriorityQueue; | ||||
std::atomic<bool> m_Kill = false; | |||||
// Ideally this would be a vector, since it does get iterated, but that requires movable types. | // Ideally this would be a vector, since it does get iterated, but that requires movable types. | ||||
std::deque<WorkerThread> m_Workers; | std::deque<WorkerThread> m_Workers; | ||||
// Round-robin counter for GetWorker. | |||||
mutable size_t m_RoundRobinIdx = 0; | |||||
}; | }; | ||||
jprahman: This appears unused in TaskManager.cpp, can it be removed? | |||||
Not Done Inline ActionsIf it's unused, yes, it's just a left-over from earlier diffs. wraitii: If it's unused, yes, it's just a left-over from earlier diffs. | |||||
Not Done Inline Actionsthe backreference m_TaskManager is also unused. phosit: the backreference `m_TaskManager` is also unused. | |||||
TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1) | TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1) | ||||
{ | { | ||||
} | } | ||||
TaskManager::TaskManager(size_t numberOfWorkers) | TaskManager::TaskManager(size_t numberOfWorkers) | ||||
{ | { | ||||
m = std::make_unique<Impl>(*this); | m = std::make_unique<Impl>(); | ||||
numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_THREADS, MAX_THREADS); | numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_THREADS, MAX_THREADS); | ||||
m->SetupWorkers(numberOfWorkers); | m->SetupWorkers(numberOfWorkers); | ||||
} | } | ||||
TaskManager::~TaskManager() = default; | TaskManager::~TaskManager() = default; | ||||
TaskManager::Impl::Impl(TaskManager& backref) | TaskManager::Impl::~Impl() | ||||
: m_TaskManager(backref) | |||||
{ | { | ||||
ClearQueue(); | |||||
m_Kill = true; | |||||
m_HasWork.notify_all(); | |||||
// Clear it before the other members get destructed. | |||||
m_Workers.clear(); | |||||
} | } | ||||
void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers) | void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers) | ||||
{ | { | ||||
for (size_t i = 0; i < numberOfWorkers; ++i) | for (size_t i = 0; i < numberOfWorkers; ++i) | ||||
m_Workers.emplace_back(*this); | m_Workers.emplace_back(*this); | ||||
} | } | ||||
Show All 19 Lines | |||||
{ | { | ||||
m->PushTask(std::move(task), priority); | m->PushTask(std::move(task), priority); | ||||
} | } | ||||
void TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority) | void TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority) | ||||
{ | { | ||||
std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; | std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; | ||||
std::deque<QueueItem>& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; | std::deque<QueueItem>& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; | ||||
std::atomic<bool>& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork; | |||||
{ | { | ||||
std::lock_guard<std::mutex> lock(mutex); | std::lock_guard<std::mutex> lock(mutex); | ||||
queue.emplace_back(std::move(task)); | queue.emplace_back(std::move(task)); | ||||
hasWork = true; | |||||
} | } | ||||
for (WorkerThread& worker : m_Workers) | m_HasWork.notify_one(); | ||||
worker.Wake(); | |||||
} | } | ||||
template<TaskPriority Priority> | |||||
bool TaskManager::Impl::PopTask(std::function<void()>& taskOut) | bool TaskManager::Impl::PopTask(std::function<void()>& taskOut) | ||||
{ | { | ||||
std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; | { | ||||
std::deque<QueueItem>& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; | std::unique_lock<std::mutex> normalPriLock(m_GlobalMutex); | ||||
std::atomic<bool>& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork; | m_HasWork.wait(normalPriLock, [this]() | ||||
Not Done Inline ActionsFor a large number of small, lightweight, tasks, I wonder about the scalability here. Each lock will force a cache line to bounce across to a different core, hurting scalability across more than a few worker threads. I wonder if a lockless MPMC queue would be viable here. jprahman: For a large number of small, lightweight, tasks, I wonder about the scalability here. Each lock… | |||||
Not Done Inline ActionsIn general, I think we should expect larger tasks on the task manager. Sending small tasks is inefficient in several ways, including the bookkeeping overhead. That being said, this does change the semantics of the performance from PushTask being a little slow (need to wake all threads) to PopTask being a little slow potentially. wraitii: In general, I think we should expect larger tasks on the task manager. Sending small tasks is… | |||||
{ | |||||
// Particularly critical section since we're locking the global queue. | if(!m_GlobalQueue.empty()) | ||||
std::lock_guard<std::mutex> globalLock(mutex); | return true; | ||||
if (!queue.empty()) | std::lock_guard lock(m_GlobalLowPriorityMutex); | ||||
{ | return !m_GlobalLowPriorityQueue.empty() || m_Kill; | ||||
taskOut = std::move(queue.front()); | }); | ||||
queue.pop_front(); | if (!m_GlobalQueue.empty()) | ||||
hasWork = !queue.empty(); | { | ||||
taskOut = std::move(m_GlobalQueue.front()); | |||||
m_GlobalQueue.pop_front(); | |||||
return true; | |||||
} | |||||
} | |||||
{ | |||||
std::lock_guard<std::mutex> lowPriLock(m_GlobalLowPriorityMutex); | |||||
if (!m_GlobalLowPriorityQueue.empty()) | |||||
{ | |||||
taskOut = std::move(m_GlobalLowPriorityQueue.front()); | |||||
m_GlobalLowPriorityQueue.pop_front(); | |||||
return true; | return true; | ||||
} | } | ||||
Not Done Inline ActionsThat comment feels kind of redundant. I think it was relevant when the function was in the worker Run loop, but should be removed now. wraitii: That comment feels kind of redundant. I think it was relevant when the function was in the… | |||||
} | |||||
return false; | return false; | ||||
} | } | ||||
void TaskManager::Initialise() | void TaskManager::Initialise() | ||||
{ | { | ||||
if (!g_TaskManager) | if (!g_TaskManager) | ||||
g_TaskManager = std::make_unique<TaskManager>(); | g_TaskManager = std::make_unique<TaskManager>(); | ||||
} | } | ||||
Show All 9 Lines | |||||
WorkerThread::WorkerThread(TaskManager::Impl& taskManager) | WorkerThread::WorkerThread(TaskManager::Impl& taskManager) | ||||
: m_TaskManager(taskManager) | : m_TaskManager(taskManager) | ||||
{ | { | ||||
Start<WorkerThread, &WorkerThread::RunUntilDeath>(this); | Start<WorkerThread, &WorkerThread::RunUntilDeath>(this); | ||||
} | } | ||||
WorkerThread::~WorkerThread() | WorkerThread::~WorkerThread() | ||||
{ | { | ||||
m_Kill = true; | |||||
m_ConditionVariable.notify_one(); | |||||
if (m_Thread.joinable()) | if (m_Thread.joinable()) | ||||
m_Thread.join(); | m_Thread.join(); | ||||
} | } | ||||
void WorkerThread::Wake() | |||||
{ | |||||
m_ConditionVariable.notify_one(); | |||||
} | |||||
void WorkerThread::RunUntilDeath() | void WorkerThread::RunUntilDeath() | ||||
{ | { | ||||
// The profiler does better if the names are unique. | // The profiler does better if the names are unique. | ||||
static std::atomic<int> n = 0; | static std::atomic<int> n = 0; | ||||
std::string name = "Task Mgr #" + std::to_string(n++); | std::string name = "Task Mgr #" + std::to_string(n++); | ||||
debug_SetThreadName(name.c_str()); | debug_SetThreadName(name.c_str()); | ||||
g_Profiler2.RegisterCurrentThread(name); | g_Profiler2.RegisterCurrentThread(name); | ||||
std::function<void()> task; | std::function<void()> task; | ||||
bool hasTask = false; | while (!m_TaskManager.m_Kill) | ||||
std::unique_lock<std::mutex> lock(m_Mutex, std::defer_lock); | { | ||||
while (!m_Kill) | |||||
{ | |||||
lock.lock(); | |||||
m_ConditionVariable.wait(lock, [this](){ | |||||
return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork; | |||||
}); | |||||
lock.unlock(); | |||||
if (m_Kill) | |||||
break; | |||||
// Fetch work from the global queues. | // Fetch work from the global queues. | ||||
hasTask = m_TaskManager.PopTask<TaskPriority::NORMAL>(task); | if (m_TaskManager.PopTask(task)) | ||||
if (!hasTask) | |||||
hasTask = m_TaskManager.PopTask<TaskPriority::LOW>(task); | |||||
if (hasTask) | |||||
task(); | task(); | ||||
} | } | ||||
} | } | ||||
// Defined here - needs access to derived types. | // Defined here - needs access to derived types. | ||||
template<typename T, void(T::* callable)()> | template<typename T, void(T::* callable)()> | ||||
void Thread::DoStart(T* object) | void Thread::DoStart(T* object) | ||||
{ | { | ||||
std::invoke(callable, object); | std::invoke(callable, object); | ||||
} | } | ||||
} // namespace Threading | } // namespace Threading |
This appears unused in TaskManager.cpp, can it be removed?