| 1 | //===----------------------------------------------------------------------===// |
| 2 | // DuckDB |
| 3 | // |
| 4 | // duckdb/parallel/pipeline_executor.hpp |
| 5 | // |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | |
| 9 | #pragma once |
| 10 | |
| 11 | #include "duckdb/common/types/data_chunk.hpp" |
| 12 | #include "duckdb/parallel/interrupt.hpp" |
| 13 | #include "duckdb/parallel/pipeline.hpp" |
| 14 | #include "duckdb/execution/physical_operator.hpp" |
| 15 | #include "duckdb/parallel/thread_context.hpp" |
| 16 | #include "duckdb/execution/execution_context.hpp" |
| 17 | #include "duckdb/common/stack.hpp" |
| 18 | |
| 19 | #include <functional> |
| 20 | |
| 21 | namespace duckdb { |
| 22 | class Executor; |
| 23 | |
| 24 | //! The result of executing a PipelineExecutor |
| 25 | enum class PipelineExecuteResult { |
| 26 | //! PipelineExecutor is fully executed: the source is completely exhausted |
| 27 | FINISHED, |
| 28 | //! PipelineExecutor is not yet fully executed and can be called again immediately |
| 29 | NOT_FINISHED, |
| 30 | //! The PipelineExecutor was interrupted and should not be called again until the interrupt is handled as specified |
| 31 | //! in the InterruptMode |
| 32 | INTERRUPTED |
| 33 | }; |
| 34 | |
| 35 | //! The Pipeline class represents an execution pipeline |
| 36 | class PipelineExecutor { |
| 37 | public: |
| 38 | PipelineExecutor(ClientContext &context, Pipeline &pipeline); |
| 39 | |
| 40 | //! Fully execute a pipeline with a source and a sink until the source is completely exhausted |
| 41 | PipelineExecuteResult Execute(); |
| 42 | //! Execute a pipeline with a source and a sink until finished, or until max_chunks were processed from the source |
| 43 | //! Returns true if execution is finished, false if Execute should be called again |
| 44 | PipelineExecuteResult Execute(idx_t max_chunks); |
| 45 | |
| 46 | //! Push a single input DataChunk into the pipeline. |
| 47 | //! Returns either OperatorResultType::NEED_MORE_INPUT or OperatorResultType::FINISHED |
| 48 | //! If OperatorResultType::FINISHED is returned, more input will not change the result anymore |
| 49 | OperatorResultType ExecutePush(DataChunk &input); |
| 50 | //! Called after depleting the source: finalizes the execution of this pipeline executor |
| 51 | //! This should only be called once per PipelineExecutor |
| 52 | void PushFinalize(); |
| 53 | |
| 54 | //! Initializes a chunk with the types that will flow out of ExecutePull |
| 55 | void InitializeChunk(DataChunk &chunk); |
| 56 | //! Execute a pipeline without a sink, and retrieve a single DataChunk |
| 57 | //! Returns an empty chunk when finished. |
| 58 | void ExecutePull(DataChunk &result); |
| 59 | //! Called after depleting the source using ExecutePull |
| 60 | //! This flushes profiler states |
| 61 | void PullFinalize(); |
| 62 | |
| 63 | //! Registers the task in the interrupt_state to allow Source/Sink operators to block the task |
| 64 | void SetTaskForInterrupts(weak_ptr<Task> current_task); |
| 65 | |
| 66 | private: |
| 67 | //! The pipeline to process |
| 68 | Pipeline &pipeline; |
| 69 | //! The thread context of this executor |
| 70 | ThreadContext thread; |
| 71 | //! The total execution context of this executor |
| 72 | ExecutionContext context; |
| 73 | |
| 74 | //! Intermediate chunks for the operators |
| 75 | vector<unique_ptr<DataChunk>> intermediate_chunks; |
| 76 | //! Intermediate states for the operators |
| 77 | vector<unique_ptr<OperatorState>> intermediate_states; |
| 78 | |
| 79 | //! The local source state |
| 80 | unique_ptr<LocalSourceState> local_source_state; |
| 81 | //! The local sink state (if any) |
| 82 | unique_ptr<LocalSinkState> local_sink_state; |
| 83 | //! The interrupt state, holding required information for sink/source operators to block |
| 84 | InterruptState interrupt_state; |
| 85 | |
| 86 | //! The final chunk used for moving data into the sink |
| 87 | DataChunk final_chunk; |
| 88 | |
| 89 | //! The operators that are not yet finished executing and have data remaining |
| 90 | //! If the stack of in_process_operators is empty, we fetch from the source instead |
| 91 | stack<idx_t> in_process_operators; |
| 92 | //! Whether or not the pipeline has been finalized (used for verification only) |
| 93 | bool finalized = false; |
| 94 | //! Whether or not the pipeline has finished processing |
| 95 | int32_t finished_processing_idx = -1; |
| 96 | //! Whether or not this pipeline requires keeping track of the batch index of the source |
| 97 | bool requires_batch_index = false; |
| 98 | |
| 99 | //! Source has indicated it is exhausted |
| 100 | bool exhausted_source = false; |
| 101 | //! Flushing of intermediate operators has started |
| 102 | bool started_flushing = false; |
| 103 | //! Flushing of caching operators is done |
| 104 | bool done_flushing = false; |
| 105 | |
| 106 | //! This flag is set when the pipeline gets interrupted by the Sink -> the final_chunk should be re-sink-ed. |
| 107 | bool remaining_sink_chunk = false; |
| 108 | |
| 109 | //! Current operator being flushed |
| 110 | idx_t flushing_idx; |
| 111 | //! Whether the current flushing_idx should be flushed: this needs to be stored to make flushing code re-entrant |
| 112 | bool should_flush_current_idx = true; |
| 113 | |
| 114 | private: |
| 115 | void StartOperator(PhysicalOperator &op); |
| 116 | void EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk); |
| 117 | |
| 118 | //! Reset the operator index to the first operator |
| 119 | void GoToSource(idx_t ¤t_idx, idx_t initial_idx); |
| 120 | SourceResultType FetchFromSource(DataChunk &result); |
| 121 | |
| 122 | void FinishProcessing(int32_t operator_idx = -1); |
| 123 | bool IsFinished(); |
| 124 | |
| 125 | //! Wrappers for sink/source calls to respective operators |
| 126 | SourceResultType GetData(DataChunk &chunk, OperatorSourceInput &input); |
| 127 | SinkResultType Sink(DataChunk &chunk, OperatorSinkInput &input); |
| 128 | |
| 129 | OperatorResultType ExecutePushInternal(DataChunk &input, idx_t initial_idx = 0); |
| 130 | //! Pushes a chunk through the pipeline and returns a single result chunk |
| 131 | //! Returns whether or not a new input chunk is needed, or whether or not we are finished |
| 132 | OperatorResultType Execute(DataChunk &input, DataChunk &result, idx_t initial_index = 0); |
| 133 | |
| 134 | //! Tries to flush all state from intermediate operators. Will return true if all state is flushed, false in the |
| 135 | //! case of a blocked sink. |
| 136 | bool TryFlushCachingOperators(); |
| 137 | |
| 138 | static bool CanCacheType(const LogicalType &type); |
| 139 | void CacheChunk(DataChunk &input, idx_t operator_idx); |
| 140 | |
| 141 | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
| 142 | //! Debugging state: number of times blocked |
| 143 | int debug_blocked_sink_count = 0; |
| 144 | int debug_blocked_source_count = 0; |
| 145 | //! Number of times the Sink/Source will block before actually returning data |
| 146 | int debug_blocked_target_count = 1; |
| 147 | #endif |
| 148 | }; |
| 149 | |
| 150 | } // namespace duckdb |
| 151 | |