1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/execution/physical_operator.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/catalog/catalog.hpp"
12#include "duckdb/common/common.hpp"
13#include "duckdb/common/enums/operator_result_type.hpp"
14#include "duckdb/common/enums/physical_operator_type.hpp"
15#include "duckdb/common/types/data_chunk.hpp"
16#include "duckdb/execution/execution_context.hpp"
17#include "duckdb/optimizer/join_order/join_node.hpp"
18#include "duckdb/common/optional_idx.hpp"
19#include "duckdb/execution/physical_operator_states.hpp"
20#include "duckdb/common/enums/order_preservation_type.hpp"
21
22namespace duckdb {
23class Event;
24class Executor;
25class PhysicalOperator;
26class Pipeline;
27class PipelineBuildState;
28class MetaPipeline;
29
30//! PhysicalOperator is the base class of the physical operators present in the
31//! execution plan
32class PhysicalOperator {
33public:
34 static constexpr const PhysicalOperatorType TYPE = PhysicalOperatorType::INVALID;
35
36public:
37 PhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types, idx_t estimated_cardinality)
38 : type(type), types(std::move(types)), estimated_cardinality(estimated_cardinality) {
39 estimated_props = make_uniq<EstimatedProperties>(args&: estimated_cardinality, args: 0);
40 }
41
42 virtual ~PhysicalOperator() {
43 }
44
45 //! The physical operator type
46 PhysicalOperatorType type;
47 //! The set of children of the operator
48 vector<unique_ptr<PhysicalOperator>> children;
49 //! The types returned by this physical operator
50 vector<LogicalType> types;
51 //! The estimated cardinality of this physical operator
52 idx_t estimated_cardinality;
53 unique_ptr<EstimatedProperties> estimated_props;
54
55 //! The global sink state of this operator
56 unique_ptr<GlobalSinkState> sink_state;
57 //! The global state of this operator
58 unique_ptr<GlobalOperatorState> op_state;
59 //! Lock for (re)setting any of the operator states
60 mutex lock;
61
62public:
63 virtual string GetName() const;
64 virtual string ParamsToString() const {
65 return "";
66 }
67 virtual string ToString() const;
68 void Print() const;
69 virtual vector<const_reference<PhysicalOperator>> GetChildren() const;
70
71 //! Return a vector of the types that will be returned by this operator
72 const vector<LogicalType> &GetTypes() const {
73 return types;
74 }
75
76 virtual bool Equals(const PhysicalOperator &other) const {
77 return false;
78 }
79
80 virtual void Verify();
81
82public:
83 // Operator interface
84 virtual unique_ptr<OperatorState> GetOperatorState(ExecutionContext &context) const;
85 virtual unique_ptr<GlobalOperatorState> GetGlobalOperatorState(ClientContext &context) const;
86 virtual OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
87 GlobalOperatorState &gstate, OperatorState &state) const;
88 virtual OperatorFinalizeResultType FinalExecute(ExecutionContext &context, DataChunk &chunk,
89 GlobalOperatorState &gstate, OperatorState &state) const;
90
91 virtual bool ParallelOperator() const {
92 return false;
93 }
94
95 virtual bool RequiresFinalExecute() const {
96 return false;
97 }
98
99 //! The influence the operator has on order (insertion order means no influence)
100 virtual OrderPreservationType OperatorOrder() const {
101 return OrderPreservationType::INSERTION_ORDER;
102 }
103
104public:
105 // Source interface
106 virtual unique_ptr<LocalSourceState> GetLocalSourceState(ExecutionContext &context,
107 GlobalSourceState &gstate) const;
108 virtual unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const;
109 virtual SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const;
110
111 virtual idx_t GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate,
112 LocalSourceState &lstate) const;
113
114 virtual bool IsSource() const {
115 return false;
116 }
117
118 virtual bool ParallelSource() const {
119 return false;
120 }
121
122 virtual bool SupportsBatchIndex() const {
123 return false;
124 }
125
126 //! The type of order emitted by the operator (as a source)
127 virtual OrderPreservationType SourceOrder() const {
128 return OrderPreservationType::INSERTION_ORDER;
129 }
130
131 //! Returns the current progress percentage, or a negative value if progress bars are not supported
132 virtual double GetProgress(ClientContext &context, GlobalSourceState &gstate) const;
133
134public:
135 // Sink interface
136
137 //! The sink method is called constantly with new input, as long as new input is available. Note that this method
138 //! CAN be called in parallel, proper locking is needed when accessing data inside the GlobalSinkState.
139 virtual SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const;
140 // The combine is called when a single thread has completed execution of its part of the pipeline, it is the final
141 // time that a specific LocalSinkState is accessible. This method can be called in parallel while other Sink() or
142 // Combine() calls are active on the same GlobalSinkState.
143 virtual void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const;
144 //! The finalize is called when ALL threads are finished execution. It is called only once per pipeline, and is
145 //! entirely single threaded.
146 //! If Finalize returns SinkResultType::FINISHED, the sink is marked as finished
147 virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
148 GlobalSinkState &gstate) const;
149 //! For sinks with RequiresBatchIndex set to true, when a new batch starts being processed this method is called
150 //! This allows flushing of the current batch (e.g. to disk)
151 virtual void NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const;
152
153 virtual unique_ptr<LocalSinkState> GetLocalSinkState(ExecutionContext &context) const;
154 virtual unique_ptr<GlobalSinkState> GetGlobalSinkState(ClientContext &context) const;
155
156 //! The maximum amount of memory the operator should use per thread.
157 static idx_t GetMaxThreadMemory(ClientContext &context);
158
159 virtual bool IsSink() const {
160 return false;
161 }
162
163 virtual bool ParallelSink() const {
164 return false;
165 }
166
167 virtual bool RequiresBatchIndex() const {
168 return false;
169 }
170
171 //! Whether or not the sink operator depends on the order of the input chunks
172 //! If this is set to true, we cannot do things like caching intermediate vectors
173 virtual bool SinkOrderDependent() const {
174 return false;
175 }
176
177public:
178 // Pipeline construction
179 virtual vector<const_reference<PhysicalOperator>> GetSources() const;
180 bool AllSourcesSupportBatchIndex() const;
181
182 virtual void BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline);
183
184public:
185 template <class TARGET>
186 TARGET &Cast() {
187 if (TARGET::TYPE != PhysicalOperatorType::INVALID && type != TARGET::TYPE) {
188 throw InternalException("Failed to cast physical operator to type - physical operator type mismatch");
189 }
190 return reinterpret_cast<TARGET &>(*this);
191 }
192
193 template <class TARGET>
194 const TARGET &Cast() const {
195 if (TARGET::TYPE != PhysicalOperatorType::INVALID && type != TARGET::TYPE) {
196 throw InternalException("Failed to cast physical operator to type - physical operator type mismatch");
197 }
198 return reinterpret_cast<const TARGET &>(*this);
199 }
200};
201
202//! Contains state for the CachingPhysicalOperator
203class CachingOperatorState : public OperatorState {
204public:
205 ~CachingOperatorState() override {
206 }
207
208 void Finalize(const PhysicalOperator &op, ExecutionContext &context) override {
209 }
210
211 unique_ptr<DataChunk> cached_chunk;
212 bool initialized = false;
213 //! Whether or not the chunk can be cached
214 bool can_cache_chunk = false;
215};
216
217//! Base class that caches output from child Operator class. Note that Operators inheriting from this class should also
218//! inherit their state class from the CachingOperatorState.
219class CachingPhysicalOperator : public PhysicalOperator {
220public:
221 static constexpr const idx_t CACHE_THRESHOLD = 64;
222 CachingPhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types, idx_t estimated_cardinality);
223
224 bool caching_supported;
225
226public:
227 OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
228 GlobalOperatorState &gstate, OperatorState &state) const final;
229 OperatorFinalizeResultType FinalExecute(ExecutionContext &context, DataChunk &chunk, GlobalOperatorState &gstate,
230 OperatorState &state) const final;
231
232 bool RequiresFinalExecute() const final {
233 return caching_supported;
234 }
235
236protected:
237 //! Child classes need to implement the ExecuteInternal method instead of the Execute
238 virtual OperatorResultType ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
239 GlobalOperatorState &gstate, OperatorState &state) const = 0;
240
241private:
242 bool CanCacheType(const LogicalType &type);
243};
244
245} // namespace duckdb
246