1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/common/types/row_operations/row_aggregate.cpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | #include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp" |
9 | #include "duckdb/common/row_operations/row_operations.hpp" |
10 | #include "duckdb/common/types/row/tuple_data_layout.hpp" |
11 | #include "duckdb/execution/operator/aggregate/aggregate_object.hpp" |
12 | |
13 | namespace duckdb { |
14 | |
15 | void RowOperations::InitializeStates(TupleDataLayout &layout, Vector &addresses, const SelectionVector &sel, |
16 | idx_t count) { |
17 | if (count == 0) { |
18 | return; |
19 | } |
20 | auto pointers = FlatVector::GetData<data_ptr_t>(vector&: addresses); |
21 | auto &offsets = layout.GetOffsets(); |
22 | auto aggr_idx = layout.ColumnCount(); |
23 | |
24 | for (const auto &aggr : layout.GetAggregates()) { |
25 | for (idx_t i = 0; i < count; ++i) { |
26 | auto row_idx = sel.get_index(idx: i); |
27 | auto row = pointers[row_idx]; |
28 | aggr.function.initialize(row + offsets[aggr_idx]); |
29 | } |
30 | ++aggr_idx; |
31 | } |
32 | } |
33 | |
34 | void RowOperations::DestroyStates(RowOperationsState &state, TupleDataLayout &layout, Vector &addresses, idx_t count) { |
35 | if (count == 0) { |
36 | return; |
37 | } |
38 | // Move to the first aggregate state |
39 | VectorOperations::AddInPlace(left&: addresses, delta: layout.GetAggrOffset(), count); |
40 | for (const auto &aggr : layout.GetAggregates()) { |
41 | if (aggr.function.destructor) { |
42 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
43 | aggr.function.destructor(addresses, aggr_input_data, count); |
44 | } |
45 | // Move to the next aggregate state |
46 | VectorOperations::AddInPlace(left&: addresses, delta: aggr.payload_size, count); |
47 | } |
48 | } |
49 | |
50 | void RowOperations::UpdateStates(RowOperationsState &state, AggregateObject &aggr, Vector &addresses, |
51 | DataChunk &payload, idx_t arg_idx, idx_t count) { |
52 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
53 | aggr.function.update(aggr.child_count == 0 ? nullptr : &payload.data[arg_idx], aggr_input_data, aggr.child_count, |
54 | addresses, count); |
55 | } |
56 | |
57 | void RowOperations::UpdateFilteredStates(RowOperationsState &state, AggregateFilterData &filter_data, |
58 | AggregateObject &aggr, Vector &addresses, DataChunk &payload, idx_t arg_idx) { |
59 | idx_t count = filter_data.ApplyFilter(payload); |
60 | if (count == 0) { |
61 | return; |
62 | } |
63 | |
64 | Vector filtered_addresses(addresses, filter_data.true_sel, count); |
65 | filtered_addresses.Flatten(count); |
66 | |
67 | UpdateStates(state, aggr, addresses&: filtered_addresses, payload&: filter_data.filtered_payload, arg_idx, count); |
68 | } |
69 | |
70 | void RowOperations::CombineStates(RowOperationsState &state, TupleDataLayout &layout, Vector &sources, Vector &targets, |
71 | idx_t count) { |
72 | if (count == 0) { |
73 | return; |
74 | } |
75 | |
76 | // Move to the first aggregate states |
77 | VectorOperations::AddInPlace(left&: sources, delta: layout.GetAggrOffset(), count); |
78 | VectorOperations::AddInPlace(left&: targets, delta: layout.GetAggrOffset(), count); |
79 | for (auto &aggr : layout.GetAggregates()) { |
80 | D_ASSERT(aggr.function.combine); |
81 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
82 | aggr.function.combine(sources, targets, aggr_input_data, count); |
83 | |
84 | // Move to the next aggregate states |
85 | VectorOperations::AddInPlace(left&: sources, delta: aggr.payload_size, count); |
86 | VectorOperations::AddInPlace(left&: targets, delta: aggr.payload_size, count); |
87 | } |
88 | } |
89 | |
90 | void RowOperations::FinalizeStates(RowOperationsState &state, TupleDataLayout &layout, Vector &addresses, |
91 | DataChunk &result, idx_t aggr_idx) { |
92 | // Move to the first aggregate state |
93 | VectorOperations::AddInPlace(left&: addresses, delta: layout.GetAggrOffset(), count: result.size()); |
94 | |
95 | auto &aggregates = layout.GetAggregates(); |
96 | for (idx_t i = 0; i < aggregates.size(); i++) { |
97 | auto &target = result.data[aggr_idx + i]; |
98 | auto &aggr = aggregates[i]; |
99 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
100 | aggr.function.finalize(addresses, aggr_input_data, target, result.size(), 0); |
101 | |
102 | // Move to the next aggregate state |
103 | VectorOperations::AddInPlace(left&: addresses, delta: aggr.payload_size, count: result.size()); |
104 | } |
105 | } |
106 | |
107 | } // namespace duckdb |
108 | |