1#include "duckdb/execution/aggregate_hashtable.hpp"
2
3#include "duckdb/common/exception.hpp"
4#include "duckdb/common/types/null_value.hpp"
5#include "duckdb/common/vector_operations/vector_operations.hpp"
6#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
7#include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp"
8#include "duckdb/common/vector_operations/unary_executor.hpp"
9#include "duckdb/common/operator/comparison_operators.hpp"
10
11#include <cmath>
12#include <map>
13
14using namespace duckdb;
15using namespace std;
16
17SuperLargeHashTable::SuperLargeHashTable(idx_t initial_capacity, vector<TypeId> group_types,
18 vector<TypeId> payload_types, vector<BoundAggregateExpression *> bindings,
19 bool parallel)
20 : SuperLargeHashTable(initial_capacity, move(group_types), move(payload_types),
21 AggregateObject::CreateAggregateObjects(move(bindings)), parallel) {
22}
23
24vector<AggregateObject> AggregateObject::CreateAggregateObjects(vector<BoundAggregateExpression *> bindings) {
25 vector<AggregateObject> aggregates;
26 for (auto &binding : bindings) {
27 auto payload_size = binding->function.state_size();
28 aggregates.push_back(AggregateObject(binding->function, binding->children.size(), payload_size,
29 binding->distinct, binding->return_type));
30 }
31 return aggregates;
32}
33
34SuperLargeHashTable::SuperLargeHashTable(idx_t initial_capacity, vector<TypeId> group_types,
35 vector<TypeId> payload_types, vector<AggregateObject> aggregate_objects,
36 bool parallel)
37 : aggregates(move(aggregate_objects)), group_types(group_types), payload_types(payload_types), group_width(0),
38 payload_width(0), capacity(0), entries(0), data(nullptr), parallel(parallel) {
39 // HT tuple layout is as follows:
40 // [FLAG][GROUPS][PAYLOAD]
41 // [FLAG] is the state of the tuple in memory
42 // [GROUPS] is the groups
43 // [PAYLOAD] is the payload (i.e. the aggregate states)
44 for (idx_t i = 0; i < group_types.size(); i++) {
45 group_width += GetTypeIdSize(group_types[i]);
46 }
47 for (idx_t i = 0; i < aggregates.size(); i++) {
48 payload_width += aggregates[i].payload_size;
49 }
50 empty_payload_data = unique_ptr<data_t[]>(new data_t[payload_width]);
51 // initialize the aggregates to the NULL value
52 auto pointer = empty_payload_data.get();
53 for (idx_t i = 0; i < aggregates.size(); i++) {
54 auto &aggr = aggregates[i];
55 aggr.function.initialize(pointer);
56 pointer += aggr.payload_size;
57 }
58
59 // FIXME: this always creates this vector, even if no distinct if present.
60 // it likely does not matter.
61 distinct_hashes.resize(aggregates.size());
62
63 // create additional hash tables for distinct aggrs
64 idx_t payload_idx = 0;
65 for (idx_t i = 0; i < aggregates.size(); i++) {
66 auto &aggr = aggregates[i];
67 if (aggr.distinct) {
68 // group types plus aggr return type
69 vector<TypeId> distinct_group_types(group_types);
70 vector<TypeId> distinct_payload_types;
71 vector<BoundAggregateExpression *> distinct_aggregates;
72 distinct_group_types.push_back(payload_types[payload_idx]);
73 distinct_hashes[i] = make_unique<SuperLargeHashTable>(initial_capacity, distinct_group_types,
74 distinct_payload_types, distinct_aggregates);
75 }
76 if (aggr.child_count) {
77 payload_idx += aggr.child_count;
78 } else {
79 payload_idx += 1;
80 }
81 }
82
83 tuple_size = FLAG_SIZE + (group_width + payload_width);
84 Resize(initial_capacity);
85}
86
87SuperLargeHashTable::~SuperLargeHashTable() {
88 Destroy();
89}
90
91void SuperLargeHashTable::CallDestructors(Vector &state_vector, idx_t count) {
92 if (count == 0) {
93 return;
94 }
95 for (idx_t i = 0; i < aggregates.size(); i++) {
96 auto &aggr = aggregates[i];
97 if (aggr.function.destructor) {
98 aggr.function.destructor(state_vector, count);
99 }
100 // move to the next aggregate state
101 VectorOperations::AddInPlace(state_vector, aggr.payload_size, count);
102 }
103}
104
105void SuperLargeHashTable::Destroy() {
106 if (!data) {
107 return;
108 }
109 // check if there is a destructor
110 bool has_destructor = false;
111 for (idx_t i = 0; i < aggregates.size(); i++) {
112 if (aggregates[i].function.destructor) {
113 has_destructor = true;
114 }
115 }
116 if (!has_destructor) {
117 return;
118 }
119 // there are aggregates with destructors: loop over the hash table
120 // and call the destructor method for each of the aggregates
121 data_ptr_t data_pointers[STANDARD_VECTOR_SIZE];
122 Vector state_vector(TypeId::POINTER, (data_ptr_t)data_pointers);
123 idx_t count = 0;
124 for (data_ptr_t ptr = data, end = data + capacity * tuple_size; ptr < end; ptr += tuple_size) {
125 if (*ptr == FULL_CELL) {
126 // found entry
127 data_pointers[count++] = ptr + FLAG_SIZE + group_width;
128 if (count == STANDARD_VECTOR_SIZE) {
129 // vector is full: call the destructors
130 CallDestructors(state_vector, count);
131 count = 0;
132 }
133 }
134 }
135 CallDestructors(state_vector, count);
136}
137
138void SuperLargeHashTable::Resize(idx_t size) {
139 if (size <= capacity) {
140 throw Exception("Cannot downsize a hash table!");
141 }
142 if (size < STANDARD_VECTOR_SIZE) {
143 size = STANDARD_VECTOR_SIZE;
144 }
145 // size needs to be a power of 2
146 assert((size & (size - 1)) == 0);
147 bitmask = size - 1;
148
149 if (entries > 0) {
150 auto new_table = make_unique<SuperLargeHashTable>(size, group_types, payload_types, aggregates, parallel);
151
152 DataChunk groups;
153 groups.Initialize(group_types);
154
155 Vector addresses(TypeId::POINTER);
156 auto data_pointers = FlatVector::GetData<data_ptr_t>(addresses);
157
158 data_ptr_t ptr = data;
159 data_ptr_t end = data + capacity * tuple_size;
160
161 assert(new_table->tuple_size == this->tuple_size);
162
163 while (true) {
164 groups.Reset();
165
166 // scan the table for full cells starting from the scan position
167 idx_t found_entries = 0;
168 for (; ptr < end && found_entries < STANDARD_VECTOR_SIZE; ptr += tuple_size) {
169 if (*ptr == FULL_CELL) {
170 // found entry
171 data_pointers[found_entries++] = ptr + FLAG_SIZE;
172 }
173 }
174 if (found_entries == 0) {
175 break;
176 }
177 // fetch the group columns
178 groups.SetCardinality(found_entries);
179 for (idx_t i = 0; i < groups.column_count(); i++) {
180 auto &column = groups.data[i];
181 VectorOperations::Gather::Set(addresses, column, found_entries);
182 }
183
184 groups.Verify();
185 assert(groups.size() == found_entries);
186 Vector new_addresses(TypeId::POINTER);
187 new_table->FindOrCreateGroups(groups, new_addresses);
188
189 // NB: both address vectors already point to the payload start
190 assert(addresses.type == new_addresses.type && addresses.type == TypeId::POINTER);
191
192 auto new_address_data = FlatVector::GetData<data_ptr_t>(new_addresses);
193 for (idx_t i = 0; i < found_entries; i++) {
194 memcpy(new_address_data[i], data_pointers[i], payload_width);
195 }
196 }
197
198 assert(this->entries == new_table->entries);
199
200 this->data = move(new_table->data);
201 this->owned_data = move(new_table->owned_data);
202 this->capacity = new_table->capacity;
203 this->string_heap.MergeHeap(new_table->string_heap);
204 new_table->data = nullptr;
205 } else {
206 data = new data_t[size * tuple_size];
207 owned_data = unique_ptr<data_t[]>(data);
208 for (idx_t i = 0; i < size; i++) {
209 data[i * tuple_size] = EMPTY_CELL;
210 }
211
212 capacity = size;
213 }
214
215 endptr = data + tuple_size * capacity;
216}
217
218void SuperLargeHashTable::AddChunk(DataChunk &groups, DataChunk &payload) {
219 if (groups.size() == 0) {
220 return;
221 }
222
223 Vector addresses(TypeId::POINTER);
224 FindOrCreateGroups(groups, addresses);
225
226 // now every cell has an entry
227 // update the aggregates
228 idx_t payload_idx = 0;
229
230 for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) {
231 assert(payload.column_count() > payload_idx);
232
233 // for any entries for which a group was found, update the aggregate
234 auto &aggr = aggregates[aggr_idx];
235 auto input_count = max((idx_t)1, (idx_t)aggr.child_count);
236 if (aggr.distinct) {
237 // construct chunk for secondary hash table probing
238 vector<TypeId> probe_types(group_types);
239 for (idx_t i = 0; i < aggr.child_count; i++) {
240 probe_types.push_back(payload_types[payload_idx]);
241 }
242 DataChunk probe_chunk;
243 probe_chunk.Initialize(probe_types);
244 for (idx_t group_idx = 0; group_idx < group_types.size(); group_idx++) {
245 probe_chunk.data[group_idx].Reference(groups.data[group_idx]);
246 }
247 for (idx_t i = 0; i < aggr.child_count; i++) {
248 probe_chunk.data[group_types.size() + i].Reference(payload.data[payload_idx + i]);
249 }
250 probe_chunk.SetCardinality(groups);
251 probe_chunk.Verify();
252
253 Vector dummy_addresses(TypeId::POINTER);
254 SelectionVector new_groups(STANDARD_VECTOR_SIZE);
255 // this is the actual meat, find out which groups plus payload
256 // value have not been seen yet
257 idx_t new_group_count =
258 distinct_hashes[aggr_idx]->FindOrCreateGroups(probe_chunk, dummy_addresses, new_groups);
259
260 // now fix up the payload and addresses accordingly by creating
261 // a selection vector
262 if (new_group_count > 0) {
263 Vector distinct_addresses;
264 distinct_addresses.Slice(addresses, new_groups, new_group_count);
265 for (idx_t i = 0; i < aggr.child_count; i++) {
266 payload.data[payload_idx + i].Slice(new_groups, new_group_count);
267 payload.data[payload_idx + i].Verify(new_group_count);
268 }
269
270 distinct_addresses.Verify(new_group_count);
271
272 aggr.function.update(&payload.data[payload_idx], input_count, distinct_addresses, new_group_count);
273 }
274 } else {
275 aggr.function.update(&payload.data[payload_idx], input_count, addresses, payload.size());
276 }
277
278 // move to the next aggregate
279 payload_idx += input_count;
280 VectorOperations::AddInPlace(addresses, aggr.payload_size, payload.size());
281 }
282}
283
284void SuperLargeHashTable::FetchAggregates(DataChunk &groups, DataChunk &result) {
285 groups.Verify();
286 assert(groups.column_count() == group_types.size());
287 for (idx_t i = 0; i < result.column_count(); i++) {
288 assert(result.data[i].type == payload_types[i]);
289 }
290 result.SetCardinality(groups);
291 if (groups.size() == 0) {
292 return;
293 }
294 // find the groups associated with the addresses
295 // FIXME: this should not use the FindOrCreateGroups, creating them is unnecessary
296 Vector addresses(TypeId::POINTER);
297 FindOrCreateGroups(groups, addresses);
298 // now fetch the aggregates
299 for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) {
300 assert(result.column_count() > aggr_idx);
301 assert(payload_types[aggr_idx] == TypeId::INT64);
302
303 VectorOperations::Gather::Set(addresses, result.data[aggr_idx], groups.size());
304 }
305}
306
307void SuperLargeHashTable::HashGroups(DataChunk &groups, Vector &addresses) {
308 // create a set of hashes for the groups
309 Vector hashes(TypeId::HASH);
310 groups.Hash(hashes);
311
312 // now compute the entry in the table based on the hash using a modulo
313 // multiply the position by the tuple size and add the base address
314 UnaryExecutor::Execute<hash_t, data_ptr_t>(hashes, addresses, groups.size(), [&](hash_t element) {
315 assert((element & bitmask) == (element % capacity));
316 return data + ((element & bitmask) * tuple_size);
317 });
318}
319
320template <class T>
321static void templated_scatter(VectorData &gdata, Vector &addresses, const SelectionVector &sel, idx_t count,
322 idx_t type_size) {
323 auto data = (T *)gdata.data;
324 auto pointers = FlatVector::GetData<uintptr_t>(addresses);
325 if (gdata.nullmask->any()) {
326 for (idx_t i = 0; i < count; i++) {
327 auto pointer_idx = sel.get_index(i);
328 auto group_idx = gdata.sel->get_index(pointer_idx);
329 auto ptr = (T *)pointers[pointer_idx];
330
331 if ((*gdata.nullmask)[group_idx]) {
332 *ptr = NullValue<T>();
333 } else {
334 *ptr = data[group_idx];
335 }
336 pointers[pointer_idx] += type_size;
337 }
338 } else {
339 for (idx_t i = 0; i < count; i++) {
340 auto pointer_idx = sel.get_index(i);
341 auto group_idx = gdata.sel->get_index(pointer_idx);
342 auto ptr = (T *)pointers[pointer_idx];
343
344 *ptr = data[group_idx];
345 pointers[pointer_idx] += type_size;
346 }
347 }
348}
349
350void SuperLargeHashTable::ScatterGroups(DataChunk &groups, unique_ptr<VectorData[]> &group_data, Vector &addresses,
351 const SelectionVector &sel, idx_t count) {
352 for (idx_t grp_idx = 0; grp_idx < groups.column_count(); grp_idx++) {
353 auto &data = groups.data[grp_idx];
354 auto &gdata = group_data[grp_idx];
355
356 auto type_size = GetTypeIdSize(data.type);
357
358 switch (data.type) {
359 case TypeId::BOOL:
360 case TypeId::INT8:
361 templated_scatter<int8_t>(gdata, addresses, sel, count, type_size);
362 break;
363 case TypeId::INT16:
364 templated_scatter<int16_t>(gdata, addresses, sel, count, type_size);
365 break;
366 case TypeId::INT32:
367 templated_scatter<int32_t>(gdata, addresses, sel, count, type_size);
368 break;
369 case TypeId::INT64:
370 templated_scatter<int64_t>(gdata, addresses, sel, count, type_size);
371 break;
372 case TypeId::FLOAT:
373 templated_scatter<float>(gdata, addresses, sel, count, type_size);
374 break;
375 case TypeId::DOUBLE:
376 templated_scatter<double>(gdata, addresses, sel, count, type_size);
377 break;
378 case TypeId::VARCHAR: {
379 auto data = (string_t *)gdata.data;
380 auto pointers = FlatVector::GetData<uintptr_t>(addresses);
381
382 for (idx_t i = 0; i < count; i++) {
383 auto pointer_idx = sel.get_index(i);
384 auto group_idx = gdata.sel->get_index(pointer_idx);
385 auto ptr = (string_t *)pointers[pointer_idx];
386
387 if ((*gdata.nullmask)[group_idx]) {
388 *ptr = NullValue<string_t>();
389 } else if (data[group_idx].IsInlined()) {
390 *ptr = data[group_idx];
391 } else {
392 *ptr = string_heap.AddString(data[group_idx]);
393 }
394 pointers[pointer_idx] += type_size;
395 }
396 break;
397 }
398 default:
399 throw Exception("Unsupported type for group vector");
400 }
401 }
402}
403
404template <class T>
405static void templated_compare_groups(VectorData &gdata, Vector &addresses, SelectionVector &sel, idx_t &count,
406 idx_t type_size, SelectionVector &no_match, idx_t &no_match_count) {
407 auto data = (T *)gdata.data;
408 auto pointers = FlatVector::GetData<uintptr_t>(addresses);
409 idx_t match_count = 0;
410 if (gdata.nullmask->any()) {
411 for (idx_t i = 0; i < count; i++) {
412 auto idx = sel.get_index(i);
413 auto group_idx = gdata.sel->get_index(idx);
414 auto value = (T *)pointers[idx];
415
416 if ((*gdata.nullmask)[group_idx]) {
417 if (IsNullValue<T>(*value)) {
418 // match: move to next value to compare
419 sel.set_index(match_count++, idx);
420 pointers[idx] += type_size;
421 } else {
422 no_match.set_index(no_match_count++, idx);
423 }
424 } else {
425 if (Equals::Operation<T>(data[group_idx], *value)) {
426 sel.set_index(match_count++, idx);
427 pointers[idx] += type_size;
428 } else {
429 no_match.set_index(no_match_count++, idx);
430 }
431 }
432 }
433 } else {
434 for (idx_t i = 0; i < count; i++) {
435 auto idx = sel.get_index(i);
436 auto group_idx = gdata.sel->get_index(idx);
437 auto value = (T *)pointers[idx];
438
439 if (Equals::Operation<T>(data[group_idx], *value)) {
440 sel.set_index(match_count++, idx);
441 pointers[idx] += type_size;
442 } else {
443 no_match.set_index(no_match_count++, idx);
444 }
445 }
446 }
447 count = match_count;
448}
449
450static idx_t CompareGroups(DataChunk &groups, unique_ptr<VectorData[]> &group_data, Vector &addresses,
451 SelectionVector &sel, idx_t count, SelectionVector &no_match) {
452 idx_t no_match_count = 0;
453 for (idx_t group_idx = 0; group_idx < groups.column_count(); group_idx++) {
454 auto &data = groups.data[group_idx];
455 auto &gdata = group_data[group_idx];
456 auto type_size = GetTypeIdSize(data.type);
457 switch (data.type) {
458 case TypeId::BOOL:
459 case TypeId::INT8:
460 templated_compare_groups<int8_t>(gdata, addresses, sel, count, type_size, no_match, no_match_count);
461 break;
462 case TypeId::INT16:
463 templated_compare_groups<int16_t>(gdata, addresses, sel, count, type_size, no_match, no_match_count);
464 break;
465 case TypeId::INT32:
466 templated_compare_groups<int32_t>(gdata, addresses, sel, count, type_size, no_match, no_match_count);
467 break;
468 case TypeId::INT64:
469 templated_compare_groups<int64_t>(gdata, addresses, sel, count, type_size, no_match, no_match_count);
470 break;
471 case TypeId::FLOAT:
472 templated_compare_groups<float>(gdata, addresses, sel, count, type_size, no_match, no_match_count);
473 break;
474 case TypeId::DOUBLE:
475 templated_compare_groups<double>(gdata, addresses, sel, count, type_size, no_match, no_match_count);
476 break;
477 case TypeId::VARCHAR:
478 templated_compare_groups<string_t>(gdata, addresses, sel, count, type_size, no_match, no_match_count);
479 break;
480 default:
481 throw Exception("Unsupported type for group vector");
482 }
483 }
484 return no_match_count;
485}
486
487// this is to support distinct aggregations where we need to record whether we
488// have already seen a value for a group
489idx_t SuperLargeHashTable::FindOrCreateGroups(DataChunk &groups, Vector &addresses, SelectionVector &new_groups) {
490 // resize at 50% capacity, also need to fit the entire vector
491 if (entries > capacity / 2 || capacity - entries <= STANDARD_VECTOR_SIZE) {
492 Resize(capacity * 2);
493 }
494
495 // we need to be able to fit at least one vector of data
496 assert(capacity - entries > STANDARD_VECTOR_SIZE);
497 assert(addresses.type == TypeId::POINTER);
498
499 // hash the groups to get the addresses
500 HashGroups(groups, addresses);
501
502 addresses.Normalify(groups.size());
503 auto data_pointers = FlatVector::GetData<data_ptr_t>(addresses);
504
505 data_ptr_t group_pointers[STANDARD_VECTOR_SIZE];
506 Vector pointers(TypeId::POINTER, (data_ptr_t)group_pointers);
507
508 // set up the selection vectors
509 SelectionVector v1(STANDARD_VECTOR_SIZE);
510 SelectionVector v2(STANDARD_VECTOR_SIZE);
511 SelectionVector empty_vector(STANDARD_VECTOR_SIZE);
512
513 // we start out with all entries [0, 1, 2, ..., groups.size()]
514 const SelectionVector *sel_vector = &FlatVector::IncrementalSelectionVector;
515 SelectionVector *next_vector = &v1;
516 SelectionVector *no_match_vector = &v2;
517 idx_t remaining_entries = groups.size();
518
519 // orrify all the groups
520 auto group_data = unique_ptr<VectorData[]>(new VectorData[groups.column_count()]);
521 for (idx_t grp_idx = 0; grp_idx < groups.column_count(); grp_idx++) {
522 groups.data[grp_idx].Orrify(groups.size(), group_data[grp_idx]);
523 }
524
525 idx_t new_group_count = 0;
526 while (remaining_entries > 0) {
527 idx_t entry_count = 0;
528 idx_t empty_count = 0;
529
530 // first figure out for each remaining whether or not it belongs to a full or empty group
531 for (idx_t i = 0; i < remaining_entries; i++) {
532 idx_t index = sel_vector->get_index(i);
533 auto entry = data_pointers[index];
534 if (*entry == EMPTY_CELL) {
535 // cell is empty; mark the cell as filled
536 *entry = FULL_CELL;
537 empty_vector.set_index(empty_count++, index);
538 new_groups.set_index(new_group_count++, index);
539 // initialize the payload info for the column
540 memcpy(entry + FLAG_SIZE + group_width, empty_payload_data.get(), payload_width);
541 } else {
542 // cell is occupied: add to check list
543 next_vector->set_index(entry_count++, index);
544 }
545 group_pointers[index] = entry + FLAG_SIZE;
546 data_pointers[index] = entry + FLAG_SIZE + group_width;
547 }
548
549 if (empty_count > 0) {
550 // for each of the locations that are empty, serialize the group columns to the locations
551 ScatterGroups(groups, group_data, pointers, empty_vector, empty_count);
552 entries += empty_count;
553 }
554 // now we have only the tuples remaining that might match to an existing group
555 // start performing comparisons with each of the groups
556 idx_t no_match_count = CompareGroups(groups, group_data, pointers, *next_vector, entry_count, *no_match_vector);
557
558 // each of the entries that do not match we move them to the next entry in the HT
559 for (idx_t i = 0; i < no_match_count; i++) {
560 idx_t index = no_match_vector->get_index(i);
561 data_pointers[index] += payload_width;
562 assert(((uint64_t)(data_pointers[index] - data)) % tuple_size == 0);
563 if (data_pointers[index] >= endptr) {
564 data_pointers[index] = data;
565 }
566 }
567 sel_vector = no_match_vector;
568 std::swap(next_vector, no_match_vector);
569 remaining_entries = no_match_count;
570 }
571 return new_group_count;
572}
573
574void SuperLargeHashTable::FindOrCreateGroups(DataChunk &groups, Vector &addresses) {
575 // create a dummy new_groups sel vector
576 SelectionVector new_groups(STANDARD_VECTOR_SIZE);
577 FindOrCreateGroups(groups, addresses, new_groups);
578}
579
580idx_t SuperLargeHashTable::Scan(idx_t &scan_position, DataChunk &groups, DataChunk &result) {
581 data_ptr_t ptr;
582 data_ptr_t start = data + scan_position;
583 data_ptr_t end = data + capacity * tuple_size;
584 if (start >= end) {
585 return 0;
586 }
587
588 Vector addresses(TypeId::POINTER);
589 auto data_pointers = FlatVector::GetData<data_ptr_t>(addresses);
590
591 // scan the table for full cells starting from the scan position
592 idx_t entry = 0;
593 for (ptr = start; ptr < end && entry < STANDARD_VECTOR_SIZE; ptr += tuple_size) {
594 if (*ptr == FULL_CELL) {
595 // found entry
596 data_pointers[entry++] = ptr + FLAG_SIZE;
597 }
598 }
599 if (entry == 0) {
600 return 0;
601 }
602 groups.SetCardinality(entry);
603 result.SetCardinality(entry);
604 // fetch the group columns
605 for (idx_t i = 0; i < groups.column_count(); i++) {
606 auto &column = groups.data[i];
607 VectorOperations::Gather::Set(addresses, column, groups.size());
608 }
609
610 for (idx_t i = 0; i < aggregates.size(); i++) {
611 auto &target = result.data[i];
612 auto &aggr = aggregates[i];
613 aggr.function.finalize(addresses, target, groups.size());
614
615 VectorOperations::AddInPlace(addresses, aggr.payload_size, groups.size());
616 }
617 scan_position = ptr - data;
618 return entry;
619}
620