1#include "duckdb/execution/physical_operator.hpp"
2
3#include "duckdb/common/printer.hpp"
4#include "duckdb/common/string_util.hpp"
5#include "duckdb/common/tree_renderer.hpp"
6#include "duckdb/execution/execution_context.hpp"
7#include "duckdb/execution/operator/set/physical_recursive_cte.hpp"
8#include "duckdb/main/client_context.hpp"
9#include "duckdb/parallel/meta_pipeline.hpp"
10#include "duckdb/parallel/pipeline.hpp"
11#include "duckdb/parallel/thread_context.hpp"
12#include "duckdb/storage/buffer_manager.hpp"
13
14namespace duckdb {
15
16string PhysicalOperator::GetName() const {
17 return PhysicalOperatorToString(type);
18}
19
20string PhysicalOperator::ToString() const {
21 TreeRenderer renderer;
22 return renderer.ToString(op: *this);
23}
24
25// LCOV_EXCL_START
26void PhysicalOperator::Print() const {
27 Printer::Print(str: ToString());
28}
29// LCOV_EXCL_STOP
30
31vector<const_reference<PhysicalOperator>> PhysicalOperator::GetChildren() const {
32 vector<const_reference<PhysicalOperator>> result;
33 for (auto &child : children) {
34 result.push_back(x: *child);
35 }
36 return result;
37}
38
39//===--------------------------------------------------------------------===//
40// Operator
41//===--------------------------------------------------------------------===//
42// LCOV_EXCL_START
43unique_ptr<OperatorState> PhysicalOperator::GetOperatorState(ExecutionContext &context) const {
44 return make_uniq<OperatorState>();
45}
46
47unique_ptr<GlobalOperatorState> PhysicalOperator::GetGlobalOperatorState(ClientContext &context) const {
48 return make_uniq<GlobalOperatorState>();
49}
50
51OperatorResultType PhysicalOperator::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
52 GlobalOperatorState &gstate, OperatorState &state) const {
53 throw InternalException("Calling Execute on a node that is not an operator!");
54}
55
56OperatorFinalizeResultType PhysicalOperator::FinalExecute(ExecutionContext &context, DataChunk &chunk,
57 GlobalOperatorState &gstate, OperatorState &state) const {
58 throw InternalException("Calling FinalExecute on a node that is not an operator!");
59}
60// LCOV_EXCL_STOP
61
62//===--------------------------------------------------------------------===//
63// Source
64//===--------------------------------------------------------------------===//
65unique_ptr<LocalSourceState> PhysicalOperator::GetLocalSourceState(ExecutionContext &context,
66 GlobalSourceState &gstate) const {
67 return make_uniq<LocalSourceState>();
68}
69
70unique_ptr<GlobalSourceState> PhysicalOperator::GetGlobalSourceState(ClientContext &context) const {
71 return make_uniq<GlobalSourceState>();
72}
73
74// LCOV_EXCL_START
75SourceResultType PhysicalOperator::GetData(ExecutionContext &context, DataChunk &chunk,
76 OperatorSourceInput &input) const {
77 throw InternalException("Calling GetData on a node that is not a source!");
78}
79
80idx_t PhysicalOperator::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate,
81 LocalSourceState &lstate) const {
82 throw InternalException("Calling GetBatchIndex on a node that does not support it");
83}
84
85double PhysicalOperator::GetProgress(ClientContext &context, GlobalSourceState &gstate) const {
86 return -1;
87}
88// LCOV_EXCL_STOP
89
90//===--------------------------------------------------------------------===//
91// Sink
92//===--------------------------------------------------------------------===//
93// LCOV_EXCL_START
94SinkResultType PhysicalOperator::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
95 throw InternalException("Calling Sink on a node that is not a sink!");
96}
97
98// LCOV_EXCL_STOP
99
100void PhysicalOperator::Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const {
101}
102
103SinkFinalizeType PhysicalOperator::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
104 GlobalSinkState &gstate) const {
105 return SinkFinalizeType::READY;
106}
107
108void PhysicalOperator::NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const {
109}
110
111unique_ptr<LocalSinkState> PhysicalOperator::GetLocalSinkState(ExecutionContext &context) const {
112 return make_uniq<LocalSinkState>();
113}
114
115unique_ptr<GlobalSinkState> PhysicalOperator::GetGlobalSinkState(ClientContext &context) const {
116 return make_uniq<GlobalSinkState>();
117}
118
119idx_t PhysicalOperator::GetMaxThreadMemory(ClientContext &context) {
120 // Memory usage per thread should scale with max mem / num threads
121 // We take 1/4th of this, to be conservative
122 idx_t max_memory = BufferManager::GetBufferManager(context).GetMaxMemory();
123 idx_t num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
124 return (max_memory / num_threads) / 4;
125}
126
127//===--------------------------------------------------------------------===//
128// Pipeline Construction
129//===--------------------------------------------------------------------===//
130void PhysicalOperator::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
131 op_state.reset();
132
133 auto &state = meta_pipeline.GetState();
134 if (IsSink()) {
135 // operator is a sink, build a pipeline
136 sink_state.reset();
137 D_ASSERT(children.size() == 1);
138
139 // single operator: the operator becomes the data source of the current pipeline
140 state.SetPipelineSource(pipeline&: current, op&: *this);
141
142 // we create a new pipeline starting from the child
143 auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op&: *this);
144 child_meta_pipeline.Build(op&: *children[0]);
145 } else {
146 // operator is not a sink! recurse in children
147 if (children.empty()) {
148 // source
149 state.SetPipelineSource(pipeline&: current, op&: *this);
150 } else {
151 if (children.size() != 1) {
152 throw InternalException("Operator not supported in BuildPipelines");
153 }
154 state.AddPipelineOperator(pipeline&: current, op&: *this);
155 children[0]->BuildPipelines(current, meta_pipeline);
156 }
157 }
158}
159
160vector<const_reference<PhysicalOperator>> PhysicalOperator::GetSources() const {
161 vector<const_reference<PhysicalOperator>> result;
162 if (IsSink()) {
163 D_ASSERT(children.size() == 1);
164 result.push_back(x: *this);
165 return result;
166 } else {
167 if (children.empty()) {
168 // source
169 result.push_back(x: *this);
170 return result;
171 } else {
172 if (children.size() != 1) {
173 throw InternalException("Operator not supported in GetSource");
174 }
175 return children[0]->GetSources();
176 }
177 }
178}
179
180bool PhysicalOperator::AllSourcesSupportBatchIndex() const {
181 auto sources = GetSources();
182 for (auto &source : sources) {
183 if (!source.get().SupportsBatchIndex()) {
184 return false;
185 }
186 }
187 return true;
188}
189
190void PhysicalOperator::Verify() {
191#ifdef DEBUG
192 auto sources = GetSources();
193 D_ASSERT(!sources.empty());
194 for (auto &child : children) {
195 child->Verify();
196 }
197#endif
198}
199
200bool CachingPhysicalOperator::CanCacheType(const LogicalType &type) {
201 switch (type.id()) {
202 case LogicalTypeId::LIST:
203 case LogicalTypeId::MAP:
204 return false;
205 case LogicalTypeId::STRUCT: {
206 auto &entries = StructType::GetChildTypes(type);
207 for (auto &entry : entries) {
208 if (!CanCacheType(type: entry.second)) {
209 return false;
210 }
211 }
212 return true;
213 }
214 default:
215 return true;
216 }
217}
218
219CachingPhysicalOperator::CachingPhysicalOperator(PhysicalOperatorType type, vector<LogicalType> types_p,
220 idx_t estimated_cardinality)
221 : PhysicalOperator(type, std::move(types_p), estimated_cardinality) {
222
223 caching_supported = true;
224 for (auto &col_type : types) {
225 if (!CanCacheType(type: col_type)) {
226 caching_supported = false;
227 break;
228 }
229 }
230}
231
232OperatorResultType CachingPhysicalOperator::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
233 GlobalOperatorState &gstate, OperatorState &state_p) const {
234 auto &state = state_p.Cast<CachingOperatorState>();
235
236 // Execute child operator
237 auto child_result = ExecuteInternal(context, input, chunk, gstate, state);
238
239#if STANDARD_VECTOR_SIZE >= 128
240 if (!state.initialized) {
241 state.initialized = true;
242 state.can_cache_chunk = true;
243
244 if (!context.client.config.enable_caching_operators) {
245 state.can_cache_chunk = false;
246 } else if (!context.pipeline || !caching_supported) {
247 state.can_cache_chunk = false;
248 } else if (!context.pipeline->GetSink()) {
249 // Disabling for pipelines without Sink, i.e. when pulling
250 state.can_cache_chunk = false;
251 } else if (context.pipeline->GetSink()->RequiresBatchIndex()) {
252 state.can_cache_chunk = false;
253 } else if (context.pipeline->IsOrderDependent()) {
254 state.can_cache_chunk = false;
255 }
256 }
257 if (!state.can_cache_chunk) {
258 return child_result;
259 }
260 // TODO chunk size of 0 should not result in a cache being created!
261 if (chunk.size() < CACHE_THRESHOLD) {
262 // we have filtered out a significant amount of tuples
263 // add this chunk to the cache and continue
264
265 if (!state.cached_chunk) {
266 state.cached_chunk = make_uniq<DataChunk>();
267 state.cached_chunk->Initialize(allocator&: Allocator::Get(context&: context.client), types: chunk.GetTypes());
268 }
269
270 state.cached_chunk->Append(other: chunk);
271
272 if (state.cached_chunk->size() >= (STANDARD_VECTOR_SIZE - CACHE_THRESHOLD) ||
273 child_result == OperatorResultType::FINISHED) {
274 // chunk cache full: return it
275 chunk.Move(chunk&: *state.cached_chunk);
276 state.cached_chunk->Initialize(allocator&: Allocator::Get(context&: context.client), types: chunk.GetTypes());
277 return child_result;
278 } else {
279 // chunk cache not full return empty result
280 chunk.Reset();
281 }
282 }
283#endif
284
285 return child_result;
286}
287
288OperatorFinalizeResultType CachingPhysicalOperator::FinalExecute(ExecutionContext &context, DataChunk &chunk,
289 GlobalOperatorState &gstate,
290 OperatorState &state_p) const {
291 auto &state = state_p.Cast<CachingOperatorState>();
292 if (state.cached_chunk) {
293 chunk.Move(chunk&: *state.cached_chunk);
294 state.cached_chunk.reset();
295 } else {
296 chunk.SetCardinality(0);
297 }
298 return OperatorFinalizeResultType::FINISHED;
299}
300
301} // namespace duckdb
302