1 | #include "duckdb/execution/operator/aggregate/physical_ungrouped_aggregate.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp" |
4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/execution/operator/aggregate/aggregate_object.hpp" |
7 | #include "duckdb/main/client_context.hpp" |
8 | #include "duckdb/parallel/thread_context.hpp" |
9 | #include "duckdb/planner/expression/bound_aggregate_expression.hpp" |
10 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
11 | #include "duckdb/execution/radix_partitioned_hashtable.hpp" |
12 | #include "duckdb/parallel/base_pipeline_event.hpp" |
13 | #include "duckdb/common/unordered_set.hpp" |
14 | #include "duckdb/common/algorithm.hpp" |
15 | #include "duckdb/parallel/interrupt.hpp" |
16 | #include <functional> |
17 | #include "duckdb/execution/operator/aggregate/distinct_aggregate_data.hpp" |
18 | |
19 | namespace duckdb { |
20 | |
21 | PhysicalUngroupedAggregate::PhysicalUngroupedAggregate(vector<LogicalType> types, |
22 | vector<unique_ptr<Expression>> expressions, |
23 | idx_t estimated_cardinality) |
24 | : PhysicalOperator(PhysicalOperatorType::UNGROUPED_AGGREGATE, std::move(types), estimated_cardinality), |
25 | aggregates(std::move(expressions)) { |
26 | |
27 | distinct_collection_info = DistinctAggregateCollectionInfo::Create(aggregates); |
28 | if (!distinct_collection_info) { |
29 | return; |
30 | } |
31 | distinct_data = make_uniq<DistinctAggregateData>(args&: *distinct_collection_info); |
32 | } |
33 | |
34 | //===--------------------------------------------------------------------===// |
35 | // Sink |
36 | //===--------------------------------------------------------------------===// |
37 | struct AggregateState { |
38 | explicit AggregateState(const vector<unique_ptr<Expression>> &aggregate_expressions) { |
39 | for (auto &aggregate : aggregate_expressions) { |
40 | D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE); |
41 | auto &aggr = aggregate->Cast<BoundAggregateExpression>(); |
42 | auto state = make_unsafe_uniq_array<data_t>(n: aggr.function.state_size()); |
43 | aggr.function.initialize(state.get()); |
44 | aggregates.push_back(x: std::move(state)); |
45 | bind_data.push_back(x: aggr.bind_info.get()); |
46 | destructors.push_back(x: aggr.function.destructor); |
47 | #ifdef DEBUG |
48 | counts.push_back(0); |
49 | #endif |
50 | } |
51 | } |
52 | ~AggregateState() { |
53 | D_ASSERT(destructors.size() == aggregates.size()); |
54 | for (idx_t i = 0; i < destructors.size(); i++) { |
55 | if (!destructors[i]) { |
56 | continue; |
57 | } |
58 | Vector state_vector(Value::POINTER(value: CastPointerToValue(src: aggregates[i].get()))); |
59 | state_vector.SetVectorType(VectorType::FLAT_VECTOR); |
60 | |
61 | AggregateInputData aggr_input_data(bind_data[i], Allocator::DefaultAllocator()); |
62 | destructors[i](state_vector, aggr_input_data, 1); |
63 | } |
64 | } |
65 | |
66 | void Move(AggregateState &other) { |
67 | other.aggregates = std::move(aggregates); |
68 | other.destructors = std::move(destructors); |
69 | } |
70 | |
71 | //! The aggregate values |
72 | vector<unsafe_unique_array<data_t>> aggregates; |
73 | //! The bind data |
74 | vector<FunctionData *> bind_data; |
75 | //! The destructors |
76 | vector<aggregate_destructor_t> destructors; |
77 | //! Counts (used for verification) |
78 | vector<idx_t> counts; |
79 | }; |
80 | |
81 | class UngroupedAggregateGlobalState : public GlobalSinkState { |
82 | public: |
83 | UngroupedAggregateGlobalState(const PhysicalUngroupedAggregate &op, ClientContext &client) |
84 | : state(op.aggregates), finished(false) { |
85 | if (op.distinct_data) { |
86 | distinct_state = make_uniq<DistinctAggregateState>(args&: *op.distinct_data, args&: client); |
87 | } |
88 | } |
89 | |
90 | //! The lock for updating the global aggregate state |
91 | mutex lock; |
92 | //! The global aggregate state |
93 | AggregateState state; |
94 | //! Whether or not the aggregate is finished |
95 | bool finished; |
96 | //! The data related to the distinct aggregates (if there are any) |
97 | unique_ptr<DistinctAggregateState> distinct_state; |
98 | }; |
99 | |
100 | class UngroupedAggregateLocalState : public LocalSinkState { |
101 | public: |
102 | UngroupedAggregateLocalState(const PhysicalUngroupedAggregate &op, const vector<LogicalType> &child_types, |
103 | GlobalSinkState &gstate_p, ExecutionContext &context) |
104 | : state(op.aggregates), child_executor(context.client), aggregate_input_chunk(), filter_set() { |
105 | auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>(); |
106 | |
107 | auto &allocator = Allocator::Get(context&: context.client); |
108 | InitializeDistinctAggregates(op, gstate, context); |
109 | |
110 | vector<LogicalType> payload_types; |
111 | vector<AggregateObject> aggregate_objects; |
112 | for (auto &aggregate : op.aggregates) { |
113 | D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE); |
114 | auto &aggr = aggregate->Cast<BoundAggregateExpression>(); |
115 | // initialize the payload chunk |
116 | for (auto &child : aggr.children) { |
117 | payload_types.push_back(x: child->return_type); |
118 | child_executor.AddExpression(expr: *child); |
119 | } |
120 | aggregate_objects.emplace_back(args: &aggr); |
121 | } |
122 | if (!payload_types.empty()) { // for select count(*) from t; there is no payload at all |
123 | aggregate_input_chunk.Initialize(allocator, types: payload_types); |
124 | } |
125 | filter_set.Initialize(context&: context.client, aggregates: aggregate_objects, payload_types: child_types); |
126 | } |
127 | |
128 | //! The local aggregate state |
129 | AggregateState state; |
130 | //! The executor |
131 | ExpressionExecutor child_executor; |
132 | //! The payload chunk, containing all the Vectors for the aggregates |
133 | DataChunk aggregate_input_chunk; |
134 | //! Aggregate filter data set |
135 | AggregateFilterDataSet filter_set; |
136 | //! The local sink states of the distinct aggregates hash tables |
137 | vector<unique_ptr<LocalSinkState>> radix_states; |
138 | |
139 | public: |
140 | void Reset() { |
141 | aggregate_input_chunk.Reset(); |
142 | } |
143 | void InitializeDistinctAggregates(const PhysicalUngroupedAggregate &op, const UngroupedAggregateGlobalState &gstate, |
144 | ExecutionContext &context) { |
145 | |
146 | if (!op.distinct_data) { |
147 | return; |
148 | } |
149 | auto &data = *op.distinct_data; |
150 | auto &state = *gstate.distinct_state; |
151 | D_ASSERT(!data.radix_tables.empty()); |
152 | |
153 | const idx_t aggregate_count = state.radix_states.size(); |
154 | radix_states.resize(new_size: aggregate_count); |
155 | |
156 | auto &distinct_info = *op.distinct_collection_info; |
157 | |
158 | for (auto &idx : distinct_info.indices) { |
159 | idx_t table_idx = distinct_info.table_map[idx]; |
160 | if (data.radix_tables[table_idx] == nullptr) { |
161 | // This aggregate has identical input as another aggregate, so no table is created for it |
162 | continue; |
163 | } |
164 | auto &radix_table = *data.radix_tables[table_idx]; |
165 | radix_states[table_idx] = radix_table.GetLocalSinkState(context); |
166 | } |
167 | } |
168 | }; |
169 | |
170 | bool PhysicalUngroupedAggregate::SinkOrderDependent() const { |
171 | for (auto &expr : aggregates) { |
172 | auto &aggr = expr->Cast<BoundAggregateExpression>(); |
173 | if (aggr.function.order_dependent == AggregateOrderDependent::ORDER_DEPENDENT) { |
174 | return true; |
175 | } |
176 | } |
177 | return false; |
178 | } |
179 | |
180 | unique_ptr<GlobalSinkState> PhysicalUngroupedAggregate::GetGlobalSinkState(ClientContext &context) const { |
181 | return make_uniq<UngroupedAggregateGlobalState>(args: *this, args&: context); |
182 | } |
183 | |
184 | unique_ptr<LocalSinkState> PhysicalUngroupedAggregate::GetLocalSinkState(ExecutionContext &context) const { |
185 | D_ASSERT(sink_state); |
186 | auto &gstate = *sink_state; |
187 | return make_uniq<UngroupedAggregateLocalState>(args: *this, args: children[0]->GetTypes(), args&: gstate, args&: context); |
188 | } |
189 | |
190 | void PhysicalUngroupedAggregate::SinkDistinct(ExecutionContext &context, DataChunk &chunk, |
191 | OperatorSinkInput &input) const { |
192 | auto &sink = input.local_state.Cast<UngroupedAggregateLocalState>(); |
193 | auto &global_sink = input.global_state.Cast<UngroupedAggregateGlobalState>(); |
194 | D_ASSERT(distinct_data); |
195 | auto &distinct_state = *global_sink.distinct_state; |
196 | auto &distinct_info = *distinct_collection_info; |
197 | auto &distinct_indices = distinct_info.Indices(); |
198 | |
199 | DataChunk empty_chunk; |
200 | |
201 | auto &distinct_filter = distinct_info.Indices(); |
202 | |
203 | for (auto &idx : distinct_indices) { |
204 | auto &aggregate = aggregates[idx]->Cast<BoundAggregateExpression>(); |
205 | |
206 | idx_t table_idx = distinct_info.table_map[idx]; |
207 | if (!distinct_data->radix_tables[table_idx]) { |
208 | // This distinct aggregate shares its data with another |
209 | continue; |
210 | } |
211 | D_ASSERT(distinct_data->radix_tables[table_idx]); |
212 | auto &radix_table = *distinct_data->radix_tables[table_idx]; |
213 | auto &radix_global_sink = *distinct_state.radix_states[table_idx]; |
214 | auto &radix_local_sink = *sink.radix_states[table_idx]; |
215 | OperatorSinkInput sink_input {.global_state: radix_global_sink, .local_state: radix_local_sink, .interrupt_state: input.interrupt_state}; |
216 | |
217 | if (aggregate.filter) { |
218 | // The hashtable can apply a filter, but only on the payload |
219 | // And in our case, we need to filter the groups (the distinct aggr children) |
220 | |
221 | // Apply the filter before inserting into the hashtable |
222 | auto &filtered_data = sink.filter_set.GetFilterData(aggr_idx: idx); |
223 | idx_t count = filtered_data.ApplyFilter(payload&: chunk); |
224 | filtered_data.filtered_payload.SetCardinality(count); |
225 | |
226 | radix_table.Sink(context, chunk&: filtered_data.filtered_payload, input&: sink_input, aggregate_input_chunk&: empty_chunk, filter: distinct_filter); |
227 | } else { |
228 | radix_table.Sink(context, chunk, input&: sink_input, aggregate_input_chunk&: empty_chunk, filter: distinct_filter); |
229 | } |
230 | } |
231 | } |
232 | |
233 | SinkResultType PhysicalUngroupedAggregate::Sink(ExecutionContext &context, DataChunk &chunk, |
234 | OperatorSinkInput &input) const { |
235 | auto &sink = input.local_state.Cast<UngroupedAggregateLocalState>(); |
236 | |
237 | // perform the aggregation inside the local state |
238 | sink.Reset(); |
239 | |
240 | if (distinct_data) { |
241 | SinkDistinct(context, chunk, input); |
242 | } |
243 | |
244 | DataChunk &payload_chunk = sink.aggregate_input_chunk; |
245 | |
246 | idx_t payload_idx = 0; |
247 | idx_t next_payload_idx = 0; |
248 | |
249 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
250 | auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
251 | |
252 | payload_idx = next_payload_idx; |
253 | next_payload_idx = payload_idx + aggregate.children.size(); |
254 | |
255 | if (aggregate.IsDistinct()) { |
256 | continue; |
257 | } |
258 | |
259 | idx_t payload_cnt = 0; |
260 | // resolve the filter (if any) |
261 | if (aggregate.filter) { |
262 | auto &filtered_data = sink.filter_set.GetFilterData(aggr_idx); |
263 | auto count = filtered_data.ApplyFilter(payload&: chunk); |
264 | |
265 | sink.child_executor.SetChunk(filtered_data.filtered_payload); |
266 | payload_chunk.SetCardinality(count); |
267 | } else { |
268 | sink.child_executor.SetChunk(chunk); |
269 | payload_chunk.SetCardinality(chunk); |
270 | } |
271 | |
272 | #ifdef DEBUG |
273 | sink.state.counts[aggr_idx] += payload_chunk.size(); |
274 | #endif |
275 | |
276 | // resolve the child expressions of the aggregate (if any) |
277 | for (idx_t i = 0; i < aggregate.children.size(); ++i) { |
278 | sink.child_executor.ExecuteExpression(expr_idx: payload_idx + payload_cnt, |
279 | result&: payload_chunk.data[payload_idx + payload_cnt]); |
280 | payload_cnt++; |
281 | } |
282 | |
283 | auto start_of_input = payload_cnt == 0 ? nullptr : &payload_chunk.data[payload_idx]; |
284 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
285 | aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt, |
286 | sink.state.aggregates[aggr_idx].get(), payload_chunk.size()); |
287 | } |
288 | return SinkResultType::NEED_MORE_INPUT; |
289 | } |
290 | |
291 | //===--------------------------------------------------------------------===// |
292 | // Finalize |
293 | //===--------------------------------------------------------------------===// |
294 | |
295 | void PhysicalUngroupedAggregate::CombineDistinct(ExecutionContext &context, GlobalSinkState &state, |
296 | LocalSinkState &lstate) const { |
297 | auto &global_sink = state.Cast<UngroupedAggregateGlobalState>(); |
298 | auto &source = lstate.Cast<UngroupedAggregateLocalState>(); |
299 | |
300 | if (!distinct_data) { |
301 | return; |
302 | } |
303 | auto &distinct_state = global_sink.distinct_state; |
304 | auto table_count = distinct_data->radix_tables.size(); |
305 | for (idx_t table_idx = 0; table_idx < table_count; table_idx++) { |
306 | D_ASSERT(distinct_data->radix_tables[table_idx]); |
307 | auto &radix_table = *distinct_data->radix_tables[table_idx]; |
308 | auto &radix_global_sink = *distinct_state->radix_states[table_idx]; |
309 | auto &radix_local_sink = *source.radix_states[table_idx]; |
310 | |
311 | radix_table.Combine(context, state&: radix_global_sink, lstate&: radix_local_sink); |
312 | } |
313 | } |
314 | |
315 | void PhysicalUngroupedAggregate::Combine(ExecutionContext &context, GlobalSinkState &state, |
316 | LocalSinkState &lstate) const { |
317 | auto &gstate = state.Cast<UngroupedAggregateGlobalState>(); |
318 | auto &source = lstate.Cast<UngroupedAggregateLocalState>(); |
319 | D_ASSERT(!gstate.finished); |
320 | |
321 | // finalize: combine the local state into the global state |
322 | // all aggregates are combinable: we might be doing a parallel aggregate |
323 | // use the combine method to combine the partial aggregates |
324 | lock_guard<mutex> glock(gstate.lock); |
325 | |
326 | CombineDistinct(context, state, lstate); |
327 | |
328 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
329 | auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
330 | |
331 | if (aggregate.IsDistinct()) { |
332 | continue; |
333 | } |
334 | |
335 | Vector source_state(Value::POINTER(value: CastPointerToValue(src: source.state.aggregates[aggr_idx].get()))); |
336 | Vector dest_state(Value::POINTER(value: CastPointerToValue(src: gstate.state.aggregates[aggr_idx].get()))); |
337 | |
338 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
339 | aggregate.function.combine(source_state, dest_state, aggr_input_data, 1); |
340 | #ifdef DEBUG |
341 | gstate.state.counts[aggr_idx] += source.state.counts[aggr_idx]; |
342 | #endif |
343 | } |
344 | |
345 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
346 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: source.child_executor, name: "child_executor" , id: 0); |
347 | client_profiler.Flush(profiler&: context.thread.profiler); |
348 | } |
349 | |
350 | class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask { |
351 | public: |
352 | UngroupedDistinctAggregateFinalizeTask(Executor &executor, shared_ptr<Event> event_p, |
353 | UngroupedAggregateGlobalState &state_p, ClientContext &context, |
354 | const PhysicalUngroupedAggregate &op) |
355 | : ExecutorTask(executor), event(std::move(event_p)), gstate(state_p), context(context), op(op) { |
356 | } |
357 | |
358 | void AggregateDistinct() { |
359 | D_ASSERT(gstate.distinct_state); |
360 | auto &aggregates = op.aggregates; |
361 | auto &distinct_state = *gstate.distinct_state; |
362 | auto &distinct_data = *op.distinct_data; |
363 | |
364 | ThreadContext temp_thread_context(context); |
365 | ExecutionContext temp_exec_context(context, temp_thread_context, nullptr); |
366 | |
367 | idx_t payload_idx = 0; |
368 | idx_t next_payload_idx = 0; |
369 | |
370 | for (idx_t i = 0; i < aggregates.size(); i++) { |
371 | auto &aggregate = aggregates[i]->Cast<BoundAggregateExpression>(); |
372 | |
373 | // Forward the payload idx |
374 | payload_idx = next_payload_idx; |
375 | next_payload_idx = payload_idx + aggregate.children.size(); |
376 | |
377 | // If aggregate is not distinct, skip it |
378 | if (!distinct_data.IsDistinct(index: i)) { |
379 | continue; |
380 | } |
381 | |
382 | DataChunk payload_chunk; |
383 | |
384 | D_ASSERT(distinct_data.info.table_map.count(i)); |
385 | auto table_idx = distinct_data.info.table_map.at(k: i); |
386 | auto &radix_table_p = distinct_data.radix_tables[table_idx]; |
387 | auto &output_chunk = *distinct_state.distinct_output_chunks[table_idx]; |
388 | auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx]; |
389 | |
390 | payload_chunk.InitializeEmpty(types: grouped_aggregate_data.group_types); |
391 | payload_chunk.SetCardinality(0); |
392 | |
393 | //! Create global and local state for the hashtable |
394 | auto global_source_state = radix_table_p->GetGlobalSourceState(context); |
395 | auto local_source_state = radix_table_p->GetLocalSourceState(context&: temp_exec_context); |
396 | |
397 | //! Retrieve the stored data from the hashtable |
398 | while (true) { |
399 | output_chunk.Reset(); |
400 | |
401 | InterruptState interrupt_state; |
402 | OperatorSourceInput source_input {.global_state: *global_source_state, .local_state: *local_source_state, .interrupt_state: interrupt_state}; |
403 | auto res = radix_table_p->GetData(context&: temp_exec_context, chunk&: output_chunk, |
404 | sink_state&: *distinct_state.radix_states[table_idx], input&: source_input); |
405 | if (res == SourceResultType::FINISHED) { |
406 | D_ASSERT(output_chunk.size() == 0); |
407 | break; |
408 | } else if (res == SourceResultType::BLOCKED) { |
409 | throw InternalException( |
410 | "Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask" ); |
411 | } |
412 | |
413 | // We dont need to resolve the filter, we already did this in Sink |
414 | idx_t payload_cnt = aggregate.children.size(); |
415 | for (idx_t i = 0; i < payload_cnt; i++) { |
416 | payload_chunk.data[i].Reference(other&: output_chunk.data[i]); |
417 | } |
418 | payload_chunk.SetCardinality(output_chunk); |
419 | #ifdef DEBUG |
420 | gstate.state.counts[i] += payload_chunk.size(); |
421 | #endif |
422 | |
423 | auto start_of_input = payload_cnt ? &payload_chunk.data[0] : nullptr; |
424 | //! Update the aggregate state |
425 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
426 | aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt, |
427 | gstate.state.aggregates[i].get(), payload_chunk.size()); |
428 | } |
429 | } |
430 | D_ASSERT(!gstate.finished); |
431 | gstate.finished = true; |
432 | } |
433 | |
434 | TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { |
435 | AggregateDistinct(); |
436 | event->FinishTask(); |
437 | return TaskExecutionResult::TASK_FINISHED; |
438 | } |
439 | |
440 | private: |
441 | shared_ptr<Event> event; |
442 | UngroupedAggregateGlobalState &gstate; |
443 | ClientContext &context; |
444 | const PhysicalUngroupedAggregate &op; |
445 | }; |
446 | |
447 | // TODO: Create tasks and run these in parallel instead of doing this all in Schedule, single threaded |
448 | class UngroupedDistinctAggregateFinalizeEvent : public BasePipelineEvent { |
449 | public: |
450 | UngroupedDistinctAggregateFinalizeEvent(const PhysicalUngroupedAggregate &op_p, |
451 | UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p, |
452 | ClientContext &context) |
453 | : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), context(context) { |
454 | } |
455 | const PhysicalUngroupedAggregate &op; |
456 | UngroupedAggregateGlobalState &gstate; |
457 | ClientContext &context; |
458 | |
459 | public: |
460 | void Schedule() override { |
461 | vector<shared_ptr<Task>> tasks; |
462 | tasks.push_back(x: make_uniq<UngroupedDistinctAggregateFinalizeTask>(args&: pipeline->executor, args: shared_from_this(), |
463 | args&: gstate, args&: context, args: op)); |
464 | D_ASSERT(!tasks.empty()); |
465 | SetTasks(std::move(tasks)); |
466 | } |
467 | }; |
468 | |
469 | class UngroupedDistinctCombineFinalizeEvent : public BasePipelineEvent { |
470 | public: |
471 | UngroupedDistinctCombineFinalizeEvent(const PhysicalUngroupedAggregate &op_p, |
472 | UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p, |
473 | ClientContext &client) |
474 | : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), client(client) { |
475 | } |
476 | |
477 | const PhysicalUngroupedAggregate &op; |
478 | UngroupedAggregateGlobalState &gstate; |
479 | ClientContext &client; |
480 | |
481 | public: |
482 | void Schedule() override { |
483 | auto &distinct_state = *gstate.distinct_state; |
484 | auto &distinct_data = *op.distinct_data; |
485 | vector<shared_ptr<Task>> tasks; |
486 | for (idx_t table_idx = 0; table_idx < distinct_data.radix_tables.size(); table_idx++) { |
487 | distinct_data.radix_tables[table_idx]->ScheduleTasks(executor&: pipeline->executor, event: shared_from_this(), |
488 | state&: *distinct_state.radix_states[table_idx], tasks); |
489 | } |
490 | D_ASSERT(!tasks.empty()); |
491 | SetTasks(std::move(tasks)); |
492 | } |
493 | |
494 | void FinishEvent() override { |
495 | //! Now that all tables are combined, it's time to do the distinct aggregations |
496 | auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(args: op, args&: gstate, args&: *pipeline, args&: client); |
497 | this->InsertEvent(replacement_event: std::move(new_event)); |
498 | } |
499 | }; |
500 | |
501 | SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context, |
502 | GlobalSinkState &gstate_p) const { |
503 | auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>(); |
504 | D_ASSERT(distinct_data); |
505 | auto &distinct_state = *gstate.distinct_state; |
506 | |
507 | bool any_partitioned = false; |
508 | for (idx_t table_idx = 0; table_idx < distinct_data->radix_tables.size(); table_idx++) { |
509 | auto &radix_table_p = distinct_data->radix_tables[table_idx]; |
510 | auto &radix_state = *distinct_state.radix_states[table_idx]; |
511 | bool partitioned = radix_table_p->Finalize(context, gstate_p&: radix_state); |
512 | if (partitioned) { |
513 | any_partitioned = true; |
514 | } |
515 | } |
516 | if (any_partitioned) { |
517 | auto new_event = make_shared<UngroupedDistinctCombineFinalizeEvent>(args: *this, args&: gstate, args&: pipeline, args&: context); |
518 | event.InsertEvent(replacement_event: std::move(new_event)); |
519 | } else { |
520 | //! Hashtables aren't partitioned, they dont need to be joined first |
521 | //! So we can compute the aggregate already |
522 | auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(args: *this, args&: gstate, args&: pipeline, args&: context); |
523 | event.InsertEvent(replacement_event: std::move(new_event)); |
524 | } |
525 | return SinkFinalizeType::READY; |
526 | } |
527 | |
528 | SinkFinalizeType PhysicalUngroupedAggregate::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
529 | GlobalSinkState &gstate_p) const { |
530 | auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>(); |
531 | |
532 | if (distinct_data) { |
533 | return FinalizeDistinct(pipeline, event, context, gstate_p); |
534 | } |
535 | |
536 | D_ASSERT(!gstate.finished); |
537 | gstate.finished = true; |
538 | return SinkFinalizeType::READY; |
539 | } |
540 | |
541 | //===--------------------------------------------------------------------===// |
542 | // Source |
543 | //===--------------------------------------------------------------------===// |
544 | void VerifyNullHandling(DataChunk &chunk, AggregateState &state, const vector<unique_ptr<Expression>> &aggregates) { |
545 | #ifdef DEBUG |
546 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
547 | auto &aggr = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
548 | if (state.counts[aggr_idx] == 0 && aggr.function.null_handling == FunctionNullHandling::DEFAULT_NULL_HANDLING) { |
549 | // Default is when 0 values go in, NULL comes out |
550 | UnifiedVectorFormat vdata; |
551 | chunk.data[aggr_idx].ToUnifiedFormat(1, vdata); |
552 | D_ASSERT(!vdata.validity.RowIsValid(vdata.sel->get_index(0))); |
553 | } |
554 | } |
555 | #endif |
556 | } |
557 | |
558 | SourceResultType PhysicalUngroupedAggregate::GetData(ExecutionContext &context, DataChunk &chunk, |
559 | OperatorSourceInput &input) const { |
560 | auto &gstate = sink_state->Cast<UngroupedAggregateGlobalState>(); |
561 | D_ASSERT(gstate.finished); |
562 | |
563 | // initialize the result chunk with the aggregate values |
564 | chunk.SetCardinality(1); |
565 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
566 | auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>(); |
567 | |
568 | Vector state_vector(Value::POINTER(value: CastPointerToValue(src: gstate.state.aggregates[aggr_idx].get()))); |
569 | AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator()); |
570 | aggregate.function.finalize(state_vector, aggr_input_data, chunk.data[aggr_idx], 1, 0); |
571 | } |
572 | VerifyNullHandling(chunk, state&: gstate.state, aggregates); |
573 | |
574 | return SourceResultType::FINISHED; |
575 | } |
576 | |
577 | string PhysicalUngroupedAggregate::ParamsToString() const { |
578 | string result; |
579 | for (idx_t i = 0; i < aggregates.size(); i++) { |
580 | auto &aggregate = aggregates[i]->Cast<BoundAggregateExpression>(); |
581 | if (i > 0) { |
582 | result += "\n" ; |
583 | } |
584 | result += aggregates[i]->GetName(); |
585 | if (aggregate.filter) { |
586 | result += " Filter: " + aggregate.filter->GetName(); |
587 | } |
588 | } |
589 | return result; |
590 | } |
591 | |
592 | } // namespace duckdb |
593 | |