1 | #include "duckdb/execution/physical_operator.hpp" |
2 | |
3 | #include "duckdb/common/printer.hpp" |
4 | #include "duckdb/common/string_util.hpp" |
5 | #include "duckdb/common/tree_renderer.hpp" |
6 | #include "duckdb/execution/execution_context.hpp" |
7 | #include "duckdb/execution/operator/set/physical_recursive_cte.hpp" |
8 | #include "duckdb/main/client_context.hpp" |
9 | #include "duckdb/parallel/meta_pipeline.hpp" |
10 | #include "duckdb/parallel/pipeline.hpp" |
11 | #include "duckdb/parallel/thread_context.hpp" |
12 | #include "duckdb/storage/buffer_manager.hpp" |
13 | |
14 | namespace duckdb { |
15 | |
16 | string PhysicalOperator::GetName() const { |
17 | return PhysicalOperatorToString(type); |
18 | } |
19 | |
20 | string PhysicalOperator::ToString() const { |
21 | TreeRenderer renderer; |
22 | return renderer.ToString(op: *this); |
23 | } |
24 | |
25 | // LCOV_EXCL_START |
26 | void PhysicalOperator::Print() const { |
27 | Printer::Print(str: ToString()); |
28 | } |
29 | // LCOV_EXCL_STOP |
30 | |
31 | vector<const_reference<PhysicalOperator>> PhysicalOperator::GetChildren() const { |
32 | vector<const_reference<PhysicalOperator>> result; |
33 | for (auto &child : children) { |
34 | result.push_back(x: *child); |
35 | } |
36 | return result; |
37 | } |
38 | |
39 | //===--------------------------------------------------------------------===// |
40 | // Operator |
41 | //===--------------------------------------------------------------------===// |
42 | // LCOV_EXCL_START |
43 | unique_ptr<OperatorState> PhysicalOperator::GetOperatorState(ExecutionContext &context) const { |
44 | return make_uniq<OperatorState>(); |
45 | } |
46 | |
47 | unique_ptr<GlobalOperatorState> PhysicalOperator::GetGlobalOperatorState(ClientContext &context) const { |
48 | return make_uniq<GlobalOperatorState>(); |
49 | } |
50 | |
51 | OperatorResultType PhysicalOperator::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
52 | GlobalOperatorState &gstate, OperatorState &state) const { |
53 | throw InternalException("Calling Execute on a node that is not an operator!" ); |
54 | } |
55 | |
56 | OperatorFinalizeResultType PhysicalOperator::FinalExecute(ExecutionContext &context, DataChunk &chunk, |
57 | GlobalOperatorState &gstate, OperatorState &state) const { |
58 | throw InternalException("Calling FinalExecute on a node that is not an operator!" ); |
59 | } |
60 | // LCOV_EXCL_STOP |
61 | |
62 | //===--------------------------------------------------------------------===// |
63 | // Source |
64 | //===--------------------------------------------------------------------===// |
65 | unique_ptr<LocalSourceState> PhysicalOperator::GetLocalSourceState(ExecutionContext &context, |
66 | GlobalSourceState &gstate) const { |
67 | return make_uniq<LocalSourceState>(); |
68 | } |
69 | |
70 | unique_ptr<GlobalSourceState> PhysicalOperator::GetGlobalSourceState(ClientContext &context) const { |
71 | return make_uniq<GlobalSourceState>(); |
72 | } |
73 | |
74 | // LCOV_EXCL_START |
75 | SourceResultType PhysicalOperator::GetData(ExecutionContext &context, DataChunk &chunk, |
76 | OperatorSourceInput &input) const { |
77 | throw InternalException("Calling GetData on a node that is not a source!" ); |
78 | } |
79 | |
80 | idx_t PhysicalOperator::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate, |
81 | LocalSourceState &lstate) const { |
82 | throw InternalException("Calling GetBatchIndex on a node that does not support it" ); |
83 | } |
84 | |
85 | double PhysicalOperator::GetProgress(ClientContext &context, GlobalSourceState &gstate) const { |
86 | return -1; |
87 | } |
88 | // LCOV_EXCL_STOP |
89 | |
90 | //===--------------------------------------------------------------------===// |
91 | // Sink |
92 | //===--------------------------------------------------------------------===// |
93 | // LCOV_EXCL_START |
94 | SinkResultType PhysicalOperator::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
95 | throw InternalException("Calling Sink on a node that is not a sink!" ); |
96 | } |
97 | |
98 | // LCOV_EXCL_STOP |
99 | |
100 | void PhysicalOperator::Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const { |
101 | } |
102 | |
103 | SinkFinalizeType PhysicalOperator::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
104 | GlobalSinkState &gstate) const { |
105 | return SinkFinalizeType::READY; |
106 | } |
107 | |
108 | void PhysicalOperator::NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const { |
109 | } |
110 | |
111 | unique_ptr<LocalSinkState> PhysicalOperator::GetLocalSinkState(ExecutionContext &context) const { |
112 | return make_uniq<LocalSinkState>(); |
113 | } |
114 | |
115 | unique_ptr<GlobalSinkState> PhysicalOperator::GetGlobalSinkState(ClientContext &context) const { |
116 | return make_uniq<GlobalSinkState>(); |
117 | } |
118 | |
119 | idx_t PhysicalOperator::GetMaxThreadMemory(ClientContext &context) { |
120 | // Memory usage per thread should scale with max mem / num threads |
121 | // We take 1/4th of this, to be conservative |
122 | idx_t max_memory = BufferManager::GetBufferManager(context).GetMaxMemory(); |
123 | idx_t num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); |
124 | return (max_memory / num_threads) / 4; |
125 | } |
126 | |
127 | //===--------------------------------------------------------------------===// |
128 | // Pipeline Construction |
129 | //===--------------------------------------------------------------------===// |
130 | void PhysicalOperator::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
131 | op_state.reset(); |
132 | |
133 | auto &state = meta_pipeline.GetState(); |
134 | if (IsSink()) { |
135 | // operator is a sink, build a pipeline |
136 | sink_state.reset(); |
137 | D_ASSERT(children.size() == 1); |
138 | |
139 | // single operator: the operator becomes the data source of the current pipeline |
140 | state.SetPipelineSource(pipeline&: current, op&: *this); |
141 | |
142 | // we create a new pipeline starting from the child |
143 | auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this); |
144 | child_meta_pipeline.Build(op&: *children[0]); |
145 | } else { |
146 | // operator is not a sink! recurse in children |
147 | if (children.empty()) { |
148 | // source |
149 | state.SetPipelineSource(pipeline&: current, op&: *this); |
150 | } else { |
151 | if (children.size() != 1) { |
152 | throw InternalException("Operator not supported in BuildPipelines" ); |
153 | } |
154 | state.AddPipelineOperator(pipeline&: current, op&: *this); |
155 | children[0]->BuildPipelines(current, meta_pipeline); |
156 | } |
157 | } |
158 | } |
159 | |
160 | vector<const_reference<PhysicalOperator>> PhysicalOperator::GetSources() const { |
161 | vector<const_reference<PhysicalOperator>> result; |
162 | if (IsSink()) { |
163 | D_ASSERT(children.size() == 1); |
164 | result.push_back(x: *this); |
165 | return result; |
166 | } else { |
167 | if (children.empty()) { |
168 | // source |
169 | result.push_back(x: *this); |
170 | return result; |
171 | } else { |
172 | if (children.size() != 1) { |
173 | throw InternalException("Operator not supported in GetSource" ); |
174 | } |
175 | return children[0]->GetSources(); |
176 | } |
177 | } |
178 | } |
179 | |
180 | bool PhysicalOperator::AllSourcesSupportBatchIndex() const { |
181 | auto sources = GetSources(); |
182 | for (auto &source : sources) { |
183 | if (!source.get().SupportsBatchIndex()) { |
184 | return false; |
185 | } |
186 | } |
187 | return true; |
188 | } |
189 | |
190 | void PhysicalOperator::Verify() { |
191 | #ifdef DEBUG |
192 | auto sources = GetSources(); |
193 | D_ASSERT(!sources.empty()); |
194 | for (auto &child : children) { |
195 | child->Verify(); |
196 | } |
197 | #endif |
198 | } |
199 | |
200 | bool CachingPhysicalOperator::CanCacheType(const LogicalType &type) { |
201 | switch (type.id()) { |
202 | case LogicalTypeId::LIST: |
203 | case LogicalTypeId::MAP: |
204 | return false; |
205 | case LogicalTypeId::STRUCT: { |
206 | auto &entries = StructType::GetChildTypes(type); |
207 | for (auto &entry : entries) { |
208 | if (!CanCacheType(type: entry.second)) { |
209 | return false; |
210 | } |
211 | } |
212 | return true; |
213 | } |
214 | default: |
215 | return true; |
216 | } |
217 | } |
218 | |
219 | CachingPhysicalOperator::CachingPhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types_p, |
220 | idx_t estimated_cardinality) |
221 | : PhysicalOperator(type, std::move(types_p), estimated_cardinality) { |
222 | |
223 | caching_supported = true; |
224 | for (auto &col_type : types) { |
225 | if (!CanCacheType(type: col_type)) { |
226 | caching_supported = false; |
227 | break; |
228 | } |
229 | } |
230 | } |
231 | |
232 | OperatorResultType CachingPhysicalOperator::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
233 | GlobalOperatorState &gstate, OperatorState &state_p) const { |
234 | auto &state = state_p.Cast<CachingOperatorState>(); |
235 | |
236 | // Execute child operator |
237 | auto child_result = ExecuteInternal(context, input, chunk, gstate, state); |
238 | |
239 | #if STANDARD_VECTOR_SIZE >= 128 |
240 | if (!state.initialized) { |
241 | state.initialized = true; |
242 | state.can_cache_chunk = true; |
243 | |
244 | if (!context.client.config.enable_caching_operators) { |
245 | state.can_cache_chunk = false; |
246 | } else if (!context.pipeline || !caching_supported) { |
247 | state.can_cache_chunk = false; |
248 | } else if (!context.pipeline->GetSink()) { |
249 | // Disabling for pipelines without Sink, i.e. when pulling |
250 | state.can_cache_chunk = false; |
251 | } else if (context.pipeline->GetSink()->RequiresBatchIndex()) { |
252 | state.can_cache_chunk = false; |
253 | } else if (context.pipeline->IsOrderDependent()) { |
254 | state.can_cache_chunk = false; |
255 | } |
256 | } |
257 | if (!state.can_cache_chunk) { |
258 | return child_result; |
259 | } |
260 | // TODO chunk size of 0 should not result in a cache being created! |
261 | if (chunk.size() < CACHE_THRESHOLD) { |
262 | // we have filtered out a significant amount of tuples |
263 | // add this chunk to the cache and continue |
264 | |
265 | if (!state.cached_chunk) { |
266 | state.cached_chunk = make_uniq<DataChunk>(); |
267 | state.cached_chunk->Initialize(allocator&: Allocator::Get(context&: context.client), types: chunk.GetTypes()); |
268 | } |
269 | |
270 | state.cached_chunk->Append(other: chunk); |
271 | |
272 | if (state.cached_chunk->size() >= (STANDARD_VECTOR_SIZE - CACHE_THRESHOLD) || |
273 | child_result == OperatorResultType::FINISHED) { |
274 | // chunk cache full: return it |
275 | chunk.Move(chunk&: *state.cached_chunk); |
276 | state.cached_chunk->Initialize(allocator&: Allocator::Get(context&: context.client), types: chunk.GetTypes()); |
277 | return child_result; |
278 | } else { |
279 | // chunk cache not full return empty result |
280 | chunk.Reset(); |
281 | } |
282 | } |
283 | #endif |
284 | |
285 | return child_result; |
286 | } |
287 | |
288 | OperatorFinalizeResultType CachingPhysicalOperator::FinalExecute(ExecutionContext &context, DataChunk &chunk, |
289 | GlobalOperatorState &gstate, |
290 | OperatorState &state_p) const { |
291 | auto &state = state_p.Cast<CachingOperatorState>(); |
292 | if (state.cached_chunk) { |
293 | chunk.Move(chunk&: *state.cached_chunk); |
294 | state.cached_chunk.reset(); |
295 | } else { |
296 | chunk.SetCardinality(0); |
297 | } |
298 | return OperatorFinalizeResultType::FINISHED; |
299 | } |
300 | |
301 | } // namespace duckdb |
302 | |