1#include "duckdb/parallel/pipeline.hpp"
2
3#include "duckdb/common/algorithm.hpp"
4#include "duckdb/common/printer.hpp"
5#include "duckdb/common/tree_renderer.hpp"
6#include "duckdb/execution/executor.hpp"
7#include "duckdb/execution/operator/aggregate/physical_ungrouped_aggregate.hpp"
8#include "duckdb/execution/operator/scan/physical_table_scan.hpp"
9#include "duckdb/execution/operator/set/physical_recursive_cte.hpp"
10#include "duckdb/main/client_context.hpp"
11#include "duckdb/main/database.hpp"
12#include "duckdb/parallel/pipeline_event.hpp"
13#include "duckdb/parallel/pipeline_executor.hpp"
14#include "duckdb/parallel/task_scheduler.hpp"
15
16namespace duckdb {
17
18class PipelineTask : public ExecutorTask {
19 static constexpr const idx_t PARTIAL_CHUNK_COUNT = 50;
20
21public:
22 explicit PipelineTask(Pipeline &pipeline_p, shared_ptr<Event> event_p)
23 : ExecutorTask(pipeline_p.executor), pipeline(pipeline_p), event(std::move(event_p)) {
24 }
25
26 Pipeline &pipeline;
27 shared_ptr<Event> event;
28 unique_ptr<PipelineExecutor> pipeline_executor;
29
30public:
31 TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
32 if (!pipeline_executor) {
33 pipeline_executor = make_uniq<PipelineExecutor>(args&: pipeline.GetClientContext(), args&: pipeline);
34 }
35
36 pipeline_executor->SetTaskForInterrupts(shared_from_this());
37
38 if (mode == TaskExecutionMode::PROCESS_PARTIAL) {
39 auto res = pipeline_executor->Execute(max_chunks: PARTIAL_CHUNK_COUNT);
40
41 switch (res) {
42 case PipelineExecuteResult::NOT_FINISHED:
43 return TaskExecutionResult::TASK_NOT_FINISHED;
44 case PipelineExecuteResult::INTERRUPTED:
45 return TaskExecutionResult::TASK_BLOCKED;
46 case PipelineExecuteResult::FINISHED:
47 break;
48 }
49 } else {
50 auto res = pipeline_executor->Execute();
51 switch (res) {
52 case PipelineExecuteResult::NOT_FINISHED:
53 throw InternalException("Execute without limit should not return NOT_FINISHED");
54 case PipelineExecuteResult::INTERRUPTED:
55 return TaskExecutionResult::TASK_BLOCKED;
56 case PipelineExecuteResult::FINISHED:
57 break;
58 }
59 }
60
61 event->FinishTask();
62 pipeline_executor.reset();
63 return TaskExecutionResult::TASK_FINISHED;
64 }
65};
66
67Pipeline::Pipeline(Executor &executor_p)
68 : executor(executor_p), ready(false), initialized(false), source(nullptr), sink(nullptr) {
69}
70
71ClientContext &Pipeline::GetClientContext() {
72 return executor.context;
73}
74
75bool Pipeline::GetProgress(double &current_percentage, idx_t &source_cardinality) {
76 D_ASSERT(source);
77 source_cardinality = source->estimated_cardinality;
78 if (!initialized) {
79 current_percentage = 0;
80 return true;
81 }
82 auto &client = executor.context;
83 current_percentage = source->GetProgress(context&: client, gstate&: *source_state);
84 return current_percentage >= 0;
85}
86
87void Pipeline::ScheduleSequentialTask(shared_ptr<Event> &event) {
88 vector<shared_ptr<Task>> tasks;
89 tasks.push_back(x: make_uniq<PipelineTask>(args&: *this, args&: event));
90 event->SetTasks(std::move(tasks));
91}
92
93bool Pipeline::ScheduleParallel(shared_ptr<Event> &event) {
94 // check if the sink, source and all intermediate operators support parallelism
95 if (!sink->ParallelSink()) {
96 return false;
97 }
98 if (!source->ParallelSource()) {
99 return false;
100 }
101 for (auto &op_ref : operators) {
102 auto &op = op_ref.get();
103 if (!op.ParallelOperator()) {
104 return false;
105 }
106 }
107 if (sink->RequiresBatchIndex()) {
108 if (!source->SupportsBatchIndex()) {
109 throw InternalException(
110 "Attempting to schedule a pipeline where the sink requires batch index but source does not support it");
111 }
112 }
113 idx_t max_threads = source_state->MaxThreads();
114 return LaunchScanTasks(event, max_threads);
115}
116
117bool Pipeline::IsOrderDependent() const {
118 auto &config = DBConfig::GetConfig(context&: executor.context);
119 if (source) {
120 auto source_order = source->SourceOrder();
121 if (source_order == OrderPreservationType::FIXED_ORDER) {
122 return true;
123 }
124 if (source_order == OrderPreservationType::NO_ORDER) {
125 return false;
126 }
127 }
128 for (auto &op_ref : operators) {
129 auto &op = op_ref.get();
130 if (op.OperatorOrder() == OrderPreservationType::NO_ORDER) {
131 return false;
132 }
133 if (op.OperatorOrder() == OrderPreservationType::FIXED_ORDER) {
134 return true;
135 }
136 }
137 if (!config.options.preserve_insertion_order) {
138 return false;
139 }
140 if (sink && sink->SinkOrderDependent()) {
141 return true;
142 }
143 return false;
144}
145
146void Pipeline::Schedule(shared_ptr<Event> &event) {
147 D_ASSERT(ready);
148 D_ASSERT(sink);
149 Reset();
150 if (!ScheduleParallel(event)) {
151 // could not parallelize this pipeline: push a sequential task instead
152 ScheduleSequentialTask(event);
153 }
154}
155
156bool Pipeline::LaunchScanTasks(shared_ptr<Event> &event, idx_t max_threads) {
157 // split the scan up into parts and schedule the parts
158 auto &scheduler = TaskScheduler::GetScheduler(context&: executor.context);
159 idx_t active_threads = scheduler.NumberOfThreads();
160 if (max_threads > active_threads) {
161 max_threads = active_threads;
162 }
163 if (max_threads <= 1) {
164 // too small to parallelize
165 return false;
166 }
167
168 // launch a task for every thread
169 vector<shared_ptr<Task>> tasks;
170 for (idx_t i = 0; i < max_threads; i++) {
171 tasks.push_back(x: make_uniq<PipelineTask>(args&: *this, args&: event));
172 }
173 event->SetTasks(std::move(tasks));
174 return true;
175}
176
177void Pipeline::ResetSink() {
178 if (sink) {
179 if (!sink->IsSink()) {
180 throw InternalException("Sink of pipeline does not have IsSink set");
181 }
182 lock_guard<mutex> guard(sink->lock);
183 if (!sink->sink_state) {
184 sink->sink_state = sink->GetGlobalSinkState(context&: GetClientContext());
185 }
186 }
187}
188
189void Pipeline::Reset() {
190 ResetSink();
191 for (auto &op_ref : operators) {
192 auto &op = op_ref.get();
193 lock_guard<mutex> guard(op.lock);
194 if (!op.op_state) {
195 op.op_state = op.GetGlobalOperatorState(context&: GetClientContext());
196 }
197 }
198 ResetSource(force: false);
199 // we no longer reset source here because this function is no longer guaranteed to be called by the main thread
200 // source reset needs to be called by the main thread because resetting a source may call into clients like R
201 initialized = true;
202}
203
204void Pipeline::ResetSource(bool force) {
205 if (source && !source->IsSource()) {
206 throw InternalException("Source of pipeline does not have IsSource set");
207 }
208 if (force || !source_state) {
209 source_state = source->GetGlobalSourceState(context&: GetClientContext());
210 }
211}
212
213void Pipeline::Ready() {
214 if (ready) {
215 return;
216 }
217 ready = true;
218 std::reverse(first: operators.begin(), last: operators.end());
219}
220
221void Pipeline::Finalize(Event &event) {
222 if (executor.HasError()) {
223 return;
224 }
225 D_ASSERT(ready);
226 try {
227 auto sink_state = sink->Finalize(pipeline&: *this, event, context&: executor.context, gstate&: *sink->sink_state);
228 sink->sink_state->state = sink_state;
229 } catch (Exception &ex) { // LCOV_EXCL_START
230 executor.PushError(exception: PreservedError(ex));
231 } catch (std::exception &ex) {
232 executor.PushError(exception: PreservedError(ex));
233 } catch (...) {
234 executor.PushError(exception: PreservedError("Unknown exception in Finalize!"));
235 } // LCOV_EXCL_STOP
236}
237
238void Pipeline::AddDependency(shared_ptr<Pipeline> &pipeline) {
239 D_ASSERT(pipeline);
240 dependencies.push_back(x: weak_ptr<Pipeline>(pipeline));
241 pipeline->parents.push_back(x: weak_ptr<Pipeline>(shared_from_this()));
242}
243
244string Pipeline::ToString() const {
245 TreeRenderer renderer;
246 return renderer.ToString(op: *this);
247}
248
249void Pipeline::Print() const {
250 Printer::Print(str: ToString());
251}
252
253void Pipeline::PrintDependencies() const {
254 for (auto &dep : dependencies) {
255 shared_ptr<Pipeline>(dep)->Print();
256 }
257}
258
259vector<reference<PhysicalOperator>> Pipeline::GetOperators() {
260 vector<reference<PhysicalOperator>> result;
261 D_ASSERT(source);
262 result.push_back(x: *source);
263 for (auto &op : operators) {
264 result.push_back(x: op.get());
265 }
266 if (sink) {
267 result.push_back(x: *sink);
268 }
269 return result;
270}
271
272vector<const_reference<PhysicalOperator>> Pipeline::GetOperators() const {
273 vector<const_reference<PhysicalOperator>> result;
274 D_ASSERT(source);
275 result.push_back(x: *source);
276 for (auto &op : operators) {
277 result.push_back(x: op.get());
278 }
279 if (sink) {
280 result.push_back(x: *sink);
281 }
282 return result;
283}
284
285void Pipeline::ClearSource() {
286 source_state.reset();
287 batch_indexes.clear();
288}
289
290idx_t Pipeline::RegisterNewBatchIndex() {
291 lock_guard<mutex> l(batch_lock);
292 idx_t minimum = batch_indexes.empty() ? base_batch_index : *batch_indexes.begin();
293 batch_indexes.insert(x: minimum);
294 return minimum;
295}
296
297idx_t Pipeline::UpdateBatchIndex(idx_t old_index, idx_t new_index) {
298 lock_guard<mutex> l(batch_lock);
299 if (new_index < *batch_indexes.begin()) {
300 throw InternalException("Processing batch index %llu, but previous min batch index was %llu", new_index,
301 *batch_indexes.begin());
302 }
303 auto entry = batch_indexes.find(x: old_index);
304 if (entry == batch_indexes.end()) {
305 throw InternalException("Batch index %llu was not found in set of active batch indexes", old_index);
306 }
307 batch_indexes.erase(position: entry);
308 batch_indexes.insert(x: new_index);
309 return *batch_indexes.begin();
310}
311//===--------------------------------------------------------------------===//
312// Pipeline Build State
313//===--------------------------------------------------------------------===//
314void PipelineBuildState::SetPipelineSource(Pipeline &pipeline, PhysicalOperator &op) {
315 pipeline.source = &op;
316}
317
318void PipelineBuildState::SetPipelineSink(Pipeline &pipeline, optional_ptr<PhysicalOperator> op,
319 idx_t sink_pipeline_count) {
320 pipeline.sink = op;
321 // set the base batch index of this pipeline based on how many other pipelines have this node as their sink
322 pipeline.base_batch_index = BATCH_INCREMENT * sink_pipeline_count;
323}
324
325void PipelineBuildState::AddPipelineOperator(Pipeline &pipeline, PhysicalOperator &op) {
326 pipeline.operators.push_back(x: op);
327}
328
329optional_ptr<PhysicalOperator> PipelineBuildState::GetPipelineSource(Pipeline &pipeline) {
330 return pipeline.source;
331}
332
333optional_ptr<PhysicalOperator> PipelineBuildState::GetPipelineSink(Pipeline &pipeline) {
334 return pipeline.sink;
335}
336
337void PipelineBuildState::SetPipelineOperators(Pipeline &pipeline, vector<reference<PhysicalOperator>> operators) {
338 pipeline.operators = std::move(operators);
339}
340
341shared_ptr<Pipeline> PipelineBuildState::CreateChildPipeline(Executor &executor, Pipeline &pipeline,
342 PhysicalOperator &op) {
343 return executor.CreateChildPipeline(current&: pipeline, op);
344}
345
346vector<reference<PhysicalOperator>> PipelineBuildState::GetPipelineOperators(Pipeline &pipeline) {
347 return pipeline.operators;
348}
349
350} // namespace duckdb
351