1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/parallel/pipeline_executor.hpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #pragma once |
10 | |
11 | #include "duckdb/common/types/data_chunk.hpp" |
12 | #include "duckdb/parallel/interrupt.hpp" |
13 | #include "duckdb/parallel/pipeline.hpp" |
14 | #include "duckdb/execution/physical_operator.hpp" |
15 | #include "duckdb/parallel/thread_context.hpp" |
16 | #include "duckdb/execution/execution_context.hpp" |
17 | #include "duckdb/common/stack.hpp" |
18 | |
19 | #include <functional> |
20 | |
21 | namespace duckdb { |
22 | class Executor; |
23 | |
24 | //! The result of executing a PipelineExecutor |
25 | enum class PipelineExecuteResult { |
26 | //! PipelineExecutor is fully executed: the source is completely exhausted |
27 | FINISHED, |
28 | //! PipelineExecutor is not yet fully executed and can be called again immediately |
29 | NOT_FINISHED, |
30 | //! The PipelineExecutor was interrupted and should not be called again until the interrupt is handled as specified |
31 | //! in the InterruptMode |
32 | INTERRUPTED |
33 | }; |
34 | |
35 | //! The Pipeline class represents an execution pipeline |
36 | class PipelineExecutor { |
37 | public: |
38 | PipelineExecutor(ClientContext &context, Pipeline &pipeline); |
39 | |
40 | //! Fully execute a pipeline with a source and a sink until the source is completely exhausted |
41 | PipelineExecuteResult Execute(); |
42 | //! Execute a pipeline with a source and a sink until finished, or until max_chunks were processed from the source |
43 | //! Returns true if execution is finished, false if Execute should be called again |
44 | PipelineExecuteResult Execute(idx_t max_chunks); |
45 | |
46 | //! Push a single input DataChunk into the pipeline. |
47 | //! Returns either OperatorResultType::NEED_MORE_INPUT or OperatorResultType::FINISHED |
48 | //! If OperatorResultType::FINISHED is returned, more input will not change the result anymore |
49 | OperatorResultType ExecutePush(DataChunk &input); |
50 | //! Called after depleting the source: finalizes the execution of this pipeline executor |
51 | //! This should only be called once per PipelineExecutor |
52 | void PushFinalize(); |
53 | |
54 | //! Initializes a chunk with the types that will flow out of ExecutePull |
55 | void InitializeChunk(DataChunk &chunk); |
56 | //! Execute a pipeline without a sink, and retrieve a single DataChunk |
57 | //! Returns an empty chunk when finished. |
58 | void ExecutePull(DataChunk &result); |
59 | //! Called after depleting the source using ExecutePull |
60 | //! This flushes profiler states |
61 | void PullFinalize(); |
62 | |
63 | //! Registers the task in the interrupt_state to allow Source/Sink operators to block the task |
64 | void SetTaskForInterrupts(weak_ptr<Task> current_task); |
65 | |
66 | private: |
67 | //! The pipeline to process |
68 | Pipeline &pipeline; |
69 | //! The thread context of this executor |
70 | ThreadContext thread; |
71 | //! The total execution context of this executor |
72 | ExecutionContext context; |
73 | |
74 | //! Intermediate chunks for the operators |
75 | vector<unique_ptr<DataChunk>> intermediate_chunks; |
76 | //! Intermediate states for the operators |
77 | vector<unique_ptr<OperatorState>> intermediate_states; |
78 | |
79 | //! The local source state |
80 | unique_ptr<LocalSourceState> local_source_state; |
81 | //! The local sink state (if any) |
82 | unique_ptr<LocalSinkState> local_sink_state; |
83 | //! The interrupt state, holding required information for sink/source operators to block |
84 | InterruptState interrupt_state; |
85 | |
86 | //! The final chunk used for moving data into the sink |
87 | DataChunk final_chunk; |
88 | |
89 | //! The operators that are not yet finished executing and have data remaining |
90 | //! If the stack of in_process_operators is empty, we fetch from the source instead |
91 | stack<idx_t> in_process_operators; |
92 | //! Whether or not the pipeline has been finalized (used for verification only) |
93 | bool finalized = false; |
94 | //! Whether or not the pipeline has finished processing |
95 | int32_t finished_processing_idx = -1; |
96 | //! Whether or not this pipeline requires keeping track of the batch index of the source |
97 | bool requires_batch_index = false; |
98 | |
99 | //! Source has indicated it is exhausted |
100 | bool exhausted_source = false; |
101 | //! Flushing of intermediate operators has started |
102 | bool started_flushing = false; |
103 | //! Flushing of caching operators is done |
104 | bool done_flushing = false; |
105 | |
106 | //! This flag is set when the pipeline gets interrupted by the Sink -> the final_chunk should be re-sink-ed. |
107 | bool remaining_sink_chunk = false; |
108 | |
109 | //! Current operator being flushed |
110 | idx_t flushing_idx; |
111 | //! Whether the current flushing_idx should be flushed: this needs to be stored to make flushing code re-entrant |
112 | bool should_flush_current_idx = true; |
113 | |
114 | private: |
115 | void StartOperator(PhysicalOperator &op); |
116 | void EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk); |
117 | |
118 | //! Reset the operator index to the first operator |
119 | void GoToSource(idx_t ¤t_idx, idx_t initial_idx); |
120 | SourceResultType FetchFromSource(DataChunk &result); |
121 | |
122 | void FinishProcessing(int32_t operator_idx = -1); |
123 | bool IsFinished(); |
124 | |
125 | //! Wrappers for sink/source calls to respective operators |
126 | SourceResultType GetData(DataChunk &chunk, OperatorSourceInput &input); |
127 | SinkResultType Sink(DataChunk &chunk, OperatorSinkInput &input); |
128 | |
129 | OperatorResultType ExecutePushInternal(DataChunk &input, idx_t initial_idx = 0); |
130 | //! Pushes a chunk through the pipeline and returns a single result chunk |
131 | //! Returns whether or not a new input chunk is needed, or whether or not we are finished |
132 | OperatorResultType Execute(DataChunk &input, DataChunk &result, idx_t initial_index = 0); |
133 | |
134 | //! Tries to flush all state from intermediate operators. Will return true if all state is flushed, false in the |
135 | //! case of a blocked sink. |
136 | bool TryFlushCachingOperators(); |
137 | |
138 | static bool CanCacheType(const LogicalType &type); |
139 | void CacheChunk(DataChunk &input, idx_t operator_idx); |
140 | |
141 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
142 | //! Debugging state: number of times blocked |
143 | int debug_blocked_sink_count = 0; |
144 | int debug_blocked_source_count = 0; |
145 | //! Number of times the Sink/Source will block before actually returning data |
146 | int debug_blocked_target_count = 1; |
147 | #endif |
148 | }; |
149 | |
150 | } // namespace duckdb |
151 | |