1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/execution/physical_operator.hpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #pragma once |
10 | |
11 | #include "duckdb/catalog/catalog.hpp" |
12 | #include "duckdb/common/common.hpp" |
13 | #include "duckdb/common/enums/operator_result_type.hpp" |
14 | #include "duckdb/common/enums/physical_operator_type.hpp" |
15 | #include "duckdb/common/types/data_chunk.hpp" |
16 | #include "duckdb/execution/execution_context.hpp" |
17 | #include "duckdb/optimizer/join_order/join_node.hpp" |
18 | #include "duckdb/common/optional_idx.hpp" |
19 | #include "duckdb/execution/physical_operator_states.hpp" |
20 | #include "duckdb/common/enums/order_preservation_type.hpp" |
21 | |
22 | namespace duckdb { |
23 | class Event; |
24 | class Executor; |
25 | class PhysicalOperator; |
26 | class Pipeline; |
27 | class PipelineBuildState; |
28 | class MetaPipeline; |
29 | |
30 | //! PhysicalOperator is the base class of the physical operators present in the |
31 | //! execution plan |
32 | class PhysicalOperator { |
33 | public: |
34 | static constexpr const PhysicalOperatorType TYPE = PhysicalOperatorType::INVALID; |
35 | |
36 | public: |
37 | PhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types, idx_t estimated_cardinality) |
38 | : type(type), types(std::move(types)), estimated_cardinality(estimated_cardinality) { |
39 | estimated_props = make_uniq<EstimatedProperties>(args&: estimated_cardinality, args: 0); |
40 | } |
41 | |
42 | virtual ~PhysicalOperator() { |
43 | } |
44 | |
45 | //! The physical operator type |
46 | PhysicalOperatorType type; |
47 | //! The set of children of the operator |
48 | vector<unique_ptr<PhysicalOperator>> children; |
49 | //! The types returned by this physical operator |
50 | vector<LogicalType> types; |
51 | //! The estimated cardinality of this physical operator |
52 | idx_t estimated_cardinality; |
53 | unique_ptr<EstimatedProperties> estimated_props; |
54 | |
55 | //! The global sink state of this operator |
56 | unique_ptr<GlobalSinkState> sink_state; |
57 | //! The global state of this operator |
58 | unique_ptr<GlobalOperatorState> op_state; |
59 | //! Lock for (re)setting any of the operator states |
60 | mutex lock; |
61 | |
62 | public: |
63 | virtual string GetName() const; |
64 | virtual string ParamsToString() const { |
65 | return "" ; |
66 | } |
67 | virtual string ToString() const; |
68 | void Print() const; |
69 | virtual vector<const_reference<PhysicalOperator>> GetChildren() const; |
70 | |
71 | //! Return a vector of the types that will be returned by this operator |
72 | const vector<LogicalType> &GetTypes() const { |
73 | return types; |
74 | } |
75 | |
76 | virtual bool Equals(const PhysicalOperator &other) const { |
77 | return false; |
78 | } |
79 | |
80 | virtual void Verify(); |
81 | |
82 | public: |
83 | // Operator interface |
84 | virtual unique_ptr<OperatorState> GetOperatorState(ExecutionContext &context) const; |
85 | virtual unique_ptr<GlobalOperatorState> GetGlobalOperatorState(ClientContext &context) const; |
86 | virtual OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
87 | GlobalOperatorState &gstate, OperatorState &state) const; |
88 | virtual OperatorFinalizeResultType FinalExecute(ExecutionContext &context, DataChunk &chunk, |
89 | GlobalOperatorState &gstate, OperatorState &state) const; |
90 | |
91 | virtual bool ParallelOperator() const { |
92 | return false; |
93 | } |
94 | |
95 | virtual bool RequiresFinalExecute() const { |
96 | return false; |
97 | } |
98 | |
99 | //! The influence the operator has on order (insertion order means no influence) |
100 | virtual OrderPreservationType OperatorOrder() const { |
101 | return OrderPreservationType::INSERTION_ORDER; |
102 | } |
103 | |
104 | public: |
105 | // Source interface |
106 | virtual unique_ptr<LocalSourceState> GetLocalSourceState(ExecutionContext &context, |
107 | GlobalSourceState &gstate) const; |
108 | virtual unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const; |
109 | virtual SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const; |
110 | |
111 | virtual idx_t GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate, |
112 | LocalSourceState &lstate) const; |
113 | |
114 | virtual bool IsSource() const { |
115 | return false; |
116 | } |
117 | |
118 | virtual bool ParallelSource() const { |
119 | return false; |
120 | } |
121 | |
122 | virtual bool SupportsBatchIndex() const { |
123 | return false; |
124 | } |
125 | |
126 | //! The type of order emitted by the operator (as a source) |
127 | virtual OrderPreservationType SourceOrder() const { |
128 | return OrderPreservationType::INSERTION_ORDER; |
129 | } |
130 | |
131 | //! Returns the current progress percentage, or a negative value if progress bars are not supported |
132 | virtual double GetProgress(ClientContext &context, GlobalSourceState &gstate) const; |
133 | |
134 | public: |
135 | // Sink interface |
136 | |
137 | //! The sink method is called constantly with new input, as long as new input is available. Note that this method |
138 | //! CAN be called in parallel, proper locking is needed when accessing data inside the GlobalSinkState. |
139 | virtual SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const; |
140 | // The combine is called when a single thread has completed execution of its part of the pipeline, it is the final |
141 | // time that a specific LocalSinkState is accessible. This method can be called in parallel while other Sink() or |
142 | // Combine() calls are active on the same GlobalSinkState. |
143 | virtual void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const; |
144 | //! The finalize is called when ALL threads are finished execution. It is called only once per pipeline, and is |
145 | //! entirely single threaded. |
146 | //! If Finalize returns SinkResultType::FINISHED, the sink is marked as finished |
147 | virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
148 | GlobalSinkState &gstate) const; |
149 | //! For sinks with RequiresBatchIndex set to true, when a new batch starts being processed this method is called |
150 | //! This allows flushing of the current batch (e.g. to disk) |
151 | virtual void NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const; |
152 | |
153 | virtual unique_ptr<LocalSinkState> GetLocalSinkState(ExecutionContext &context) const; |
154 | virtual unique_ptr<GlobalSinkState> GetGlobalSinkState(ClientContext &context) const; |
155 | |
156 | //! The maximum amount of memory the operator should use per thread. |
157 | static idx_t GetMaxThreadMemory(ClientContext &context); |
158 | |
159 | virtual bool IsSink() const { |
160 | return false; |
161 | } |
162 | |
163 | virtual bool ParallelSink() const { |
164 | return false; |
165 | } |
166 | |
167 | virtual bool RequiresBatchIndex() const { |
168 | return false; |
169 | } |
170 | |
171 | //! Whether or not the sink operator depends on the order of the input chunks |
172 | //! If this is set to true, we cannot do things like caching intermediate vectors |
173 | virtual bool SinkOrderDependent() const { |
174 | return false; |
175 | } |
176 | |
177 | public: |
178 | // Pipeline construction |
179 | virtual vector<const_reference<PhysicalOperator>> GetSources() const; |
180 | bool AllSourcesSupportBatchIndex() const; |
181 | |
182 | virtual void BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline); |
183 | |
184 | public: |
185 | template <class TARGET> |
186 | TARGET &Cast() { |
187 | if (TARGET::TYPE != PhysicalOperatorType::INVALID && type != TARGET::TYPE) { |
188 | throw InternalException("Failed to cast physical operator to type - physical operator type mismatch" ); |
189 | } |
190 | return reinterpret_cast<TARGET &>(*this); |
191 | } |
192 | |
193 | template <class TARGET> |
194 | const TARGET &Cast() const { |
195 | if (TARGET::TYPE != PhysicalOperatorType::INVALID && type != TARGET::TYPE) { |
196 | throw InternalException("Failed to cast physical operator to type - physical operator type mismatch" ); |
197 | } |
198 | return reinterpret_cast<const TARGET &>(*this); |
199 | } |
200 | }; |
201 | |
202 | //! Contains state for the CachingPhysicalOperator |
203 | class CachingOperatorState : public OperatorState { |
204 | public: |
205 | ~CachingOperatorState() override { |
206 | } |
207 | |
208 | void Finalize(const PhysicalOperator &op, ExecutionContext &context) override { |
209 | } |
210 | |
211 | unique_ptr<DataChunk> cached_chunk; |
212 | bool initialized = false; |
213 | //! Whether or not the chunk can be cached |
214 | bool can_cache_chunk = false; |
215 | }; |
216 | |
217 | //! Base class that caches output from child Operator class. Note that Operators inheriting from this class should also |
218 | //! inherit their state class from the CachingOperatorState. |
219 | class CachingPhysicalOperator : public PhysicalOperator { |
220 | public: |
221 | static constexpr const idx_t CACHE_THRESHOLD = 64; |
222 | CachingPhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types, idx_t estimated_cardinality); |
223 | |
224 | bool caching_supported; |
225 | |
226 | public: |
227 | OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
228 | GlobalOperatorState &gstate, OperatorState &state) const final; |
229 | OperatorFinalizeResultType FinalExecute(ExecutionContext &context, DataChunk &chunk, GlobalOperatorState &gstate, |
230 | OperatorState &state) const final; |
231 | |
232 | bool RequiresFinalExecute() const final { |
233 | return caching_supported; |
234 | } |
235 | |
236 | protected: |
237 | //! Child classes need to implement the ExecuteInternal method instead of the Execute |
238 | virtual OperatorResultType ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
239 | GlobalOperatorState &gstate, OperatorState &state) const = 0; |
240 | |
241 | private: |
242 | bool CanCacheType(const LogicalType &type); |
243 | }; |
244 | |
245 | } // namespace duckdb |
246 | |