| 1 | //===----------------------------------------------------------------------===// |
| 2 | // DuckDB |
| 3 | // |
| 4 | // duckdb/parallel/task_scheduler.hpp |
| 5 | // |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | |
| 9 | #pragma once |
| 10 | |
| 11 | #include "duckdb/common/common.hpp" |
| 12 | #include "duckdb/common/mutex.hpp" |
| 13 | #include "duckdb/common/vector.hpp" |
| 14 | #include "duckdb/parallel/task.hpp" |
| 15 | #include "duckdb/common/atomic.hpp" |
| 16 | |
| 17 | namespace duckdb { |
| 18 | |
| 19 | struct ConcurrentQueue; |
| 20 | struct QueueProducerToken; |
| 21 | class ClientContext; |
| 22 | class DatabaseInstance; |
| 23 | class TaskScheduler; |
| 24 | |
| 25 | struct SchedulerThread; |
| 26 | |
| 27 | struct ProducerToken { |
| 28 | ProducerToken(TaskScheduler &scheduler, unique_ptr<QueueProducerToken> token); |
| 29 | ~ProducerToken(); |
| 30 | |
| 31 | TaskScheduler &scheduler; |
| 32 | unique_ptr<QueueProducerToken> token; |
| 33 | mutex producer_lock; |
| 34 | }; |
| 35 | |
| 36 | //! The TaskScheduler is responsible for managing tasks and threads |
| 37 | class TaskScheduler { |
| 38 | // timeout for semaphore wait, default 5ms |
| 39 | constexpr static int64_t TASK_TIMEOUT_USECS = 5000; |
| 40 | |
| 41 | public: |
| 42 | TaskScheduler(DatabaseInstance &db); |
| 43 | ~TaskScheduler(); |
| 44 | |
| 45 | DUCKDB_API static TaskScheduler &GetScheduler(ClientContext &context); |
| 46 | DUCKDB_API static TaskScheduler &GetScheduler(DatabaseInstance &db); |
| 47 | |
| 48 | unique_ptr<ProducerToken> CreateProducer(); |
| 49 | //! Schedule a task to be executed by the task scheduler |
| 50 | void ScheduleTask(ProducerToken &producer, shared_ptr<Task> task); |
| 51 | //! Fetches a task from a specific producer, returns true if successful or false if no tasks were available |
| 52 | bool GetTaskFromProducer(ProducerToken &token, shared_ptr<Task> &task); |
| 53 | //! Run tasks forever until "marker" is set to false, "marker" must remain valid until the thread is joined |
| 54 | void ExecuteForever(atomic<bool> *marker); |
| 55 | //! Run tasks until `marker` is set to false, `max_tasks` have been completed, or until there are no more tasks |
| 56 | //! available. Returns the number of tasks that were completed. |
| 57 | idx_t ExecuteTasks(atomic<bool> *marker, idx_t max_tasks); |
| 58 | //! Run tasks until `max_tasks` have been completed, or until there are no more tasks available |
| 59 | void ExecuteTasks(idx_t max_tasks); |
| 60 | |
| 61 | //! Sets the amount of active threads executing tasks for the system; n-1 background threads will be launched. |
| 62 | //! The main thread will also be used for execution |
| 63 | void SetThreads(int32_t n); |
| 64 | //! Returns the number of threads |
| 65 | DUCKDB_API int32_t NumberOfThreads(); |
| 66 | |
| 67 | //! Send signals to n threads, signalling for them to wake up and attempt to execute a task |
| 68 | void Signal(idx_t n); |
| 69 | |
| 70 | private: |
| 71 | void SetThreadsInternal(int32_t n); |
| 72 | |
| 73 | private: |
| 74 | DatabaseInstance &db; |
| 75 | //! The task queue |
| 76 | unique_ptr<ConcurrentQueue> queue; |
| 77 | //! Lock for modifying the thread count |
| 78 | mutex thread_lock; |
| 79 | //! The active background threads of the task scheduler |
| 80 | vector<unique_ptr<SchedulerThread>> threads; |
| 81 | //! Markers used by the various threads, if the markers are set to "false" the thread execution is stopped |
| 82 | vector<unique_ptr<atomic<bool>>> markers; |
| 83 | }; |
| 84 | |
| 85 | } // namespace duckdb |
| 86 | |