| 1 | //===----------------------------------------------------------------------===// | 
| 2 | //                         DuckDB | 
| 3 | // | 
| 4 | // duckdb/parallel/task_scheduler.hpp | 
| 5 | // | 
| 6 | // | 
| 7 | //===----------------------------------------------------------------------===// | 
| 8 |  | 
| 9 | #pragma once | 
| 10 |  | 
| 11 | #include "duckdb/common/common.hpp" | 
| 12 | #include "duckdb/common/mutex.hpp" | 
| 13 | #include "duckdb/common/vector.hpp" | 
| 14 | #include "duckdb/parallel/task.hpp" | 
| 15 | #include "duckdb/common/atomic.hpp" | 
| 16 |  | 
| 17 | namespace duckdb { | 
| 18 |  | 
| 19 | struct ConcurrentQueue; | 
| 20 | struct QueueProducerToken; | 
| 21 | class ClientContext; | 
| 22 | class DatabaseInstance; | 
| 23 | class TaskScheduler; | 
| 24 |  | 
| 25 | struct SchedulerThread; | 
| 26 |  | 
| 27 | struct ProducerToken { | 
| 28 | 	ProducerToken(TaskScheduler &scheduler, unique_ptr<QueueProducerToken> token); | 
| 29 | 	~ProducerToken(); | 
| 30 |  | 
| 31 | 	TaskScheduler &scheduler; | 
| 32 | 	unique_ptr<QueueProducerToken> token; | 
| 33 | 	mutex producer_lock; | 
| 34 | }; | 
| 35 |  | 
| 36 | //! The TaskScheduler is responsible for managing tasks and threads | 
| 37 | class TaskScheduler { | 
| 38 | 	// timeout for semaphore wait, default 5ms | 
| 39 | 	constexpr static int64_t TASK_TIMEOUT_USECS = 5000; | 
| 40 |  | 
| 41 | public: | 
| 42 | 	TaskScheduler(DatabaseInstance &db); | 
| 43 | 	~TaskScheduler(); | 
| 44 |  | 
| 45 | 	DUCKDB_API static TaskScheduler &GetScheduler(ClientContext &context); | 
| 46 | 	DUCKDB_API static TaskScheduler &GetScheduler(DatabaseInstance &db); | 
| 47 |  | 
| 48 | 	unique_ptr<ProducerToken> CreateProducer(); | 
| 49 | 	//! Schedule a task to be executed by the task scheduler | 
| 50 | 	void ScheduleTask(ProducerToken &producer, shared_ptr<Task> task); | 
| 51 | 	//! Fetches a task from a specific producer, returns true if successful or false if no tasks were available | 
| 52 | 	bool GetTaskFromProducer(ProducerToken &token, shared_ptr<Task> &task); | 
| 53 | 	//! Run tasks forever until "marker" is set to false, "marker" must remain valid until the thread is joined | 
| 54 | 	void ExecuteForever(atomic<bool> *marker); | 
| 55 | 	//! Run tasks until `marker` is set to false, `max_tasks` have been completed, or until there are no more tasks | 
| 56 | 	//! available. Returns the number of tasks that were completed. | 
| 57 | 	idx_t ExecuteTasks(atomic<bool> *marker, idx_t max_tasks); | 
| 58 | 	//! Run tasks until `max_tasks` have been completed, or until there are no more tasks available | 
| 59 | 	void ExecuteTasks(idx_t max_tasks); | 
| 60 |  | 
| 61 | 	//! Sets the amount of active threads executing tasks for the system; n-1 background threads will be launched. | 
| 62 | 	//! The main thread will also be used for execution | 
| 63 | 	void SetThreads(int32_t n); | 
| 64 | 	//! Returns the number of threads | 
| 65 | 	DUCKDB_API int32_t NumberOfThreads(); | 
| 66 |  | 
| 67 | 	//! Send signals to n threads, signalling for them to wake up and attempt to execute a task | 
| 68 | 	void Signal(idx_t n); | 
| 69 |  | 
| 70 | private: | 
| 71 | 	void SetThreadsInternal(int32_t n); | 
| 72 |  | 
| 73 | private: | 
| 74 | 	DatabaseInstance &db; | 
| 75 | 	//! The task queue | 
| 76 | 	unique_ptr<ConcurrentQueue> queue; | 
| 77 | 	//! Lock for modifying the thread count | 
| 78 | 	mutex thread_lock; | 
| 79 | 	//! The active background threads of the task scheduler | 
| 80 | 	vector<unique_ptr<SchedulerThread>> threads; | 
| 81 | 	//! Markers used by the various threads, if the markers are set to "false" the thread execution is stopped | 
| 82 | 	vector<unique_ptr<atomic<bool>>> markers; | 
| 83 | }; | 
| 84 |  | 
| 85 | } // namespace duckdb | 
| 86 |  |