1#include "duckdb/execution/operator/aggregate/physical_hash_aggregate.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/execution/expression_executor.hpp"
5#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
6#include "duckdb/planner/expression/bound_constant_expression.hpp"
7#include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp"
8
9using namespace duckdb;
10using namespace std;
11
12class PhysicalHashAggregateState : public PhysicalOperatorState {
13public:
14 PhysicalHashAggregateState(PhysicalHashAggregate *parent, PhysicalOperator *child);
15
16 //! Materialized GROUP BY expression
17 DataChunk group_chunk;
18 //! Materialized aggregates
19 DataChunk aggregate_chunk;
20 //! The current position to scan the HT for output tuples
21 idx_t ht_scan_position;
22 idx_t tuples_scanned;
23 //! The HT
24 unique_ptr<SuperLargeHashTable> ht;
25 //! The payload chunk, only used while filling the HT
26 DataChunk payload_chunk;
27 //! Expression executor for the GROUP BY chunk
28 ExpressionExecutor group_executor;
29 //! Expression state for the payload
30 ExpressionExecutor payload_executor;
31};
32
33PhysicalHashAggregate::PhysicalHashAggregate(vector<TypeId> types, vector<unique_ptr<Expression>> expressions,
34 PhysicalOperatorType type)
35 : PhysicalHashAggregate(types, move(expressions), {}, type) {
36}
37
38PhysicalHashAggregate::PhysicalHashAggregate(vector<TypeId> types, vector<unique_ptr<Expression>> expressions,
39 vector<unique_ptr<Expression>> groups, PhysicalOperatorType type)
40 : PhysicalOperator(type, types), groups(move(groups)) {
41 // get a list of all aggregates to be computed
42 // fake a single group with a constant value for aggregation without groups
43 if (this->groups.size() == 0) {
44 auto ce = make_unique<BoundConstantExpression>(Value::TINYINT(42));
45 this->groups.push_back(move(ce));
46 is_implicit_aggr = true;
47 } else {
48 is_implicit_aggr = false;
49 }
50 for (auto &expr : expressions) {
51 assert(expr->expression_class == ExpressionClass::BOUND_AGGREGATE);
52 assert(expr->IsAggregate());
53 aggregates.push_back(move(expr));
54 }
55}
56
57void PhysicalHashAggregate::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
58 auto state = reinterpret_cast<PhysicalHashAggregateState *>(state_);
59 do {
60 // resolve the child chunk if there is one
61 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
62 if (state->child_chunk.size() == 0) {
63 break;
64 }
65 // aggregation with groups
66 DataChunk &group_chunk = state->group_chunk;
67 DataChunk &payload_chunk = state->payload_chunk;
68 state->group_executor.Execute(state->child_chunk, group_chunk);
69 state->payload_executor.SetChunk(state->child_chunk);
70
71 payload_chunk.Reset();
72 idx_t payload_idx = 0, payload_expr_idx = 0;
73 payload_chunk.SetCardinality(group_chunk);
74 for (idx_t i = 0; i < aggregates.size(); i++) {
75 auto &aggr = (BoundAggregateExpression &)*aggregates[i];
76 if (aggr.children.size()) {
77 for (idx_t j = 0; j < aggr.children.size(); ++j) {
78 state->payload_executor.ExecuteExpression(payload_expr_idx, payload_chunk.data[payload_idx]);
79 payload_idx++;
80 payload_expr_idx++;
81 }
82 } else {
83 payload_idx++;
84 }
85 }
86
87 group_chunk.Verify();
88 payload_chunk.Verify();
89 assert(payload_chunk.column_count() == 0 || group_chunk.size() == payload_chunk.size());
90
91 state->ht->AddChunk(group_chunk, payload_chunk);
92 state->tuples_scanned += state->child_chunk.size();
93 } while (state->child_chunk.size() > 0);
94
95 state->group_chunk.Reset();
96 state->aggregate_chunk.Reset();
97 idx_t elements_found = state->ht->Scan(state->ht_scan_position, state->group_chunk, state->aggregate_chunk);
98
99 // special case hack to sort out aggregating from empty intermediates
100 // for aggregations without groups
101 if (elements_found == 0 && state->tuples_scanned == 0 && is_implicit_aggr) {
102 assert(chunk.column_count() == aggregates.size());
103 // for each column in the aggregates, set to initial state
104 chunk.SetCardinality(1);
105 for (idx_t i = 0; i < chunk.column_count(); i++) {
106 assert(aggregates[i]->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE);
107 auto &aggr = (BoundAggregateExpression &)*aggregates[i];
108 auto aggr_state = unique_ptr<data_t[]>(new data_t[aggr.function.state_size()]);
109 aggr.function.initialize(aggr_state.get());
110
111 Vector state_vector(Value::POINTER((uintptr_t)aggr_state.get()));
112 aggr.function.finalize(state_vector, chunk.data[i], 1);
113 }
114 state->finished = true;
115 return;
116 }
117 if (elements_found == 0 && !state->finished) {
118 state->finished = true;
119 return;
120 }
121 // we finished the child chunk
122 // actually compute the final projection list now
123 idx_t chunk_index = 0;
124 chunk.SetCardinality(elements_found);
125 if (state->group_chunk.column_count() + state->aggregate_chunk.column_count() == chunk.column_count()) {
126 for (idx_t col_idx = 0; col_idx < state->group_chunk.column_count(); col_idx++) {
127 chunk.data[chunk_index++].Reference(state->group_chunk.data[col_idx]);
128 }
129 } else {
130 assert(state->aggregate_chunk.column_count() == chunk.column_count());
131 }
132
133 for (idx_t col_idx = 0; col_idx < state->aggregate_chunk.column_count(); col_idx++) {
134 chunk.data[chunk_index++].Reference(state->aggregate_chunk.data[col_idx]);
135 }
136}
137
138unique_ptr<PhysicalOperatorState> PhysicalHashAggregate::GetOperatorState() {
139 assert(children.size() > 0);
140 auto state = make_unique<PhysicalHashAggregateState>(this, children[0].get());
141 state->tuples_scanned = 0;
142 vector<TypeId> group_types, payload_types;
143 vector<BoundAggregateExpression *> aggregate_kind;
144 for (auto &expr : groups) {
145 group_types.push_back(expr->return_type);
146 }
147 for (auto &expr : aggregates) {
148 assert(expr->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE);
149 auto &aggr = (BoundAggregateExpression &)*expr;
150 aggregate_kind.push_back(&aggr);
151 if (aggr.children.size()) {
152 for (idx_t i = 0; i < aggr.children.size(); ++i) {
153 payload_types.push_back(aggr.children[i]->return_type);
154 state->payload_executor.AddExpression(*aggr.children[i]);
155 }
156 } else {
157 // COUNT(*)
158 payload_types.push_back(TypeId::INT64);
159 }
160 }
161 if (payload_types.size() > 0) {
162 state->payload_chunk.Initialize(payload_types);
163 }
164
165 state->ht = make_unique<SuperLargeHashTable>(1024, group_types, payload_types, aggregate_kind);
166 return move(state);
167}
168
169PhysicalHashAggregateState::PhysicalHashAggregateState(PhysicalHashAggregate *parent, PhysicalOperator *child)
170 : PhysicalOperatorState(child), ht_scan_position(0), tuples_scanned(0), group_executor(parent->groups) {
171 vector<TypeId> group_types, aggregate_types;
172 for (auto &expr : parent->groups) {
173 group_types.push_back(expr->return_type);
174 }
175 group_chunk.Initialize(group_types);
176 for (auto &expr : parent->aggregates) {
177 aggregate_types.push_back(expr->return_type);
178 }
179 if (aggregate_types.size() > 0) {
180 aggregate_chunk.Initialize(aggregate_types);
181 }
182}
183