Index: source/ps/Future.h =================================================================== --- source/ps/Future.h +++ source/ps/Future.h @@ -21,11 +21,14 @@ #include "ps/FutureForward.h" #include +#include #include #include +#include #include #include #include +#include template class PackagedTask; @@ -136,6 +139,90 @@ } // namespace FutureSharedStateDetail +namespace Repeating +{ +struct Delay +{ + std::chrono::steady_clock::duration time; +}; + +template +struct SharedState +{ + SharedState(Scheduler& taskManager, const Delay& duration, Callback callbackFunc) : + scheduler{taskManager}, + delay{duration}, + callback{std::move(callbackFunc)} + {} + + Scheduler& scheduler; + const Delay delay; + Callback callback; + FutureSharedStateDetail::Receiver receiver; +}; + +enum class Result +{ + Repeat, + Stop +}; + +template +class PackagedTask +{ +public: + PackagedTask() = delete; + PackagedTask(std::shared_ptr> ss) : + m_SharedState(std::move(ss)) + {} + + void operator()() + { + { + FutureSharedStateDetail::Status expected = FutureSharedStateDetail::Status::PENDING; + if (!m_SharedState->receiver.m_Status.compare_exchange_strong(expected, + FutureSharedStateDetail::Status::STARTED)) + { + return; + } + } + + const auto result = m_SharedState->callback(); + + if (result == Result::Repeat) + { + // Reschedule if the task wasn't canceled in the meantime. + FutureSharedStateDetail::Status expected = FutureSharedStateDetail::Status::STARTED; + if (m_SharedState->receiver.m_Status.compare_exchange_strong(expected, + FutureSharedStateDetail::Status::PENDING)) + { + m_SharedState->scheduler.PushDelayedTask(m_SharedState->delay.time, *this); + } + return; + } + + // The task finished. + + // Because we might have threads waiting on us, we need to make sure that they either: + // - don't wait on our condition variable + // - receive the notification when we're done. + // This requires locking the mutex (@see Wait). + { + std::lock_guard lock(m_SharedState->receiver.m_Mutex); + m_SharedState->receiver.m_Status = FutureSharedStateDetail::Status::DONE; + } + + m_SharedState->receiver.m_ConditionVariable.notify_all(); + + // We no longer need the shared state, drop it immediately. + m_SharedState.reset(); + } + +private: + std::shared_ptr> m_SharedState; +}; +} + /** * Corresponds to std::future. * Unlike std::future, Future can request the cancellation of the task that would produce the result. @@ -165,6 +252,19 @@ Future& operator=(Future&&) = default; ~Future() = default; + template + Future(Scheduler& scheduler, const Repeating::Delay& delay, Callback callback) + { + static_assert(std::is_same_v, Repeating::Result>); + + auto temp = std::make_shared>(scheduler, delay, + std::move(callback)); + m_SharedState = std::shared_ptr>{temp, &temp->receiver}; + + using namespace std::literals; + scheduler.PushDelayedTask(0ms, Repeating::PackagedTask{std::move(temp)}); + } + /** * Make the future wait for the result of @a func. */ @@ -233,6 +333,9 @@ std::shared_ptr> m_SharedState; }; +template +Future(Scheduler&, const Repeating::Delay&, Func) -> Future; + /** * Corresponds somewhat to std::packaged_task. * Like packaged_task, this holds a function acting as a promise. Index: source/ps/TaskManager.h =================================================================== --- source/ps/TaskManager.h +++ source/ps/TaskManager.h @@ -20,6 +20,8 @@ #include "ps/Future.h" +#include +#include #include #include @@ -71,6 +73,8 @@ return ret; } + void PushDelayedTask(std::chrono::steady_clock::duration delay, std::function task); + private: TaskManager(size_t numberOfWorkers); Index: source/ps/TaskManager.cpp =================================================================== --- source/ps/TaskManager.cpp +++ source/ps/TaskManager.cpp @@ -27,12 +27,17 @@ #include "ps/ThreadUtil.h" #include "ps/Profiler2.h" +#include #include #include #include #include #include +#include #include +#include + +using namespace std::literals; namespace Threading { @@ -49,6 +54,12 @@ */ constexpr size_t MAX_WORKERS = 32; +/** + * The worker threads are every `MAX_SLEEPTIME` woken up even if there is no + * task scheduled. + */ +constexpr auto MAX_SLEEPTIME = 30ms; + size_t GetDefaultNumberOfWorkers() { const size_t hardware_concurrency = std::thread::hardware_concurrency(); @@ -142,12 +153,30 @@ * May be called from any thread. */ void PushTask(std::function&& task, TaskPriority priority); + void PushDelayedTask(std::chrono::system_clock::duration delay, QueueItem task) + { + const auto now = std::chrono::steady_clock::now(); + std::lock_guard lock{m_DelayedTasksMutex}; + m_DelayedTasks.push({now + delay, std::move(task)}); + } protected: void ClearQueue(); template bool PopTask(std::function& taskOut); + bool PopDelayedTask(QueueItem& outTask) + { + const auto now = std::chrono::steady_clock::now(); + std::lock_guard lock{m_DelayedTasksMutex}; + if (!m_DelayedTasks.empty() && m_DelayedTasks.top().timeToExecute <= now) + { + outTask = m_DelayedTasks.top().whatToExecute; + m_DelayedTasks.pop(); + return true; + } + return false; + } std::atomic m_HasWork = false; std::atomic m_HasLowPriorityWork = false; @@ -156,6 +185,20 @@ std::deque m_GlobalQueue; std::deque m_GlobalLowPriorityQueue; + struct Instruction + { + std::chrono::steady_clock::time_point timeToExecute; + QueueItem whatToExecute; + + bool operator>(const Instruction& other) const + { + return timeToExecute > other.timeToExecute; + } + }; + + std::priority_queue, std::greater<>> m_DelayedTasks; + std::mutex m_DelayedTasksMutex; + // Ideally this would be a vector, since it does get iterated, but that requires movable types. std::deque m_Workers; }; @@ -217,6 +260,11 @@ worker.Wake(); } +void TaskManager::PushDelayedTask(std::chrono::system_clock::duration delay, QueueItem task) +{ + m->PushDelayedTask(delay, task); +} + template bool TaskManager::Impl::PopTask(std::function& taskOut) { @@ -284,7 +332,7 @@ while (!m_Kill) { lock.lock(); - m_ConditionVariable.wait(lock, [this](){ + m_ConditionVariable.wait_for(lock, MAX_SLEEPTIME, [this](){ return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork; }); lock.unlock(); @@ -293,7 +341,9 @@ break; // Fetch work from the global queues. - hasTask = m_TaskManager.PopTask(task); + hasTask = m_TaskManager.PopDelayedTask(task); + if (!hasTask) + hasTask = m_TaskManager.PopTask(task); if (!hasTask) hasTask = m_TaskManager.PopTask(task); if (hasTask) Index: source/ps/tests/test_Future.h =================================================================== --- source/ps/tests/test_Future.h +++ source/ps/tests/test_Future.h @@ -19,8 +19,11 @@ #include "ps/Future.h" +#include #include +#include #include +#include class TestFuture : public CxxTest::TestSuite { @@ -141,4 +144,88 @@ future.Wrap([t = MoveOnlyType{}]{}); } + + class DummyTaskManager + { + public: + void PushDelayedTask(std::chrono::steady_clock::duration delay, std::function task) + { + instructions.push_back({delay, task}); + } + + using Instruction = std::tuple>; + + std::vector instructions; + }; + + void test_repeating_static_delay() + { + using namespace std::literals; + DummyTaskManager tm; + Future future{tm, Repeating::Delay{2ms}, [i = 0]() mutable + { + ++i; + return i == 2 ? Repeating::Result::Stop : Repeating::Result::Repeat; + }}; + + TS_ASSERT_EQUALS(tm.instructions.size(), 1); + + { + auto [delay, task] = tm.instructions.at(0); + tm.instructions.clear(); + TS_ASSERT_EQUALS(delay, 0ms); + task(); + } + + TS_ASSERT_EQUALS(tm.instructions.size(), 1); + + { + auto [delay, task] = tm.instructions.at(0); + tm.instructions.clear(); + TS_ASSERT_EQUALS(delay, 2ms); + task(); + } + + TS_ASSERT_EQUALS(tm.instructions.size(), 0); + TS_ASSERT(future.IsReady()); + } + + void test_repeating_infinit() + { + using namespace std::literals; + DummyTaskManager tm; + + // This task does allways schedule itself. + Future future{tm, Repeating::Delay{0s}, [] + { + return Repeating::Result::Repeat; + }}; + + TS_ASSERT_EQUALS(tm.instructions.size(), 1); + + { + auto [delay, task] = tm.instructions.at(0); + tm.instructions.clear(); + task(); + } + + TS_ASSERT_EQUALS(tm.instructions.size(), 1); + + { + auto [delay, task] = tm.instructions.at(0); + tm.instructions.clear(); + task(); + } + + TS_ASSERT_EQUALS(tm.instructions.size(), 1); + + // If CancelOrWait is called it will not schedule itself anymore. + future.CancelOrWait(); + + auto [delay, task] = tm.instructions.at(0); + tm.instructions.clear(); + task(); + + TS_ASSERT_EQUALS(tm.instructions.size(), 0); + } }; Index: source/ps/tests/test_TaskManager.h =================================================================== --- source/ps/tests/test_TaskManager.h +++ source/ps/tests/test_TaskManager.h @@ -115,4 +115,61 @@ TS_ASSERT_EQUALS(futures[i].Get(), 5); #undef ITERATIONS } + + void test_repeating_timer() + { + using namespace std::literals; + Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); + + const auto start = std::chrono::steady_clock::now(); + Future future{taskManager, Repeating::Delay{50ms}, [i = 0]() mutable + { + ++i; + return i == 11 ? Repeating::Result::Stop : Repeating::Result::Repeat; + }}; + future.Wait(); + const auto stop = std::chrono::steady_clock::now(); + + TS_ASSERT_LESS_THAN_EQUALS(500ms, stop - start); + } + + void test_repeating_timing() + { + using namespace std::literals; + Threading::TaskManager& taskManager = Threading::TaskManager::Instance(); + + // This is realy bad code don't write something like that in the engine. + + std::atomic flag{false}; + Future loading{taskManager, Repeating::Delay{100ms}, [&flag, firstPass = true]() mutable + { + // The first run is not delayed. + if (firstPass) + { + firstPass = false; + return Repeating::Result::Repeat; + } + + // After 100ms the other task should already have set the flag. + TS_ASSERT(flag.load()); + return Repeating::Result::Stop; + }}; + + Future storing{taskManager, Repeating::Delay{10ms}, [&flag, firstPass = true]() mutable + { + // The first run is not delayed. + if (firstPass) + { + firstPass = false; + return Repeating::Result::Repeat; + } + + // Set the flag after 10ms. + flag.store(true); + return Repeating::Result::Stop; + }}; + + storing.Wait(); + loading.Wait(); + } };