1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/parallel/task_scheduler.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/common/common.hpp"
12#include "duckdb/common/mutex.hpp"
13#include "duckdb/common/vector.hpp"
14#include "duckdb/parallel/task.hpp"
15#include "duckdb/common/atomic.hpp"
16
17namespace duckdb {
18
19struct ConcurrentQueue;
20struct QueueProducerToken;
21class ClientContext;
22class DatabaseInstance;
23class TaskScheduler;
24
25struct SchedulerThread;
26
27struct ProducerToken {
28 ProducerToken(TaskScheduler &scheduler, unique_ptr<QueueProducerToken> token);
29 ~ProducerToken();
30
31 TaskScheduler &scheduler;
32 unique_ptr<QueueProducerToken> token;
33 mutex producer_lock;
34};
35
36//! The TaskScheduler is responsible for managing tasks and threads
37class TaskScheduler {
38 // timeout for semaphore wait, default 5ms
39 constexpr static int64_t TASK_TIMEOUT_USECS = 5000;
40
41public:
42 TaskScheduler(DatabaseInstance &db);
43 ~TaskScheduler();
44
45 DUCKDB_API static TaskScheduler &GetScheduler(ClientContext &context);
46 DUCKDB_API static TaskScheduler &GetScheduler(DatabaseInstance &db);
47
48 unique_ptr<ProducerToken> CreateProducer();
49 //! Schedule a task to be executed by the task scheduler
50 void ScheduleTask(ProducerToken &producer, shared_ptr<Task> task);
51 //! Fetches a task from a specific producer, returns true if successful or false if no tasks were available
52 bool GetTaskFromProducer(ProducerToken &token, shared_ptr<Task> &task);
53 //! Run tasks forever until "marker" is set to false, "marker" must remain valid until the thread is joined
54 void ExecuteForever(atomic<bool> *marker);
55 //! Run tasks until `marker` is set to false, `max_tasks` have been completed, or until there are no more tasks
56 //! available. Returns the number of tasks that were completed.
57 idx_t ExecuteTasks(atomic<bool> *marker, idx_t max_tasks);
58 //! Run tasks until `max_tasks` have been completed, or until there are no more tasks available
59 void ExecuteTasks(idx_t max_tasks);
60
61 //! Sets the amount of active threads executing tasks for the system; n-1 background threads will be launched.
62 //! The main thread will also be used for execution
63 void SetThreads(int32_t n);
64 //! Returns the number of threads
65 DUCKDB_API int32_t NumberOfThreads();
66
67 //! Send signals to n threads, signalling for them to wake up and attempt to execute a task
68 void Signal(idx_t n);
69
70private:
71 void SetThreadsInternal(int32_t n);
72
73private:
74 DatabaseInstance &db;
75 //! The task queue
76 unique_ptr<ConcurrentQueue> queue;
77 //! Lock for modifying the thread count
78 mutex thread_lock;
79 //! The active background threads of the task scheduler
80 vector<unique_ptr<SchedulerThread>> threads;
81 //! Markers used by the various threads, if the markers are set to "false" the thread execution is stopped
82 vector<unique_ptr<atomic<bool>>> markers;
83};
84
85} // namespace duckdb
86