Index: source/network/NetClient.h =================================================================== --- source/network/NetClient.h +++ source/network/NetClient.h @@ -27,7 +27,6 @@ #include #include -#include class CGame; class CNetClientSession; @@ -314,8 +313,6 @@ /// Current network session (or NULL if not connected) CNetClientSession* m_Session; - std::thread m_PollingThread; - /// Turn manager associated with the current game (or NULL if we haven't started the game yet) CNetClientTurnManager* m_ClientTurnManager; Index: source/network/NetClient.cpp =================================================================== --- source/network/NetClient.cpp +++ source/network/NetClient.cpp @@ -35,7 +35,6 @@ #include "ps/Game.h" #include "ps/Loader.h" #include "ps/Profile.h" -#include "ps/Threading.h" #include "scriptinterface/ScriptInterface.h" #include "simulation2/Simulation2.h" #include "network/StunClient.h" @@ -191,13 +190,12 @@ m_ControllerSecret = secret; } - bool CNetClient::SetupConnection(ENetHost* enetClient) { CNetClientSession* session = new CNetClientSession(*this); bool ok = session->Connect(m_ServerAddress, m_ServerPort, enetClient); SetAndOwnSession(session); - m_PollingThread = std::thread(Threading::HandleExceptions::Wrapper, m_Session); + CNetClientSession::StartRecurrentTask(session); return ok; } @@ -297,16 +295,11 @@ void CNetClient::DestroyConnection() { if (m_Session) + { + // This deletes session on its own. m_Session->Shutdown(); - - if (m_PollingThread.joinable()) - // Use detach() over join() because we don't want to wait for the session - // (which may be polling or trying to send messages). - m_PollingThread.detach(); - - // The polling thread will cleanup the session on its own, - // mark it as nullptr here so we know we're done using it. - m_Session = nullptr; + m_Session = nullptr; + } } void CNetClient::Poll() Index: source/network/NetFileTransfer.h =================================================================== --- source/network/NetFileTransfer.h +++ source/network/NetFileTransfer.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2020 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * This file is part of 0 A.D. * * 0 A.D. is free software: you can redistribute it and/or modify @@ -108,6 +108,11 @@ */ void Poll(); + /** + * @return true if there is some work to be done. + */ + bool HasWork() const; + private: Status OnFileTransferResponse(const CFileTransferResponseMessage& message); Status OnFileTransferData(const CFileTransferDataMessage& message); Index: source/network/NetFileTransfer.cpp =================================================================== --- source/network/NetFileTransfer.cpp +++ source/network/NetFileTransfer.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2019 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * This file is part of 0 A.D. * * 0 A.D. is free software: you can redistribute it and/or modify @@ -188,3 +188,8 @@ // TODO: need to garbage-collect finished tasks } + +bool CNetFileTransferer::HasWork() const +{ + return !m_FileSendTasks.empty(); +} Index: source/network/NetSession.h =================================================================== --- source/network/NetSession.h +++ source/network/NetSession.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2020 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * This file is part of 0 A.D. * * 0 A.D. is free software: you can redistribute it and/or modify @@ -23,6 +23,7 @@ #include "network/NetFileTransfer.h" #include "network/NetHost.h" #include "ps/CStr.h" +#include "ps/ThreadPoolForward.h" #include @@ -78,7 +79,8 @@ * The client NetSession is threaded to avoid getting timeouts if the main thread hangs. * Call Connect() before starting this loop. */ - static void RunNetLoop(CNetClientSession* session); + static void StartRecurrentTask(CNetClientSession* session); + ThreadPool::RecurrentTaskStatus RunNetUpdate(Future& future, ThreadPool::PoolExecutor& exec); /** * Shut down the net session. @@ -110,7 +112,7 @@ /** * Process queued incoming messages. */ - void Poll(); + void Poll(const ENetEvent& event); /** * Flush queued outgoing network messages. Index: source/network/NetSession.cpp =================================================================== --- source/network/NetSession.cpp +++ source/network/NetSession.cpp @@ -1,4 +1,4 @@ -/* Copyright (C) 2019 Wildfire Games. +/* Copyright (C) 2021 Wildfire Games. * This file is part of 0 A.D. * * 0 A.D. is free software: you can redistribute it and/or modify @@ -25,10 +25,9 @@ #include "NetStats.h" #include "ps/CLogger.h" #include "ps/Profile.h" +#include "ps/ThreadPool.h" #include "scriptinterface/ScriptInterface.h" -constexpr int NETCLIENT_POLL_TIMEOUT = 50; - constexpr int CHANNEL_COUNT = 1; CNetClientSession::CNetClientSession(CNetClient& client) : @@ -92,44 +91,59 @@ return true; } -void CNetClientSession::RunNetLoop(CNetClientSession* session) +void CNetClientSession::StartRecurrentTask(CNetClientSession* session) { - ENSURE(!session->m_LoopRunning); session->m_LoopRunning = true; + ThreadPool::TaskManager::Instance().AddRecurrentTask(30, [session, future=Future()](ThreadPool::PoolExecutor& exec) mutable { + return session->RunNetUpdate(future, exec); + }); +} - debug_SetThreadName("NetClientSession loop"); +ThreadPool::RecurrentTaskStatus CNetClientSession::RunNetUpdate(Future& future, ThreadPool::PoolExecutor& exec) +{ + if (future.Valid() && !future.IsReady()) + return ThreadPool::RecurrentTaskStatus::RETRY; - while (!session->m_ShouldShutdown) + if (m_ShouldShutdown) { - ENSURE(session->m_Host && session->m_Server); - - session->m_FileTransferer.Poll(); - session->Poll(); - session->Flush(); - - session->m_LastReceivedTime = enet_time_get() - session->m_Server->lastReceiveTime; - session->m_MeanRTT = session->m_Server->roundTripTime; + m_LoopRunning = false; + delete this; + return ThreadPool::RecurrentTaskStatus::STOP; } - session->m_LoopRunning = false; + ENSURE(m_Host && m_Server); - // Deleting the session is handled in this thread as it might outlive the CNetClient. - SAFE_DELETE(session); + m_LastReceivedTime = enet_time_get() - m_Server->lastReceiveTime; + m_MeanRTT = m_Server->roundTripTime; + + // Check if we have some work to do, if not exit early. + ENetEvent event; + // TODO: handle errors. + bool hasWork = enet_host_service(m_Host, &event, 0) > 0; + if (!hasWork && m_FileTransferer.HasWork()) + hasWork = true; + else if (!m_OutgoingMessages.empty()) + hasWork = true; + if (hasWork) + future = exec.Submit([event, this]() mutable { + PROFILE2("NetClient - Update"); + m_FileTransferer.Poll(); + Flush(); + do + Poll(event); + while (enet_host_service(m_Host, &event, 0) > 0); + }); + return ThreadPool::RecurrentTaskStatus::OK; } void CNetClientSession::Shutdown() { + // On the next timer loop, this will clean up the net session. m_ShouldShutdown = true; } -void CNetClientSession::Poll() +void CNetClientSession::Poll(const ENetEvent& event) { - ENetEvent event; - - // Use the timeout to make the thread wait and save CPU time. - if (enet_host_service(m_Host, &event, NETCLIENT_POLL_TIMEOUT) <= 0) - return; - if (event.type == ENET_EVENT_TYPE_CONNECT) { ENSURE(event.peer == m_Server); Index: source/ps/ThreadPool.h =================================================================== --- source/ps/ThreadPool.h +++ source/ps/ThreadPool.h @@ -19,6 +19,7 @@ #define INCLUDED_THREADPOOL #include "ps/Future.h" +#include "ps/ThreadPoolForward.h" #include #include @@ -28,21 +29,14 @@ namespace ThreadPool { -class TaskManager; class WorkerThread; -enum class Priority -{ - NORMAL, - LOW -}; - /** * Execute work on the global queue. * The low-priority queue has a differently-typed executor to highlight the different * starvation guaranteeds. */ -template +template class GlobalExecutor final : public IExecutor { friend class TaskManager; @@ -124,6 +118,18 @@ */ EachThreadExecutor& GetAllWorkers(); + /** + * Add a recurrent task running approximately every X ms. + * Recurrent tasks are run on the timer thread, and should run in microseconds or less. + * A typical use-case is to check if some actual work needs to be performed, and use the PoolExecutor + * passed as argument to do this on one of the regular workers. + * Note that the timer is allowed to drift over time. + * @param repeatms - Time, in milliseconds, between two calls. + * The actual value will depend on the threadpool timer. + * @param func - Function to run. + */ + void AddRecurrentTask(u32 repeatms, std::function&)>&& func); + private: class Impl; std::unique_ptr m; Index: source/ps/ThreadPool.cpp =================================================================== --- source/ps/ThreadPool.cpp +++ source/ps/ThreadPool.cpp @@ -28,6 +28,7 @@ #include "ps/Profiler2.h" #include +#include #include #include #include @@ -60,7 +61,24 @@ std::unique_ptr g_TaskManager; +/** + * Run the timer thread every X. + * NB: this is currently allowed to drift over time, as that's presumed not too important. + * This value should be high enough that tasks can execute somewhat real-time, + * but low enough that the thread is largely idle & won't have negative scheduling effects. + */ +static constexpr std::chrono::microseconds TIMER_INTERVAL = std::chrono::microseconds(30000); + +/** + * The timer thread runs recurrent tasks on its own thread to avoid context switching. + * This means these tasks must be fast. + * When the wakup takes more than this much time, print a warning. + * This is conservative, it's meant to detect code errors. + */ +static constexpr std::chrono::microseconds MAX_TIMER_WAKUP_TIME = std::chrono::microseconds(1000); + class Thread; +class TimerThread; } using QueueItem = std::function; @@ -148,6 +166,58 @@ ThreadPool::TaskManager::Impl& m_TaskManager; }; +class ThreadPool::TimerThread final : public Thread +{ +public: + TimerThread(const GlobalExecutor& exec) : m_Executor(exec) + { + Start(this); + }; + + ~TimerThread() + { + m_Kill = true; + Clear(); + if (m_Thread.joinable()) + m_Thread.join(); + } + + /** + * Clear recurrent & one-off tasks. + */ + void Clear() + { + std::lock_guard lock(m_Mutex); + m_RecurrentTasks.clear(); + } + + /** + * Add a recurrent task. The task will be run on the next timer call. + * @param repeat - How often to run this task. 0 is every time, 1 is every 2, 2 every 3, and so on. + * @param func - Task to run. + */ + void PushRecurrentTask(u32 repeat, std::function&)>&& func) + { + m_RecurrentTasks.emplace_back(repeat, std::move(func)); + } + +protected: + void RunUntilDeath(); + std::mutex m_Mutex; + std::condition_variable m_ConditionVariable; + + GlobalExecutor m_Executor; + + struct RecurrentTask + { + RecurrentTask(u32 repeat, std::function&)>&& func) : m_Repeat(repeat), m_Func(std::move(func)) {} + u32 m_Repeat = 0; + u32 m_Cooldown = 0; + std::function&)> m_Func; + }; + std::vector m_RecurrentTasks; +}; + /** * PImpl-ed implementation of the threadpool's task manager. * @@ -194,6 +264,7 @@ // Ideally this would be a vector, since it does get iterated, but that requires movable types, // and we want the executors to be long-lived, thus these need to not move. std::list m_Workers; + TimerThread m_TimerWorker; // This does not contain all workers, see comment on avoiding starvation above. std::vector m_DedicatedExecutors; @@ -215,7 +286,7 @@ ThreadPool::TaskManager::~TaskManager() {} ThreadPool::TaskManager::Impl::Impl(TaskManager& backref, size_t nbWorkers) - : m_TaskManager(backref) + : m_TaskManager(backref), m_TimerWorker(m_TaskManager.GetExecutor()) { m_FirstReversedIdx = nbWorkers - GetNbOfReversedPriorityWorker(nbWorkers); for (size_t i = 0; i < nbWorkers; ++i) @@ -240,6 +311,7 @@ void ThreadPool::TaskManager::Clear() { m->Clear(); } void ThreadPool::TaskManager::Impl::Clear() { + m_TimerWorker.Clear(); for (WorkerThread& worker : m_Workers) worker.Clear(); } @@ -311,6 +383,15 @@ return *g_TaskManager; } +void ThreadPool::TaskManager::AddRecurrentTask(u32 repeatms, std::function&)>&& func) +{ + u32 repeat = round(std::chrono::duration(repeatms) / TIMER_INTERVAL); + // Need to remove one additional tick as 0 means "every timer" + if (repeat > 0) + --repeat; + m->m_TimerWorker.PushRecurrentTask(repeat, std::move(func)); +} + // Executor definitions template @@ -391,6 +472,77 @@ } } +void ThreadPool::TimerThread::RunUntilDeath() +{ + std::string name("ThreadPool Timer"); + g_Profiler2.RegisterCurrentThread(name); + debug_SetThreadName(name.c_str()); + + std::chrono::microseconds sleepTime = TIMER_INTERVAL; + std::chrono::steady_clock clock; + std::chrono::time_point t0 = clock.now(); + u32 warningCounter = 0; + std::vector indicesToDrop; + + std::unique_lock lock(m_Mutex); + while (!m_Kill) + { + // Tolerate spurious wake ups. + m_ConditionVariable.wait_for(lock, sleepTime); + if (m_Kill) + break; + // We give no guarantees on actual timing (because that's much easier), + // so just fire tasks. + t0 = clock.now(); + + indicesToDrop.clear(); + for (size_t i = 0; i < m_RecurrentTasks.size(); ++i) + { + RecurrentTask& task = m_RecurrentTasks[i]; + if (task.m_Cooldown == 0) + { + RecurrentTaskStatus status = task.m_Func(m_Executor); + if (status == RecurrentTaskStatus::OK) + task.m_Cooldown = task.m_Repeat; + else if (status == RecurrentTaskStatus::STOP) + indicesToDrop.push_back(i); + // Else keep as-is for retry. + } + else + --task.m_Cooldown; + } + for (size_t idx : indicesToDrop) + { + m_RecurrentTasks[idx] = std::move(m_RecurrentTasks.back()); + m_RecurrentTasks.pop_back(); + } + indicesToDrop.clear(); + + auto wakeTime = clock.now() - t0; + // Since it's measured, correct for run time, but this won't prevent + // actual drift over time. + sleepTime = std::chrono::duration_cast(TIMER_INTERVAL - wakeTime); + + // Warning logic - we want to warn if the timer is too often too slow + // since that probably indicates a mistake somewhere (e.g. the wrong function is run). + // However we don't want spurious warnings, so add 2 for each overrun, and remove 1 otherwise. + // This will warn if enough runs are slow in a short time period. + if (wakeTime > MAX_TIMER_WAKUP_TIME) + { + warningCounter += 2; + if (warningCounter > 50) + { + warningCounter = 0; + LOGWARNING("ThreadPool Timer: starting tasks took %ims, which is more than the limit of %ims", + std::chrono::duration_cast(sleepTime).count(), + std::chrono::duration_cast(MAX_TIMER_WAKUP_TIME).count()); + } + } + else if (warningCounter > 0) + --warningCounter; + } +} + // Defined here - needs access to derived types. template void ThreadPool::Thread::DoStart(T* object) Index: source/ps/ThreadPoolForward.h =================================================================== --- /dev/null +++ source/ps/ThreadPoolForward.h @@ -0,0 +1,51 @@ +/* Copyright (C) 2021 Wildfire Games. + * This file is part of 0 A.D. + * + * 0 A.D. is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * 0 A.D. is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with 0 A.D. If not, see . + */ + +#ifndef INCLUDED_THREADPOOL_FORWARD +#define INCLUDED_THREADPOOL_FORWARD + +#include "ps/FutureForward.h" + +namespace ThreadPool +{ +enum class RecurrentTaskStatus +{ + // Task went OK - nothing special to report. + OK, + // Retry next timer call. + RETRY, + // Drop the recurrent task + STOP, +}; + +enum class Priority +{ + NORMAL, + LOW +}; + +template +class GlobalExecutor; +using PoolExecutor = GlobalExecutor; +using LowPrioPoolExecutor = GlobalExecutor; +class ThreadExecutor; +class EachThreadExecutor; + +class TaskManager; +} // ThreadPool + +#endif // INCLUDED_THREADPOOL_FORWARD