1#pragma once
2
3#include <mutex>
4#include <memory>
5#include <functional>
6
7#include <common/logger_useful.h>
8
9#include <common/StringRef.h>
10#include <Common/Arena.h>
11#include <Common/HashTable/FixedHashMap.h>
12#include <Common/HashTable/HashMap.h>
13#include <Common/HashTable/TwoLevelHashMap.h>
14#include <Common/HashTable/StringHashMap.h>
15#include <Common/HashTable/TwoLevelStringHashMap.h>
16
17#include <Common/ThreadPool.h>
18#include <Common/UInt128.h>
19#include <Common/LRUCache.h>
20#include <Common/ColumnsHashing.h>
21#include <Common/assert_cast.h>
22#include <Common/filesystemHelpers.h>
23
24#include <DataStreams/IBlockStream_fwd.h>
25#include <DataStreams/SizeLimits.h>
26
27#include <Interpreters/AggregateDescription.h>
28#include <Interpreters/AggregationCommon.h>
29
30#include <Columns/ColumnString.h>
31#include <Columns/ColumnFixedString.h>
32#include <Columns/ColumnAggregateFunction.h>
33#include <Columns/ColumnVector.h>
34#include <Columns/ColumnNullable.h>
35#include <Columns/ColumnLowCardinality.h>
36
37
38namespace DB
39{
40
41namespace ErrorCodes
42{
43 extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
44 extern const int NOT_ENOUGH_SPACE;
45}
46
47class IBlockOutputStream;
48
49
50/** Different data structures that can be used for aggregation
51 * For efficiency, the aggregation data itself is put into the pool.
52 * Data and pool ownership (states of aggregate functions)
53 * is acquired later - in `convertToBlocks` function, by the ColumnAggregateFunction object.
54 *
55 * Most data structures exist in two versions: normal and two-level (TwoLevel).
56 * A two-level hash table works a little slower with a small number of different keys,
57 * but with a large number of different keys scales better, because it allows
58 * parallelize some operations (merging, post-processing) in a natural way.
59 *
60 * To ensure efficient work over a wide range of conditions,
61 * first single-level hash tables are used,
62 * and when the number of different keys is large enough,
63 * they are converted to two-level ones.
64 *
65 * PS. There are many different approaches to the effective implementation of parallel and distributed aggregation,
66 * best suited for different cases, and this approach is just one of them, chosen for a combination of reasons.
67 */
68
69using AggregatedDataWithoutKey = AggregateDataPtr;
70
71using AggregatedDataWithUInt8Key = FixedHashMap<UInt8, AggregateDataPtr>;
72using AggregatedDataWithUInt16Key = FixedHashMap<UInt16, AggregateDataPtr>;
73
74using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
75
76using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
77
78using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>;
79
80using AggregatedDataWithKeys128 = HashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
81using AggregatedDataWithKeys256 = HashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
82
83using AggregatedDataWithUInt64KeyTwoLevel = TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
84
85using AggregatedDataWithShortStringKeyTwoLevel = TwoLevelStringHashMap<AggregateDataPtr>;
86
87using AggregatedDataWithStringKeyTwoLevel = TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr>;
88
89using AggregatedDataWithKeys128TwoLevel = TwoLevelHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
90using AggregatedDataWithKeys256TwoLevel = TwoLevelHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
91
92/** Variants with better hash function, using more than 32 bits for hash.
93 * Using for merging phase of external aggregation, where number of keys may be far greater than 4 billion,
94 * but we keep in memory and merge only sub-partition of them simultaneously.
95 * TODO We need to switch for better hash function not only for external aggregation,
96 * but also for huge aggregation results on machines with terabytes of RAM.
97 */
98
99using AggregatedDataWithUInt64KeyHash64 = HashMap<UInt64, AggregateDataPtr, DefaultHash<UInt64>>;
100using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, AggregateDataPtr, StringRefHash64>;
101using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
102using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
103
104template <typename Base>
105struct AggregationDataWithNullKey : public Base
106{
107 using Base::Base;
108
109 bool & hasNullKeyData() { return has_null_key_data; }
110 AggregateDataPtr & getNullKeyData() { return null_key_data; }
111 bool hasNullKeyData() const { return has_null_key_data; }
112 const AggregateDataPtr & getNullKeyData() const { return null_key_data; }
113 size_t size() const { return Base::size() + (has_null_key_data ? 1 : 0); }
114 bool empty() const { return Base::empty() && !has_null_key_data; }
115 void clear()
116 {
117 Base::clear();
118 has_null_key_data = false;
119 }
120 void clearAndShrink()
121 {
122 Base::clearAndShrink();
123 has_null_key_data = false;
124 }
125
126private:
127 bool has_null_key_data = false;
128 AggregateDataPtr null_key_data = nullptr;
129};
130
131template <typename Base>
132struct AggregationDataWithNullKeyTwoLevel : public Base
133{
134 using Base::impls;
135
136 AggregationDataWithNullKeyTwoLevel() {}
137
138 template <typename Other>
139 explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other)
140 {
141 impls[0].hasNullKeyData() = other.hasNullKeyData();
142 impls[0].getNullKeyData() = other.getNullKeyData();
143 }
144
145 bool & hasNullKeyData() { return impls[0].hasNullKeyData(); }
146 AggregateDataPtr & getNullKeyData() { return impls[0].getNullKeyData(); }
147 bool hasNullKeyData() const { return impls[0].hasNullKeyData(); }
148 const AggregateDataPtr & getNullKeyData() const { return impls[0].getNullKeyData(); }
149};
150
151template <typename ... Types>
152using HashTableWithNullKey = AggregationDataWithNullKey<HashMapTable<Types ...>>;
153template <typename ... Types>
154using StringHashTableWithNullKey = AggregationDataWithNullKey<StringHashMap<Types ...>>;
155
156using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>;
157using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
158
159using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>;
160using AggregatedDataWithNullableStringKey = AggregationDataWithNullKey<AggregatedDataWithStringKey>;
161
162using AggregatedDataWithNullableUInt64KeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
163 TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>,
164 TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
165
166using AggregatedDataWithNullableShortStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
167 TwoLevelStringHashMap<AggregateDataPtr, HashTableAllocator, StringHashTableWithNullKey>>;
168
169using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
170 TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>,
171 TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
172
173
174/// For the case where there is one numeric key.
175/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
176template <typename FieldType, typename TData,
177 bool consecutive_keys_optimization = true>
178struct AggregationMethodOneNumber
179{
180 using Data = TData;
181 using Key = typename Data::key_type;
182 using Mapped = typename Data::mapped_type;
183
184 Data data;
185
186 AggregationMethodOneNumber() {}
187
188 template <typename Other>
189 AggregationMethodOneNumber(const Other & other) : data(other.data) {}
190
191 /// To use one `Method` in different threads, use different `State`.
192 using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type,
193 Mapped, FieldType, consecutive_keys_optimization>;
194
195 /// Use optimization for low cardinality.
196 static const bool low_cardinality_optimization = false;
197
198 // Insert the key from the hash table into columns.
199 static void insertKeyIntoColumns(const Key & key, MutableColumns & key_columns, const Sizes & /*key_sizes*/)
200 {
201 auto key_holder = reinterpret_cast<const char *>(&key);
202 auto column = static_cast<ColumnVectorHelper *>(key_columns[0].get());
203 column->insertRawData<sizeof(FieldType)>(key_holder);
204 }
205};
206
207
208/// For the case where there is one string key.
209template <typename TData>
210struct AggregationMethodString
211{
212 using Data = TData;
213 using Key = typename Data::key_type;
214 using Mapped = typename Data::mapped_type;
215
216 Data data;
217
218 AggregationMethodString() {}
219
220 template <typename Other>
221 AggregationMethodString(const Other & other) : data(other.data) {}
222
223 using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped>;
224
225 static const bool low_cardinality_optimization = false;
226
227 static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
228 {
229 key_columns[0]->insertData(key.data, key.size);
230 }
231};
232
233
234/// Same as above but without cache
235template <typename TData>
236struct AggregationMethodStringNoCache
237{
238 using Data = TData;
239 using Key = typename Data::key_type;
240 using Mapped = typename Data::mapped_type;
241
242 Data data;
243
244 AggregationMethodStringNoCache() {}
245
246 template <typename Other>
247 AggregationMethodStringNoCache(const Other & other) : data(other.data) {}
248
249 using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, true, false>;
250
251 static const bool low_cardinality_optimization = false;
252
253 static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
254 {
255 key_columns[0]->insertData(key.data, key.size);
256 }
257};
258
259
260/// For the case where there is one fixed-length string key.
261template <typename TData>
262struct AggregationMethodFixedString
263{
264 using Data = TData;
265 using Key = typename Data::key_type;
266 using Mapped = typename Data::mapped_type;
267
268 Data data;
269
270 AggregationMethodFixedString() {}
271
272 template <typename Other>
273 AggregationMethodFixedString(const Other & other) : data(other.data) {}
274
275 using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped>;
276
277 static const bool low_cardinality_optimization = false;
278
279 static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
280 {
281 key_columns[0]->insertData(key.data, key.size);
282 }
283};
284
285/// Same as above but without cache
286template <typename TData>
287struct AggregationMethodFixedStringNoCache
288{
289 using Data = TData;
290 using Key = typename Data::key_type;
291 using Mapped = typename Data::mapped_type;
292
293 Data data;
294
295 AggregationMethodFixedStringNoCache() {}
296
297 template <typename Other>
298 AggregationMethodFixedStringNoCache(const Other & other) : data(other.data) {}
299
300 using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped, true, false>;
301
302 static const bool low_cardinality_optimization = false;
303
304 static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
305 {
306 key_columns[0]->insertData(key.data, key.size);
307 }
308};
309
310
311/// Single low cardinality column.
312template <typename SingleColumnMethod>
313struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
314{
315 using Base = SingleColumnMethod;
316 using BaseState = typename Base::State;
317
318 using Data = typename Base::Data;
319 using Key = typename Base::Key;
320 using Mapped = typename Base::Mapped;
321
322 using Base::data;
323
324 AggregationMethodSingleLowCardinalityColumn() = default;
325
326 template <typename Other>
327 explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
328
329 using State = ColumnsHashing::HashMethodSingleLowCardinalityColumn<BaseState, Mapped, true>;
330
331 static const bool low_cardinality_optimization = true;
332
333 static void insertKeyIntoColumns(const Key & key,
334 MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
335 {
336 auto col = assert_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get());
337
338 if constexpr (std::is_same_v<Key, StringRef>)
339 {
340 col->insertData(key.data, key.size);
341 }
342 else
343 {
344 col->insertData(reinterpret_cast<const char *>(&key), sizeof(key));
345 }
346 }
347};
348
349
350/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
351template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false>
352struct AggregationMethodKeysFixed
353{
354 using Data = TData;
355 using Key = typename Data::key_type;
356 using Mapped = typename Data::mapped_type;
357 static constexpr bool has_nullable_keys = has_nullable_keys_;
358 static constexpr bool has_low_cardinality = has_low_cardinality_;
359
360 Data data;
361
362 AggregationMethodKeysFixed() {}
363
364 template <typename Other>
365 AggregationMethodKeysFixed(const Other & other) : data(other.data) {}
366
367 using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped, has_nullable_keys, has_low_cardinality>;
368
369 static const bool low_cardinality_optimization = false;
370
371 static void insertKeyIntoColumns(const Key & key, MutableColumns & key_columns, const Sizes & key_sizes)
372 {
373 size_t keys_size = key_columns.size();
374
375 static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
376 /// In any hash key value, column values to be read start just after the bitmap, if it exists.
377 size_t pos = bitmap_size;
378
379 for (size_t i = 0; i < keys_size; ++i)
380 {
381 IColumn * observed_column;
382 ColumnUInt8 * null_map;
383
384 bool column_nullable = false;
385 if constexpr (has_nullable_keys)
386 column_nullable = isColumnNullable(*key_columns[i]);
387
388 /// If we have a nullable column, get its nested column and its null map.
389 if (column_nullable)
390 {
391 ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[i]);
392 observed_column = &nullable_col.getNestedColumn();
393 null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
394 }
395 else
396 {
397 observed_column = key_columns[i].get();
398 null_map = nullptr;
399 }
400
401 bool is_null = false;
402 if (column_nullable)
403 {
404 /// The current column is nullable. Check if the value of the
405 /// corresponding key is nullable. Update the null map accordingly.
406 size_t bucket = i / 8;
407 size_t offset = i % 8;
408 UInt8 val = (reinterpret_cast<const UInt8 *>(&key)[bucket] >> offset) & 1;
409 null_map->insertValue(val);
410 is_null = val == 1;
411 }
412
413 if (has_nullable_keys && is_null)
414 observed_column->insertDefault();
415 else
416 {
417 size_t size = key_sizes[i];
418 observed_column->insertData(reinterpret_cast<const char *>(&key) + pos, size);
419 pos += size;
420 }
421 }
422 }
423};
424
425
426/** Aggregates by concatenating serialized key values.
427 * The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts.
428 * That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
429 * Therefore, when aggregating by several strings, there is no ambiguity.
430 */
431template <typename TData>
432struct AggregationMethodSerialized
433{
434 using Data = TData;
435 using Key = typename Data::key_type;
436 using Mapped = typename Data::mapped_type;
437
438 Data data;
439
440 AggregationMethodSerialized() {}
441
442 template <typename Other>
443 AggregationMethodSerialized(const Other & other) : data(other.data) {}
444
445 using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped>;
446
447 static const bool low_cardinality_optimization = false;
448
449 static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &)
450 {
451 auto pos = key.data;
452 for (auto & column : key_columns)
453 pos = column->deserializeAndInsertFromArena(pos);
454 }
455};
456
457
458class Aggregator;
459
460using ColumnsHashing::HashMethodContext;
461using ColumnsHashing::HashMethodContextPtr;
462
463struct AggregatedDataVariants : private boost::noncopyable
464{
465 /** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
466 * - when aggregating, states are created in the pool using IAggregateFunction::create (inside - `placement new` of arbitrary structure);
467 * - they must then be destroyed using IAggregateFunction::destroy (inside - calling the destructor of arbitrary structure);
468 * - if aggregation is complete, then, in the Aggregator::convertToBlocks function, pointers to the states of aggregate functions
469 * are written to ColumnAggregateFunction; ColumnAggregateFunction "acquires ownership" of them, that is - calls `destroy` in its destructor.
470 * - if during the aggregation, before call to Aggregator::convertToBlocks, an exception was thrown,
471 * then the states of aggregate functions must still be destroyed,
472 * otherwise, for complex states (eg, AggregateFunctionUniq), there will be memory leaks;
473 * - in this case, to destroy states, the destructor calls Aggregator::destroyAggregateStates method,
474 * but only if the variable aggregator (see below) is not nullptr;
475 * - that is, until you transfer ownership of the aggregate function states in the ColumnAggregateFunction, set the variable `aggregator`,
476 * so that when an exception occurs, the states are correctly destroyed.
477 *
478 * PS. This can be corrected by making a pool that knows about which states of aggregate functions and in which order are put in it, and knows how to destroy them.
479 * But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
480 * In this case, the pool will not be able to know with what offsets objects are stored.
481 */
482 Aggregator * aggregator = nullptr;
483
484 size_t keys_size{}; /// Number of keys. NOTE do we need this field?
485 Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
486
487 /// Pools for states of aggregate functions. Ownership will be later transferred to ColumnAggregateFunction.
488 Arenas aggregates_pools;
489 Arena * aggregates_pool{}; /// The pool that is currently used for allocation.
490
491 /** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
492 */
493 AggregatedDataWithoutKey without_key = nullptr;
494
495 // Disable consecutive key optimization for Uint8/16, because they use a FixedHashMap
496 // and the lookup there is almost free, so we don't need to cache the last lookup result
497 std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>> key8;
498 std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>> key16;
499
500 std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32;
501 std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64;
502 std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>> key_string;
503 std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKey>> key_fixed_string;
504 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
505 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
506 std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
507
508 std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
509 std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
510 std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_string_two_level;
511 std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_fixed_string_two_level;
512 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
513 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
514 std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
515
516 std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64;
517 std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64;
518 std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>> key_fixed_string_hash64;
519 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>> keys128_hash64;
520 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>> keys256_hash64;
521 std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64;
522
523 /// Support for nullable keys.
524 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>> nullable_keys128;
525 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>> nullable_keys256;
526 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level;
527 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
528
529 /// Support for low cardinality.
530 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>> low_cardinality_key8;
531 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>> low_cardinality_key16;
532 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key32;
533 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key64;
534 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_string;
535 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_fixed_string;
536
537 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
538 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
539 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
540 std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
541
542 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128;
543 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256;
544 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>> low_cardinality_keys128_two_level;
545 std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>> low_cardinality_keys256_two_level;
546
547 /// In this and similar macros, the option without_key is not considered.
548 #define APPLY_FOR_AGGREGATED_VARIANTS(M) \
549 M(key8, false) \
550 M(key16, false) \
551 M(key32, false) \
552 M(key64, false) \
553 M(key_string, false) \
554 M(key_fixed_string, false) \
555 M(keys128, false) \
556 M(keys256, false) \
557 M(serialized, false) \
558 M(key32_two_level, true) \
559 M(key64_two_level, true) \
560 M(key_string_two_level, true) \
561 M(key_fixed_string_two_level, true) \
562 M(keys128_two_level, true) \
563 M(keys256_two_level, true) \
564 M(serialized_two_level, true) \
565 M(key64_hash64, false) \
566 M(key_string_hash64, false) \
567 M(key_fixed_string_hash64, false) \
568 M(keys128_hash64, false) \
569 M(keys256_hash64, false) \
570 M(serialized_hash64, false) \
571 M(nullable_keys128, false) \
572 M(nullable_keys256, false) \
573 M(nullable_keys128_two_level, true) \
574 M(nullable_keys256_two_level, true) \
575 M(low_cardinality_key8, false) \
576 M(low_cardinality_key16, false) \
577 M(low_cardinality_key32, false) \
578 M(low_cardinality_key64, false) \
579 M(low_cardinality_keys128, false) \
580 M(low_cardinality_keys256, false) \
581 M(low_cardinality_key_string, false) \
582 M(low_cardinality_key_fixed_string, false) \
583 M(low_cardinality_key32_two_level, true) \
584 M(low_cardinality_key64_two_level, true) \
585 M(low_cardinality_keys128_two_level, true) \
586 M(low_cardinality_keys256_two_level, true) \
587 M(low_cardinality_key_string_two_level, true) \
588 M(low_cardinality_key_fixed_string_two_level, true) \
589
590 enum class Type
591 {
592 EMPTY = 0,
593 without_key,
594
595 #define M(NAME, IS_TWO_LEVEL) NAME,
596 APPLY_FOR_AGGREGATED_VARIANTS(M)
597 #undef M
598 };
599 Type type = Type::EMPTY;
600
601 AggregatedDataVariants() : aggregates_pools(1, std::make_shared<Arena>()), aggregates_pool(aggregates_pools.back().get()) {}
602 bool empty() const { return type == Type::EMPTY; }
603 void invalidate() { type = Type::EMPTY; }
604
605 ~AggregatedDataVariants();
606
607 void init(Type type_)
608 {
609 switch (type_)
610 {
611 case Type::EMPTY: break;
612 case Type::without_key: break;
613
614 #define M(NAME, IS_TWO_LEVEL) \
615 case Type::NAME: NAME = std::make_unique<decltype(NAME)::element_type>(); break;
616 APPLY_FOR_AGGREGATED_VARIANTS(M)
617 #undef M
618 }
619
620 type = type_;
621 }
622
623 /// Number of rows (different keys).
624 size_t size() const
625 {
626 switch (type)
627 {
628 case Type::EMPTY: return 0;
629 case Type::without_key: return 1;
630
631 #define M(NAME, IS_TWO_LEVEL) \
632 case Type::NAME: return NAME->data.size() + (without_key != nullptr);
633 APPLY_FOR_AGGREGATED_VARIANTS(M)
634 #undef M
635 }
636
637 __builtin_unreachable();
638 }
639
640 /// The size without taking into account the row in which data is written for the calculation of TOTALS.
641 size_t sizeWithoutOverflowRow() const
642 {
643 switch (type)
644 {
645 case Type::EMPTY: return 0;
646 case Type::without_key: return 1;
647
648 #define M(NAME, IS_TWO_LEVEL) \
649 case Type::NAME: return NAME->data.size();
650 APPLY_FOR_AGGREGATED_VARIANTS(M)
651 #undef M
652 }
653
654 __builtin_unreachable();
655 }
656
657 const char * getMethodName() const
658 {
659 switch (type)
660 {
661 case Type::EMPTY: return "EMPTY";
662 case Type::without_key: return "without_key";
663
664 #define M(NAME, IS_TWO_LEVEL) \
665 case Type::NAME: return #NAME;
666 APPLY_FOR_AGGREGATED_VARIANTS(M)
667 #undef M
668 }
669
670 __builtin_unreachable();
671 }
672
673 bool isTwoLevel() const
674 {
675 switch (type)
676 {
677 case Type::EMPTY: return false;
678 case Type::without_key: return false;
679
680 #define M(NAME, IS_TWO_LEVEL) \
681 case Type::NAME: return IS_TWO_LEVEL;
682 APPLY_FOR_AGGREGATED_VARIANTS(M)
683 #undef M
684 }
685
686 __builtin_unreachable();
687 }
688
689 #define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
690 M(key32) \
691 M(key64) \
692 M(key_string) \
693 M(key_fixed_string) \
694 M(keys128) \
695 M(keys256) \
696 M(serialized) \
697 M(nullable_keys128) \
698 M(nullable_keys256) \
699 M(low_cardinality_key32) \
700 M(low_cardinality_key64) \
701 M(low_cardinality_keys128) \
702 M(low_cardinality_keys256) \
703 M(low_cardinality_key_string) \
704 M(low_cardinality_key_fixed_string) \
705
706 #define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
707 M(key8) \
708 M(key16) \
709 M(key64_hash64) \
710 M(key_string_hash64)\
711 M(key_fixed_string_hash64) \
712 M(keys128_hash64) \
713 M(keys256_hash64) \
714 M(serialized_hash64) \
715 M(low_cardinality_key8) \
716 M(low_cardinality_key16) \
717
718 #define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
719 APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
720 APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
721
722 bool isConvertibleToTwoLevel() const
723 {
724 switch (type)
725 {
726 #define M(NAME) \
727 case Type::NAME: return true;
728
729 APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
730
731 #undef M
732 default:
733 return false;
734 }
735 }
736
737 void convertToTwoLevel();
738
739 #define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
740 M(key32_two_level) \
741 M(key64_two_level) \
742 M(key_string_two_level) \
743 M(key_fixed_string_two_level) \
744 M(keys128_two_level) \
745 M(keys256_two_level) \
746 M(serialized_two_level) \
747 M(nullable_keys128_two_level) \
748 M(nullable_keys256_two_level) \
749 M(low_cardinality_key32_two_level) \
750 M(low_cardinality_key64_two_level) \
751 M(low_cardinality_keys128_two_level) \
752 M(low_cardinality_keys256_two_level) \
753 M(low_cardinality_key_string_two_level) \
754 M(low_cardinality_key_fixed_string_two_level) \
755
756 #define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \
757 M(low_cardinality_key8) \
758 M(low_cardinality_key16) \
759 M(low_cardinality_key32) \
760 M(low_cardinality_key64) \
761 M(low_cardinality_keys128) \
762 M(low_cardinality_keys256) \
763 M(low_cardinality_key_string) \
764 M(low_cardinality_key_fixed_string) \
765 M(low_cardinality_key32_two_level) \
766 M(low_cardinality_key64_two_level) \
767 M(low_cardinality_keys128_two_level) \
768 M(low_cardinality_keys256_two_level) \
769 M(low_cardinality_key_string_two_level) \
770 M(low_cardinality_key_fixed_string_two_level) \
771
772 bool isLowCardinality()
773 {
774 switch (type)
775 {
776 #define M(NAME) \
777 case Type::NAME: return true;
778
779 APPLY_FOR_LOW_CARDINALITY_VARIANTS(M)
780 #undef M
781 default:
782 return false;
783 }
784 }
785
786 static HashMethodContextPtr createCache(Type type, const HashMethodContext::Settings & settings)
787 {
788 switch (type)
789 {
790 case Type::without_key: return nullptr;
791
792 #define M(NAME, IS_TWO_LEVEL) \
793 case Type::NAME: \
794 { \
795 using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \
796 using T ## NAME = typename TPtr ## NAME ::element_type; \
797 return T ## NAME ::State::createContext(settings); \
798 }
799
800 APPLY_FOR_AGGREGATED_VARIANTS(M)
801 #undef M
802
803 default:
804 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
805 }
806 }
807};
808
809using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
810using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
811using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
812
813/** How are "total" values calculated with WITH TOTALS?
814 * (For more details, see TotalsHavingBlockInputStream.)
815 *
816 * In the absence of group_by_overflow_mode = 'any', the data is aggregated as usual, but the states of the aggregate functions are not finalized.
817 * Later, the aggregate function states for all rows (passed through HAVING) are merged into one - this will be TOTALS.
818 *
819 * If there is group_by_overflow_mode = 'any', the data is aggregated as usual, except for the keys that did not fit in max_rows_to_group_by.
820 * For these keys, the data is aggregated into one additional row - see below under the names `overflow_row`, `overflows`...
821 * Later, the aggregate function states for all rows (passed through HAVING) are merged into one,
822 * also overflow_row is added or not added (depending on the totals_mode setting) also - this will be TOTALS.
823 */
824
825
826/** Aggregates the source of the blocks.
827 */
828class Aggregator
829{
830public:
831 struct Params
832 {
833 /// Data structure of source blocks.
834 Block src_header;
835 /// Data structure of intermediate blocks before merge.
836 Block intermediate_header;
837
838 /// What to count.
839 const ColumnNumbers keys;
840 const AggregateDescriptions aggregates;
841 const size_t keys_size;
842 const size_t aggregates_size;
843
844 /// The settings of approximate calculation of GROUP BY.
845 const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
846 const size_t max_rows_to_group_by;
847 const OverflowMode group_by_overflow_mode;
848
849 /// Two-level aggregation settings (used for a large number of keys).
850 /** With how many keys or the size of the aggregation state in bytes,
851 * two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
852 * 0 - the corresponding threshold is not specified.
853 */
854 const size_t group_by_two_level_threshold;
855 const size_t group_by_two_level_threshold_bytes;
856
857 /// Settings to flush temporary data to the filesystem (external aggregation).
858 const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
859
860 /// Return empty result when aggregating without keys on empty set.
861 bool empty_result_for_aggregation_by_empty_set;
862
863 const std::string tmp_path;
864
865 /// Settings is used to determine cache size. No threads are created.
866 size_t max_threads;
867
868 const size_t min_free_disk_space;
869 Params(
870 const Block & src_header_,
871 const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
872 bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
873 size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
874 size_t max_bytes_before_external_group_by_,
875 bool empty_result_for_aggregation_by_empty_set_,
876 const std::string & tmp_path_, size_t max_threads_,
877 size_t min_free_disk_space_)
878 : src_header(src_header_),
879 keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
880 overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
881 group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
882 max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
883 empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
884 tmp_path(tmp_path_), max_threads(max_threads_),
885 min_free_disk_space(min_free_disk_space_)
886 {
887 }
888
889 /// Only parameters that matter during merge.
890 Params(const Block & intermediate_header_,
891 const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
892 : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0)
893 {
894 intermediate_header = intermediate_header_;
895 }
896 };
897
898 Aggregator(const Params & params_);
899
900 /// Aggregate the source. Get the result in the form of one of the data structures.
901 void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result);
902
903 using AggregateColumns = std::vector<ColumnRawPtrs>;
904 using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>;
905 using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
906 using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;
907
908 /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
909 bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
910 ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
911 bool & no_more_keys);
912
913 bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
914 ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
915 bool & no_more_keys);
916
917 /** Convert the aggregation data structure into a block.
918 * If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
919 *
920 * If final = false, then ColumnAggregateFunction is created as the aggregation columns with the state of the calculations,
921 * which can then be combined with other states (for distributed query processing).
922 * If final = true, then columns with ready values are created as aggregate columns.
923 */
924 BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
925
926 /** Merge several aggregation data structures and output the result as a block stream.
927 */
928 std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
929 ManyAggregatedDataVariants prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const;
930
931 /** Merge the stream of partially aggregated blocks into one data structure.
932 * (Pre-aggregate several blocks that represent the result of independent aggregations from remote servers.)
933 */
934 void mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads);
935
936 using BucketToBlocks = std::map<Int32, BlocksList>;
937 /// Merge partially aggregated blocks separated to buckets into one data structure.
938 void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);
939
940 /// Merge several partially aggregated blocks into one.
941 /// Precondition: for all blocks block.info.is_overflows flag must be the same.
942 /// (either all blocks are from overflow data or none blocks are).
943 /// The resulting block has the same value of is_overflows flag.
944 Block mergeBlocks(BlocksList & blocks, bool final);
945
946 /** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
947 * This is needed to simplify merging of that data with other results, that are already two-level.
948 */
949 std::vector<Block> convertBlockToTwoLevel(const Block & block);
950
951 using CancellationHook = std::function<bool()>;
952
953 /** Set a function that checks whether the current task can be aborted.
954 */
955 void setCancellationHook(const CancellationHook cancellation_hook);
956
957 /// For external aggregation.
958 void writeToTemporaryFile(AggregatedDataVariants & data_variants);
959
960 bool hasTemporaryFiles() const { return !temporary_files.empty(); }
961
962 struct TemporaryFiles
963 {
964 std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
965 size_t sum_size_uncompressed = 0;
966 size_t sum_size_compressed = 0;
967 mutable std::mutex mutex;
968
969 bool empty() const
970 {
971 std::lock_guard lock(mutex);
972 return files.empty();
973 }
974 };
975
976 const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
977
978 /// Get data structure of the result.
979 Block getHeader(bool final) const;
980
981protected:
982 friend struct AggregatedDataVariants;
983 friend class MergingAndConvertingBlockInputStream;
984 friend class ConvertingAggregatedToChunksTransform;
985 friend class ConvertingAggregatedToChunksSource;
986
987 Params params;
988
989 AggregatedDataVariants::Type method_chosen;
990 Sizes key_sizes;
991
992 HashMethodContextPtr aggregation_state_cache;
993
994 AggregateFunctionsPlainPtrs aggregate_functions;
995
996 /** This array serves two purposes.
997 *
998 * 1. Function arguments are collected side by side, and they do not need to be collected from different places. Also the array is made zero-terminated.
999 * The inner loop (for the case without_key) is almost twice as compact; performance gain of about 30%.
1000 *
1001 * 2. Calling a function by pointer is better than a virtual call, because in the case of a virtual call,
1002 * GCC 5.1.2 generates code that, at each iteration of the loop, reloads the function address from memory into the register
1003 * (the offset value in the virtual function table).
1004 */
1005 struct AggregateFunctionInstruction
1006 {
1007 const IAggregateFunction * that;
1008 IAggregateFunction::AddFunc func;
1009 size_t state_offset;
1010 const IColumn ** arguments;
1011 const IAggregateFunction * batch_that;
1012 const IColumn ** batch_arguments;
1013 const UInt64 * offsets = nullptr;
1014 };
1015
1016 using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;
1017
1018 Sizes offsets_of_aggregate_states; /// The offset to the n-th aggregate function in a row of aggregate functions.
1019 size_t total_size_of_aggregate_states = 0; /// The total size of the row from the aggregate functions.
1020
1021 // add info to track alignment requirement
1022 // If there are states whose alignmentment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
1023 size_t align_aggregate_states = 1;
1024
1025 bool all_aggregates_has_trivial_destructor = false;
1026
1027 /// How many RAM were used to process the query before processing the first block.
1028 Int64 memory_usage_before_aggregation = 0;
1029
1030 std::mutex mutex;
1031
1032 Logger * log = &Logger::get("Aggregator");
1033
1034 /// Returns true if you can abort the current task.
1035 CancellationHook isCancelled;
1036
1037 /// For external aggregation.
1038 TemporaryFiles temporary_files;
1039
1040 /** Select the aggregation method based on the number and types of keys. */
1041 AggregatedDataVariants::Type chooseAggregationMethod();
1042
1043 /** Create states of aggregate functions for one key.
1044 */
1045 void createAggregateStates(AggregateDataPtr & aggregate_data) const;
1046
1047 /** Call `destroy` methods for states of aggregate functions.
1048 * Used in the exception handler for aggregation, since RAII in this case is not applicable.
1049 */
1050 void destroyAllAggregateStates(AggregatedDataVariants & result);
1051
1052
1053 /// Process one data block, aggregate the data into a hash table.
1054 template <typename Method>
1055 void executeImpl(
1056 Method & method,
1057 Arena * aggregates_pool,
1058 size_t rows,
1059 ColumnRawPtrs & key_columns,
1060 AggregateFunctionInstruction * aggregate_instructions,
1061 bool no_more_keys,
1062 AggregateDataPtr overflow_row) const;
1063
1064 /// Specialization for a particular value no_more_keys.
1065 template <bool no_more_keys, typename Method>
1066 void executeImplCase(
1067 Method & method,
1068 typename Method::State & state,
1069 Arena * aggregates_pool,
1070 size_t rows,
1071 AggregateFunctionInstruction * aggregate_instructions,
1072 AggregateDataPtr overflow_row) const;
1073
1074 template <typename Method>
1075 void executeImplBatch(
1076 Method & method,
1077 typename Method::State & state,
1078 Arena * aggregates_pool,
1079 size_t rows,
1080 AggregateFunctionInstruction * aggregate_instructions) const;
1081
1082 /// For case when there are no keys (all aggregate into one row).
1083 void executeWithoutKeyImpl(
1084 AggregatedDataWithoutKey & res,
1085 size_t rows,
1086 AggregateFunctionInstruction * aggregate_instructions,
1087 Arena * arena) const;
1088
1089 template <typename Method>
1090 void writeToTemporaryFileImpl(
1091 AggregatedDataVariants & data_variants,
1092 Method & method,
1093 IBlockOutputStream & out);
1094
1095protected:
1096 /// Merge NULL key data from hash table `src` into `dst`.
1097 template <typename Method, typename Table>
1098 void mergeDataNullKey(
1099 Table & table_dst,
1100 Table & table_src,
1101 Arena * arena) const;
1102
1103 /// Merge data from hash table `src` into `dst`.
1104 template <typename Method, typename Table>
1105 void mergeDataImpl(
1106 Table & table_dst,
1107 Table & table_src,
1108 Arena * arena) const;
1109
1110 /// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`.
1111 template <typename Method, typename Table>
1112 void mergeDataNoMoreKeysImpl(
1113 Table & table_dst,
1114 AggregatedDataWithoutKey & overflows,
1115 Table & table_src,
1116 Arena * arena) const;
1117
1118 /// Same, but ignores the rest of the keys.
1119 template <typename Method, typename Table>
1120 void mergeDataOnlyExistingKeysImpl(
1121 Table & table_dst,
1122 Table & table_src,
1123 Arena * arena) const;
1124
1125 void mergeWithoutKeyDataImpl(
1126 ManyAggregatedDataVariants & non_empty_data) const;
1127
1128 template <typename Method>
1129 void mergeSingleLevelDataImpl(
1130 ManyAggregatedDataVariants & non_empty_data) const;
1131
1132 template <typename Method, typename Table>
1133 void convertToBlockImpl(
1134 Method & method,
1135 Table & data,
1136 MutableColumns & key_columns,
1137 AggregateColumnsData & aggregate_columns,
1138 MutableColumns & final_aggregate_columns,
1139 bool final) const;
1140
1141 template <typename Method, typename Table>
1142 void convertToBlockImplFinal(
1143 Method & method,
1144 Table & data,
1145 MutableColumns & key_columns,
1146 MutableColumns & final_aggregate_columns) const;
1147
1148 template <typename Method, typename Table>
1149 void convertToBlockImplNotFinal(
1150 Method & method,
1151 Table & data,
1152 MutableColumns & key_columns,
1153 AggregateColumnsData & aggregate_columns) const;
1154
1155 template <typename Filler>
1156 Block prepareBlockAndFill(
1157 AggregatedDataVariants & data_variants,
1158 bool final,
1159 size_t rows,
1160 Filler && filler) const;
1161
1162 template <typename Method>
1163 Block convertOneBucketToBlock(
1164 AggregatedDataVariants & data_variants,
1165 Method & method,
1166 bool final,
1167 size_t bucket) const;
1168
1169 Block mergeAndConvertOneBucketToBlock(
1170 ManyAggregatedDataVariants & variants,
1171 Arena * arena,
1172 bool final,
1173 size_t bucket) const;
1174
1175 Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
1176 Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
1177 BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
1178
1179 template <typename Method>
1180 BlocksList prepareBlocksAndFillTwoLevelImpl(
1181 AggregatedDataVariants & data_variants,
1182 Method & method,
1183 bool final,
1184 ThreadPool * thread_pool) const;
1185
1186 template <bool no_more_keys, typename Method, typename Table>
1187 void mergeStreamsImplCase(
1188 Block & block,
1189 Arena * aggregates_pool,
1190 Method & method,
1191 Table & data,
1192 AggregateDataPtr overflow_row) const;
1193
1194 template <typename Method, typename Table>
1195 void mergeStreamsImpl(
1196 Block & block,
1197 Arena * aggregates_pool,
1198 Method & method,
1199 Table & data,
1200 AggregateDataPtr overflow_row,
1201 bool no_more_keys) const;
1202
1203 void mergeWithoutKeyStreamsImpl(
1204 Block & block,
1205 AggregatedDataVariants & result) const;
1206
1207 template <typename Method>
1208 void mergeBucketImpl(
1209 ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const;
1210
1211 template <typename Method>
1212 void convertBlockToTwoLevelImpl(
1213 Method & method,
1214 Arena * pool,
1215 ColumnRawPtrs & key_columns,
1216 const Block & source,
1217 std::vector<Block> & destinations) const;
1218
1219 template <typename Method, typename Table>
1220 void destroyImpl(Table & table) const;
1221
1222 void destroyWithoutKey(
1223 AggregatedDataVariants & result) const;
1224
1225
1226 /** Checks constraints on the maximum number of keys for aggregation.
1227 * If it is exceeded, then, depending on the group_by_overflow_mode, either
1228 * - throws an exception;
1229 * - returns false, which means that execution must be aborted;
1230 * - sets the variable no_more_keys to true.
1231 */
1232 bool checkLimits(size_t result_size, bool & no_more_keys) const;
1233};
1234
1235
1236/** Get the aggregation variant by its type. */
1237template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
1238
1239#define M(NAME, IS_TWO_LEVEL) \
1240 template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; }
1241
1242APPLY_FOR_AGGREGATED_VARIANTS(M)
1243
1244#undef M
1245
1246}
1247