1 | #include "duckdb/parallel/pipeline_executor.hpp" |
2 | #include "duckdb/main/client_context.hpp" |
3 | #include "duckdb/common/limits.hpp" |
4 | |
5 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
6 | #include <thread> |
7 | #include <chrono> |
8 | #endif |
9 | |
10 | namespace duckdb { |
11 | |
12 | PipelineExecutor::PipelineExecutor(ClientContext &context_p, Pipeline &pipeline_p) |
13 | : pipeline(pipeline_p), thread(context_p), context(context_p, thread, &pipeline_p) { |
14 | D_ASSERT(pipeline.source_state); |
15 | if (pipeline.sink) { |
16 | local_sink_state = pipeline.sink->GetLocalSinkState(context); |
17 | requires_batch_index = pipeline.sink->RequiresBatchIndex() && pipeline.source->SupportsBatchIndex(); |
18 | if (requires_batch_index) { |
19 | auto &partition_info = local_sink_state->partition_info; |
20 | if (!partition_info.batch_index.IsValid()) { |
21 | // batch index is not set yet - initialize before fetching anything |
22 | partition_info.batch_index = pipeline.RegisterNewBatchIndex(); |
23 | partition_info.min_batch_index = partition_info.batch_index; |
24 | } |
25 | } |
26 | } |
27 | local_source_state = pipeline.source->GetLocalSourceState(context, gstate&: *pipeline.source_state); |
28 | |
29 | intermediate_chunks.reserve(n: pipeline.operators.size()); |
30 | intermediate_states.reserve(n: pipeline.operators.size()); |
31 | for (idx_t i = 0; i < pipeline.operators.size(); i++) { |
32 | auto &prev_operator = i == 0 ? *pipeline.source : pipeline.operators[i - 1].get(); |
33 | auto ¤t_operator = pipeline.operators[i].get(); |
34 | |
35 | auto chunk = make_uniq<DataChunk>(); |
36 | chunk->Initialize(allocator&: Allocator::Get(context&: context.client), types: prev_operator.GetTypes()); |
37 | intermediate_chunks.push_back(x: std::move(chunk)); |
38 | |
39 | auto op_state = current_operator.GetOperatorState(context); |
40 | intermediate_states.push_back(x: std::move(op_state)); |
41 | |
42 | if (current_operator.IsSink() && current_operator.sink_state->state == SinkFinalizeType::NO_OUTPUT_POSSIBLE) { |
43 | // one of the operators has already figured out no output is possible |
44 | // we can skip executing the pipeline |
45 | FinishProcessing(); |
46 | } |
47 | } |
48 | InitializeChunk(chunk&: final_chunk); |
49 | } |
50 | |
51 | bool PipelineExecutor::TryFlushCachingOperators() { |
52 | if (!started_flushing) { |
53 | // Remainder of this method assumes any in process operators are from flushing |
54 | D_ASSERT(in_process_operators.empty()); |
55 | started_flushing = true; |
56 | flushing_idx = IsFinished() ? idx_t(finished_processing_idx) : 0; |
57 | } |
58 | |
59 | // Go over each operator and keep flushing them using `FinalExecute` until empty |
60 | while (flushing_idx < pipeline.operators.size()) { |
61 | if (!pipeline.operators[flushing_idx].get().RequiresFinalExecute()) { |
62 | flushing_idx++; |
63 | continue; |
64 | } |
65 | |
66 | // This slightly awkward way of increasing the flushing idx is to make the code re-entrant: We need to call this |
67 | // method again in the case of a Sink returning BLOCKED. |
68 | if (!should_flush_current_idx && in_process_operators.empty()) { |
69 | should_flush_current_idx = true; |
70 | flushing_idx++; |
71 | continue; |
72 | } |
73 | |
74 | auto &curr_chunk = |
75 | flushing_idx + 1 >= intermediate_chunks.size() ? final_chunk : *intermediate_chunks[flushing_idx + 1]; |
76 | auto ¤t_operator = pipeline.operators[flushing_idx].get(); |
77 | |
78 | OperatorFinalizeResultType finalize_result; |
79 | OperatorResultType push_result; |
80 | |
81 | if (in_process_operators.empty()) { |
82 | StartOperator(op&: current_operator); |
83 | finalize_result = current_operator.FinalExecute(context, chunk&: curr_chunk, gstate&: *current_operator.op_state, |
84 | state&: *intermediate_states[flushing_idx]); |
85 | EndOperator(op&: current_operator, chunk: &curr_chunk); |
86 | } else { |
87 | // Reset flag and reflush the last chunk we were flushing. |
88 | finalize_result = OperatorFinalizeResultType::HAVE_MORE_OUTPUT; |
89 | } |
90 | |
91 | push_result = ExecutePushInternal(input&: curr_chunk, initial_idx: flushing_idx + 1); |
92 | |
93 | if (finalize_result == OperatorFinalizeResultType::HAVE_MORE_OUTPUT) { |
94 | should_flush_current_idx = true; |
95 | } else { |
96 | should_flush_current_idx = false; |
97 | } |
98 | |
99 | if (push_result == OperatorResultType::BLOCKED) { |
100 | remaining_sink_chunk = true; |
101 | return false; |
102 | } else if (push_result == OperatorResultType::FINISHED) { |
103 | break; |
104 | } |
105 | } |
106 | return true; |
107 | } |
108 | |
109 | PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) { |
110 | D_ASSERT(pipeline.sink); |
111 | auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0]; |
112 | for (idx_t i = 0; i < max_chunks; i++) { |
113 | if (context.client.interrupted) { |
114 | throw InterruptException(); |
115 | } |
116 | |
117 | OperatorResultType result; |
118 | if (exhausted_source && done_flushing && !remaining_sink_chunk && in_process_operators.empty()) { |
119 | break; |
120 | } else if (remaining_sink_chunk) { |
121 | // The pipeline was interrupted by the Sink. We should retry sinking the final chunk. |
122 | result = ExecutePushInternal(input&: final_chunk); |
123 | remaining_sink_chunk = false; |
124 | } else if (!in_process_operators.empty() && !started_flushing) { |
125 | // The pipeline was interrupted by the Sink when pushing a source chunk through the pipeline. We need to |
126 | // re-push the same source chunk through the pipeline because there are in_process operators, meaning that |
127 | // the result for the pipeline |
128 | D_ASSERT(source_chunk.size() > 0); |
129 | result = ExecutePushInternal(input&: source_chunk); |
130 | } else if (exhausted_source && !done_flushing) { |
131 | // The source was exhausted, try flushing all operators |
132 | auto flush_completed = TryFlushCachingOperators(); |
133 | if (flush_completed) { |
134 | done_flushing = true; |
135 | break; |
136 | } else { |
137 | return PipelineExecuteResult::INTERRUPTED; |
138 | } |
139 | } else if (!exhausted_source) { |
140 | // "Regular" path: fetch a chunk from the source and push it through the pipeline |
141 | source_chunk.Reset(); |
142 | SourceResultType source_result = FetchFromSource(result&: source_chunk); |
143 | |
144 | if (source_result == SourceResultType::BLOCKED) { |
145 | return PipelineExecuteResult::INTERRUPTED; |
146 | } |
147 | |
148 | if (source_result == SourceResultType::FINISHED) { |
149 | exhausted_source = true; |
150 | if (source_chunk.size() == 0) { |
151 | continue; |
152 | } |
153 | } |
154 | result = ExecutePushInternal(input&: source_chunk); |
155 | } else { |
156 | throw InternalException("Unexpected state reached in pipeline executor" ); |
157 | } |
158 | |
159 | // SINK INTERRUPT |
160 | if (result == OperatorResultType::BLOCKED) { |
161 | remaining_sink_chunk = true; |
162 | return PipelineExecuteResult::INTERRUPTED; |
163 | } |
164 | |
165 | if (result == OperatorResultType::FINISHED) { |
166 | break; |
167 | } |
168 | } |
169 | |
170 | if ((!exhausted_source || !done_flushing) && !IsFinished()) { |
171 | return PipelineExecuteResult::NOT_FINISHED; |
172 | } |
173 | |
174 | PushFinalize(); |
175 | |
176 | return PipelineExecuteResult::FINISHED; |
177 | } |
178 | |
179 | PipelineExecuteResult PipelineExecutor::Execute() { |
180 | return Execute(max_chunks: NumericLimits<idx_t>::Maximum()); |
181 | } |
182 | |
183 | OperatorResultType PipelineExecutor::ExecutePush(DataChunk &input) { // LCOV_EXCL_START |
184 | return ExecutePushInternal(input); |
185 | } // LCOV_EXCL_STOP |
186 | |
187 | void PipelineExecutor::FinishProcessing(int32_t operator_idx) { |
188 | finished_processing_idx = operator_idx < 0 ? NumericLimits<int32_t>::Maximum() : operator_idx; |
189 | in_process_operators = stack<idx_t>(); |
190 | } |
191 | |
192 | bool PipelineExecutor::IsFinished() { |
193 | return finished_processing_idx >= 0; |
194 | } |
195 | |
196 | OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t initial_idx) { |
197 | D_ASSERT(pipeline.sink); |
198 | if (input.size() == 0) { // LCOV_EXCL_START |
199 | return OperatorResultType::NEED_MORE_INPUT; |
200 | } // LCOV_EXCL_STOP |
201 | |
202 | // this loop will continuously push the input chunk through the pipeline as long as: |
203 | // - the OperatorResultType for the Execute is HAVE_MORE_OUTPUT |
204 | // - the Sink doesn't block |
205 | while (true) { |
206 | OperatorResultType result; |
207 | // Note: if input is the final_chunk, we don't do any executing, the chunk just needs to be sinked |
208 | if (&input != &final_chunk) { |
209 | final_chunk.Reset(); |
210 | result = Execute(input, result&: final_chunk, initial_index: initial_idx); |
211 | if (result == OperatorResultType::FINISHED) { |
212 | return OperatorResultType::FINISHED; |
213 | } |
214 | } else { |
215 | result = OperatorResultType::NEED_MORE_INPUT; |
216 | } |
217 | auto &sink_chunk = final_chunk; |
218 | if (sink_chunk.size() > 0) { |
219 | StartOperator(op&: *pipeline.sink); |
220 | D_ASSERT(pipeline.sink); |
221 | D_ASSERT(pipeline.sink->sink_state); |
222 | OperatorSinkInput sink_input {.global_state: *pipeline.sink->sink_state, .local_state: *local_sink_state, .interrupt_state: interrupt_state}; |
223 | |
224 | auto sink_result = Sink(chunk&: sink_chunk, input&: sink_input); |
225 | |
226 | EndOperator(op&: *pipeline.sink, chunk: nullptr); |
227 | |
228 | if (sink_result == SinkResultType::BLOCKED) { |
229 | return OperatorResultType::BLOCKED; |
230 | } else if (sink_result == SinkResultType::FINISHED) { |
231 | FinishProcessing(); |
232 | return OperatorResultType::FINISHED; |
233 | } |
234 | } |
235 | if (result == OperatorResultType::NEED_MORE_INPUT) { |
236 | return OperatorResultType::NEED_MORE_INPUT; |
237 | } |
238 | } |
239 | } |
240 | |
241 | void PipelineExecutor::PushFinalize() { |
242 | if (finalized) { |
243 | throw InternalException("Calling PushFinalize on a pipeline that has been finalized already" ); |
244 | } |
245 | |
246 | D_ASSERT(local_sink_state); |
247 | |
248 | finalized = true; |
249 | |
250 | // run the combine for the sink |
251 | pipeline.sink->Combine(context, gstate&: *pipeline.sink->sink_state, lstate&: *local_sink_state); |
252 | |
253 | // flush all query profiler info |
254 | for (idx_t i = 0; i < intermediate_states.size(); i++) { |
255 | intermediate_states[i]->Finalize(op: pipeline.operators[i].get(), context); |
256 | } |
257 | pipeline.executor.Flush(tcontext&: thread); |
258 | local_sink_state.reset(); |
259 | } |
260 | |
261 | // TODO: Refactoring the StreamingQueryResult to use Push-based execution should eliminate the need for this code |
262 | void PipelineExecutor::ExecutePull(DataChunk &result) { |
263 | if (IsFinished()) { |
264 | return; |
265 | } |
266 | auto &executor = pipeline.executor; |
267 | try { |
268 | D_ASSERT(!pipeline.sink); |
269 | auto &source_chunk = pipeline.operators.empty() ? result : *intermediate_chunks[0]; |
270 | while (result.size() == 0 && !exhausted_source) { |
271 | if (in_process_operators.empty()) { |
272 | source_chunk.Reset(); |
273 | |
274 | auto done_signal = make_shared<InterruptDoneSignalState>(); |
275 | interrupt_state = InterruptState(done_signal); |
276 | SourceResultType source_result; |
277 | |
278 | // Repeatedly try to fetch from the source until it doesn't block. Note that it may block multiple times |
279 | while (true) { |
280 | source_result = FetchFromSource(result&: source_chunk); |
281 | |
282 | // No interrupt happened, all good. |
283 | if (source_result != SourceResultType::BLOCKED) { |
284 | break; |
285 | } |
286 | |
287 | // Busy wait for async callback from source operator |
288 | done_signal->Await(); |
289 | } |
290 | |
291 | if (source_result == SourceResultType::FINISHED) { |
292 | exhausted_source = true; |
293 | if (source_chunk.size() == 0) { |
294 | break; |
295 | } |
296 | } |
297 | } |
298 | if (!pipeline.operators.empty()) { |
299 | auto state = Execute(input&: source_chunk, result); |
300 | if (state == OperatorResultType::FINISHED) { |
301 | break; |
302 | } |
303 | } |
304 | } |
305 | } catch (const Exception &ex) { // LCOV_EXCL_START |
306 | if (executor.HasError()) { |
307 | executor.ThrowException(); |
308 | } |
309 | throw; |
310 | } catch (std::exception &ex) { |
311 | if (executor.HasError()) { |
312 | executor.ThrowException(); |
313 | } |
314 | throw; |
315 | } catch (...) { |
316 | if (executor.HasError()) { |
317 | executor.ThrowException(); |
318 | } |
319 | throw; |
320 | } // LCOV_EXCL_STOP |
321 | } |
322 | |
323 | void PipelineExecutor::PullFinalize() { |
324 | if (finalized) { |
325 | throw InternalException("Calling PullFinalize on a pipeline that has been finalized already" ); |
326 | } |
327 | finalized = true; |
328 | pipeline.executor.Flush(tcontext&: thread); |
329 | } |
330 | |
331 | void PipelineExecutor::GoToSource(idx_t ¤t_idx, idx_t initial_idx) { |
332 | // we go back to the first operator (the source) |
333 | current_idx = initial_idx; |
334 | if (!in_process_operators.empty()) { |
335 | // ... UNLESS there is an in process operator |
336 | // if there is an in-process operator, we start executing at the latest one |
337 | // for example, if we have a join operator that has tuples left, we first need to emit those tuples |
338 | current_idx = in_process_operators.top(); |
339 | in_process_operators.pop(); |
340 | } |
341 | D_ASSERT(current_idx >= initial_idx); |
342 | } |
343 | |
344 | OperatorResultType PipelineExecutor::Execute(DataChunk &input, DataChunk &result, idx_t initial_idx) { |
345 | if (input.size() == 0) { // LCOV_EXCL_START |
346 | return OperatorResultType::NEED_MORE_INPUT; |
347 | } // LCOV_EXCL_STOP |
348 | D_ASSERT(!pipeline.operators.empty()); |
349 | |
350 | idx_t current_idx; |
351 | GoToSource(current_idx, initial_idx); |
352 | if (current_idx == initial_idx) { |
353 | current_idx++; |
354 | } |
355 | if (current_idx > pipeline.operators.size()) { |
356 | result.Reference(chunk&: input); |
357 | return OperatorResultType::NEED_MORE_INPUT; |
358 | } |
359 | while (true) { |
360 | if (context.client.interrupted) { |
361 | throw InterruptException(); |
362 | } |
363 | // now figure out where to put the chunk |
364 | // if current_idx is the last possible index (>= operators.size()) we write to the result |
365 | // otherwise we write to an intermediate chunk |
366 | auto current_intermediate = current_idx; |
367 | auto ¤t_chunk = |
368 | current_intermediate >= intermediate_chunks.size() ? result : *intermediate_chunks[current_intermediate]; |
369 | current_chunk.Reset(); |
370 | if (current_idx == initial_idx) { |
371 | // we went back to the source: we need more input |
372 | return OperatorResultType::NEED_MORE_INPUT; |
373 | } else { |
374 | auto &prev_chunk = |
375 | current_intermediate == initial_idx + 1 ? input : *intermediate_chunks[current_intermediate - 1]; |
376 | auto operator_idx = current_idx - 1; |
377 | auto ¤t_operator = pipeline.operators[operator_idx].get(); |
378 | |
379 | // if current_idx > source_idx, we pass the previous operators' output through the Execute of the current |
380 | // operator |
381 | StartOperator(op&: current_operator); |
382 | auto result = current_operator.Execute(context, input&: prev_chunk, chunk&: current_chunk, gstate&: *current_operator.op_state, |
383 | state&: *intermediate_states[current_intermediate - 1]); |
384 | EndOperator(op&: current_operator, chunk: ¤t_chunk); |
385 | if (result == OperatorResultType::HAVE_MORE_OUTPUT) { |
386 | // more data remains in this operator |
387 | // push in-process marker |
388 | in_process_operators.push(x: current_idx); |
389 | } else if (result == OperatorResultType::FINISHED) { |
390 | D_ASSERT(current_chunk.size() == 0); |
391 | FinishProcessing(operator_idx: current_idx); |
392 | return OperatorResultType::FINISHED; |
393 | } |
394 | current_chunk.Verify(); |
395 | } |
396 | |
397 | if (current_chunk.size() == 0) { |
398 | // no output from this operator! |
399 | if (current_idx == initial_idx) { |
400 | // if we got no output from the scan, we are done |
401 | break; |
402 | } else { |
403 | // if we got no output from an intermediate op |
404 | // we go back and try to pull data from the source again |
405 | GoToSource(current_idx, initial_idx); |
406 | continue; |
407 | } |
408 | } else { |
409 | // we got output! continue to the next operator |
410 | current_idx++; |
411 | if (current_idx > pipeline.operators.size()) { |
412 | // if we got output and are at the last operator, we are finished executing for this output chunk |
413 | // return the data and push it into the chunk |
414 | break; |
415 | } |
416 | } |
417 | } |
418 | return in_process_operators.empty() ? OperatorResultType::NEED_MORE_INPUT : OperatorResultType::HAVE_MORE_OUTPUT; |
419 | } |
420 | |
421 | void PipelineExecutor::SetTaskForInterrupts(weak_ptr<Task> current_task) { |
422 | interrupt_state = InterruptState(std::move(current_task)); |
423 | } |
424 | |
425 | SourceResultType PipelineExecutor::GetData(DataChunk &chunk, OperatorSourceInput &input) { |
426 | //! Testing feature to enable async source on every operator |
427 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
428 | if (debug_blocked_source_count < debug_blocked_target_count) { |
429 | debug_blocked_source_count++; |
430 | |
431 | auto &callback_state = input.interrupt_state; |
432 | std::thread rewake_thread([callback_state] { |
433 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
434 | callback_state.Callback(); |
435 | }); |
436 | rewake_thread.detach(); |
437 | |
438 | return SourceResultType::BLOCKED; |
439 | } |
440 | #endif |
441 | |
442 | return pipeline.source->GetData(context, chunk, input); |
443 | } |
444 | |
445 | SinkResultType PipelineExecutor::Sink(DataChunk &chunk, OperatorSinkInput &input) { |
446 | //! Testing feature to enable async sink on every operator |
447 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
448 | if (debug_blocked_sink_count < debug_blocked_target_count) { |
449 | debug_blocked_sink_count++; |
450 | |
451 | auto &callback_state = input.interrupt_state; |
452 | std::thread rewake_thread([callback_state] { |
453 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
454 | callback_state.Callback(); |
455 | }); |
456 | rewake_thread.detach(); |
457 | |
458 | return SinkResultType::BLOCKED; |
459 | } |
460 | #endif |
461 | return pipeline.sink->Sink(context, chunk, input); |
462 | } |
463 | |
464 | SourceResultType PipelineExecutor::FetchFromSource(DataChunk &result) { |
465 | StartOperator(op&: *pipeline.source); |
466 | |
467 | OperatorSourceInput source_input = {.global_state: *pipeline.source_state, .local_state: *local_source_state, .interrupt_state: interrupt_state}; |
468 | auto res = GetData(chunk&: result, input&: source_input); |
469 | |
470 | // Ensures Sinks only return empty results when Blocking or Finished |
471 | D_ASSERT(res != SourceResultType::BLOCKED || result.size() == 0); |
472 | |
473 | if (requires_batch_index && res != SourceResultType::BLOCKED) { |
474 | idx_t next_batch_index; |
475 | if (result.size() == 0) { |
476 | next_batch_index = NumericLimits<int64_t>::Maximum(); |
477 | } else { |
478 | next_batch_index = |
479 | pipeline.source->GetBatchIndex(context, chunk&: result, gstate&: *pipeline.source_state, lstate&: *local_source_state); |
480 | next_batch_index += pipeline.base_batch_index; |
481 | } |
482 | auto &partition_info = local_sink_state->partition_info; |
483 | if (next_batch_index != partition_info.batch_index.GetIndex()) { |
484 | // batch index has changed - update it |
485 | if (partition_info.batch_index.GetIndex() > next_batch_index) { |
486 | throw InternalException( |
487 | "Pipeline batch index - gotten lower batch index %llu (down from previous batch index of %llu)" , |
488 | next_batch_index, partition_info.batch_index.GetIndex()); |
489 | } |
490 | auto current_batch = partition_info.batch_index.GetIndex(); |
491 | partition_info.batch_index = next_batch_index; |
492 | // call NextBatch before updating min_batch_index to provide the opportunity to flush the previous batch |
493 | pipeline.sink->NextBatch(context, state&: *pipeline.sink->sink_state, lstate_p&: *local_sink_state); |
494 | partition_info.min_batch_index = pipeline.UpdateBatchIndex(old_index: current_batch, new_index: next_batch_index); |
495 | } |
496 | } |
497 | |
498 | EndOperator(op&: *pipeline.source, chunk: &result); |
499 | |
500 | return res; |
501 | } |
502 | |
503 | void PipelineExecutor::InitializeChunk(DataChunk &chunk) { |
504 | auto &last_op = pipeline.operators.empty() ? *pipeline.source : pipeline.operators.back().get(); |
505 | chunk.Initialize(allocator&: Allocator::DefaultAllocator(), types: last_op.GetTypes()); |
506 | } |
507 | |
508 | void PipelineExecutor::StartOperator(PhysicalOperator &op) { |
509 | if (context.client.interrupted) { |
510 | throw InterruptException(); |
511 | } |
512 | context.thread.profiler.StartOperator(phys_op: &op); |
513 | } |
514 | |
515 | void PipelineExecutor::EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk) { |
516 | context.thread.profiler.EndOperator(chunk); |
517 | |
518 | if (chunk) { |
519 | chunk->Verify(); |
520 | } |
521 | } |
522 | |
523 | } // namespace duckdb |
524 | |