| 1 | //===----------------------------------------------------------------------===// |
| 2 | // DuckDB |
| 3 | // |
| 4 | // duckdb/execution/physical_operator.hpp |
| 5 | // |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | |
| 9 | #pragma once |
| 10 | |
| 11 | #include "duckdb/catalog/catalog.hpp" |
| 12 | #include "duckdb/common/common.hpp" |
| 13 | #include "duckdb/common/enums/operator_result_type.hpp" |
| 14 | #include "duckdb/common/enums/physical_operator_type.hpp" |
| 15 | #include "duckdb/common/types/data_chunk.hpp" |
| 16 | #include "duckdb/execution/execution_context.hpp" |
| 17 | #include "duckdb/optimizer/join_order/join_node.hpp" |
| 18 | #include "duckdb/common/optional_idx.hpp" |
| 19 | #include "duckdb/execution/physical_operator_states.hpp" |
| 20 | #include "duckdb/common/enums/order_preservation_type.hpp" |
| 21 | |
| 22 | namespace duckdb { |
| 23 | class Event; |
| 24 | class Executor; |
| 25 | class PhysicalOperator; |
| 26 | class Pipeline; |
| 27 | class PipelineBuildState; |
| 28 | class MetaPipeline; |
| 29 | |
| 30 | //! PhysicalOperator is the base class of the physical operators present in the |
| 31 | //! execution plan |
| 32 | class PhysicalOperator { |
| 33 | public: |
| 34 | static constexpr const PhysicalOperatorType TYPE = PhysicalOperatorType::INVALID; |
| 35 | |
| 36 | public: |
| 37 | PhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types, idx_t estimated_cardinality) |
| 38 | : type(type), types(std::move(types)), estimated_cardinality(estimated_cardinality) { |
| 39 | estimated_props = make_uniq<EstimatedProperties>(args&: estimated_cardinality, args: 0); |
| 40 | } |
| 41 | |
| 42 | virtual ~PhysicalOperator() { |
| 43 | } |
| 44 | |
| 45 | //! The physical operator type |
| 46 | PhysicalOperatorType type; |
| 47 | //! The set of children of the operator |
| 48 | vector<unique_ptr<PhysicalOperator>> children; |
| 49 | //! The types returned by this physical operator |
| 50 | vector<LogicalType> types; |
| 51 | //! The estimated cardinality of this physical operator |
| 52 | idx_t estimated_cardinality; |
| 53 | unique_ptr<EstimatedProperties> estimated_props; |
| 54 | |
| 55 | //! The global sink state of this operator |
| 56 | unique_ptr<GlobalSinkState> sink_state; |
| 57 | //! The global state of this operator |
| 58 | unique_ptr<GlobalOperatorState> op_state; |
| 59 | //! Lock for (re)setting any of the operator states |
| 60 | mutex lock; |
| 61 | |
| 62 | public: |
| 63 | virtual string GetName() const; |
| 64 | virtual string ParamsToString() const { |
| 65 | return "" ; |
| 66 | } |
| 67 | virtual string ToString() const; |
| 68 | void Print() const; |
| 69 | virtual vector<const_reference<PhysicalOperator>> GetChildren() const; |
| 70 | |
| 71 | //! Return a vector of the types that will be returned by this operator |
| 72 | const vector<LogicalType> &GetTypes() const { |
| 73 | return types; |
| 74 | } |
| 75 | |
| 76 | virtual bool Equals(const PhysicalOperator &other) const { |
| 77 | return false; |
| 78 | } |
| 79 | |
| 80 | virtual void Verify(); |
| 81 | |
| 82 | public: |
| 83 | // Operator interface |
| 84 | virtual unique_ptr<OperatorState> GetOperatorState(ExecutionContext &context) const; |
| 85 | virtual unique_ptr<GlobalOperatorState> GetGlobalOperatorState(ClientContext &context) const; |
| 86 | virtual OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
| 87 | GlobalOperatorState &gstate, OperatorState &state) const; |
| 88 | virtual OperatorFinalizeResultType FinalExecute(ExecutionContext &context, DataChunk &chunk, |
| 89 | GlobalOperatorState &gstate, OperatorState &state) const; |
| 90 | |
| 91 | virtual bool ParallelOperator() const { |
| 92 | return false; |
| 93 | } |
| 94 | |
| 95 | virtual bool RequiresFinalExecute() const { |
| 96 | return false; |
| 97 | } |
| 98 | |
| 99 | //! The influence the operator has on order (insertion order means no influence) |
| 100 | virtual OrderPreservationType OperatorOrder() const { |
| 101 | return OrderPreservationType::INSERTION_ORDER; |
| 102 | } |
| 103 | |
| 104 | public: |
| 105 | // Source interface |
| 106 | virtual unique_ptr<LocalSourceState> GetLocalSourceState(ExecutionContext &context, |
| 107 | GlobalSourceState &gstate) const; |
| 108 | virtual unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const; |
| 109 | virtual SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const; |
| 110 | |
| 111 | virtual idx_t GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate, |
| 112 | LocalSourceState &lstate) const; |
| 113 | |
| 114 | virtual bool IsSource() const { |
| 115 | return false; |
| 116 | } |
| 117 | |
| 118 | virtual bool ParallelSource() const { |
| 119 | return false; |
| 120 | } |
| 121 | |
| 122 | virtual bool SupportsBatchIndex() const { |
| 123 | return false; |
| 124 | } |
| 125 | |
| 126 | //! The type of order emitted by the operator (as a source) |
| 127 | virtual OrderPreservationType SourceOrder() const { |
| 128 | return OrderPreservationType::INSERTION_ORDER; |
| 129 | } |
| 130 | |
| 131 | //! Returns the current progress percentage, or a negative value if progress bars are not supported |
| 132 | virtual double GetProgress(ClientContext &context, GlobalSourceState &gstate) const; |
| 133 | |
| 134 | public: |
| 135 | // Sink interface |
| 136 | |
| 137 | //! The sink method is called constantly with new input, as long as new input is available. Note that this method |
| 138 | //! CAN be called in parallel, proper locking is needed when accessing data inside the GlobalSinkState. |
| 139 | virtual SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const; |
| 140 | // The combine is called when a single thread has completed execution of its part of the pipeline, it is the final |
| 141 | // time that a specific LocalSinkState is accessible. This method can be called in parallel while other Sink() or |
| 142 | // Combine() calls are active on the same GlobalSinkState. |
| 143 | virtual void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const; |
| 144 | //! The finalize is called when ALL threads are finished execution. It is called only once per pipeline, and is |
| 145 | //! entirely single threaded. |
| 146 | //! If Finalize returns SinkResultType::FINISHED, the sink is marked as finished |
| 147 | virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
| 148 | GlobalSinkState &gstate) const; |
| 149 | //! For sinks with RequiresBatchIndex set to true, when a new batch starts being processed this method is called |
| 150 | //! This allows flushing of the current batch (e.g. to disk) |
| 151 | virtual void NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const; |
| 152 | |
| 153 | virtual unique_ptr<LocalSinkState> GetLocalSinkState(ExecutionContext &context) const; |
| 154 | virtual unique_ptr<GlobalSinkState> GetGlobalSinkState(ClientContext &context) const; |
| 155 | |
| 156 | //! The maximum amount of memory the operator should use per thread. |
| 157 | static idx_t GetMaxThreadMemory(ClientContext &context); |
| 158 | |
| 159 | virtual bool IsSink() const { |
| 160 | return false; |
| 161 | } |
| 162 | |
| 163 | virtual bool ParallelSink() const { |
| 164 | return false; |
| 165 | } |
| 166 | |
| 167 | virtual bool RequiresBatchIndex() const { |
| 168 | return false; |
| 169 | } |
| 170 | |
| 171 | //! Whether or not the sink operator depends on the order of the input chunks |
| 172 | //! If this is set to true, we cannot do things like caching intermediate vectors |
| 173 | virtual bool SinkOrderDependent() const { |
| 174 | return false; |
| 175 | } |
| 176 | |
| 177 | public: |
| 178 | // Pipeline construction |
| 179 | virtual vector<const_reference<PhysicalOperator>> GetSources() const; |
| 180 | bool AllSourcesSupportBatchIndex() const; |
| 181 | |
| 182 | virtual void BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline); |
| 183 | |
| 184 | public: |
| 185 | template <class TARGET> |
| 186 | TARGET &Cast() { |
| 187 | if (TARGET::TYPE != PhysicalOperatorType::INVALID && type != TARGET::TYPE) { |
| 188 | throw InternalException("Failed to cast physical operator to type - physical operator type mismatch" ); |
| 189 | } |
| 190 | return reinterpret_cast<TARGET &>(*this); |
| 191 | } |
| 192 | |
| 193 | template <class TARGET> |
| 194 | const TARGET &Cast() const { |
| 195 | if (TARGET::TYPE != PhysicalOperatorType::INVALID && type != TARGET::TYPE) { |
| 196 | throw InternalException("Failed to cast physical operator to type - physical operator type mismatch" ); |
| 197 | } |
| 198 | return reinterpret_cast<const TARGET &>(*this); |
| 199 | } |
| 200 | }; |
| 201 | |
| 202 | //! Contains state for the CachingPhysicalOperator |
| 203 | class CachingOperatorState : public OperatorState { |
| 204 | public: |
| 205 | ~CachingOperatorState() override { |
| 206 | } |
| 207 | |
| 208 | void Finalize(const PhysicalOperator &op, ExecutionContext &context) override { |
| 209 | } |
| 210 | |
| 211 | unique_ptr<DataChunk> cached_chunk; |
| 212 | bool initialized = false; |
| 213 | //! Whether or not the chunk can be cached |
| 214 | bool can_cache_chunk = false; |
| 215 | }; |
| 216 | |
| 217 | //! Base class that caches output from child Operator class. Note that Operators inheriting from this class should also |
| 218 | //! inherit their state class from the CachingOperatorState. |
| 219 | class CachingPhysicalOperator : public PhysicalOperator { |
| 220 | public: |
| 221 | static constexpr const idx_t CACHE_THRESHOLD = 64; |
| 222 | CachingPhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types, idx_t estimated_cardinality); |
| 223 | |
| 224 | bool caching_supported; |
| 225 | |
| 226 | public: |
| 227 | OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
| 228 | GlobalOperatorState &gstate, OperatorState &state) const final; |
| 229 | OperatorFinalizeResultType FinalExecute(ExecutionContext &context, DataChunk &chunk, GlobalOperatorState &gstate, |
| 230 | OperatorState &state) const final; |
| 231 | |
| 232 | bool RequiresFinalExecute() const final { |
| 233 | return caching_supported; |
| 234 | } |
| 235 | |
| 236 | protected: |
| 237 | //! Child classes need to implement the ExecuteInternal method instead of the Execute |
| 238 | virtual OperatorResultType ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
| 239 | GlobalOperatorState &gstate, OperatorState &state) const = 0; |
| 240 | |
| 241 | private: |
| 242 | bool CanCacheType(const LogicalType &type); |
| 243 | }; |
| 244 | |
| 245 | } // namespace duckdb |
| 246 | |