1#include "duckdb/execution/operator/aggregate/physical_ungrouped_aggregate.hpp"
2
3#include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp"
4#include "duckdb/common/vector_operations/vector_operations.hpp"
5#include "duckdb/execution/expression_executor.hpp"
6#include "duckdb/execution/operator/aggregate/aggregate_object.hpp"
7#include "duckdb/main/client_context.hpp"
8#include "duckdb/parallel/thread_context.hpp"
9#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
10#include "duckdb/planner/expression/bound_reference_expression.hpp"
11#include "duckdb/execution/radix_partitioned_hashtable.hpp"
12#include "duckdb/parallel/base_pipeline_event.hpp"
13#include "duckdb/common/unordered_set.hpp"
14#include "duckdb/common/algorithm.hpp"
15#include "duckdb/parallel/interrupt.hpp"
16#include <functional>
17#include "duckdb/execution/operator/aggregate/distinct_aggregate_data.hpp"
18
19namespace duckdb {
20
21PhysicalUngroupedAggregate::PhysicalUngroupedAggregate(vector<LogicalType> types,
22 vector<unique_ptr<Expression>> expressions,
23 idx_t estimated_cardinality)
24 : PhysicalOperator(PhysicalOperatorType::UNGROUPED_AGGREGATE, std::move(types), estimated_cardinality),
25 aggregates(std::move(expressions)) {
26
27 distinct_collection_info = DistinctAggregateCollectionInfo::Create(aggregates);
28 if (!distinct_collection_info) {
29 return;
30 }
31 distinct_data = make_uniq<DistinctAggregateData>(args&: *distinct_collection_info);
32}
33
34//===--------------------------------------------------------------------===//
35// Sink
36//===--------------------------------------------------------------------===//
37struct AggregateState {
38 explicit AggregateState(const vector<unique_ptr<Expression>> &aggregate_expressions) {
39 for (auto &aggregate : aggregate_expressions) {
40 D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE);
41 auto &aggr = aggregate->Cast<BoundAggregateExpression>();
42 auto state = make_unsafe_uniq_array<data_t>(n: aggr.function.state_size());
43 aggr.function.initialize(state.get());
44 aggregates.push_back(x: std::move(state));
45 bind_data.push_back(x: aggr.bind_info.get());
46 destructors.push_back(x: aggr.function.destructor);
47#ifdef DEBUG
48 counts.push_back(0);
49#endif
50 }
51 }
52 ~AggregateState() {
53 D_ASSERT(destructors.size() == aggregates.size());
54 for (idx_t i = 0; i < destructors.size(); i++) {
55 if (!destructors[i]) {
56 continue;
57 }
58 Vector state_vector(Value::POINTER(value: CastPointerToValue(src: aggregates[i].get())));
59 state_vector.SetVectorType(VectorType::FLAT_VECTOR);
60
61 AggregateInputData aggr_input_data(bind_data[i], Allocator::DefaultAllocator());
62 destructors[i](state_vector, aggr_input_data, 1);
63 }
64 }
65
66 void Move(AggregateState &other) {
67 other.aggregates = std::move(aggregates);
68 other.destructors = std::move(destructors);
69 }
70
71 //! The aggregate values
72 vector<unsafe_unique_array<data_t>> aggregates;
73 //! The bind data
74 vector<FunctionData *> bind_data;
75 //! The destructors
76 vector<aggregate_destructor_t> destructors;
77 //! Counts (used for verification)
78 vector<idx_t> counts;
79};
80
81class UngroupedAggregateGlobalState : public GlobalSinkState {
82public:
83 UngroupedAggregateGlobalState(const PhysicalUngroupedAggregate &op, ClientContext &client)
84 : state(op.aggregates), finished(false) {
85 if (op.distinct_data) {
86 distinct_state = make_uniq<DistinctAggregateState>(args&: *op.distinct_data, args&: client);
87 }
88 }
89
90 //! The lock for updating the global aggregate state
91 mutex lock;
92 //! The global aggregate state
93 AggregateState state;
94 //! Whether or not the aggregate is finished
95 bool finished;
96 //! The data related to the distinct aggregates (if there are any)
97 unique_ptr<DistinctAggregateState> distinct_state;
98};
99
100class UngroupedAggregateLocalState : public LocalSinkState {
101public:
102 UngroupedAggregateLocalState(const PhysicalUngroupedAggregate &op, const vector<LogicalType> &child_types,
103 GlobalSinkState &gstate_p, ExecutionContext &context)
104 : state(op.aggregates), child_executor(context.client), aggregate_input_chunk(), filter_set() {
105 auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>();
106
107 auto &allocator = Allocator::Get(context&: context.client);
108 InitializeDistinctAggregates(op, gstate, context);
109
110 vector<LogicalType> payload_types;
111 vector<AggregateObject> aggregate_objects;
112 for (auto &aggregate : op.aggregates) {
113 D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE);
114 auto &aggr = aggregate->Cast<BoundAggregateExpression>();
115 // initialize the payload chunk
116 for (auto &child : aggr.children) {
117 payload_types.push_back(x: child->return_type);
118 child_executor.AddExpression(expr: *child);
119 }
120 aggregate_objects.emplace_back(args: &aggr);
121 }
122 if (!payload_types.empty()) { // for select count(*) from t; there is no payload at all
123 aggregate_input_chunk.Initialize(allocator, types: payload_types);
124 }
125 filter_set.Initialize(context&: context.client, aggregates: aggregate_objects, payload_types: child_types);
126 }
127
128 //! The local aggregate state
129 AggregateState state;
130 //! The executor
131 ExpressionExecutor child_executor;
132 //! The payload chunk, containing all the Vectors for the aggregates
133 DataChunk aggregate_input_chunk;
134 //! Aggregate filter data set
135 AggregateFilterDataSet filter_set;
136 //! The local sink states of the distinct aggregates hash tables
137 vector<unique_ptr<LocalSinkState>> radix_states;
138
139public:
140 void Reset() {
141 aggregate_input_chunk.Reset();
142 }
143 void InitializeDistinctAggregates(const PhysicalUngroupedAggregate &op, const UngroupedAggregateGlobalState &gstate,
144 ExecutionContext &context) {
145
146 if (!op.distinct_data) {
147 return;
148 }
149 auto &data = *op.distinct_data;
150 auto &state = *gstate.distinct_state;
151 D_ASSERT(!data.radix_tables.empty());
152
153 const idx_t aggregate_count = state.radix_states.size();
154 radix_states.resize(new_size: aggregate_count);
155
156 auto &distinct_info = *op.distinct_collection_info;
157
158 for (auto &idx : distinct_info.indices) {
159 idx_t table_idx = distinct_info.table_map[idx];
160 if (data.radix_tables[table_idx] == nullptr) {
161 // This aggregate has identical input as another aggregate, so no table is created for it
162 continue;
163 }
164 auto &radix_table = *data.radix_tables[table_idx];
165 radix_states[table_idx] = radix_table.GetLocalSinkState(context);
166 }
167 }
168};
169
170bool PhysicalUngroupedAggregate::SinkOrderDependent() const {
171 for (auto &expr : aggregates) {
172 auto &aggr = expr->Cast<BoundAggregateExpression>();
173 if (aggr.function.order_dependent == AggregateOrderDependent::ORDER_DEPENDENT) {
174 return true;
175 }
176 }
177 return false;
178}
179
180unique_ptr<GlobalSinkState> PhysicalUngroupedAggregate::GetGlobalSinkState(ClientContext &context) const {
181 return make_uniq<UngroupedAggregateGlobalState>(args: *this, args&: context);
182}
183
184unique_ptr<LocalSinkState> PhysicalUngroupedAggregate::GetLocalSinkState(ExecutionContext &context) const {
185 D_ASSERT(sink_state);
186 auto &gstate = *sink_state;
187 return make_uniq<UngroupedAggregateLocalState>(args: *this, args: children[0]->GetTypes(), args&: gstate, args&: context);
188}
189
190void PhysicalUngroupedAggregate::SinkDistinct(ExecutionContext &context, DataChunk &chunk,
191 OperatorSinkInput &input) const {
192 auto &sink = input.local_state.Cast<UngroupedAggregateLocalState>();
193 auto &global_sink = input.global_state.Cast<UngroupedAggregateGlobalState>();
194 D_ASSERT(distinct_data);
195 auto &distinct_state = *global_sink.distinct_state;
196 auto &distinct_info = *distinct_collection_info;
197 auto &distinct_indices = distinct_info.Indices();
198
199 DataChunk empty_chunk;
200
201 auto &distinct_filter = distinct_info.Indices();
202
203 for (auto &idx : distinct_indices) {
204 auto &aggregate = aggregates[idx]->Cast<BoundAggregateExpression>();
205
206 idx_t table_idx = distinct_info.table_map[idx];
207 if (!distinct_data->radix_tables[table_idx]) {
208 // This distinct aggregate shares its data with another
209 continue;
210 }
211 D_ASSERT(distinct_data->radix_tables[table_idx]);
212 auto &radix_table = *distinct_data->radix_tables[table_idx];
213 auto &radix_global_sink = *distinct_state.radix_states[table_idx];
214 auto &radix_local_sink = *sink.radix_states[table_idx];
215 OperatorSinkInput sink_input {.global_state: radix_global_sink, .local_state: radix_local_sink, .interrupt_state: input.interrupt_state};
216
217 if (aggregate.filter) {
218 // The hashtable can apply a filter, but only on the payload
219 // And in our case, we need to filter the groups (the distinct aggr children)
220
221 // Apply the filter before inserting into the hashtable
222 auto &filtered_data = sink.filter_set.GetFilterData(aggr_idx: idx);
223 idx_t count = filtered_data.ApplyFilter(payload&: chunk);
224 filtered_data.filtered_payload.SetCardinality(count);
225
226 radix_table.Sink(context, chunk&: filtered_data.filtered_payload, input&: sink_input, aggregate_input_chunk&: empty_chunk, filter: distinct_filter);
227 } else {
228 radix_table.Sink(context, chunk, input&: sink_input, aggregate_input_chunk&: empty_chunk, filter: distinct_filter);
229 }
230 }
231}
232
233SinkResultType PhysicalUngroupedAggregate::Sink(ExecutionContext &context, DataChunk &chunk,
234 OperatorSinkInput &input) const {
235 auto &sink = input.local_state.Cast<UngroupedAggregateLocalState>();
236
237 // perform the aggregation inside the local state
238 sink.Reset();
239
240 if (distinct_data) {
241 SinkDistinct(context, chunk, input);
242 }
243
244 DataChunk &payload_chunk = sink.aggregate_input_chunk;
245
246 idx_t payload_idx = 0;
247 idx_t next_payload_idx = 0;
248
249 for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) {
250 auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>();
251
252 payload_idx = next_payload_idx;
253 next_payload_idx = payload_idx + aggregate.children.size();
254
255 if (aggregate.IsDistinct()) {
256 continue;
257 }
258
259 idx_t payload_cnt = 0;
260 // resolve the filter (if any)
261 if (aggregate.filter) {
262 auto &filtered_data = sink.filter_set.GetFilterData(aggr_idx);
263 auto count = filtered_data.ApplyFilter(payload&: chunk);
264
265 sink.child_executor.SetChunk(filtered_data.filtered_payload);
266 payload_chunk.SetCardinality(count);
267 } else {
268 sink.child_executor.SetChunk(chunk);
269 payload_chunk.SetCardinality(chunk);
270 }
271
272#ifdef DEBUG
273 sink.state.counts[aggr_idx] += payload_chunk.size();
274#endif
275
276 // resolve the child expressions of the aggregate (if any)
277 for (idx_t i = 0; i < aggregate.children.size(); ++i) {
278 sink.child_executor.ExecuteExpression(expr_idx: payload_idx + payload_cnt,
279 result&: payload_chunk.data[payload_idx + payload_cnt]);
280 payload_cnt++;
281 }
282
283 auto start_of_input = payload_cnt == 0 ? nullptr : &payload_chunk.data[payload_idx];
284 AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator());
285 aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt,
286 sink.state.aggregates[aggr_idx].get(), payload_chunk.size());
287 }
288 return SinkResultType::NEED_MORE_INPUT;
289}
290
291//===--------------------------------------------------------------------===//
292// Finalize
293//===--------------------------------------------------------------------===//
294
295void PhysicalUngroupedAggregate::CombineDistinct(ExecutionContext &context, GlobalSinkState &state,
296 LocalSinkState &lstate) const {
297 auto &global_sink = state.Cast<UngroupedAggregateGlobalState>();
298 auto &source = lstate.Cast<UngroupedAggregateLocalState>();
299
300 if (!distinct_data) {
301 return;
302 }
303 auto &distinct_state = global_sink.distinct_state;
304 auto table_count = distinct_data->radix_tables.size();
305 for (idx_t table_idx = 0; table_idx < table_count; table_idx++) {
306 D_ASSERT(distinct_data->radix_tables[table_idx]);
307 auto &radix_table = *distinct_data->radix_tables[table_idx];
308 auto &radix_global_sink = *distinct_state->radix_states[table_idx];
309 auto &radix_local_sink = *source.radix_states[table_idx];
310
311 radix_table.Combine(context, state&: radix_global_sink, lstate&: radix_local_sink);
312 }
313}
314
315void PhysicalUngroupedAggregate::Combine(ExecutionContext &context, GlobalSinkState &state,
316 LocalSinkState &lstate) const {
317 auto &gstate = state.Cast<UngroupedAggregateGlobalState>();
318 auto &source = lstate.Cast<UngroupedAggregateLocalState>();
319 D_ASSERT(!gstate.finished);
320
321 // finalize: combine the local state into the global state
322 // all aggregates are combinable: we might be doing a parallel aggregate
323 // use the combine method to combine the partial aggregates
324 lock_guard<mutex> glock(gstate.lock);
325
326 CombineDistinct(context, state, lstate);
327
328 for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) {
329 auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>();
330
331 if (aggregate.IsDistinct()) {
332 continue;
333 }
334
335 Vector source_state(Value::POINTER(value: CastPointerToValue(src: source.state.aggregates[aggr_idx].get())));
336 Vector dest_state(Value::POINTER(value: CastPointerToValue(src: gstate.state.aggregates[aggr_idx].get())));
337
338 AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator());
339 aggregate.function.combine(source_state, dest_state, aggr_input_data, 1);
340#ifdef DEBUG
341 gstate.state.counts[aggr_idx] += source.state.counts[aggr_idx];
342#endif
343 }
344
345 auto &client_profiler = QueryProfiler::Get(context&: context.client);
346 context.thread.profiler.Flush(phys_op: *this, expression_executor&: source.child_executor, name: "child_executor", id: 0);
347 client_profiler.Flush(profiler&: context.thread.profiler);
348}
349
350class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask {
351public:
352 UngroupedDistinctAggregateFinalizeTask(Executor &executor, shared_ptr<Event> event_p,
353 UngroupedAggregateGlobalState &state_p, ClientContext &context,
354 const PhysicalUngroupedAggregate &op)
355 : ExecutorTask(executor), event(std::move(event_p)), gstate(state_p), context(context), op(op) {
356 }
357
358 void AggregateDistinct() {
359 D_ASSERT(gstate.distinct_state);
360 auto &aggregates = op.aggregates;
361 auto &distinct_state = *gstate.distinct_state;
362 auto &distinct_data = *op.distinct_data;
363
364 ThreadContext temp_thread_context(context);
365 ExecutionContext temp_exec_context(context, temp_thread_context, nullptr);
366
367 idx_t payload_idx = 0;
368 idx_t next_payload_idx = 0;
369
370 for (idx_t i = 0; i < aggregates.size(); i++) {
371 auto &aggregate = aggregates[i]->Cast<BoundAggregateExpression>();
372
373 // Forward the payload idx
374 payload_idx = next_payload_idx;
375 next_payload_idx = payload_idx + aggregate.children.size();
376
377 // If aggregate is not distinct, skip it
378 if (!distinct_data.IsDistinct(index: i)) {
379 continue;
380 }
381
382 DataChunk payload_chunk;
383
384 D_ASSERT(distinct_data.info.table_map.count(i));
385 auto table_idx = distinct_data.info.table_map.at(k: i);
386 auto &radix_table_p = distinct_data.radix_tables[table_idx];
387 auto &output_chunk = *distinct_state.distinct_output_chunks[table_idx];
388 auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx];
389
390 payload_chunk.InitializeEmpty(types: grouped_aggregate_data.group_types);
391 payload_chunk.SetCardinality(0);
392
393 //! Create global and local state for the hashtable
394 auto global_source_state = radix_table_p->GetGlobalSourceState(context);
395 auto local_source_state = radix_table_p->GetLocalSourceState(context&: temp_exec_context);
396
397 //! Retrieve the stored data from the hashtable
398 while (true) {
399 output_chunk.Reset();
400
401 InterruptState interrupt_state;
402 OperatorSourceInput source_input {.global_state: *global_source_state, .local_state: *local_source_state, .interrupt_state: interrupt_state};
403 auto res = radix_table_p->GetData(context&: temp_exec_context, chunk&: output_chunk,
404 sink_state&: *distinct_state.radix_states[table_idx], input&: source_input);
405 if (res == SourceResultType::FINISHED) {
406 D_ASSERT(output_chunk.size() == 0);
407 break;
408 } else if (res == SourceResultType::BLOCKED) {
409 throw InternalException(
410 "Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask");
411 }
412
413 // We dont need to resolve the filter, we already did this in Sink
414 idx_t payload_cnt = aggregate.children.size();
415 for (idx_t i = 0; i < payload_cnt; i++) {
416 payload_chunk.data[i].Reference(other&: output_chunk.data[i]);
417 }
418 payload_chunk.SetCardinality(output_chunk);
419#ifdef DEBUG
420 gstate.state.counts[i] += payload_chunk.size();
421#endif
422
423 auto start_of_input = payload_cnt ? &payload_chunk.data[0] : nullptr;
424 //! Update the aggregate state
425 AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator());
426 aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt,
427 gstate.state.aggregates[i].get(), payload_chunk.size());
428 }
429 }
430 D_ASSERT(!gstate.finished);
431 gstate.finished = true;
432 }
433
434 TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
435 AggregateDistinct();
436 event->FinishTask();
437 return TaskExecutionResult::TASK_FINISHED;
438 }
439
440private:
441 shared_ptr<Event> event;
442 UngroupedAggregateGlobalState &gstate;
443 ClientContext &context;
444 const PhysicalUngroupedAggregate &op;
445};
446
447// TODO: Create tasks and run these in parallel instead of doing this all in Schedule, single threaded
448class UngroupedDistinctAggregateFinalizeEvent : public BasePipelineEvent {
449public:
450 UngroupedDistinctAggregateFinalizeEvent(const PhysicalUngroupedAggregate &op_p,
451 UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p,
452 ClientContext &context)
453 : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), context(context) {
454 }
455 const PhysicalUngroupedAggregate &op;
456 UngroupedAggregateGlobalState &gstate;
457 ClientContext &context;
458
459public:
460 void Schedule() override {
461 vector<shared_ptr<Task>> tasks;
462 tasks.push_back(x: make_uniq<UngroupedDistinctAggregateFinalizeTask>(args&: pipeline->executor, args: shared_from_this(),
463 args&: gstate, args&: context, args: op));
464 D_ASSERT(!tasks.empty());
465 SetTasks(std::move(tasks));
466 }
467};
468
469class UngroupedDistinctCombineFinalizeEvent : public BasePipelineEvent {
470public:
471 UngroupedDistinctCombineFinalizeEvent(const PhysicalUngroupedAggregate &op_p,
472 UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p,
473 ClientContext &client)
474 : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), client(client) {
475 }
476
477 const PhysicalUngroupedAggregate &op;
478 UngroupedAggregateGlobalState &gstate;
479 ClientContext &client;
480
481public:
482 void Schedule() override {
483 auto &distinct_state = *gstate.distinct_state;
484 auto &distinct_data = *op.distinct_data;
485 vector<shared_ptr<Task>> tasks;
486 for (idx_t table_idx = 0; table_idx < distinct_data.radix_tables.size(); table_idx++) {
487 distinct_data.radix_tables[table_idx]->ScheduleTasks(executor&: pipeline->executor, event: shared_from_this(),
488 state&: *distinct_state.radix_states[table_idx], tasks);
489 }
490 D_ASSERT(!tasks.empty());
491 SetTasks(std::move(tasks));
492 }
493
494 void FinishEvent() override {
495 //! Now that all tables are combined, it's time to do the distinct aggregations
496 auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(args: op, args&: gstate, args&: *pipeline, args&: client);
497 this->InsertEvent(replacement_event: std::move(new_event));
498 }
499};
500
501SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context,
502 GlobalSinkState &gstate_p) const {
503 auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>();
504 D_ASSERT(distinct_data);
505 auto &distinct_state = *gstate.distinct_state;
506
507 bool any_partitioned = false;
508 for (idx_t table_idx = 0; table_idx < distinct_data->radix_tables.size(); table_idx++) {
509 auto &radix_table_p = distinct_data->radix_tables[table_idx];
510 auto &radix_state = *distinct_state.radix_states[table_idx];
511 bool partitioned = radix_table_p->Finalize(context, gstate_p&: radix_state);
512 if (partitioned) {
513 any_partitioned = true;
514 }
515 }
516 if (any_partitioned) {
517 auto new_event = make_shared<UngroupedDistinctCombineFinalizeEvent>(args: *this, args&: gstate, args&: pipeline, args&: context);
518 event.InsertEvent(replacement_event: std::move(new_event));
519 } else {
520 //! Hashtables aren't partitioned, they dont need to be joined first
521 //! So we can compute the aggregate already
522 auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(args: *this, args&: gstate, args&: pipeline, args&: context);
523 event.InsertEvent(replacement_event: std::move(new_event));
524 }
525 return SinkFinalizeType::READY;
526}
527
528SinkFinalizeType PhysicalUngroupedAggregate::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
529 GlobalSinkState &gstate_p) const {
530 auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalState>();
531
532 if (distinct_data) {
533 return FinalizeDistinct(pipeline, event, context, gstate_p);
534 }
535
536 D_ASSERT(!gstate.finished);
537 gstate.finished = true;
538 return SinkFinalizeType::READY;
539}
540
541//===--------------------------------------------------------------------===//
542// Source
543//===--------------------------------------------------------------------===//
544void VerifyNullHandling(DataChunk &chunk, AggregateState &state, const vector<unique_ptr<Expression>> &aggregates) {
545#ifdef DEBUG
546 for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) {
547 auto &aggr = aggregates[aggr_idx]->Cast<BoundAggregateExpression>();
548 if (state.counts[aggr_idx] == 0 && aggr.function.null_handling == FunctionNullHandling::DEFAULT_NULL_HANDLING) {
549 // Default is when 0 values go in, NULL comes out
550 UnifiedVectorFormat vdata;
551 chunk.data[aggr_idx].ToUnifiedFormat(1, vdata);
552 D_ASSERT(!vdata.validity.RowIsValid(vdata.sel->get_index(0)));
553 }
554 }
555#endif
556}
557
558SourceResultType PhysicalUngroupedAggregate::GetData(ExecutionContext &context, DataChunk &chunk,
559 OperatorSourceInput &input) const {
560 auto &gstate = sink_state->Cast<UngroupedAggregateGlobalState>();
561 D_ASSERT(gstate.finished);
562
563 // initialize the result chunk with the aggregate values
564 chunk.SetCardinality(1);
565 for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) {
566 auto &aggregate = aggregates[aggr_idx]->Cast<BoundAggregateExpression>();
567
568 Vector state_vector(Value::POINTER(value: CastPointerToValue(src: gstate.state.aggregates[aggr_idx].get())));
569 AggregateInputData aggr_input_data(aggregate.bind_info.get(), Allocator::DefaultAllocator());
570 aggregate.function.finalize(state_vector, aggr_input_data, chunk.data[aggr_idx], 1, 0);
571 }
572 VerifyNullHandling(chunk, state&: gstate.state, aggregates);
573
574 return SourceResultType::FINISHED;
575}
576
577string PhysicalUngroupedAggregate::ParamsToString() const {
578 string result;
579 for (idx_t i = 0; i < aggregates.size(); i++) {
580 auto &aggregate = aggregates[i]->Cast<BoundAggregateExpression>();
581 if (i > 0) {
582 result += "\n";
583 }
584 result += aggregates[i]->GetName();
585 if (aggregate.filter) {
586 result += " Filter: " + aggregate.filter->GetName();
587 }
588 }
589 return result;
590}
591
592} // namespace duckdb
593