| 1 | //===----------------------------------------------------------------------===// |
| 2 | // DuckDB |
| 3 | // |
| 4 | // duckdb/common/types/row_operations/row_aggregate.cpp |
| 5 | // |
| 6 | // |
| 7 | //===----------------------------------------------------------------------===// |
| 8 | #include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp" |
| 9 | #include "duckdb/common/row_operations/row_operations.hpp" |
| 10 | #include "duckdb/common/types/row/tuple_data_layout.hpp" |
| 11 | #include "duckdb/execution/operator/aggregate/aggregate_object.hpp" |
| 12 | |
| 13 | namespace duckdb { |
| 14 | |
| 15 | void RowOperations::InitializeStates(TupleDataLayout &layout, Vector &addresses, const SelectionVector &sel, |
| 16 | idx_t count) { |
| 17 | if (count == 0) { |
| 18 | return; |
| 19 | } |
| 20 | auto pointers = FlatVector::GetData<data_ptr_t>(vector&: addresses); |
| 21 | auto &offsets = layout.GetOffsets(); |
| 22 | auto aggr_idx = layout.ColumnCount(); |
| 23 | |
| 24 | for (const auto &aggr : layout.GetAggregates()) { |
| 25 | for (idx_t i = 0; i < count; ++i) { |
| 26 | auto row_idx = sel.get_index(idx: i); |
| 27 | auto row = pointers[row_idx]; |
| 28 | aggr.function.initialize(row + offsets[aggr_idx]); |
| 29 | } |
| 30 | ++aggr_idx; |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | void RowOperations::DestroyStates(RowOperationsState &state, TupleDataLayout &layout, Vector &addresses, idx_t count) { |
| 35 | if (count == 0) { |
| 36 | return; |
| 37 | } |
| 38 | // Move to the first aggregate state |
| 39 | VectorOperations::AddInPlace(left&: addresses, delta: layout.GetAggrOffset(), count); |
| 40 | for (const auto &aggr : layout.GetAggregates()) { |
| 41 | if (aggr.function.destructor) { |
| 42 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
| 43 | aggr.function.destructor(addresses, aggr_input_data, count); |
| 44 | } |
| 45 | // Move to the next aggregate state |
| 46 | VectorOperations::AddInPlace(left&: addresses, delta: aggr.payload_size, count); |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | void RowOperations::UpdateStates(RowOperationsState &state, AggregateObject &aggr, Vector &addresses, |
| 51 | DataChunk &payload, idx_t arg_idx, idx_t count) { |
| 52 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
| 53 | aggr.function.update(aggr.child_count == 0 ? nullptr : &payload.data[arg_idx], aggr_input_data, aggr.child_count, |
| 54 | addresses, count); |
| 55 | } |
| 56 | |
| 57 | void RowOperations::UpdateFilteredStates(RowOperationsState &state, AggregateFilterData &filter_data, |
| 58 | AggregateObject &aggr, Vector &addresses, DataChunk &payload, idx_t arg_idx) { |
| 59 | idx_t count = filter_data.ApplyFilter(payload); |
| 60 | if (count == 0) { |
| 61 | return; |
| 62 | } |
| 63 | |
| 64 | Vector filtered_addresses(addresses, filter_data.true_sel, count); |
| 65 | filtered_addresses.Flatten(count); |
| 66 | |
| 67 | UpdateStates(state, aggr, addresses&: filtered_addresses, payload&: filter_data.filtered_payload, arg_idx, count); |
| 68 | } |
| 69 | |
| 70 | void RowOperations::CombineStates(RowOperationsState &state, TupleDataLayout &layout, Vector &sources, Vector &targets, |
| 71 | idx_t count) { |
| 72 | if (count == 0) { |
| 73 | return; |
| 74 | } |
| 75 | |
| 76 | // Move to the first aggregate states |
| 77 | VectorOperations::AddInPlace(left&: sources, delta: layout.GetAggrOffset(), count); |
| 78 | VectorOperations::AddInPlace(left&: targets, delta: layout.GetAggrOffset(), count); |
| 79 | for (auto &aggr : layout.GetAggregates()) { |
| 80 | D_ASSERT(aggr.function.combine); |
| 81 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
| 82 | aggr.function.combine(sources, targets, aggr_input_data, count); |
| 83 | |
| 84 | // Move to the next aggregate states |
| 85 | VectorOperations::AddInPlace(left&: sources, delta: aggr.payload_size, count); |
| 86 | VectorOperations::AddInPlace(left&: targets, delta: aggr.payload_size, count); |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | void RowOperations::FinalizeStates(RowOperationsState &state, TupleDataLayout &layout, Vector &addresses, |
| 91 | DataChunk &result, idx_t aggr_idx) { |
| 92 | // Move to the first aggregate state |
| 93 | VectorOperations::AddInPlace(left&: addresses, delta: layout.GetAggrOffset(), count: result.size()); |
| 94 | |
| 95 | auto &aggregates = layout.GetAggregates(); |
| 96 | for (idx_t i = 0; i < aggregates.size(); i++) { |
| 97 | auto &target = result.data[aggr_idx + i]; |
| 98 | auto &aggr = aggregates[i]; |
| 99 | AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); |
| 100 | aggr.function.finalize(addresses, aggr_input_data, target, result.size(), 0); |
| 101 | |
| 102 | // Move to the next aggregate state |
| 103 | VectorOperations::AddInPlace(left&: addresses, delta: aggr.payload_size, count: result.size()); |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | } // namespace duckdb |
| 108 | |