| 1 | #include "duckdb/parallel/pipeline_executor.hpp" |
| 2 | #include "duckdb/main/client_context.hpp" |
| 3 | #include "duckdb/common/limits.hpp" |
| 4 | |
| 5 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
| 6 | #include <thread> |
| 7 | #include <chrono> |
| 8 | #endif |
| 9 | |
| 10 | namespace duckdb { |
| 11 | |
| 12 | PipelineExecutor::PipelineExecutor(ClientContext &context_p, Pipeline &pipeline_p) |
| 13 | : pipeline(pipeline_p), thread(context_p), context(context_p, thread, &pipeline_p) { |
| 14 | D_ASSERT(pipeline.source_state); |
| 15 | if (pipeline.sink) { |
| 16 | local_sink_state = pipeline.sink->GetLocalSinkState(context); |
| 17 | requires_batch_index = pipeline.sink->RequiresBatchIndex() && pipeline.source->SupportsBatchIndex(); |
| 18 | if (requires_batch_index) { |
| 19 | auto &partition_info = local_sink_state->partition_info; |
| 20 | if (!partition_info.batch_index.IsValid()) { |
| 21 | // batch index is not set yet - initialize before fetching anything |
| 22 | partition_info.batch_index = pipeline.RegisterNewBatchIndex(); |
| 23 | partition_info.min_batch_index = partition_info.batch_index; |
| 24 | } |
| 25 | } |
| 26 | } |
| 27 | local_source_state = pipeline.source->GetLocalSourceState(context, gstate&: *pipeline.source_state); |
| 28 | |
| 29 | intermediate_chunks.reserve(n: pipeline.operators.size()); |
| 30 | intermediate_states.reserve(n: pipeline.operators.size()); |
| 31 | for (idx_t i = 0; i < pipeline.operators.size(); i++) { |
| 32 | auto &prev_operator = i == 0 ? *pipeline.source : pipeline.operators[i - 1].get(); |
| 33 | auto ¤t_operator = pipeline.operators[i].get(); |
| 34 | |
| 35 | auto chunk = make_uniq<DataChunk>(); |
| 36 | chunk->Initialize(allocator&: Allocator::Get(context&: context.client), types: prev_operator.GetTypes()); |
| 37 | intermediate_chunks.push_back(x: std::move(chunk)); |
| 38 | |
| 39 | auto op_state = current_operator.GetOperatorState(context); |
| 40 | intermediate_states.push_back(x: std::move(op_state)); |
| 41 | |
| 42 | if (current_operator.IsSink() && current_operator.sink_state->state == SinkFinalizeType::NO_OUTPUT_POSSIBLE) { |
| 43 | // one of the operators has already figured out no output is possible |
| 44 | // we can skip executing the pipeline |
| 45 | FinishProcessing(); |
| 46 | } |
| 47 | } |
| 48 | InitializeChunk(chunk&: final_chunk); |
| 49 | } |
| 50 | |
| 51 | bool PipelineExecutor::TryFlushCachingOperators() { |
| 52 | if (!started_flushing) { |
| 53 | // Remainder of this method assumes any in process operators are from flushing |
| 54 | D_ASSERT(in_process_operators.empty()); |
| 55 | started_flushing = true; |
| 56 | flushing_idx = IsFinished() ? idx_t(finished_processing_idx) : 0; |
| 57 | } |
| 58 | |
| 59 | // Go over each operator and keep flushing them using `FinalExecute` until empty |
| 60 | while (flushing_idx < pipeline.operators.size()) { |
| 61 | if (!pipeline.operators[flushing_idx].get().RequiresFinalExecute()) { |
| 62 | flushing_idx++; |
| 63 | continue; |
| 64 | } |
| 65 | |
| 66 | // This slightly awkward way of increasing the flushing idx is to make the code re-entrant: We need to call this |
| 67 | // method again in the case of a Sink returning BLOCKED. |
| 68 | if (!should_flush_current_idx && in_process_operators.empty()) { |
| 69 | should_flush_current_idx = true; |
| 70 | flushing_idx++; |
| 71 | continue; |
| 72 | } |
| 73 | |
| 74 | auto &curr_chunk = |
| 75 | flushing_idx + 1 >= intermediate_chunks.size() ? final_chunk : *intermediate_chunks[flushing_idx + 1]; |
| 76 | auto ¤t_operator = pipeline.operators[flushing_idx].get(); |
| 77 | |
| 78 | OperatorFinalizeResultType finalize_result; |
| 79 | OperatorResultType push_result; |
| 80 | |
| 81 | if (in_process_operators.empty()) { |
| 82 | StartOperator(op&: current_operator); |
| 83 | finalize_result = current_operator.FinalExecute(context, chunk&: curr_chunk, gstate&: *current_operator.op_state, |
| 84 | state&: *intermediate_states[flushing_idx]); |
| 85 | EndOperator(op&: current_operator, chunk: &curr_chunk); |
| 86 | } else { |
| 87 | // Reset flag and reflush the last chunk we were flushing. |
| 88 | finalize_result = OperatorFinalizeResultType::HAVE_MORE_OUTPUT; |
| 89 | } |
| 90 | |
| 91 | push_result = ExecutePushInternal(input&: curr_chunk, initial_idx: flushing_idx + 1); |
| 92 | |
| 93 | if (finalize_result == OperatorFinalizeResultType::HAVE_MORE_OUTPUT) { |
| 94 | should_flush_current_idx = true; |
| 95 | } else { |
| 96 | should_flush_current_idx = false; |
| 97 | } |
| 98 | |
| 99 | if (push_result == OperatorResultType::BLOCKED) { |
| 100 | remaining_sink_chunk = true; |
| 101 | return false; |
| 102 | } else if (push_result == OperatorResultType::FINISHED) { |
| 103 | break; |
| 104 | } |
| 105 | } |
| 106 | return true; |
| 107 | } |
| 108 | |
| 109 | PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) { |
| 110 | D_ASSERT(pipeline.sink); |
| 111 | auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0]; |
| 112 | for (idx_t i = 0; i < max_chunks; i++) { |
| 113 | if (context.client.interrupted) { |
| 114 | throw InterruptException(); |
| 115 | } |
| 116 | |
| 117 | OperatorResultType result; |
| 118 | if (exhausted_source && done_flushing && !remaining_sink_chunk && in_process_operators.empty()) { |
| 119 | break; |
| 120 | } else if (remaining_sink_chunk) { |
| 121 | // The pipeline was interrupted by the Sink. We should retry sinking the final chunk. |
| 122 | result = ExecutePushInternal(input&: final_chunk); |
| 123 | remaining_sink_chunk = false; |
| 124 | } else if (!in_process_operators.empty() && !started_flushing) { |
| 125 | // The pipeline was interrupted by the Sink when pushing a source chunk through the pipeline. We need to |
| 126 | // re-push the same source chunk through the pipeline because there are in_process operators, meaning that |
| 127 | // the result for the pipeline |
| 128 | D_ASSERT(source_chunk.size() > 0); |
| 129 | result = ExecutePushInternal(input&: source_chunk); |
| 130 | } else if (exhausted_source && !done_flushing) { |
| 131 | // The source was exhausted, try flushing all operators |
| 132 | auto flush_completed = TryFlushCachingOperators(); |
| 133 | if (flush_completed) { |
| 134 | done_flushing = true; |
| 135 | break; |
| 136 | } else { |
| 137 | return PipelineExecuteResult::INTERRUPTED; |
| 138 | } |
| 139 | } else if (!exhausted_source) { |
| 140 | // "Regular" path: fetch a chunk from the source and push it through the pipeline |
| 141 | source_chunk.Reset(); |
| 142 | SourceResultType source_result = FetchFromSource(result&: source_chunk); |
| 143 | |
| 144 | if (source_result == SourceResultType::BLOCKED) { |
| 145 | return PipelineExecuteResult::INTERRUPTED; |
| 146 | } |
| 147 | |
| 148 | if (source_result == SourceResultType::FINISHED) { |
| 149 | exhausted_source = true; |
| 150 | if (source_chunk.size() == 0) { |
| 151 | continue; |
| 152 | } |
| 153 | } |
| 154 | result = ExecutePushInternal(input&: source_chunk); |
| 155 | } else { |
| 156 | throw InternalException("Unexpected state reached in pipeline executor" ); |
| 157 | } |
| 158 | |
| 159 | // SINK INTERRUPT |
| 160 | if (result == OperatorResultType::BLOCKED) { |
| 161 | remaining_sink_chunk = true; |
| 162 | return PipelineExecuteResult::INTERRUPTED; |
| 163 | } |
| 164 | |
| 165 | if (result == OperatorResultType::FINISHED) { |
| 166 | break; |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | if ((!exhausted_source || !done_flushing) && !IsFinished()) { |
| 171 | return PipelineExecuteResult::NOT_FINISHED; |
| 172 | } |
| 173 | |
| 174 | PushFinalize(); |
| 175 | |
| 176 | return PipelineExecuteResult::FINISHED; |
| 177 | } |
| 178 | |
| 179 | PipelineExecuteResult PipelineExecutor::Execute() { |
| 180 | return Execute(max_chunks: NumericLimits<idx_t>::Maximum()); |
| 181 | } |
| 182 | |
| 183 | OperatorResultType PipelineExecutor::ExecutePush(DataChunk &input) { // LCOV_EXCL_START |
| 184 | return ExecutePushInternal(input); |
| 185 | } // LCOV_EXCL_STOP |
| 186 | |
| 187 | void PipelineExecutor::FinishProcessing(int32_t operator_idx) { |
| 188 | finished_processing_idx = operator_idx < 0 ? NumericLimits<int32_t>::Maximum() : operator_idx; |
| 189 | in_process_operators = stack<idx_t>(); |
| 190 | } |
| 191 | |
| 192 | bool PipelineExecutor::IsFinished() { |
| 193 | return finished_processing_idx >= 0; |
| 194 | } |
| 195 | |
| 196 | OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t initial_idx) { |
| 197 | D_ASSERT(pipeline.sink); |
| 198 | if (input.size() == 0) { // LCOV_EXCL_START |
| 199 | return OperatorResultType::NEED_MORE_INPUT; |
| 200 | } // LCOV_EXCL_STOP |
| 201 | |
| 202 | // this loop will continuously push the input chunk through the pipeline as long as: |
| 203 | // - the OperatorResultType for the Execute is HAVE_MORE_OUTPUT |
| 204 | // - the Sink doesn't block |
| 205 | while (true) { |
| 206 | OperatorResultType result; |
| 207 | // Note: if input is the final_chunk, we don't do any executing, the chunk just needs to be sinked |
| 208 | if (&input != &final_chunk) { |
| 209 | final_chunk.Reset(); |
| 210 | result = Execute(input, result&: final_chunk, initial_index: initial_idx); |
| 211 | if (result == OperatorResultType::FINISHED) { |
| 212 | return OperatorResultType::FINISHED; |
| 213 | } |
| 214 | } else { |
| 215 | result = OperatorResultType::NEED_MORE_INPUT; |
| 216 | } |
| 217 | auto &sink_chunk = final_chunk; |
| 218 | if (sink_chunk.size() > 0) { |
| 219 | StartOperator(op&: *pipeline.sink); |
| 220 | D_ASSERT(pipeline.sink); |
| 221 | D_ASSERT(pipeline.sink->sink_state); |
| 222 | OperatorSinkInput sink_input {.global_state: *pipeline.sink->sink_state, .local_state: *local_sink_state, .interrupt_state: interrupt_state}; |
| 223 | |
| 224 | auto sink_result = Sink(chunk&: sink_chunk, input&: sink_input); |
| 225 | |
| 226 | EndOperator(op&: *pipeline.sink, chunk: nullptr); |
| 227 | |
| 228 | if (sink_result == SinkResultType::BLOCKED) { |
| 229 | return OperatorResultType::BLOCKED; |
| 230 | } else if (sink_result == SinkResultType::FINISHED) { |
| 231 | FinishProcessing(); |
| 232 | return OperatorResultType::FINISHED; |
| 233 | } |
| 234 | } |
| 235 | if (result == OperatorResultType::NEED_MORE_INPUT) { |
| 236 | return OperatorResultType::NEED_MORE_INPUT; |
| 237 | } |
| 238 | } |
| 239 | } |
| 240 | |
| 241 | void PipelineExecutor::PushFinalize() { |
| 242 | if (finalized) { |
| 243 | throw InternalException("Calling PushFinalize on a pipeline that has been finalized already" ); |
| 244 | } |
| 245 | |
| 246 | D_ASSERT(local_sink_state); |
| 247 | |
| 248 | finalized = true; |
| 249 | |
| 250 | // run the combine for the sink |
| 251 | pipeline.sink->Combine(context, gstate&: *pipeline.sink->sink_state, lstate&: *local_sink_state); |
| 252 | |
| 253 | // flush all query profiler info |
| 254 | for (idx_t i = 0; i < intermediate_states.size(); i++) { |
| 255 | intermediate_states[i]->Finalize(op: pipeline.operators[i].get(), context); |
| 256 | } |
| 257 | pipeline.executor.Flush(tcontext&: thread); |
| 258 | local_sink_state.reset(); |
| 259 | } |
| 260 | |
| 261 | // TODO: Refactoring the StreamingQueryResult to use Push-based execution should eliminate the need for this code |
| 262 | void PipelineExecutor::ExecutePull(DataChunk &result) { |
| 263 | if (IsFinished()) { |
| 264 | return; |
| 265 | } |
| 266 | auto &executor = pipeline.executor; |
| 267 | try { |
| 268 | D_ASSERT(!pipeline.sink); |
| 269 | auto &source_chunk = pipeline.operators.empty() ? result : *intermediate_chunks[0]; |
| 270 | while (result.size() == 0 && !exhausted_source) { |
| 271 | if (in_process_operators.empty()) { |
| 272 | source_chunk.Reset(); |
| 273 | |
| 274 | auto done_signal = make_shared<InterruptDoneSignalState>(); |
| 275 | interrupt_state = InterruptState(done_signal); |
| 276 | SourceResultType source_result; |
| 277 | |
| 278 | // Repeatedly try to fetch from the source until it doesn't block. Note that it may block multiple times |
| 279 | while (true) { |
| 280 | source_result = FetchFromSource(result&: source_chunk); |
| 281 | |
| 282 | // No interrupt happened, all good. |
| 283 | if (source_result != SourceResultType::BLOCKED) { |
| 284 | break; |
| 285 | } |
| 286 | |
| 287 | // Busy wait for async callback from source operator |
| 288 | done_signal->Await(); |
| 289 | } |
| 290 | |
| 291 | if (source_result == SourceResultType::FINISHED) { |
| 292 | exhausted_source = true; |
| 293 | if (source_chunk.size() == 0) { |
| 294 | break; |
| 295 | } |
| 296 | } |
| 297 | } |
| 298 | if (!pipeline.operators.empty()) { |
| 299 | auto state = Execute(input&: source_chunk, result); |
| 300 | if (state == OperatorResultType::FINISHED) { |
| 301 | break; |
| 302 | } |
| 303 | } |
| 304 | } |
| 305 | } catch (const Exception &ex) { // LCOV_EXCL_START |
| 306 | if (executor.HasError()) { |
| 307 | executor.ThrowException(); |
| 308 | } |
| 309 | throw; |
| 310 | } catch (std::exception &ex) { |
| 311 | if (executor.HasError()) { |
| 312 | executor.ThrowException(); |
| 313 | } |
| 314 | throw; |
| 315 | } catch (...) { |
| 316 | if (executor.HasError()) { |
| 317 | executor.ThrowException(); |
| 318 | } |
| 319 | throw; |
| 320 | } // LCOV_EXCL_STOP |
| 321 | } |
| 322 | |
| 323 | void PipelineExecutor::PullFinalize() { |
| 324 | if (finalized) { |
| 325 | throw InternalException("Calling PullFinalize on a pipeline that has been finalized already" ); |
| 326 | } |
| 327 | finalized = true; |
| 328 | pipeline.executor.Flush(tcontext&: thread); |
| 329 | } |
| 330 | |
| 331 | void PipelineExecutor::GoToSource(idx_t ¤t_idx, idx_t initial_idx) { |
| 332 | // we go back to the first operator (the source) |
| 333 | current_idx = initial_idx; |
| 334 | if (!in_process_operators.empty()) { |
| 335 | // ... UNLESS there is an in process operator |
| 336 | // if there is an in-process operator, we start executing at the latest one |
| 337 | // for example, if we have a join operator that has tuples left, we first need to emit those tuples |
| 338 | current_idx = in_process_operators.top(); |
| 339 | in_process_operators.pop(); |
| 340 | } |
| 341 | D_ASSERT(current_idx >= initial_idx); |
| 342 | } |
| 343 | |
| 344 | OperatorResultType PipelineExecutor::Execute(DataChunk &input, DataChunk &result, idx_t initial_idx) { |
| 345 | if (input.size() == 0) { // LCOV_EXCL_START |
| 346 | return OperatorResultType::NEED_MORE_INPUT; |
| 347 | } // LCOV_EXCL_STOP |
| 348 | D_ASSERT(!pipeline.operators.empty()); |
| 349 | |
| 350 | idx_t current_idx; |
| 351 | GoToSource(current_idx, initial_idx); |
| 352 | if (current_idx == initial_idx) { |
| 353 | current_idx++; |
| 354 | } |
| 355 | if (current_idx > pipeline.operators.size()) { |
| 356 | result.Reference(chunk&: input); |
| 357 | return OperatorResultType::NEED_MORE_INPUT; |
| 358 | } |
| 359 | while (true) { |
| 360 | if (context.client.interrupted) { |
| 361 | throw InterruptException(); |
| 362 | } |
| 363 | // now figure out where to put the chunk |
| 364 | // if current_idx is the last possible index (>= operators.size()) we write to the result |
| 365 | // otherwise we write to an intermediate chunk |
| 366 | auto current_intermediate = current_idx; |
| 367 | auto ¤t_chunk = |
| 368 | current_intermediate >= intermediate_chunks.size() ? result : *intermediate_chunks[current_intermediate]; |
| 369 | current_chunk.Reset(); |
| 370 | if (current_idx == initial_idx) { |
| 371 | // we went back to the source: we need more input |
| 372 | return OperatorResultType::NEED_MORE_INPUT; |
| 373 | } else { |
| 374 | auto &prev_chunk = |
| 375 | current_intermediate == initial_idx + 1 ? input : *intermediate_chunks[current_intermediate - 1]; |
| 376 | auto operator_idx = current_idx - 1; |
| 377 | auto ¤t_operator = pipeline.operators[operator_idx].get(); |
| 378 | |
| 379 | // if current_idx > source_idx, we pass the previous operators' output through the Execute of the current |
| 380 | // operator |
| 381 | StartOperator(op&: current_operator); |
| 382 | auto result = current_operator.Execute(context, input&: prev_chunk, chunk&: current_chunk, gstate&: *current_operator.op_state, |
| 383 | state&: *intermediate_states[current_intermediate - 1]); |
| 384 | EndOperator(op&: current_operator, chunk: ¤t_chunk); |
| 385 | if (result == OperatorResultType::HAVE_MORE_OUTPUT) { |
| 386 | // more data remains in this operator |
| 387 | // push in-process marker |
| 388 | in_process_operators.push(x: current_idx); |
| 389 | } else if (result == OperatorResultType::FINISHED) { |
| 390 | D_ASSERT(current_chunk.size() == 0); |
| 391 | FinishProcessing(operator_idx: current_idx); |
| 392 | return OperatorResultType::FINISHED; |
| 393 | } |
| 394 | current_chunk.Verify(); |
| 395 | } |
| 396 | |
| 397 | if (current_chunk.size() == 0) { |
| 398 | // no output from this operator! |
| 399 | if (current_idx == initial_idx) { |
| 400 | // if we got no output from the scan, we are done |
| 401 | break; |
| 402 | } else { |
| 403 | // if we got no output from an intermediate op |
| 404 | // we go back and try to pull data from the source again |
| 405 | GoToSource(current_idx, initial_idx); |
| 406 | continue; |
| 407 | } |
| 408 | } else { |
| 409 | // we got output! continue to the next operator |
| 410 | current_idx++; |
| 411 | if (current_idx > pipeline.operators.size()) { |
| 412 | // if we got output and are at the last operator, we are finished executing for this output chunk |
| 413 | // return the data and push it into the chunk |
| 414 | break; |
| 415 | } |
| 416 | } |
| 417 | } |
| 418 | return in_process_operators.empty() ? OperatorResultType::NEED_MORE_INPUT : OperatorResultType::HAVE_MORE_OUTPUT; |
| 419 | } |
| 420 | |
| 421 | void PipelineExecutor::SetTaskForInterrupts(weak_ptr<Task> current_task) { |
| 422 | interrupt_state = InterruptState(std::move(current_task)); |
| 423 | } |
| 424 | |
| 425 | SourceResultType PipelineExecutor::GetData(DataChunk &chunk, OperatorSourceInput &input) { |
| 426 | //! Testing feature to enable async source on every operator |
| 427 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
| 428 | if (debug_blocked_source_count < debug_blocked_target_count) { |
| 429 | debug_blocked_source_count++; |
| 430 | |
| 431 | auto &callback_state = input.interrupt_state; |
| 432 | std::thread rewake_thread([callback_state] { |
| 433 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| 434 | callback_state.Callback(); |
| 435 | }); |
| 436 | rewake_thread.detach(); |
| 437 | |
| 438 | return SourceResultType::BLOCKED; |
| 439 | } |
| 440 | #endif |
| 441 | |
| 442 | return pipeline.source->GetData(context, chunk, input); |
| 443 | } |
| 444 | |
| 445 | SinkResultType PipelineExecutor::Sink(DataChunk &chunk, OperatorSinkInput &input) { |
| 446 | //! Testing feature to enable async sink on every operator |
| 447 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
| 448 | if (debug_blocked_sink_count < debug_blocked_target_count) { |
| 449 | debug_blocked_sink_count++; |
| 450 | |
| 451 | auto &callback_state = input.interrupt_state; |
| 452 | std::thread rewake_thread([callback_state] { |
| 453 | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| 454 | callback_state.Callback(); |
| 455 | }); |
| 456 | rewake_thread.detach(); |
| 457 | |
| 458 | return SinkResultType::BLOCKED; |
| 459 | } |
| 460 | #endif |
| 461 | return pipeline.sink->Sink(context, chunk, input); |
| 462 | } |
| 463 | |
| 464 | SourceResultType PipelineExecutor::FetchFromSource(DataChunk &result) { |
| 465 | StartOperator(op&: *pipeline.source); |
| 466 | |
| 467 | OperatorSourceInput source_input = {.global_state: *pipeline.source_state, .local_state: *local_source_state, .interrupt_state: interrupt_state}; |
| 468 | auto res = GetData(chunk&: result, input&: source_input); |
| 469 | |
| 470 | // Ensures Sinks only return empty results when Blocking or Finished |
| 471 | D_ASSERT(res != SourceResultType::BLOCKED || result.size() == 0); |
| 472 | |
| 473 | if (requires_batch_index && res != SourceResultType::BLOCKED) { |
| 474 | idx_t next_batch_index; |
| 475 | if (result.size() == 0) { |
| 476 | next_batch_index = NumericLimits<int64_t>::Maximum(); |
| 477 | } else { |
| 478 | next_batch_index = |
| 479 | pipeline.source->GetBatchIndex(context, chunk&: result, gstate&: *pipeline.source_state, lstate&: *local_source_state); |
| 480 | next_batch_index += pipeline.base_batch_index; |
| 481 | } |
| 482 | auto &partition_info = local_sink_state->partition_info; |
| 483 | if (next_batch_index != partition_info.batch_index.GetIndex()) { |
| 484 | // batch index has changed - update it |
| 485 | if (partition_info.batch_index.GetIndex() > next_batch_index) { |
| 486 | throw InternalException( |
| 487 | "Pipeline batch index - gotten lower batch index %llu (down from previous batch index of %llu)" , |
| 488 | next_batch_index, partition_info.batch_index.GetIndex()); |
| 489 | } |
| 490 | auto current_batch = partition_info.batch_index.GetIndex(); |
| 491 | partition_info.batch_index = next_batch_index; |
| 492 | // call NextBatch before updating min_batch_index to provide the opportunity to flush the previous batch |
| 493 | pipeline.sink->NextBatch(context, state&: *pipeline.sink->sink_state, lstate_p&: *local_sink_state); |
| 494 | partition_info.min_batch_index = pipeline.UpdateBatchIndex(old_index: current_batch, new_index: next_batch_index); |
| 495 | } |
| 496 | } |
| 497 | |
| 498 | EndOperator(op&: *pipeline.source, chunk: &result); |
| 499 | |
| 500 | return res; |
| 501 | } |
| 502 | |
| 503 | void PipelineExecutor::InitializeChunk(DataChunk &chunk) { |
| 504 | auto &last_op = pipeline.operators.empty() ? *pipeline.source : pipeline.operators.back().get(); |
| 505 | chunk.Initialize(allocator&: Allocator::DefaultAllocator(), types: last_op.GetTypes()); |
| 506 | } |
| 507 | |
| 508 | void PipelineExecutor::StartOperator(PhysicalOperator &op) { |
| 509 | if (context.client.interrupted) { |
| 510 | throw InterruptException(); |
| 511 | } |
| 512 | context.thread.profiler.StartOperator(phys_op: &op); |
| 513 | } |
| 514 | |
| 515 | void PipelineExecutor::EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk) { |
| 516 | context.thread.profiler.EndOperator(chunk); |
| 517 | |
| 518 | if (chunk) { |
| 519 | chunk->Verify(); |
| 520 | } |
| 521 | } |
| 522 | |
| 523 | } // namespace duckdb |
| 524 | |