1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/parallel/pipeline_executor.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/common/types/data_chunk.hpp"
12#include "duckdb/parallel/interrupt.hpp"
13#include "duckdb/parallel/pipeline.hpp"
14#include "duckdb/execution/physical_operator.hpp"
15#include "duckdb/parallel/thread_context.hpp"
16#include "duckdb/execution/execution_context.hpp"
17#include "duckdb/common/stack.hpp"
18
19#include <functional>
20
21namespace duckdb {
22class Executor;
23
24//! The result of executing a PipelineExecutor
25enum class PipelineExecuteResult {
26 //! PipelineExecutor is fully executed: the source is completely exhausted
27 FINISHED,
28 //! PipelineExecutor is not yet fully executed and can be called again immediately
29 NOT_FINISHED,
30 //! The PipelineExecutor was interrupted and should not be called again until the interrupt is handled as specified
31 //! in the InterruptMode
32 INTERRUPTED
33};
34
35//! The Pipeline class represents an execution pipeline
36class PipelineExecutor {
37public:
38 PipelineExecutor(ClientContext &context, Pipeline &pipeline);
39
40 //! Fully execute a pipeline with a source and a sink until the source is completely exhausted
41 PipelineExecuteResult Execute();
42 //! Execute a pipeline with a source and a sink until finished, or until max_chunks were processed from the source
43 //! Returns true if execution is finished, false if Execute should be called again
44 PipelineExecuteResult Execute(idx_t max_chunks);
45
46 //! Push a single input DataChunk into the pipeline.
47 //! Returns either OperatorResultType::NEED_MORE_INPUT or OperatorResultType::FINISHED
48 //! If OperatorResultType::FINISHED is returned, more input will not change the result anymore
49 OperatorResultType ExecutePush(DataChunk &input);
50 //! Called after depleting the source: finalizes the execution of this pipeline executor
51 //! This should only be called once per PipelineExecutor
52 void PushFinalize();
53
54 //! Initializes a chunk with the types that will flow out of ExecutePull
55 void InitializeChunk(DataChunk &chunk);
56 //! Execute a pipeline without a sink, and retrieve a single DataChunk
57 //! Returns an empty chunk when finished.
58 void ExecutePull(DataChunk &result);
59 //! Called after depleting the source using ExecutePull
60 //! This flushes profiler states
61 void PullFinalize();
62
63 //! Registers the task in the interrupt_state to allow Source/Sink operators to block the task
64 void SetTaskForInterrupts(weak_ptr<Task> current_task);
65
66private:
67 //! The pipeline to process
68 Pipeline &pipeline;
69 //! The thread context of this executor
70 ThreadContext thread;
71 //! The total execution context of this executor
72 ExecutionContext context;
73
74 //! Intermediate chunks for the operators
75 vector<unique_ptr<DataChunk>> intermediate_chunks;
76 //! Intermediate states for the operators
77 vector<unique_ptr<OperatorState>> intermediate_states;
78
79 //! The local source state
80 unique_ptr<LocalSourceState> local_source_state;
81 //! The local sink state (if any)
82 unique_ptr<LocalSinkState> local_sink_state;
83 //! The interrupt state, holding required information for sink/source operators to block
84 InterruptState interrupt_state;
85
86 //! The final chunk used for moving data into the sink
87 DataChunk final_chunk;
88
89 //! The operators that are not yet finished executing and have data remaining
90 //! If the stack of in_process_operators is empty, we fetch from the source instead
91 stack<idx_t> in_process_operators;
92 //! Whether or not the pipeline has been finalized (used for verification only)
93 bool finalized = false;
94 //! Whether or not the pipeline has finished processing
95 int32_t finished_processing_idx = -1;
96 //! Whether or not this pipeline requires keeping track of the batch index of the source
97 bool requires_batch_index = false;
98
99 //! Source has indicated it is exhausted
100 bool exhausted_source = false;
101 //! Flushing of intermediate operators has started
102 bool started_flushing = false;
103 //! Flushing of caching operators is done
104 bool done_flushing = false;
105
106 //! This flag is set when the pipeline gets interrupted by the Sink -> the final_chunk should be re-sink-ed.
107 bool remaining_sink_chunk = false;
108
109 //! Current operator being flushed
110 idx_t flushing_idx;
111 //! Whether the current flushing_idx should be flushed: this needs to be stored to make flushing code re-entrant
112 bool should_flush_current_idx = true;
113
114private:
115 void StartOperator(PhysicalOperator &op);
116 void EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk);
117
118 //! Reset the operator index to the first operator
119 void GoToSource(idx_t &current_idx, idx_t initial_idx);
120 SourceResultType FetchFromSource(DataChunk &result);
121
122 void FinishProcessing(int32_t operator_idx = -1);
123 bool IsFinished();
124
125 //! Wrappers for sink/source calls to respective operators
126 SourceResultType GetData(DataChunk &chunk, OperatorSourceInput &input);
127 SinkResultType Sink(DataChunk &chunk, OperatorSinkInput &input);
128
129 OperatorResultType ExecutePushInternal(DataChunk &input, idx_t initial_idx = 0);
130 //! Pushes a chunk through the pipeline and returns a single result chunk
131 //! Returns whether or not a new input chunk is needed, or whether or not we are finished
132 OperatorResultType Execute(DataChunk &input, DataChunk &result, idx_t initial_index = 0);
133
134 //! Tries to flush all state from intermediate operators. Will return true if all state is flushed, false in the
135 //! case of a blocked sink.
136 bool TryFlushCachingOperators();
137
138 static bool CanCacheType(const LogicalType &type);
139 void CacheChunk(DataChunk &input, idx_t operator_idx);
140
141#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
142 //! Debugging state: number of times blocked
143 int debug_blocked_sink_count = 0;
144 int debug_blocked_source_count = 0;
145 //! Number of times the Sink/Source will block before actually returning data
146 int debug_blocked_target_count = 1;
147#endif
148};
149
150} // namespace duckdb
151