1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/common/types/row_operations/row_aggregate.cpp
5//
6//
7//===----------------------------------------------------------------------===//
8#include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp"
9#include "duckdb/common/row_operations/row_operations.hpp"
10#include "duckdb/common/types/row/tuple_data_layout.hpp"
11#include "duckdb/execution/operator/aggregate/aggregate_object.hpp"
12
13namespace duckdb {
14
15void RowOperations::InitializeStates(TupleDataLayout &layout, Vector &addresses, const SelectionVector &sel,
16 idx_t count) {
17 if (count == 0) {
18 return;
19 }
20 auto pointers = FlatVector::GetData<data_ptr_t>(vector&: addresses);
21 auto &offsets = layout.GetOffsets();
22 auto aggr_idx = layout.ColumnCount();
23
24 for (const auto &aggr : layout.GetAggregates()) {
25 for (idx_t i = 0; i < count; ++i) {
26 auto row_idx = sel.get_index(idx: i);
27 auto row = pointers[row_idx];
28 aggr.function.initialize(row + offsets[aggr_idx]);
29 }
30 ++aggr_idx;
31 }
32}
33
34void RowOperations::DestroyStates(RowOperationsState &state, TupleDataLayout &layout, Vector &addresses, idx_t count) {
35 if (count == 0) {
36 return;
37 }
38 // Move to the first aggregate state
39 VectorOperations::AddInPlace(left&: addresses, delta: layout.GetAggrOffset(), count);
40 for (const auto &aggr : layout.GetAggregates()) {
41 if (aggr.function.destructor) {
42 AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator);
43 aggr.function.destructor(addresses, aggr_input_data, count);
44 }
45 // Move to the next aggregate state
46 VectorOperations::AddInPlace(left&: addresses, delta: aggr.payload_size, count);
47 }
48}
49
50void RowOperations::UpdateStates(RowOperationsState &state, AggregateObject &aggr, Vector &addresses,
51 DataChunk &payload, idx_t arg_idx, idx_t count) {
52 AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator);
53 aggr.function.update(aggr.child_count == 0 ? nullptr : &payload.data[arg_idx], aggr_input_data, aggr.child_count,
54 addresses, count);
55}
56
57void RowOperations::UpdateFilteredStates(RowOperationsState &state, AggregateFilterData &filter_data,
58 AggregateObject &aggr, Vector &addresses, DataChunk &payload, idx_t arg_idx) {
59 idx_t count = filter_data.ApplyFilter(payload);
60 if (count == 0) {
61 return;
62 }
63
64 Vector filtered_addresses(addresses, filter_data.true_sel, count);
65 filtered_addresses.Flatten(count);
66
67 UpdateStates(state, aggr, addresses&: filtered_addresses, payload&: filter_data.filtered_payload, arg_idx, count);
68}
69
70void RowOperations::CombineStates(RowOperationsState &state, TupleDataLayout &layout, Vector &sources, Vector &targets,
71 idx_t count) {
72 if (count == 0) {
73 return;
74 }
75
76 // Move to the first aggregate states
77 VectorOperations::AddInPlace(left&: sources, delta: layout.GetAggrOffset(), count);
78 VectorOperations::AddInPlace(left&: targets, delta: layout.GetAggrOffset(), count);
79 for (auto &aggr : layout.GetAggregates()) {
80 D_ASSERT(aggr.function.combine);
81 AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator);
82 aggr.function.combine(sources, targets, aggr_input_data, count);
83
84 // Move to the next aggregate states
85 VectorOperations::AddInPlace(left&: sources, delta: aggr.payload_size, count);
86 VectorOperations::AddInPlace(left&: targets, delta: aggr.payload_size, count);
87 }
88}
89
90void RowOperations::FinalizeStates(RowOperationsState &state, TupleDataLayout &layout, Vector &addresses,
91 DataChunk &result, idx_t aggr_idx) {
92 // Move to the first aggregate state
93 VectorOperations::AddInPlace(left&: addresses, delta: layout.GetAggrOffset(), count: result.size());
94
95 auto &aggregates = layout.GetAggregates();
96 for (idx_t i = 0; i < aggregates.size(); i++) {
97 auto &target = result.data[aggr_idx + i];
98 auto &aggr = aggregates[i];
99 AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator);
100 aggr.function.finalize(addresses, aggr_input_data, target, result.size(), 0);
101
102 // Move to the next aggregate state
103 VectorOperations::AddInPlace(left&: addresses, delta: aggr.payload_size, count: result.size());
104 }
105}
106
107} // namespace duckdb
108