1#include "duckdb/parallel/pipeline_executor.hpp"
2#include "duckdb/main/client_context.hpp"
3#include "duckdb/common/limits.hpp"
4
5#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
6#include <thread>
7#include <chrono>
8#endif
9
10namespace duckdb {
11
12PipelineExecutor::PipelineExecutor(ClientContext &context_p, Pipeline &pipeline_p)
13 : pipeline(pipeline_p), thread(context_p), context(context_p, thread, &pipeline_p) {
14 D_ASSERT(pipeline.source_state);
15 if (pipeline.sink) {
16 local_sink_state = pipeline.sink->GetLocalSinkState(context);
17 requires_batch_index = pipeline.sink->RequiresBatchIndex() && pipeline.source->SupportsBatchIndex();
18 if (requires_batch_index) {
19 auto &partition_info = local_sink_state->partition_info;
20 if (!partition_info.batch_index.IsValid()) {
21 // batch index is not set yet - initialize before fetching anything
22 partition_info.batch_index = pipeline.RegisterNewBatchIndex();
23 partition_info.min_batch_index = partition_info.batch_index;
24 }
25 }
26 }
27 local_source_state = pipeline.source->GetLocalSourceState(context, gstate&: *pipeline.source_state);
28
29 intermediate_chunks.reserve(n: pipeline.operators.size());
30 intermediate_states.reserve(n: pipeline.operators.size());
31 for (idx_t i = 0; i < pipeline.operators.size(); i++) {
32 auto &prev_operator = i == 0 ? *pipeline.source : pipeline.operators[i - 1].get();
33 auto &current_operator = pipeline.operators[i].get();
34
35 auto chunk = make_uniq<DataChunk>();
36 chunk->Initialize(allocator&: Allocator::Get(context&: context.client), types: prev_operator.GetTypes());
37 intermediate_chunks.push_back(x: std::move(chunk));
38
39 auto op_state = current_operator.GetOperatorState(context);
40 intermediate_states.push_back(x: std::move(op_state));
41
42 if (current_operator.IsSink() && current_operator.sink_state->state == SinkFinalizeType::NO_OUTPUT_POSSIBLE) {
43 // one of the operators has already figured out no output is possible
44 // we can skip executing the pipeline
45 FinishProcessing();
46 }
47 }
48 InitializeChunk(chunk&: final_chunk);
49}
50
51bool PipelineExecutor::TryFlushCachingOperators() {
52 if (!started_flushing) {
53 // Remainder of this method assumes any in process operators are from flushing
54 D_ASSERT(in_process_operators.empty());
55 started_flushing = true;
56 flushing_idx = IsFinished() ? idx_t(finished_processing_idx) : 0;
57 }
58
59 // Go over each operator and keep flushing them using `FinalExecute` until empty
60 while (flushing_idx < pipeline.operators.size()) {
61 if (!pipeline.operators[flushing_idx].get().RequiresFinalExecute()) {
62 flushing_idx++;
63 continue;
64 }
65
66 // This slightly awkward way of increasing the flushing idx is to make the code re-entrant: We need to call this
67 // method again in the case of a Sink returning BLOCKED.
68 if (!should_flush_current_idx && in_process_operators.empty()) {
69 should_flush_current_idx = true;
70 flushing_idx++;
71 continue;
72 }
73
74 auto &curr_chunk =
75 flushing_idx + 1 >= intermediate_chunks.size() ? final_chunk : *intermediate_chunks[flushing_idx + 1];
76 auto &current_operator = pipeline.operators[flushing_idx].get();
77
78 OperatorFinalizeResultType finalize_result;
79 OperatorResultType push_result;
80
81 if (in_process_operators.empty()) {
82 StartOperator(op&: current_operator);
83 finalize_result = current_operator.FinalExecute(context, chunk&: curr_chunk, gstate&: *current_operator.op_state,
84 state&: *intermediate_states[flushing_idx]);
85 EndOperator(op&: current_operator, chunk: &curr_chunk);
86 } else {
87 // Reset flag and reflush the last chunk we were flushing.
88 finalize_result = OperatorFinalizeResultType::HAVE_MORE_OUTPUT;
89 }
90
91 push_result = ExecutePushInternal(input&: curr_chunk, initial_idx: flushing_idx + 1);
92
93 if (finalize_result == OperatorFinalizeResultType::HAVE_MORE_OUTPUT) {
94 should_flush_current_idx = true;
95 } else {
96 should_flush_current_idx = false;
97 }
98
99 if (push_result == OperatorResultType::BLOCKED) {
100 remaining_sink_chunk = true;
101 return false;
102 } else if (push_result == OperatorResultType::FINISHED) {
103 break;
104 }
105 }
106 return true;
107}
108
109PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
110 D_ASSERT(pipeline.sink);
111 auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0];
112 for (idx_t i = 0; i < max_chunks; i++) {
113 if (context.client.interrupted) {
114 throw InterruptException();
115 }
116
117 OperatorResultType result;
118 if (exhausted_source && done_flushing && !remaining_sink_chunk && in_process_operators.empty()) {
119 break;
120 } else if (remaining_sink_chunk) {
121 // The pipeline was interrupted by the Sink. We should retry sinking the final chunk.
122 result = ExecutePushInternal(input&: final_chunk);
123 remaining_sink_chunk = false;
124 } else if (!in_process_operators.empty() && !started_flushing) {
125 // The pipeline was interrupted by the Sink when pushing a source chunk through the pipeline. We need to
126 // re-push the same source chunk through the pipeline because there are in_process operators, meaning that
127 // the result for the pipeline
128 D_ASSERT(source_chunk.size() > 0);
129 result = ExecutePushInternal(input&: source_chunk);
130 } else if (exhausted_source && !done_flushing) {
131 // The source was exhausted, try flushing all operators
132 auto flush_completed = TryFlushCachingOperators();
133 if (flush_completed) {
134 done_flushing = true;
135 break;
136 } else {
137 return PipelineExecuteResult::INTERRUPTED;
138 }
139 } else if (!exhausted_source) {
140 // "Regular" path: fetch a chunk from the source and push it through the pipeline
141 source_chunk.Reset();
142 SourceResultType source_result = FetchFromSource(result&: source_chunk);
143
144 if (source_result == SourceResultType::BLOCKED) {
145 return PipelineExecuteResult::INTERRUPTED;
146 }
147
148 if (source_result == SourceResultType::FINISHED) {
149 exhausted_source = true;
150 if (source_chunk.size() == 0) {
151 continue;
152 }
153 }
154 result = ExecutePushInternal(input&: source_chunk);
155 } else {
156 throw InternalException("Unexpected state reached in pipeline executor");
157 }
158
159 // SINK INTERRUPT
160 if (result == OperatorResultType::BLOCKED) {
161 remaining_sink_chunk = true;
162 return PipelineExecuteResult::INTERRUPTED;
163 }
164
165 if (result == OperatorResultType::FINISHED) {
166 break;
167 }
168 }
169
170 if ((!exhausted_source || !done_flushing) && !IsFinished()) {
171 return PipelineExecuteResult::NOT_FINISHED;
172 }
173
174 PushFinalize();
175
176 return PipelineExecuteResult::FINISHED;
177}
178
179PipelineExecuteResult PipelineExecutor::Execute() {
180 return Execute(max_chunks: NumericLimits<idx_t>::Maximum());
181}
182
183OperatorResultType PipelineExecutor::ExecutePush(DataChunk &input) { // LCOV_EXCL_START
184 return ExecutePushInternal(input);
185} // LCOV_EXCL_STOP
186
187void PipelineExecutor::FinishProcessing(int32_t operator_idx) {
188 finished_processing_idx = operator_idx < 0 ? NumericLimits<int32_t>::Maximum() : operator_idx;
189 in_process_operators = stack<idx_t>();
190}
191
192bool PipelineExecutor::IsFinished() {
193 return finished_processing_idx >= 0;
194}
195
196OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t initial_idx) {
197 D_ASSERT(pipeline.sink);
198 if (input.size() == 0) { // LCOV_EXCL_START
199 return OperatorResultType::NEED_MORE_INPUT;
200 } // LCOV_EXCL_STOP
201
202 // this loop will continuously push the input chunk through the pipeline as long as:
203 // - the OperatorResultType for the Execute is HAVE_MORE_OUTPUT
204 // - the Sink doesn't block
205 while (true) {
206 OperatorResultType result;
207 // Note: if input is the final_chunk, we don't do any executing, the chunk just needs to be sinked
208 if (&input != &final_chunk) {
209 final_chunk.Reset();
210 result = Execute(input, result&: final_chunk, initial_index: initial_idx);
211 if (result == OperatorResultType::FINISHED) {
212 return OperatorResultType::FINISHED;
213 }
214 } else {
215 result = OperatorResultType::NEED_MORE_INPUT;
216 }
217 auto &sink_chunk = final_chunk;
218 if (sink_chunk.size() > 0) {
219 StartOperator(op&: *pipeline.sink);
220 D_ASSERT(pipeline.sink);
221 D_ASSERT(pipeline.sink->sink_state);
222 OperatorSinkInput sink_input {.global_state: *pipeline.sink->sink_state, .local_state: *local_sink_state, .interrupt_state: interrupt_state};
223
224 auto sink_result = Sink(chunk&: sink_chunk, input&: sink_input);
225
226 EndOperator(op&: *pipeline.sink, chunk: nullptr);
227
228 if (sink_result == SinkResultType::BLOCKED) {
229 return OperatorResultType::BLOCKED;
230 } else if (sink_result == SinkResultType::FINISHED) {
231 FinishProcessing();
232 return OperatorResultType::FINISHED;
233 }
234 }
235 if (result == OperatorResultType::NEED_MORE_INPUT) {
236 return OperatorResultType::NEED_MORE_INPUT;
237 }
238 }
239}
240
241void PipelineExecutor::PushFinalize() {
242 if (finalized) {
243 throw InternalException("Calling PushFinalize on a pipeline that has been finalized already");
244 }
245
246 D_ASSERT(local_sink_state);
247
248 finalized = true;
249
250 // run the combine for the sink
251 pipeline.sink->Combine(context, gstate&: *pipeline.sink->sink_state, lstate&: *local_sink_state);
252
253 // flush all query profiler info
254 for (idx_t i = 0; i < intermediate_states.size(); i++) {
255 intermediate_states[i]->Finalize(op: pipeline.operators[i].get(), context);
256 }
257 pipeline.executor.Flush(tcontext&: thread);
258 local_sink_state.reset();
259}
260
261// TODO: Refactoring the StreamingQueryResult to use Push-based execution should eliminate the need for this code
262void PipelineExecutor::ExecutePull(DataChunk &result) {
263 if (IsFinished()) {
264 return;
265 }
266 auto &executor = pipeline.executor;
267 try {
268 D_ASSERT(!pipeline.sink);
269 auto &source_chunk = pipeline.operators.empty() ? result : *intermediate_chunks[0];
270 while (result.size() == 0 && !exhausted_source) {
271 if (in_process_operators.empty()) {
272 source_chunk.Reset();
273
274 auto done_signal = make_shared<InterruptDoneSignalState>();
275 interrupt_state = InterruptState(done_signal);
276 SourceResultType source_result;
277
278 // Repeatedly try to fetch from the source until it doesn't block. Note that it may block multiple times
279 while (true) {
280 source_result = FetchFromSource(result&: source_chunk);
281
282 // No interrupt happened, all good.
283 if (source_result != SourceResultType::BLOCKED) {
284 break;
285 }
286
287 // Busy wait for async callback from source operator
288 done_signal->Await();
289 }
290
291 if (source_result == SourceResultType::FINISHED) {
292 exhausted_source = true;
293 if (source_chunk.size() == 0) {
294 break;
295 }
296 }
297 }
298 if (!pipeline.operators.empty()) {
299 auto state = Execute(input&: source_chunk, result);
300 if (state == OperatorResultType::FINISHED) {
301 break;
302 }
303 }
304 }
305 } catch (const Exception &ex) { // LCOV_EXCL_START
306 if (executor.HasError()) {
307 executor.ThrowException();
308 }
309 throw;
310 } catch (std::exception &ex) {
311 if (executor.HasError()) {
312 executor.ThrowException();
313 }
314 throw;
315 } catch (...) {
316 if (executor.HasError()) {
317 executor.ThrowException();
318 }
319 throw;
320 } // LCOV_EXCL_STOP
321}
322
323void PipelineExecutor::PullFinalize() {
324 if (finalized) {
325 throw InternalException("Calling PullFinalize on a pipeline that has been finalized already");
326 }
327 finalized = true;
328 pipeline.executor.Flush(tcontext&: thread);
329}
330
331void PipelineExecutor::GoToSource(idx_t &current_idx, idx_t initial_idx) {
332 // we go back to the first operator (the source)
333 current_idx = initial_idx;
334 if (!in_process_operators.empty()) {
335 // ... UNLESS there is an in process operator
336 // if there is an in-process operator, we start executing at the latest one
337 // for example, if we have a join operator that has tuples left, we first need to emit those tuples
338 current_idx = in_process_operators.top();
339 in_process_operators.pop();
340 }
341 D_ASSERT(current_idx >= initial_idx);
342}
343
344OperatorResultType PipelineExecutor::Execute(DataChunk &input, DataChunk &result, idx_t initial_idx) {
345 if (input.size() == 0) { // LCOV_EXCL_START
346 return OperatorResultType::NEED_MORE_INPUT;
347 } // LCOV_EXCL_STOP
348 D_ASSERT(!pipeline.operators.empty());
349
350 idx_t current_idx;
351 GoToSource(current_idx, initial_idx);
352 if (current_idx == initial_idx) {
353 current_idx++;
354 }
355 if (current_idx > pipeline.operators.size()) {
356 result.Reference(chunk&: input);
357 return OperatorResultType::NEED_MORE_INPUT;
358 }
359 while (true) {
360 if (context.client.interrupted) {
361 throw InterruptException();
362 }
363 // now figure out where to put the chunk
364 // if current_idx is the last possible index (>= operators.size()) we write to the result
365 // otherwise we write to an intermediate chunk
366 auto current_intermediate = current_idx;
367 auto &current_chunk =
368 current_intermediate >= intermediate_chunks.size() ? result : *intermediate_chunks[current_intermediate];
369 current_chunk.Reset();
370 if (current_idx == initial_idx) {
371 // we went back to the source: we need more input
372 return OperatorResultType::NEED_MORE_INPUT;
373 } else {
374 auto &prev_chunk =
375 current_intermediate == initial_idx + 1 ? input : *intermediate_chunks[current_intermediate - 1];
376 auto operator_idx = current_idx - 1;
377 auto &current_operator = pipeline.operators[operator_idx].get();
378
379 // if current_idx > source_idx, we pass the previous operators' output through the Execute of the current
380 // operator
381 StartOperator(op&: current_operator);
382 auto result = current_operator.Execute(context, input&: prev_chunk, chunk&: current_chunk, gstate&: *current_operator.op_state,
383 state&: *intermediate_states[current_intermediate - 1]);
384 EndOperator(op&: current_operator, chunk: &current_chunk);
385 if (result == OperatorResultType::HAVE_MORE_OUTPUT) {
386 // more data remains in this operator
387 // push in-process marker
388 in_process_operators.push(x: current_idx);
389 } else if (result == OperatorResultType::FINISHED) {
390 D_ASSERT(current_chunk.size() == 0);
391 FinishProcessing(operator_idx: current_idx);
392 return OperatorResultType::FINISHED;
393 }
394 current_chunk.Verify();
395 }
396
397 if (current_chunk.size() == 0) {
398 // no output from this operator!
399 if (current_idx == initial_idx) {
400 // if we got no output from the scan, we are done
401 break;
402 } else {
403 // if we got no output from an intermediate op
404 // we go back and try to pull data from the source again
405 GoToSource(current_idx, initial_idx);
406 continue;
407 }
408 } else {
409 // we got output! continue to the next operator
410 current_idx++;
411 if (current_idx > pipeline.operators.size()) {
412 // if we got output and are at the last operator, we are finished executing for this output chunk
413 // return the data and push it into the chunk
414 break;
415 }
416 }
417 }
418 return in_process_operators.empty() ? OperatorResultType::NEED_MORE_INPUT : OperatorResultType::HAVE_MORE_OUTPUT;
419}
420
421void PipelineExecutor::SetTaskForInterrupts(weak_ptr<Task> current_task) {
422 interrupt_state = InterruptState(std::move(current_task));
423}
424
425SourceResultType PipelineExecutor::GetData(DataChunk &chunk, OperatorSourceInput &input) {
426 //! Testing feature to enable async source on every operator
427#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
428 if (debug_blocked_source_count < debug_blocked_target_count) {
429 debug_blocked_source_count++;
430
431 auto &callback_state = input.interrupt_state;
432 std::thread rewake_thread([callback_state] {
433 std::this_thread::sleep_for(std::chrono::milliseconds(1));
434 callback_state.Callback();
435 });
436 rewake_thread.detach();
437
438 return SourceResultType::BLOCKED;
439 }
440#endif
441
442 return pipeline.source->GetData(context, chunk, input);
443}
444
445SinkResultType PipelineExecutor::Sink(DataChunk &chunk, OperatorSinkInput &input) {
446 //! Testing feature to enable async sink on every operator
447#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
448 if (debug_blocked_sink_count < debug_blocked_target_count) {
449 debug_blocked_sink_count++;
450
451 auto &callback_state = input.interrupt_state;
452 std::thread rewake_thread([callback_state] {
453 std::this_thread::sleep_for(std::chrono::milliseconds(1));
454 callback_state.Callback();
455 });
456 rewake_thread.detach();
457
458 return SinkResultType::BLOCKED;
459 }
460#endif
461 return pipeline.sink->Sink(context, chunk, input);
462}
463
464SourceResultType PipelineExecutor::FetchFromSource(DataChunk &result) {
465 StartOperator(op&: *pipeline.source);
466
467 OperatorSourceInput source_input = {.global_state: *pipeline.source_state, .local_state: *local_source_state, .interrupt_state: interrupt_state};
468 auto res = GetData(chunk&: result, input&: source_input);
469
470 // Ensures Sinks only return empty results when Blocking or Finished
471 D_ASSERT(res != SourceResultType::BLOCKED || result.size() == 0);
472
473 if (requires_batch_index && res != SourceResultType::BLOCKED) {
474 idx_t next_batch_index;
475 if (result.size() == 0) {
476 next_batch_index = NumericLimits<int64_t>::Maximum();
477 } else {
478 next_batch_index =
479 pipeline.source->GetBatchIndex(context, chunk&: result, gstate&: *pipeline.source_state, lstate&: *local_source_state);
480 next_batch_index += pipeline.base_batch_index;
481 }
482 auto &partition_info = local_sink_state->partition_info;
483 if (next_batch_index != partition_info.batch_index.GetIndex()) {
484 // batch index has changed - update it
485 if (partition_info.batch_index.GetIndex() > next_batch_index) {
486 throw InternalException(
487 "Pipeline batch index - gotten lower batch index %llu (down from previous batch index of %llu)",
488 next_batch_index, partition_info.batch_index.GetIndex());
489 }
490 auto current_batch = partition_info.batch_index.GetIndex();
491 partition_info.batch_index = next_batch_index;
492 // call NextBatch before updating min_batch_index to provide the opportunity to flush the previous batch
493 pipeline.sink->NextBatch(context, state&: *pipeline.sink->sink_state, lstate_p&: *local_sink_state);
494 partition_info.min_batch_index = pipeline.UpdateBatchIndex(old_index: current_batch, new_index: next_batch_index);
495 }
496 }
497
498 EndOperator(op&: *pipeline.source, chunk: &result);
499
500 return res;
501}
502
503void PipelineExecutor::InitializeChunk(DataChunk &chunk) {
504 auto &last_op = pipeline.operators.empty() ? *pipeline.source : pipeline.operators.back().get();
505 chunk.Initialize(allocator&: Allocator::DefaultAllocator(), types: last_op.GetTypes());
506}
507
508void PipelineExecutor::StartOperator(PhysicalOperator &op) {
509 if (context.client.interrupted) {
510 throw InterruptException();
511 }
512 context.thread.profiler.StartOperator(phys_op: &op);
513}
514
515void PipelineExecutor::EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk) {
516 context.thread.profiler.EndOperator(chunk);
517
518 if (chunk) {
519 chunk->Verify();
520 }
521}
522
523} // namespace duckdb
524