1 | #pragma once |
2 | |
3 | #include <mutex> |
4 | #include <memory> |
5 | #include <functional> |
6 | |
7 | #include <common/logger_useful.h> |
8 | |
9 | #include <common/StringRef.h> |
10 | #include <Common/Arena.h> |
11 | #include <Common/HashTable/FixedHashMap.h> |
12 | #include <Common/HashTable/HashMap.h> |
13 | #include <Common/HashTable/TwoLevelHashMap.h> |
14 | #include <Common/HashTable/StringHashMap.h> |
15 | #include <Common/HashTable/TwoLevelStringHashMap.h> |
16 | |
17 | #include <Common/ThreadPool.h> |
18 | #include <Common/UInt128.h> |
19 | #include <Common/LRUCache.h> |
20 | #include <Common/ColumnsHashing.h> |
21 | #include <Common/assert_cast.h> |
22 | #include <Common/filesystemHelpers.h> |
23 | |
24 | #include <DataStreams/IBlockStream_fwd.h> |
25 | #include <DataStreams/SizeLimits.h> |
26 | |
27 | #include <Interpreters/AggregateDescription.h> |
28 | #include <Interpreters/AggregationCommon.h> |
29 | |
30 | #include <Columns/ColumnString.h> |
31 | #include <Columns/ColumnFixedString.h> |
32 | #include <Columns/ColumnAggregateFunction.h> |
33 | #include <Columns/ColumnVector.h> |
34 | #include <Columns/ColumnNullable.h> |
35 | #include <Columns/ColumnLowCardinality.h> |
36 | |
37 | |
38 | namespace DB |
39 | { |
40 | |
41 | namespace ErrorCodes |
42 | { |
43 | extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; |
44 | extern const int NOT_ENOUGH_SPACE; |
45 | } |
46 | |
47 | class IBlockOutputStream; |
48 | |
49 | |
50 | /** Different data structures that can be used for aggregation |
51 | * For efficiency, the aggregation data itself is put into the pool. |
52 | * Data and pool ownership (states of aggregate functions) |
53 | * is acquired later - in `convertToBlocks` function, by the ColumnAggregateFunction object. |
54 | * |
55 | * Most data structures exist in two versions: normal and two-level (TwoLevel). |
56 | * A two-level hash table works a little slower with a small number of different keys, |
57 | * but with a large number of different keys scales better, because it allows |
58 | * parallelize some operations (merging, post-processing) in a natural way. |
59 | * |
60 | * To ensure efficient work over a wide range of conditions, |
61 | * first single-level hash tables are used, |
62 | * and when the number of different keys is large enough, |
63 | * they are converted to two-level ones. |
64 | * |
65 | * PS. There are many different approaches to the effective implementation of parallel and distributed aggregation, |
66 | * best suited for different cases, and this approach is just one of them, chosen for a combination of reasons. |
67 | */ |
68 | |
69 | using AggregatedDataWithoutKey = AggregateDataPtr; |
70 | |
71 | using AggregatedDataWithUInt8Key = FixedHashMap<UInt8, AggregateDataPtr>; |
72 | using AggregatedDataWithUInt16Key = FixedHashMap<UInt16, AggregateDataPtr>; |
73 | |
74 | using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>; |
75 | |
76 | using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>; |
77 | |
78 | using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>; |
79 | |
80 | using AggregatedDataWithKeys128 = HashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>; |
81 | using AggregatedDataWithKeys256 = HashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>; |
82 | |
83 | using AggregatedDataWithUInt64KeyTwoLevel = TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>; |
84 | |
85 | using AggregatedDataWithShortStringKeyTwoLevel = TwoLevelStringHashMap<AggregateDataPtr>; |
86 | |
87 | using AggregatedDataWithStringKeyTwoLevel = TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr>; |
88 | |
89 | using AggregatedDataWithKeys128TwoLevel = TwoLevelHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>; |
90 | using AggregatedDataWithKeys256TwoLevel = TwoLevelHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>; |
91 | |
92 | /** Variants with better hash function, using more than 32 bits for hash. |
93 | * Using for merging phase of external aggregation, where number of keys may be far greater than 4 billion, |
94 | * but we keep in memory and merge only sub-partition of them simultaneously. |
95 | * TODO We need to switch for better hash function not only for external aggregation, |
96 | * but also for huge aggregation results on machines with terabytes of RAM. |
97 | */ |
98 | |
99 | using AggregatedDataWithUInt64KeyHash64 = HashMap<UInt64, AggregateDataPtr, DefaultHash<UInt64>>; |
100 | using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, AggregateDataPtr, StringRefHash64>; |
101 | using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>; |
102 | using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>; |
103 | |
104 | template <typename Base> |
105 | struct AggregationDataWithNullKey : public Base |
106 | { |
107 | using Base::Base; |
108 | |
109 | bool & hasNullKeyData() { return has_null_key_data; } |
110 | AggregateDataPtr & getNullKeyData() { return null_key_data; } |
111 | bool hasNullKeyData() const { return has_null_key_data; } |
112 | const AggregateDataPtr & getNullKeyData() const { return null_key_data; } |
113 | size_t size() const { return Base::size() + (has_null_key_data ? 1 : 0); } |
114 | bool empty() const { return Base::empty() && !has_null_key_data; } |
115 | void clear() |
116 | { |
117 | Base::clear(); |
118 | has_null_key_data = false; |
119 | } |
120 | void clearAndShrink() |
121 | { |
122 | Base::clearAndShrink(); |
123 | has_null_key_data = false; |
124 | } |
125 | |
126 | private: |
127 | bool has_null_key_data = false; |
128 | AggregateDataPtr null_key_data = nullptr; |
129 | }; |
130 | |
131 | template <typename Base> |
132 | struct AggregationDataWithNullKeyTwoLevel : public Base |
133 | { |
134 | using Base::impls; |
135 | |
136 | AggregationDataWithNullKeyTwoLevel() {} |
137 | |
138 | template <typename Other> |
139 | explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other) |
140 | { |
141 | impls[0].hasNullKeyData() = other.hasNullKeyData(); |
142 | impls[0].getNullKeyData() = other.getNullKeyData(); |
143 | } |
144 | |
145 | bool & hasNullKeyData() { return impls[0].hasNullKeyData(); } |
146 | AggregateDataPtr & getNullKeyData() { return impls[0].getNullKeyData(); } |
147 | bool hasNullKeyData() const { return impls[0].hasNullKeyData(); } |
148 | const AggregateDataPtr & getNullKeyData() const { return impls[0].getNullKeyData(); } |
149 | }; |
150 | |
151 | template <typename ... Types> |
152 | using HashTableWithNullKey = AggregationDataWithNullKey<HashMapTable<Types ...>>; |
153 | template <typename ... Types> |
154 | using StringHashTableWithNullKey = AggregationDataWithNullKey<StringHashMap<Types ...>>; |
155 | |
156 | using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>; |
157 | using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>; |
158 | |
159 | using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>; |
160 | using AggregatedDataWithNullableStringKey = AggregationDataWithNullKey<AggregatedDataWithStringKey>; |
161 | |
162 | using AggregatedDataWithNullableUInt64KeyTwoLevel = AggregationDataWithNullKeyTwoLevel< |
163 | TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>, |
164 | TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>; |
165 | |
166 | using AggregatedDataWithNullableShortStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel< |
167 | TwoLevelStringHashMap<AggregateDataPtr, HashTableAllocator, StringHashTableWithNullKey>>; |
168 | |
169 | using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel< |
170 | TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>, |
171 | TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>; |
172 | |
173 | |
174 | /// For the case where there is one numeric key. |
175 | /// FieldType is UInt8/16/32/64 for any type with corresponding bit width. |
176 | template <typename FieldType, typename TData, |
177 | bool consecutive_keys_optimization = true> |
178 | struct AggregationMethodOneNumber |
179 | { |
180 | using Data = TData; |
181 | using Key = typename Data::key_type; |
182 | using Mapped = typename Data::mapped_type; |
183 | |
184 | Data data; |
185 | |
186 | AggregationMethodOneNumber() {} |
187 | |
188 | template <typename Other> |
189 | AggregationMethodOneNumber(const Other & other) : data(other.data) {} |
190 | |
191 | /// To use one `Method` in different threads, use different `State`. |
192 | using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, |
193 | Mapped, FieldType, consecutive_keys_optimization>; |
194 | |
195 | /// Use optimization for low cardinality. |
196 | static const bool low_cardinality_optimization = false; |
197 | |
198 | // Insert the key from the hash table into columns. |
199 | static void insertKeyIntoColumns(const Key & key, MutableColumns & key_columns, const Sizes & /*key_sizes*/) |
200 | { |
201 | auto key_holder = reinterpret_cast<const char *>(&key); |
202 | auto column = static_cast<ColumnVectorHelper *>(key_columns[0].get()); |
203 | column->insertRawData<sizeof(FieldType)>(key_holder); |
204 | } |
205 | }; |
206 | |
207 | |
208 | /// For the case where there is one string key. |
209 | template <typename TData> |
210 | struct AggregationMethodString |
211 | { |
212 | using Data = TData; |
213 | using Key = typename Data::key_type; |
214 | using Mapped = typename Data::mapped_type; |
215 | |
216 | Data data; |
217 | |
218 | AggregationMethodString() {} |
219 | |
220 | template <typename Other> |
221 | AggregationMethodString(const Other & other) : data(other.data) {} |
222 | |
223 | using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped>; |
224 | |
225 | static const bool low_cardinality_optimization = false; |
226 | |
227 | static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &) |
228 | { |
229 | key_columns[0]->insertData(key.data, key.size); |
230 | } |
231 | }; |
232 | |
233 | |
234 | /// Same as above but without cache |
235 | template <typename TData> |
236 | struct AggregationMethodStringNoCache |
237 | { |
238 | using Data = TData; |
239 | using Key = typename Data::key_type; |
240 | using Mapped = typename Data::mapped_type; |
241 | |
242 | Data data; |
243 | |
244 | AggregationMethodStringNoCache() {} |
245 | |
246 | template <typename Other> |
247 | AggregationMethodStringNoCache(const Other & other) : data(other.data) {} |
248 | |
249 | using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, true, false>; |
250 | |
251 | static const bool low_cardinality_optimization = false; |
252 | |
253 | static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &) |
254 | { |
255 | key_columns[0]->insertData(key.data, key.size); |
256 | } |
257 | }; |
258 | |
259 | |
260 | /// For the case where there is one fixed-length string key. |
261 | template <typename TData> |
262 | struct AggregationMethodFixedString |
263 | { |
264 | using Data = TData; |
265 | using Key = typename Data::key_type; |
266 | using Mapped = typename Data::mapped_type; |
267 | |
268 | Data data; |
269 | |
270 | AggregationMethodFixedString() {} |
271 | |
272 | template <typename Other> |
273 | AggregationMethodFixedString(const Other & other) : data(other.data) {} |
274 | |
275 | using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped>; |
276 | |
277 | static const bool low_cardinality_optimization = false; |
278 | |
279 | static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &) |
280 | { |
281 | key_columns[0]->insertData(key.data, key.size); |
282 | } |
283 | }; |
284 | |
285 | /// Same as above but without cache |
286 | template <typename TData> |
287 | struct AggregationMethodFixedStringNoCache |
288 | { |
289 | using Data = TData; |
290 | using Key = typename Data::key_type; |
291 | using Mapped = typename Data::mapped_type; |
292 | |
293 | Data data; |
294 | |
295 | AggregationMethodFixedStringNoCache() {} |
296 | |
297 | template <typename Other> |
298 | AggregationMethodFixedStringNoCache(const Other & other) : data(other.data) {} |
299 | |
300 | using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped, true, false>; |
301 | |
302 | static const bool low_cardinality_optimization = false; |
303 | |
304 | static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &) |
305 | { |
306 | key_columns[0]->insertData(key.data, key.size); |
307 | } |
308 | }; |
309 | |
310 | |
311 | /// Single low cardinality column. |
312 | template <typename SingleColumnMethod> |
313 | struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod |
314 | { |
315 | using Base = SingleColumnMethod; |
316 | using BaseState = typename Base::State; |
317 | |
318 | using Data = typename Base::Data; |
319 | using Key = typename Base::Key; |
320 | using Mapped = typename Base::Mapped; |
321 | |
322 | using Base::data; |
323 | |
324 | AggregationMethodSingleLowCardinalityColumn() = default; |
325 | |
326 | template <typename Other> |
327 | explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {} |
328 | |
329 | using State = ColumnsHashing::HashMethodSingleLowCardinalityColumn<BaseState, Mapped, true>; |
330 | |
331 | static const bool low_cardinality_optimization = true; |
332 | |
333 | static void insertKeyIntoColumns(const Key & key, |
334 | MutableColumns & key_columns_low_cardinality, const Sizes & /*key_sizes*/) |
335 | { |
336 | auto col = assert_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0].get()); |
337 | |
338 | if constexpr (std::is_same_v<Key, StringRef>) |
339 | { |
340 | col->insertData(key.data, key.size); |
341 | } |
342 | else |
343 | { |
344 | col->insertData(reinterpret_cast<const char *>(&key), sizeof(key)); |
345 | } |
346 | } |
347 | }; |
348 | |
349 | |
350 | /// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits. |
351 | template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false> |
352 | struct AggregationMethodKeysFixed |
353 | { |
354 | using Data = TData; |
355 | using Key = typename Data::key_type; |
356 | using Mapped = typename Data::mapped_type; |
357 | static constexpr bool has_nullable_keys = has_nullable_keys_; |
358 | static constexpr bool has_low_cardinality = has_low_cardinality_; |
359 | |
360 | Data data; |
361 | |
362 | AggregationMethodKeysFixed() {} |
363 | |
364 | template <typename Other> |
365 | AggregationMethodKeysFixed(const Other & other) : data(other.data) {} |
366 | |
367 | using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped, has_nullable_keys, has_low_cardinality>; |
368 | |
369 | static const bool low_cardinality_optimization = false; |
370 | |
371 | static void insertKeyIntoColumns(const Key & key, MutableColumns & key_columns, const Sizes & key_sizes) |
372 | { |
373 | size_t keys_size = key_columns.size(); |
374 | |
375 | static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0; |
376 | /// In any hash key value, column values to be read start just after the bitmap, if it exists. |
377 | size_t pos = bitmap_size; |
378 | |
379 | for (size_t i = 0; i < keys_size; ++i) |
380 | { |
381 | IColumn * observed_column; |
382 | ColumnUInt8 * null_map; |
383 | |
384 | bool column_nullable = false; |
385 | if constexpr (has_nullable_keys) |
386 | column_nullable = isColumnNullable(*key_columns[i]); |
387 | |
388 | /// If we have a nullable column, get its nested column and its null map. |
389 | if (column_nullable) |
390 | { |
391 | ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[i]); |
392 | observed_column = &nullable_col.getNestedColumn(); |
393 | null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn()); |
394 | } |
395 | else |
396 | { |
397 | observed_column = key_columns[i].get(); |
398 | null_map = nullptr; |
399 | } |
400 | |
401 | bool is_null = false; |
402 | if (column_nullable) |
403 | { |
404 | /// The current column is nullable. Check if the value of the |
405 | /// corresponding key is nullable. Update the null map accordingly. |
406 | size_t bucket = i / 8; |
407 | size_t offset = i % 8; |
408 | UInt8 val = (reinterpret_cast<const UInt8 *>(&key)[bucket] >> offset) & 1; |
409 | null_map->insertValue(val); |
410 | is_null = val == 1; |
411 | } |
412 | |
413 | if (has_nullable_keys && is_null) |
414 | observed_column->insertDefault(); |
415 | else |
416 | { |
417 | size_t size = key_sizes[i]; |
418 | observed_column->insertData(reinterpret_cast<const char *>(&key) + pos, size); |
419 | pos += size; |
420 | } |
421 | } |
422 | } |
423 | }; |
424 | |
425 | |
426 | /** Aggregates by concatenating serialized key values. |
427 | * The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts. |
428 | * That is, for example, for strings, it contains first the serialized length of the string, and then the bytes. |
429 | * Therefore, when aggregating by several strings, there is no ambiguity. |
430 | */ |
431 | template <typename TData> |
432 | struct AggregationMethodSerialized |
433 | { |
434 | using Data = TData; |
435 | using Key = typename Data::key_type; |
436 | using Mapped = typename Data::mapped_type; |
437 | |
438 | Data data; |
439 | |
440 | AggregationMethodSerialized() {} |
441 | |
442 | template <typename Other> |
443 | AggregationMethodSerialized(const Other & other) : data(other.data) {} |
444 | |
445 | using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped>; |
446 | |
447 | static const bool low_cardinality_optimization = false; |
448 | |
449 | static void insertKeyIntoColumns(const StringRef & key, MutableColumns & key_columns, const Sizes &) |
450 | { |
451 | auto pos = key.data; |
452 | for (auto & column : key_columns) |
453 | pos = column->deserializeAndInsertFromArena(pos); |
454 | } |
455 | }; |
456 | |
457 | |
458 | class Aggregator; |
459 | |
460 | using ColumnsHashing::HashMethodContext; |
461 | using ColumnsHashing::HashMethodContextPtr; |
462 | |
463 | struct AggregatedDataVariants : private boost::noncopyable |
464 | { |
465 | /** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way: |
466 | * - when aggregating, states are created in the pool using IAggregateFunction::create (inside - `placement new` of arbitrary structure); |
467 | * - they must then be destroyed using IAggregateFunction::destroy (inside - calling the destructor of arbitrary structure); |
468 | * - if aggregation is complete, then, in the Aggregator::convertToBlocks function, pointers to the states of aggregate functions |
469 | * are written to ColumnAggregateFunction; ColumnAggregateFunction "acquires ownership" of them, that is - calls `destroy` in its destructor. |
470 | * - if during the aggregation, before call to Aggregator::convertToBlocks, an exception was thrown, |
471 | * then the states of aggregate functions must still be destroyed, |
472 | * otherwise, for complex states (eg, AggregateFunctionUniq), there will be memory leaks; |
473 | * - in this case, to destroy states, the destructor calls Aggregator::destroyAggregateStates method, |
474 | * but only if the variable aggregator (see below) is not nullptr; |
475 | * - that is, until you transfer ownership of the aggregate function states in the ColumnAggregateFunction, set the variable `aggregator`, |
476 | * so that when an exception occurs, the states are correctly destroyed. |
477 | * |
478 | * PS. This can be corrected by making a pool that knows about which states of aggregate functions and in which order are put in it, and knows how to destroy them. |
479 | * But this can hardly be done simply because it is planned to put variable-length strings into the same pool. |
480 | * In this case, the pool will not be able to know with what offsets objects are stored. |
481 | */ |
482 | Aggregator * aggregator = nullptr; |
483 | |
484 | size_t keys_size{}; /// Number of keys. NOTE do we need this field? |
485 | Sizes key_sizes; /// Dimensions of keys, if keys of fixed length |
486 | |
487 | /// Pools for states of aggregate functions. Ownership will be later transferred to ColumnAggregateFunction. |
488 | Arenas aggregates_pools; |
489 | Arena * aggregates_pool{}; /// The pool that is currently used for allocation. |
490 | |
491 | /** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by. |
492 | */ |
493 | AggregatedDataWithoutKey without_key = nullptr; |
494 | |
495 | // Disable consecutive key optimization for Uint8/16, because they use a FixedHashMap |
496 | // and the lookup there is almost free, so we don't need to cache the last lookup result |
497 | std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>> key8; |
498 | std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>> key16; |
499 | |
500 | std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32; |
501 | std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64; |
502 | std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>> key_string; |
503 | std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKey>> key_fixed_string; |
504 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128; |
505 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256; |
506 | std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized; |
507 | |
508 | std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level; |
509 | std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level; |
510 | std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_string_two_level; |
511 | std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_fixed_string_two_level; |
512 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level; |
513 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level; |
514 | std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level; |
515 | |
516 | std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64; |
517 | std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64; |
518 | std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>> key_fixed_string_hash64; |
519 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>> keys128_hash64; |
520 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>> keys256_hash64; |
521 | std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64; |
522 | |
523 | /// Support for nullable keys. |
524 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>> nullable_keys128; |
525 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>> nullable_keys256; |
526 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level; |
527 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level; |
528 | |
529 | /// Support for low cardinality. |
530 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>> low_cardinality_key8; |
531 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>> low_cardinality_key16; |
532 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key32; |
533 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key64; |
534 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_string; |
535 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_fixed_string; |
536 | |
537 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key32_two_level; |
538 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key64_two_level; |
539 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_string_two_level; |
540 | std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level; |
541 | |
542 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128; |
543 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256; |
544 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>> low_cardinality_keys128_two_level; |
545 | std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>> low_cardinality_keys256_two_level; |
546 | |
547 | /// In this and similar macros, the option without_key is not considered. |
548 | #define APPLY_FOR_AGGREGATED_VARIANTS(M) \ |
549 | M(key8, false) \ |
550 | M(key16, false) \ |
551 | M(key32, false) \ |
552 | M(key64, false) \ |
553 | M(key_string, false) \ |
554 | M(key_fixed_string, false) \ |
555 | M(keys128, false) \ |
556 | M(keys256, false) \ |
557 | M(serialized, false) \ |
558 | M(key32_two_level, true) \ |
559 | M(key64_two_level, true) \ |
560 | M(key_string_two_level, true) \ |
561 | M(key_fixed_string_two_level, true) \ |
562 | M(keys128_two_level, true) \ |
563 | M(keys256_two_level, true) \ |
564 | M(serialized_two_level, true) \ |
565 | M(key64_hash64, false) \ |
566 | M(key_string_hash64, false) \ |
567 | M(key_fixed_string_hash64, false) \ |
568 | M(keys128_hash64, false) \ |
569 | M(keys256_hash64, false) \ |
570 | M(serialized_hash64, false) \ |
571 | M(nullable_keys128, false) \ |
572 | M(nullable_keys256, false) \ |
573 | M(nullable_keys128_two_level, true) \ |
574 | M(nullable_keys256_two_level, true) \ |
575 | M(low_cardinality_key8, false) \ |
576 | M(low_cardinality_key16, false) \ |
577 | M(low_cardinality_key32, false) \ |
578 | M(low_cardinality_key64, false) \ |
579 | M(low_cardinality_keys128, false) \ |
580 | M(low_cardinality_keys256, false) \ |
581 | M(low_cardinality_key_string, false) \ |
582 | M(low_cardinality_key_fixed_string, false) \ |
583 | M(low_cardinality_key32_two_level, true) \ |
584 | M(low_cardinality_key64_two_level, true) \ |
585 | M(low_cardinality_keys128_two_level, true) \ |
586 | M(low_cardinality_keys256_two_level, true) \ |
587 | M(low_cardinality_key_string_two_level, true) \ |
588 | M(low_cardinality_key_fixed_string_two_level, true) \ |
589 | |
590 | enum class Type |
591 | { |
592 | EMPTY = 0, |
593 | without_key, |
594 | |
595 | #define M(NAME, IS_TWO_LEVEL) NAME, |
596 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
597 | #undef M |
598 | }; |
599 | Type type = Type::EMPTY; |
600 | |
601 | AggregatedDataVariants() : aggregates_pools(1, std::make_shared<Arena>()), aggregates_pool(aggregates_pools.back().get()) {} |
602 | bool empty() const { return type == Type::EMPTY; } |
603 | void invalidate() { type = Type::EMPTY; } |
604 | |
605 | ~AggregatedDataVariants(); |
606 | |
607 | void init(Type type_) |
608 | { |
609 | switch (type_) |
610 | { |
611 | case Type::EMPTY: break; |
612 | case Type::without_key: break; |
613 | |
614 | #define M(NAME, IS_TWO_LEVEL) \ |
615 | case Type::NAME: NAME = std::make_unique<decltype(NAME)::element_type>(); break; |
616 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
617 | #undef M |
618 | } |
619 | |
620 | type = type_; |
621 | } |
622 | |
623 | /// Number of rows (different keys). |
624 | size_t size() const |
625 | { |
626 | switch (type) |
627 | { |
628 | case Type::EMPTY: return 0; |
629 | case Type::without_key: return 1; |
630 | |
631 | #define M(NAME, IS_TWO_LEVEL) \ |
632 | case Type::NAME: return NAME->data.size() + (without_key != nullptr); |
633 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
634 | #undef M |
635 | } |
636 | |
637 | __builtin_unreachable(); |
638 | } |
639 | |
640 | /// The size without taking into account the row in which data is written for the calculation of TOTALS. |
641 | size_t sizeWithoutOverflowRow() const |
642 | { |
643 | switch (type) |
644 | { |
645 | case Type::EMPTY: return 0; |
646 | case Type::without_key: return 1; |
647 | |
648 | #define M(NAME, IS_TWO_LEVEL) \ |
649 | case Type::NAME: return NAME->data.size(); |
650 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
651 | #undef M |
652 | } |
653 | |
654 | __builtin_unreachable(); |
655 | } |
656 | |
657 | const char * getMethodName() const |
658 | { |
659 | switch (type) |
660 | { |
661 | case Type::EMPTY: return "EMPTY" ; |
662 | case Type::without_key: return "without_key" ; |
663 | |
664 | #define M(NAME, IS_TWO_LEVEL) \ |
665 | case Type::NAME: return #NAME; |
666 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
667 | #undef M |
668 | } |
669 | |
670 | __builtin_unreachable(); |
671 | } |
672 | |
673 | bool isTwoLevel() const |
674 | { |
675 | switch (type) |
676 | { |
677 | case Type::EMPTY: return false; |
678 | case Type::without_key: return false; |
679 | |
680 | #define M(NAME, IS_TWO_LEVEL) \ |
681 | case Type::NAME: return IS_TWO_LEVEL; |
682 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
683 | #undef M |
684 | } |
685 | |
686 | __builtin_unreachable(); |
687 | } |
688 | |
689 | #define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \ |
690 | M(key32) \ |
691 | M(key64) \ |
692 | M(key_string) \ |
693 | M(key_fixed_string) \ |
694 | M(keys128) \ |
695 | M(keys256) \ |
696 | M(serialized) \ |
697 | M(nullable_keys128) \ |
698 | M(nullable_keys256) \ |
699 | M(low_cardinality_key32) \ |
700 | M(low_cardinality_key64) \ |
701 | M(low_cardinality_keys128) \ |
702 | M(low_cardinality_keys256) \ |
703 | M(low_cardinality_key_string) \ |
704 | M(low_cardinality_key_fixed_string) \ |
705 | |
706 | #define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \ |
707 | M(key8) \ |
708 | M(key16) \ |
709 | M(key64_hash64) \ |
710 | M(key_string_hash64)\ |
711 | M(key_fixed_string_hash64) \ |
712 | M(keys128_hash64) \ |
713 | M(keys256_hash64) \ |
714 | M(serialized_hash64) \ |
715 | M(low_cardinality_key8) \ |
716 | M(low_cardinality_key16) \ |
717 | |
718 | #define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \ |
719 | APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \ |
720 | APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \ |
721 | |
722 | bool isConvertibleToTwoLevel() const |
723 | { |
724 | switch (type) |
725 | { |
726 | #define M(NAME) \ |
727 | case Type::NAME: return true; |
728 | |
729 | APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) |
730 | |
731 | #undef M |
732 | default: |
733 | return false; |
734 | } |
735 | } |
736 | |
737 | void convertToTwoLevel(); |
738 | |
739 | #define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \ |
740 | M(key32_two_level) \ |
741 | M(key64_two_level) \ |
742 | M(key_string_two_level) \ |
743 | M(key_fixed_string_two_level) \ |
744 | M(keys128_two_level) \ |
745 | M(keys256_two_level) \ |
746 | M(serialized_two_level) \ |
747 | M(nullable_keys128_two_level) \ |
748 | M(nullable_keys256_two_level) \ |
749 | M(low_cardinality_key32_two_level) \ |
750 | M(low_cardinality_key64_two_level) \ |
751 | M(low_cardinality_keys128_two_level) \ |
752 | M(low_cardinality_keys256_two_level) \ |
753 | M(low_cardinality_key_string_two_level) \ |
754 | M(low_cardinality_key_fixed_string_two_level) \ |
755 | |
756 | #define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \ |
757 | M(low_cardinality_key8) \ |
758 | M(low_cardinality_key16) \ |
759 | M(low_cardinality_key32) \ |
760 | M(low_cardinality_key64) \ |
761 | M(low_cardinality_keys128) \ |
762 | M(low_cardinality_keys256) \ |
763 | M(low_cardinality_key_string) \ |
764 | M(low_cardinality_key_fixed_string) \ |
765 | M(low_cardinality_key32_two_level) \ |
766 | M(low_cardinality_key64_two_level) \ |
767 | M(low_cardinality_keys128_two_level) \ |
768 | M(low_cardinality_keys256_two_level) \ |
769 | M(low_cardinality_key_string_two_level) \ |
770 | M(low_cardinality_key_fixed_string_two_level) \ |
771 | |
772 | bool isLowCardinality() |
773 | { |
774 | switch (type) |
775 | { |
776 | #define M(NAME) \ |
777 | case Type::NAME: return true; |
778 | |
779 | APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) |
780 | #undef M |
781 | default: |
782 | return false; |
783 | } |
784 | } |
785 | |
786 | static HashMethodContextPtr createCache(Type type, const HashMethodContext::Settings & settings) |
787 | { |
788 | switch (type) |
789 | { |
790 | case Type::without_key: return nullptr; |
791 | |
792 | #define M(NAME, IS_TWO_LEVEL) \ |
793 | case Type::NAME: \ |
794 | { \ |
795 | using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \ |
796 | using T ## NAME = typename TPtr ## NAME ::element_type; \ |
797 | return T ## NAME ::State::createContext(settings); \ |
798 | } |
799 | |
800 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
801 | #undef M |
802 | |
803 | default: |
804 | throw Exception("Unknown aggregated data variant." , ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); |
805 | } |
806 | } |
807 | }; |
808 | |
809 | using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>; |
810 | using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>; |
811 | using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>; |
812 | |
813 | /** How are "total" values calculated with WITH TOTALS? |
814 | * (For more details, see TotalsHavingBlockInputStream.) |
815 | * |
816 | * In the absence of group_by_overflow_mode = 'any', the data is aggregated as usual, but the states of the aggregate functions are not finalized. |
817 | * Later, the aggregate function states for all rows (passed through HAVING) are merged into one - this will be TOTALS. |
818 | * |
819 | * If there is group_by_overflow_mode = 'any', the data is aggregated as usual, except for the keys that did not fit in max_rows_to_group_by. |
820 | * For these keys, the data is aggregated into one additional row - see below under the names `overflow_row`, `overflows`... |
821 | * Later, the aggregate function states for all rows (passed through HAVING) are merged into one, |
822 | * also overflow_row is added or not added (depending on the totals_mode setting) also - this will be TOTALS. |
823 | */ |
824 | |
825 | |
826 | /** Aggregates the source of the blocks. |
827 | */ |
828 | class Aggregator |
829 | { |
830 | public: |
831 | struct Params |
832 | { |
833 | /// Data structure of source blocks. |
834 | Block ; |
835 | /// Data structure of intermediate blocks before merge. |
836 | Block ; |
837 | |
838 | /// What to count. |
839 | const ColumnNumbers keys; |
840 | const AggregateDescriptions aggregates; |
841 | const size_t keys_size; |
842 | const size_t aggregates_size; |
843 | |
844 | /// The settings of approximate calculation of GROUP BY. |
845 | const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by. |
846 | const size_t max_rows_to_group_by; |
847 | const OverflowMode group_by_overflow_mode; |
848 | |
849 | /// Two-level aggregation settings (used for a large number of keys). |
850 | /** With how many keys or the size of the aggregation state in bytes, |
851 | * two-level aggregation begins to be used. Enough to reach of at least one of the thresholds. |
852 | * 0 - the corresponding threshold is not specified. |
853 | */ |
854 | const size_t group_by_two_level_threshold; |
855 | const size_t group_by_two_level_threshold_bytes; |
856 | |
857 | /// Settings to flush temporary data to the filesystem (external aggregation). |
858 | const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation. |
859 | |
860 | /// Return empty result when aggregating without keys on empty set. |
861 | bool empty_result_for_aggregation_by_empty_set; |
862 | |
863 | const std::string tmp_path; |
864 | |
865 | /// Settings is used to determine cache size. No threads are created. |
866 | size_t max_threads; |
867 | |
868 | const size_t min_free_disk_space; |
869 | Params( |
870 | const Block & , |
871 | const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, |
872 | bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, |
873 | size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, |
874 | size_t max_bytes_before_external_group_by_, |
875 | bool empty_result_for_aggregation_by_empty_set_, |
876 | const std::string & tmp_path_, size_t max_threads_, |
877 | size_t min_free_disk_space_) |
878 | : src_header(src_header_), |
879 | keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), |
880 | overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), |
881 | group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), |
882 | max_bytes_before_external_group_by(max_bytes_before_external_group_by_), |
883 | empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_), |
884 | tmp_path(tmp_path_), max_threads(max_threads_), |
885 | min_free_disk_space(min_free_disk_space_) |
886 | { |
887 | } |
888 | |
889 | /// Only parameters that matter during merge. |
890 | Params(const Block & , |
891 | const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_) |
892 | : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "" , max_threads_, 0) |
893 | { |
894 | intermediate_header = intermediate_header_; |
895 | } |
896 | }; |
897 | |
898 | Aggregator(const Params & params_); |
899 | |
900 | /// Aggregate the source. Get the result in the form of one of the data structures. |
901 | void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result); |
902 | |
903 | using AggregateColumns = std::vector<ColumnRawPtrs>; |
904 | using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>; |
905 | using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>; |
906 | using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>; |
907 | |
908 | /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). |
909 | bool executeOnBlock(const Block & block, AggregatedDataVariants & result, |
910 | ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block |
911 | bool & no_more_keys); |
912 | |
913 | bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result, |
914 | ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block |
915 | bool & no_more_keys); |
916 | |
917 | /** Convert the aggregation data structure into a block. |
918 | * If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block. |
919 | * |
920 | * If final = false, then ColumnAggregateFunction is created as the aggregation columns with the state of the calculations, |
921 | * which can then be combined with other states (for distributed query processing). |
922 | * If final = true, then columns with ready values are created as aggregate columns. |
923 | */ |
924 | BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const; |
925 | |
926 | /** Merge several aggregation data structures and output the result as a block stream. |
927 | */ |
928 | std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const; |
929 | ManyAggregatedDataVariants prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const; |
930 | |
931 | /** Merge the stream of partially aggregated blocks into one data structure. |
932 | * (Pre-aggregate several blocks that represent the result of independent aggregations from remote servers.) |
933 | */ |
934 | void mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads); |
935 | |
936 | using BucketToBlocks = std::map<Int32, BlocksList>; |
937 | /// Merge partially aggregated blocks separated to buckets into one data structure. |
938 | void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads); |
939 | |
940 | /// Merge several partially aggregated blocks into one. |
941 | /// Precondition: for all blocks block.info.is_overflows flag must be the same. |
942 | /// (either all blocks are from overflow data or none blocks are). |
943 | /// The resulting block has the same value of is_overflows flag. |
944 | Block mergeBlocks(BlocksList & blocks, bool final); |
945 | |
946 | /** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used. |
947 | * This is needed to simplify merging of that data with other results, that are already two-level. |
948 | */ |
949 | std::vector<Block> convertBlockToTwoLevel(const Block & block); |
950 | |
951 | using CancellationHook = std::function<bool()>; |
952 | |
953 | /** Set a function that checks whether the current task can be aborted. |
954 | */ |
955 | void setCancellationHook(const CancellationHook cancellation_hook); |
956 | |
957 | /// For external aggregation. |
958 | void writeToTemporaryFile(AggregatedDataVariants & data_variants); |
959 | |
960 | bool hasTemporaryFiles() const { return !temporary_files.empty(); } |
961 | |
962 | struct TemporaryFiles |
963 | { |
964 | std::vector<std::unique_ptr<Poco::TemporaryFile>> files; |
965 | size_t sum_size_uncompressed = 0; |
966 | size_t sum_size_compressed = 0; |
967 | mutable std::mutex mutex; |
968 | |
969 | bool empty() const |
970 | { |
971 | std::lock_guard lock(mutex); |
972 | return files.empty(); |
973 | } |
974 | }; |
975 | |
976 | const TemporaryFiles & getTemporaryFiles() const { return temporary_files; } |
977 | |
978 | /// Get data structure of the result. |
979 | Block (bool final) const; |
980 | |
981 | protected: |
982 | friend struct AggregatedDataVariants; |
983 | friend class MergingAndConvertingBlockInputStream; |
984 | friend class ConvertingAggregatedToChunksTransform; |
985 | friend class ConvertingAggregatedToChunksSource; |
986 | |
987 | Params params; |
988 | |
989 | AggregatedDataVariants::Type method_chosen; |
990 | Sizes key_sizes; |
991 | |
992 | HashMethodContextPtr aggregation_state_cache; |
993 | |
994 | AggregateFunctionsPlainPtrs aggregate_functions; |
995 | |
996 | /** This array serves two purposes. |
997 | * |
998 | * 1. Function arguments are collected side by side, and they do not need to be collected from different places. Also the array is made zero-terminated. |
999 | * The inner loop (for the case without_key) is almost twice as compact; performance gain of about 30%. |
1000 | * |
1001 | * 2. Calling a function by pointer is better than a virtual call, because in the case of a virtual call, |
1002 | * GCC 5.1.2 generates code that, at each iteration of the loop, reloads the function address from memory into the register |
1003 | * (the offset value in the virtual function table). |
1004 | */ |
1005 | struct AggregateFunctionInstruction |
1006 | { |
1007 | const IAggregateFunction * that; |
1008 | IAggregateFunction::AddFunc func; |
1009 | size_t state_offset; |
1010 | const IColumn ** arguments; |
1011 | const IAggregateFunction * batch_that; |
1012 | const IColumn ** batch_arguments; |
1013 | const UInt64 * offsets = nullptr; |
1014 | }; |
1015 | |
1016 | using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>; |
1017 | |
1018 | Sizes offsets_of_aggregate_states; /// The offset to the n-th aggregate function in a row of aggregate functions. |
1019 | size_t total_size_of_aggregate_states = 0; /// The total size of the row from the aggregate functions. |
1020 | |
1021 | // add info to track alignment requirement |
1022 | // If there are states whose alignmentment are v1, ..vn, align_aggregate_states will be max(v1, ... vn) |
1023 | size_t align_aggregate_states = 1; |
1024 | |
1025 | bool all_aggregates_has_trivial_destructor = false; |
1026 | |
1027 | /// How many RAM were used to process the query before processing the first block. |
1028 | Int64 memory_usage_before_aggregation = 0; |
1029 | |
1030 | std::mutex mutex; |
1031 | |
1032 | Logger * log = &Logger::get("Aggregator" ); |
1033 | |
1034 | /// Returns true if you can abort the current task. |
1035 | CancellationHook isCancelled; |
1036 | |
1037 | /// For external aggregation. |
1038 | TemporaryFiles temporary_files; |
1039 | |
1040 | /** Select the aggregation method based on the number and types of keys. */ |
1041 | AggregatedDataVariants::Type chooseAggregationMethod(); |
1042 | |
1043 | /** Create states of aggregate functions for one key. |
1044 | */ |
1045 | void createAggregateStates(AggregateDataPtr & aggregate_data) const; |
1046 | |
1047 | /** Call `destroy` methods for states of aggregate functions. |
1048 | * Used in the exception handler for aggregation, since RAII in this case is not applicable. |
1049 | */ |
1050 | void destroyAllAggregateStates(AggregatedDataVariants & result); |
1051 | |
1052 | |
1053 | /// Process one data block, aggregate the data into a hash table. |
1054 | template <typename Method> |
1055 | void executeImpl( |
1056 | Method & method, |
1057 | Arena * aggregates_pool, |
1058 | size_t rows, |
1059 | ColumnRawPtrs & key_columns, |
1060 | AggregateFunctionInstruction * aggregate_instructions, |
1061 | bool no_more_keys, |
1062 | AggregateDataPtr overflow_row) const; |
1063 | |
1064 | /// Specialization for a particular value no_more_keys. |
1065 | template <bool no_more_keys, typename Method> |
1066 | void executeImplCase( |
1067 | Method & method, |
1068 | typename Method::State & state, |
1069 | Arena * aggregates_pool, |
1070 | size_t rows, |
1071 | AggregateFunctionInstruction * aggregate_instructions, |
1072 | AggregateDataPtr overflow_row) const; |
1073 | |
1074 | template <typename Method> |
1075 | void executeImplBatch( |
1076 | Method & method, |
1077 | typename Method::State & state, |
1078 | Arena * aggregates_pool, |
1079 | size_t rows, |
1080 | AggregateFunctionInstruction * aggregate_instructions) const; |
1081 | |
1082 | /// For case when there are no keys (all aggregate into one row). |
1083 | void executeWithoutKeyImpl( |
1084 | AggregatedDataWithoutKey & res, |
1085 | size_t rows, |
1086 | AggregateFunctionInstruction * aggregate_instructions, |
1087 | Arena * arena) const; |
1088 | |
1089 | template <typename Method> |
1090 | void writeToTemporaryFileImpl( |
1091 | AggregatedDataVariants & data_variants, |
1092 | Method & method, |
1093 | IBlockOutputStream & out); |
1094 | |
1095 | protected: |
1096 | /// Merge NULL key data from hash table `src` into `dst`. |
1097 | template <typename Method, typename Table> |
1098 | void mergeDataNullKey( |
1099 | Table & table_dst, |
1100 | Table & table_src, |
1101 | Arena * arena) const; |
1102 | |
1103 | /// Merge data from hash table `src` into `dst`. |
1104 | template <typename Method, typename Table> |
1105 | void mergeDataImpl( |
1106 | Table & table_dst, |
1107 | Table & table_src, |
1108 | Arena * arena) const; |
1109 | |
1110 | /// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`. |
1111 | template <typename Method, typename Table> |
1112 | void mergeDataNoMoreKeysImpl( |
1113 | Table & table_dst, |
1114 | AggregatedDataWithoutKey & overflows, |
1115 | Table & table_src, |
1116 | Arena * arena) const; |
1117 | |
1118 | /// Same, but ignores the rest of the keys. |
1119 | template <typename Method, typename Table> |
1120 | void mergeDataOnlyExistingKeysImpl( |
1121 | Table & table_dst, |
1122 | Table & table_src, |
1123 | Arena * arena) const; |
1124 | |
1125 | void mergeWithoutKeyDataImpl( |
1126 | ManyAggregatedDataVariants & non_empty_data) const; |
1127 | |
1128 | template <typename Method> |
1129 | void mergeSingleLevelDataImpl( |
1130 | ManyAggregatedDataVariants & non_empty_data) const; |
1131 | |
1132 | template <typename Method, typename Table> |
1133 | void convertToBlockImpl( |
1134 | Method & method, |
1135 | Table & data, |
1136 | MutableColumns & key_columns, |
1137 | AggregateColumnsData & aggregate_columns, |
1138 | MutableColumns & final_aggregate_columns, |
1139 | bool final) const; |
1140 | |
1141 | template <typename Method, typename Table> |
1142 | void convertToBlockImplFinal( |
1143 | Method & method, |
1144 | Table & data, |
1145 | MutableColumns & key_columns, |
1146 | MutableColumns & final_aggregate_columns) const; |
1147 | |
1148 | template <typename Method, typename Table> |
1149 | void convertToBlockImplNotFinal( |
1150 | Method & method, |
1151 | Table & data, |
1152 | MutableColumns & key_columns, |
1153 | AggregateColumnsData & aggregate_columns) const; |
1154 | |
1155 | template <typename Filler> |
1156 | Block prepareBlockAndFill( |
1157 | AggregatedDataVariants & data_variants, |
1158 | bool final, |
1159 | size_t rows, |
1160 | Filler && filler) const; |
1161 | |
1162 | template <typename Method> |
1163 | Block convertOneBucketToBlock( |
1164 | AggregatedDataVariants & data_variants, |
1165 | Method & method, |
1166 | bool final, |
1167 | size_t bucket) const; |
1168 | |
1169 | Block mergeAndConvertOneBucketToBlock( |
1170 | ManyAggregatedDataVariants & variants, |
1171 | Arena * arena, |
1172 | bool final, |
1173 | size_t bucket) const; |
1174 | |
1175 | Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const; |
1176 | Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const; |
1177 | BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const; |
1178 | |
1179 | template <typename Method> |
1180 | BlocksList prepareBlocksAndFillTwoLevelImpl( |
1181 | AggregatedDataVariants & data_variants, |
1182 | Method & method, |
1183 | bool final, |
1184 | ThreadPool * thread_pool) const; |
1185 | |
1186 | template <bool no_more_keys, typename Method, typename Table> |
1187 | void mergeStreamsImplCase( |
1188 | Block & block, |
1189 | Arena * aggregates_pool, |
1190 | Method & method, |
1191 | Table & data, |
1192 | AggregateDataPtr overflow_row) const; |
1193 | |
1194 | template <typename Method, typename Table> |
1195 | void mergeStreamsImpl( |
1196 | Block & block, |
1197 | Arena * aggregates_pool, |
1198 | Method & method, |
1199 | Table & data, |
1200 | AggregateDataPtr overflow_row, |
1201 | bool no_more_keys) const; |
1202 | |
1203 | void mergeWithoutKeyStreamsImpl( |
1204 | Block & block, |
1205 | AggregatedDataVariants & result) const; |
1206 | |
1207 | template <typename Method> |
1208 | void mergeBucketImpl( |
1209 | ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const; |
1210 | |
1211 | template <typename Method> |
1212 | void convertBlockToTwoLevelImpl( |
1213 | Method & method, |
1214 | Arena * pool, |
1215 | ColumnRawPtrs & key_columns, |
1216 | const Block & source, |
1217 | std::vector<Block> & destinations) const; |
1218 | |
1219 | template <typename Method, typename Table> |
1220 | void destroyImpl(Table & table) const; |
1221 | |
1222 | void destroyWithoutKey( |
1223 | AggregatedDataVariants & result) const; |
1224 | |
1225 | |
1226 | /** Checks constraints on the maximum number of keys for aggregation. |
1227 | * If it is exceeded, then, depending on the group_by_overflow_mode, either |
1228 | * - throws an exception; |
1229 | * - returns false, which means that execution must be aborted; |
1230 | * - sets the variable no_more_keys to true. |
1231 | */ |
1232 | bool checkLimits(size_t result_size, bool & no_more_keys) const; |
1233 | }; |
1234 | |
1235 | |
1236 | /** Get the aggregation variant by its type. */ |
1237 | template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants); |
1238 | |
1239 | #define M(NAME, IS_TWO_LEVEL) \ |
1240 | template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; } |
1241 | |
1242 | APPLY_FOR_AGGREGATED_VARIANTS(M) |
1243 | |
1244 | #undef M |
1245 | |
1246 | } |
1247 | |