1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/parallel/task_scheduler.hpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #pragma once |
10 | |
11 | #include "duckdb/common/common.hpp" |
12 | #include "duckdb/common/mutex.hpp" |
13 | #include "duckdb/common/vector.hpp" |
14 | #include "duckdb/parallel/task.hpp" |
15 | #include "duckdb/common/atomic.hpp" |
16 | |
17 | namespace duckdb { |
18 | |
19 | struct ConcurrentQueue; |
20 | struct QueueProducerToken; |
21 | class ClientContext; |
22 | class DatabaseInstance; |
23 | class TaskScheduler; |
24 | |
25 | struct SchedulerThread; |
26 | |
27 | struct ProducerToken { |
28 | ProducerToken(TaskScheduler &scheduler, unique_ptr<QueueProducerToken> token); |
29 | ~ProducerToken(); |
30 | |
31 | TaskScheduler &scheduler; |
32 | unique_ptr<QueueProducerToken> token; |
33 | mutex producer_lock; |
34 | }; |
35 | |
36 | //! The TaskScheduler is responsible for managing tasks and threads |
37 | class TaskScheduler { |
38 | // timeout for semaphore wait, default 5ms |
39 | constexpr static int64_t TASK_TIMEOUT_USECS = 5000; |
40 | |
41 | public: |
42 | TaskScheduler(DatabaseInstance &db); |
43 | ~TaskScheduler(); |
44 | |
45 | DUCKDB_API static TaskScheduler &GetScheduler(ClientContext &context); |
46 | DUCKDB_API static TaskScheduler &GetScheduler(DatabaseInstance &db); |
47 | |
48 | unique_ptr<ProducerToken> CreateProducer(); |
49 | //! Schedule a task to be executed by the task scheduler |
50 | void ScheduleTask(ProducerToken &producer, shared_ptr<Task> task); |
51 | //! Fetches a task from a specific producer, returns true if successful or false if no tasks were available |
52 | bool GetTaskFromProducer(ProducerToken &token, shared_ptr<Task> &task); |
53 | //! Run tasks forever until "marker" is set to false, "marker" must remain valid until the thread is joined |
54 | void ExecuteForever(atomic<bool> *marker); |
55 | //! Run tasks until `marker` is set to false, `max_tasks` have been completed, or until there are no more tasks |
56 | //! available. Returns the number of tasks that were completed. |
57 | idx_t ExecuteTasks(atomic<bool> *marker, idx_t max_tasks); |
58 | //! Run tasks until `max_tasks` have been completed, or until there are no more tasks available |
59 | void ExecuteTasks(idx_t max_tasks); |
60 | |
61 | //! Sets the amount of active threads executing tasks for the system; n-1 background threads will be launched. |
62 | //! The main thread will also be used for execution |
63 | void SetThreads(int32_t n); |
64 | //! Returns the number of threads |
65 | DUCKDB_API int32_t NumberOfThreads(); |
66 | |
67 | //! Send signals to n threads, signalling for them to wake up and attempt to execute a task |
68 | void Signal(idx_t n); |
69 | |
70 | private: |
71 | void SetThreadsInternal(int32_t n); |
72 | |
73 | private: |
74 | DatabaseInstance &db; |
75 | //! The task queue |
76 | unique_ptr<ConcurrentQueue> queue; |
77 | //! Lock for modifying the thread count |
78 | mutex thread_lock; |
79 | //! The active background threads of the task scheduler |
80 | vector<unique_ptr<SchedulerThread>> threads; |
81 | //! Markers used by the various threads, if the markers are set to "false" the thread execution is stopped |
82 | vector<unique_ptr<atomic<bool>>> markers; |
83 | }; |
84 | |
85 | } // namespace duckdb |
86 | |