1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/execution/executor.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/common/common.hpp"
12#include "duckdb/common/enums/pending_execution_result.hpp"
13#include "duckdb/common/mutex.hpp"
14#include "duckdb/common/pair.hpp"
15#include "duckdb/common/reference_map.hpp"
16#include "duckdb/parallel/pipeline.hpp"
17
18namespace duckdb {
19class ClientContext;
20class DataChunk;
21class PhysicalOperator;
22class PipelineExecutor;
23class OperatorState;
24class QueryProfiler;
25class ThreadContext;
26class Task;
27
28struct PipelineEventStack;
29struct ProducerToken;
30struct ScheduleEventData;
31
32class Executor {
33 friend class Pipeline;
34 friend class PipelineTask;
35 friend class PipelineBuildState;
36
37public:
38 explicit Executor(ClientContext &context);
39 ~Executor();
40
41 ClientContext &context;
42
43public:
44 static Executor &Get(ClientContext &context);
45
46 void Initialize(PhysicalOperator &physical_plan);
47 void Initialize(unique_ptr<PhysicalOperator> physical_plan);
48
49 void CancelTasks();
50 PendingExecutionResult ExecuteTask();
51
52 void Reset();
53
54 vector<LogicalType> GetTypes();
55
56 unique_ptr<DataChunk> FetchChunk();
57
58 //! Push a new error
59 void PushError(PreservedError exception);
60
61 //! True if an error has been thrown
62 bool HasError();
63 //! Throw the exception that was pushed using PushError.
64 //! Should only be called if HasError returns true
65 void ThrowException();
66
67 //! Work on tasks for this specific executor, until there are no tasks remaining
68 void WorkOnTasks();
69
70 //! Flush a thread context into the client context
71 void Flush(ThreadContext &context);
72
73 //! Reschedules a task that was blocked
74 void RescheduleTask(shared_ptr<Task> &task);
75
76 //! Add the task to be rescheduled
77 void AddToBeRescheduled(shared_ptr<Task> &task);
78
79 //! Returns the progress of the pipelines
80 bool GetPipelinesProgress(double &current_progress);
81
82 void CompletePipeline() {
83 completed_pipelines++;
84 }
85 ProducerToken &GetToken() {
86 return *producer;
87 }
88 void AddEvent(shared_ptr<Event> event);
89
90 void AddRecursiveCTE(PhysicalOperator &rec_cte);
91 void ReschedulePipelines(const vector<shared_ptr<MetaPipeline>> &pipelines, vector<shared_ptr<Event>> &events);
92
93 //! Whether or not the root of the pipeline is a result collector object
94 bool HasResultCollector();
95 //! Returns the query result - can only be used if `HasResultCollector` returns true
96 unique_ptr<QueryResult> GetResult();
97
98 //! Returns true if all pipelines have been completed
99 bool ExecutionIsFinished();
100
101private:
102 void InitializeInternal(PhysicalOperator &physical_plan);
103
104 void ScheduleEvents(const vector<shared_ptr<MetaPipeline>> &meta_pipelines);
105 static void ScheduleEventsInternal(ScheduleEventData &event_data);
106
107 static void VerifyScheduledEvents(const ScheduleEventData &event_data);
108 static void VerifyScheduledEventsInternal(const idx_t i, const vector<Event *> &vertices, vector<bool> &visited,
109 vector<bool> &recursion_stack);
110
111 static void SchedulePipeline(const shared_ptr<MetaPipeline> &pipeline, ScheduleEventData &event_data);
112
113 bool NextExecutor();
114
115 shared_ptr<Pipeline> CreateChildPipeline(Pipeline &current, PhysicalOperator &op);
116
117 void VerifyPipeline(Pipeline &pipeline);
118 void VerifyPipelines();
119
120private:
121 optional_ptr<PhysicalOperator> physical_plan;
122 unique_ptr<PhysicalOperator> owned_plan;
123
124 mutex executor_lock;
125 mutex error_lock;
126 //! All pipelines of the query plan
127 vector<shared_ptr<Pipeline>> pipelines;
128 //! The root pipelines of the query
129 vector<shared_ptr<Pipeline>> root_pipelines;
130 //! The recursive CTE's in this query plan
131 vector<reference<PhysicalOperator>> recursive_ctes;
132 //! The pipeline executor for the root pipeline
133 unique_ptr<PipelineExecutor> root_executor;
134 //! The current root pipeline index
135 idx_t root_pipeline_idx;
136 //! The producer of this query
137 unique_ptr<ProducerToken> producer;
138 //! Exceptions that occurred during the execution of the current query
139 vector<PreservedError> exceptions;
140 //! List of events
141 vector<shared_ptr<Event>> events;
142 //! The query profiler
143 shared_ptr<QueryProfiler> profiler;
144
145 //! The amount of completed pipelines of the query
146 atomic<idx_t> completed_pipelines;
147 //! The total amount of pipelines in the query
148 idx_t total_pipelines;
149 //! Whether or not execution is cancelled
150 bool cancelled;
151
152 //! The last pending execution result (if any)
153 PendingExecutionResult execution_result;
154 //! The current task in process (if any)
155 shared_ptr<Task> task;
156
157 //! Task that have been descheduled
158 unordered_map<Task *, shared_ptr<Task>> to_be_rescheduled_tasks;
159};
160} // namespace duckdb
161