Index: source/ps/TaskManager.cpp =================================================================== --- source/ps/TaskManager.cpp +++ source/ps/TaskManager.cpp @@ -77,7 +77,6 @@ } std::thread m_Thread; - std::atomic m_Kill = false; }; /** @@ -89,17 +88,9 @@ WorkerThread(TaskManager::Impl& taskManager); ~WorkerThread(); - /** - * Wake the worker. - */ - void Wake(); - protected: void RunUntilDeath(); - std::mutex m_Mutex; - std::condition_variable m_ConditionVariable; - TaskManager::Impl& m_TaskManager; }; @@ -113,12 +104,8 @@ friend class TaskManager; friend class WorkerThread; public: - Impl(TaskManager& backref); - ~Impl() - { - ClearQueue(); - m_Workers.clear(); - } + Impl() = default; + ~Impl(); /** * 2-phase init to avoid having to think too hard about the order of class members. @@ -135,24 +122,20 @@ protected: void ClearQueue(); - template bool PopTask(std::function& taskOut); - // Back reference (keep this first). - TaskManager& m_TaskManager; + std::condition_variable m_HasWork; - std::atomic m_HasWork = false; - std::atomic m_HasLowPriorityWork = false; std::mutex m_GlobalMutex; - std::mutex m_GlobalLowPriorityMutex; std::deque m_GlobalQueue; + + std::mutex m_GlobalLowPriorityMutex; std::deque m_GlobalLowPriorityQueue; + std::atomic m_Kill = false; + // Ideally this would be a vector, since it does get iterated, but that requires movable types. std::deque m_Workers; - - // Round-robin counter for GetWorker. - mutable size_t m_RoundRobinIdx = 0; }; TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1) @@ -161,16 +144,20 @@ TaskManager::TaskManager(size_t numberOfWorkers) { - m = std::make_unique(*this); + m = std::make_unique(); numberOfWorkers = Clamp(numberOfWorkers, MIN_THREADS, MAX_THREADS); m->SetupWorkers(numberOfWorkers); } TaskManager::~TaskManager() = default; -TaskManager::Impl::Impl(TaskManager& backref) - : m_TaskManager(backref) +TaskManager::Impl::~Impl() { + ClearQueue(); + m_Kill = true; + m_HasWork.notify_all(); + // Clear it before the other members get destructed. + m_Workers.clear(); } void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers) @@ -206,32 +193,40 @@ { std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; std::deque& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; - std::atomic& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork; { std::lock_guard lock(mutex); queue.emplace_back(std::move(task)); - hasWork = true; } - for (WorkerThread& worker : m_Workers) - worker.Wake(); + m_HasWork.notify_one(); } -template bool TaskManager::Impl::PopTask(std::function& taskOut) { - std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex; - std::deque& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue; - std::atomic& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork; - - // Particularly critical section since we're locking the global queue. - std::lock_guard globalLock(mutex); - if (!queue.empty()) { - taskOut = std::move(queue.front()); - queue.pop_front(); - hasWork = !queue.empty(); - return true; + std::unique_lock normalPriLock(m_GlobalMutex); + m_HasWork.wait(normalPriLock, [this]() + { + if(!m_GlobalQueue.empty()) + return true; + std::lock_guard lock(m_GlobalLowPriorityMutex); + return !m_GlobalLowPriorityQueue.empty() || m_Kill; + }); + if (!m_GlobalQueue.empty()) + { + taskOut = std::move(m_GlobalQueue.front()); + m_GlobalQueue.pop_front(); + return true; + } + } + { + std::lock_guard lowPriLock(m_GlobalLowPriorityMutex); + if (!m_GlobalLowPriorityQueue.empty()) + { + taskOut = std::move(m_GlobalLowPriorityQueue.front()); + m_GlobalLowPriorityQueue.pop_front(); + return true; + } } return false; } @@ -258,17 +253,10 @@ WorkerThread::~WorkerThread() { - m_Kill = true; - m_ConditionVariable.notify_one(); if (m_Thread.joinable()) m_Thread.join(); } -void WorkerThread::Wake() -{ - m_ConditionVariable.notify_one(); -} - void WorkerThread::RunUntilDeath() { // The profiler does better if the names are unique. @@ -279,24 +267,10 @@ std::function task; - bool hasTask = false; - std::unique_lock lock(m_Mutex, std::defer_lock); - while (!m_Kill) + while (!m_TaskManager.m_Kill) { - lock.lock(); - m_ConditionVariable.wait(lock, [this](){ - return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork; - }); - lock.unlock(); - - if (m_Kill) - break; - // Fetch work from the global queues. - hasTask = m_TaskManager.PopTask(task); - if (!hasTask) - hasTask = m_TaskManager.PopTask(task); - if (hasTask) + if (m_TaskManager.PopTask(task)) task(); } }