Changeset View
Standalone View
source/ps/ThreadPool.h
- This file was added.
/* Copyright (C) 2021 Wildfire Games. | |||||||||||||||
* This file is part of 0 A.D. | |||||||||||||||
* | |||||||||||||||
* 0 A.D. is free software: you can redistribute it and/or modify | |||||||||||||||
* it under the terms of the GNU General Public License as published by | |||||||||||||||
* the Free Software Foundation, either version 2 of the License, or | |||||||||||||||
* (at your option) any later version. | |||||||||||||||
* | |||||||||||||||
* 0 A.D. is distributed in the hope that it will be useful, | |||||||||||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | |||||||||||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||||||||||||||
* GNU General Public License for more details. | |||||||||||||||
* | |||||||||||||||
* You should have received a copy of the GNU General Public License | |||||||||||||||
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>. | |||||||||||||||
*/ | |||||||||||||||
#ifndef INCLUDED_THREADPOOL | |||||||||||||||
#define INCLUDED_THREADPOOL | |||||||||||||||
#include "ps/Future.h" | |||||||||||||||
#include <memory> | |||||||||||||||
StanUnsubmitted Not Done Inline Actions
Stan: | |||||||||||||||
Not Done Inline ActionsStill. Can some of the below classes be marked final? they don't seem overridable anyway. Stan: Still.
Can some of the below classes be marked final? they don't seem overridable anyway. | |||||||||||||||
#include <vector> | |||||||||||||||
Not Done Inline Actions
Stan: | |||||||||||||||
class TestThreadPool; | |||||||||||||||
namespace ThreadPool | |||||||||||||||
{ | |||||||||||||||
class Pool; | |||||||||||||||
Not Done Inline ActionsI'm not sure that it should be called ThreadPool. Because I think that TaskManager intersects with ThreadPool, but ThreadPool doesn't include the whole TaskManager. vladislavbelov: I'm not sure that it should be called `ThreadPool`. Because I think that `TaskManager`… | |||||||||||||||
Not Done Inline ActionsWhat's about that? vladislavbelov: What's about that? | |||||||||||||||
class Thread; | |||||||||||||||
class ThreadExecutor; | |||||||||||||||
class IExecutor | |||||||||||||||
Not Done Inline ActionsTaskPriority. vladislavbelov: `TaskPriority`. | |||||||||||||||
{ | |||||||||||||||
protected: | |||||||||||||||
// This is purely an interface to reduce duplication, | |||||||||||||||
// and not meant to be used polymorphically, so it has a non-virtual, protected destructor. | |||||||||||||||
~IExecutor() = default; | |||||||||||||||
public: | |||||||||||||||
/** | |||||||||||||||
* Submit a task in a fire-and-forget manner. The task is not guaranteed to run before program exit. | |||||||||||||||
*/ | |||||||||||||||
Not Done Inline ActionsWhat's the point of making a class for it? Why not using WorkerAffinity = ...;? Also affinity means subset not a single element in other words affinity is a mask. Also in our case WorkerAffinity is useless for performance, it helps only for a profiler-like initializations than can be done in thread`s init. vladislavbelov: What's the point of making a class for it? Why not `using WorkerAffinity = ...;`? Also affinity… | |||||||||||||||
void Execute(std::function<void()>&& func) | |||||||||||||||
{ | |||||||||||||||
ExecuteTask(std::move(func)); | |||||||||||||||
} | |||||||||||||||
/** | |||||||||||||||
* Submit a task and get a future that will hold its result. | |||||||||||||||
* The task is not guaranteed to run before program exit. | |||||||||||||||
* Note that if you discard the future, the task is auto-cancelled, | |||||||||||||||
* so use Execute for fire-and-forget tasks. | |||||||||||||||
*/ | |||||||||||||||
template<typename T> | |||||||||||||||
[[nodiscard]] Future<std::invoke_result_t<T>> Submit(T&& func) | |||||||||||||||
{ | |||||||||||||||
Future<std::invoke_result_t<T>> ret; | |||||||||||||||
ExecuteTask(ret.Wrap(std::move(func))); | |||||||||||||||
return ret; | |||||||||||||||
} | |||||||||||||||
protected: | |||||||||||||||
// Do the actual work. | |||||||||||||||
virtual void ExecuteTask(PackagedTask&& task) = 0; | |||||||||||||||
}; | |||||||||||||||
Done Inline Actions. vladislavbelov: . | |||||||||||||||
class PoolExecutor final : public IExecutor | |||||||||||||||
{ | |||||||||||||||
public: | |||||||||||||||
PoolExecutor(Pool& p) : m_Pool(p) {} | |||||||||||||||
protected: | |||||||||||||||
Done Inline ActionsThe same should be for copying then. vladislavbelov: The same should be for copying then. | |||||||||||||||
virtual void ExecuteTask(PackagedTask&& task) override; | |||||||||||||||
Pool& m_Pool; | |||||||||||||||
}; | |||||||||||||||
class ThreadExecutor final : public IExecutor | |||||||||||||||
{ | |||||||||||||||
friend class Pool; | |||||||||||||||
Not Done Inline ActionsWhy std::list over vector / array Stan: Why std::list over vector / array
Does it make sense to use custom allocators for those ? | |||||||||||||||
Done Inline Actionsstd::vector cannot use non-movable objects for now, and array is constant-size. I don't think custom allocators particularly make sense. wraitii: std::vector cannot use non-movable objects for now, and array is constant-size.
I don't think… | |||||||||||||||
friend class EachThreadExecutor; | |||||||||||||||
public: | |||||||||||||||
/** | |||||||||||||||
* Shadow the IExecutor one - we may need to rewrap the function. | |||||||||||||||
*/ | |||||||||||||||
template<typename T> | |||||||||||||||
[[nodiscard]] Future<std::invoke_result_t<T>> Submit(T&& func) | |||||||||||||||
{ | |||||||||||||||
Future<std::invoke_result_t<T>> ret; | |||||||||||||||
if (m_Owns) | |||||||||||||||
// Extend the lifetime of the thread by packaging it in the task. | |||||||||||||||
Not Done Inline ActionsThat should not be public. vladislavbelov: That should not be public. | |||||||||||||||
ExecuteTask(ret.Wrap([thread=m_Thread.ownedPtr, func=std::move(func)]() mutable { | |||||||||||||||
func(); | |||||||||||||||
// Here it gets tricky: the task will end, releasing its shared_ptr, thus destroying the thread | |||||||||||||||
// and thus potentially joining itself (aka crashing). | |||||||||||||||
// To avoid that, we'll set the thread m_Kill to true right away, detach, and the stack will do the rest. | |||||||||||||||
KillOwnedThread(thread); | |||||||||||||||
})); | |||||||||||||||
else | |||||||||||||||
ExecuteTask(ret.Wrap(func)); | |||||||||||||||
return ret; | |||||||||||||||
} | |||||||||||||||
ThreadExecutor(ThreadExecutor&&); | |||||||||||||||
~ThreadExecutor(); | |||||||||||||||
protected: | |||||||||||||||
// Reference constructor. | |||||||||||||||
ThreadExecutor(Thread& thread); | |||||||||||||||
// Owning constructor | |||||||||||||||
struct CreateThread {}; | |||||||||||||||
ThreadExecutor(CreateThread); | |||||||||||||||
Not Done Inline ActionsCancelTasks or ClearQueue. vladislavbelov: `CancelTasks` or `ClearQueue`. | |||||||||||||||
ThreadExecutor(const ThreadExecutor&); | |||||||||||||||
virtual void ExecuteTask(PackagedTask&& task) override; | |||||||||||||||
static void KillOwnedThread(const std::shared_ptr<Thread>& thread); | |||||||||||||||
protected: | |||||||||||||||
union MaybeOwnedThread | |||||||||||||||
{ | |||||||||||||||
MaybeOwnedThread(Thread* p); | |||||||||||||||
Not Done Inline Actionstypo? Stan: typo? | |||||||||||||||
MaybeOwnedThread(CreateThread); | |||||||||||||||
Not Done Inline ActionsIt's not a TaskManager responsibility to provide executors. TaskManager by its interface works only with tasks. A task executor shouldn't retrieved when the task is ready to execute and not when it's submitted. vladislavbelov: It's not a `TaskManager` responsibility to provide executors. `TaskManager` by its interface… | |||||||||||||||
Done Inline ActionsSee comment above on IExecutor - I need some more info on this I think.
So what I called a 'worker' here is what you call a 'task executor', right? But my IExecutor class is intended to be the lightweight handle-to-a-worker from e.g. http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0443r12.html , not an 'execution context'. My design has the client code get an executor from the taskManager/thread pool (I agree that it makes the name a bit confusing, but perhaps we didn't understand the classes to do the same in the first place), then push work via that. The executor system is how I implement thread affinity and priority. I could skip that and just push work via the task manager directly - I didn't do it because it seemed the C++ standard would go more flexible, and I wanted the concepts here to be vaguely replaceable with standard ones someday. Can you clarify what you would have me change? wraitii: See comment above on IExecutor - I need some more info on this I think.
> A task executor… | |||||||||||||||
Not Done Inline ActionsI'd prefer to not have IExecutor at all. It's just an additional step for the code client. A simpler task manager can be extended to a more flexible one. But I don't think we will ever need it. We don't need to solve any possible problem. vladislavbelov: I'd prefer to not have `IExecutor` at all. It's just an additional step for the code client. A… | |||||||||||||||
MaybeOwnedThread(bool owns, MaybeOwnedThread&&); | |||||||||||||||
MaybeOwnedThread(bool owns, const MaybeOwnedThread&); | |||||||||||||||
~MaybeOwnedThread() {}; | |||||||||||||||
Thread* ptr; | |||||||||||||||
std::shared_ptr<Thread> ownedPtr; | |||||||||||||||
} m_Thread; | |||||||||||||||
bool m_Owns; | |||||||||||||||
}; | |||||||||||||||
class EachThreadExecutor | |||||||||||||||
{ | |||||||||||||||
public: | |||||||||||||||
EachThreadExecutor(std::vector<ThreadExecutor>& e) : m_Executors(e) {} | |||||||||||||||
Not Done Inline ActionsMethods for testing shouldn't be in a public interface. vladislavbelov: Methods for testing shouldn't be in a public interface. | |||||||||||||||
Done Inline ActionsThis is also not for testing, but used for e.g. sending work from the pathfinder, so we can push work to each thread. Edit: though with some more thinking I see how this isn't extremely useful, tbh. wraitii: This is also not for testing, but used for e.g. sending work from the pathfinder, so we can… | |||||||||||||||
std::vector<ThreadExecutor>::iterator begin() { return m_Executors.begin(); } | |||||||||||||||
std::vector<ThreadExecutor>::iterator end() { return m_Executors.end(); } | |||||||||||||||
private: | |||||||||||||||
std::vector<ThreadExecutor>& m_Executors; | |||||||||||||||
}; | |||||||||||||||
Not Done Inline Actionsnamespace. vladislavbelov: `namespace`. | |||||||||||||||
/** | |||||||||||||||
* ThreadPool::Pool is the actual thead pool. | |||||||||||||||
* It is ready to accept work from the moment it's created, | |||||||||||||||
* but will only create actual threads once InitThreads is called. | |||||||||||||||
*/ | |||||||||||||||
class Pool | |||||||||||||||
{ | |||||||||||||||
friend class ::TestThreadPool; | |||||||||||||||
friend class PoolExecutor; | |||||||||||||||
friend class Thread; | |||||||||||||||
public: | |||||||||||||||
Pool(); | |||||||||||||||
Not Done Inline ActionsIt seems the Pool name is overused. Pool can't do any work it only can provide threads and not thread execution interfaces. For that purposes there is TaskManager or TaskQueue. vladislavbelov: It seems the `Pool` name is overused. Pool can't do any work it only can provide threads and… | |||||||||||||||
Pool(const Pool&) = delete; | |||||||||||||||
Pool(Pool&&) = delete; | |||||||||||||||
/** | |||||||||||||||
* On destruction Reset is called (@see Reset) and all threads are joined. | |||||||||||||||
*/ | |||||||||||||||
~Pool(); | |||||||||||||||
/** | |||||||||||||||
* Create worker threads. Their number is initially automatically determined. | |||||||||||||||
*/ | |||||||||||||||
void InitThreads(); | |||||||||||||||
/** | |||||||||||||||
* The thread pool's threads are initialised before ConfigDB. This is called when the latter is. | |||||||||||||||
*/ | |||||||||||||||
void ReadConfig(); | |||||||||||||||
public: | |||||||||||||||
/** | |||||||||||||||
* Clears all tasks from the thread pool & its threads, waiting for started tasks. | |||||||||||||||
* Threads are not deleted. | |||||||||||||||
*/ | |||||||||||||||
void Reset(); | |||||||||||||||
/** | |||||||||||||||
* The Pool executor assigns work to the pool, not a specific worker. | |||||||||||||||
* Tasks are not guaranteed to be run in a separate thread. | |||||||||||||||
* Prefer this executor if you will have work to do asynchronously, | |||||||||||||||
* but starting it immediately is not required. | |||||||||||||||
*/ | |||||||||||||||
PoolExecutor GetExecutor(); | |||||||||||||||
/** | |||||||||||||||
* The thread executor assigns work to a specific worker. | |||||||||||||||
* The underlying thread is guaranteed to live at least as long as the executor, | |||||||||||||||
* unless the thread pool is reset. | |||||||||||||||
Not Done Inline Actionsconst? Stan: const? | |||||||||||||||
* @param mustBeAsync - if true, a temporary thread may be started. | |||||||||||||||
*/ | |||||||||||||||
ThreadExecutor GetWorker(bool mustBeAsync = false); | |||||||||||||||
/** | |||||||||||||||
* Returns an executor that can be used to start (optionally different) work on (optionally all) threads. | |||||||||||||||
* Lifetime guarantee: until Reset() is called. | |||||||||||||||
*/ | |||||||||||||||
Not Done Inline Actionsconst? Stan: const? | |||||||||||||||
Done Inline Actions(on this and above) wraitii: (on this and above)
I think it's technically const, but semantically it doesn't make much sense… | |||||||||||||||
EachThreadExecutor GetAllWorkers(); | |||||||||||||||
size_t GetNbOfThreads(); | |||||||||||||||
protected: | |||||||||||||||
void InitThreads(size_t n); | |||||||||||||||
class Impl; | |||||||||||||||
std::unique_ptr<Impl> m; | |||||||||||||||
}; | |||||||||||||||
} // ThreadPool | |||||||||||||||
extern ThreadPool::Pool g_ThreadPool; | |||||||||||||||
#endif // INCLUDED_THREADPOOL | |||||||||||||||
Not Done Inline ActionsUse static method instead. vladislavbelov: Use static method instead. |