1#include <iomanip>
2#include <thread>
3#include <future>
4#include <Poco/Version.h>
5#include <Poco/Util/Application.h>
6#include <Common/Stopwatch.h>
7#include <Common/setThreadName.h>
8#include <DataTypes/DataTypeAggregateFunction.h>
9#include <DataTypes/DataTypeNullable.h>
10#include <DataTypes/DataTypeLowCardinality.h>
11#include <Columns/ColumnsNumber.h>
12#include <Columns/ColumnArray.h>
13#include <Columns/ColumnTuple.h>
14#include <Columns/ColumnLowCardinality.h>
15#include <DataStreams/IBlockInputStream.h>
16#include <DataStreams/NativeBlockOutputStream.h>
17#include <DataStreams/NullBlockInputStream.h>
18#include <DataStreams/materializeBlock.h>
19#include <IO/WriteBufferFromFile.h>
20#include <Compression/CompressedWriteBuffer.h>
21#include <Interpreters/Aggregator.h>
22#include <Common/ClickHouseRevision.h>
23#include <Common/MemoryTracker.h>
24#include <Common/CurrentThread.h>
25#include <Common/typeid_cast.h>
26#include <Common/assert_cast.h>
27#include <common/demangle.h>
28#include <common/config_common.h>
29#include <AggregateFunctions/AggregateFunctionArray.h>
30#include <AggregateFunctions/AggregateFunctionState.h>
31
32
33namespace ProfileEvents
34{
35 extern const Event ExternalAggregationWritePart;
36 extern const Event ExternalAggregationCompressedBytes;
37 extern const Event ExternalAggregationUncompressedBytes;
38}
39
40namespace CurrentMetrics
41{
42 extern const Metric QueryThread;
43}
44
45namespace DB
46{
47
48namespace ErrorCodes
49{
50 extern const int TOO_MANY_ROWS;
51 extern const int EMPTY_DATA_PASSED;
52 extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS;
53 extern const int LOGICAL_ERROR;
54}
55
56
57AggregatedDataVariants::~AggregatedDataVariants()
58{
59 if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
60 {
61 try
62 {
63 aggregator->destroyAllAggregateStates(*this);
64 }
65 catch (...)
66 {
67 tryLogCurrentException(__PRETTY_FUNCTION__);
68 }
69 }
70}
71
72
73void AggregatedDataVariants::convertToTwoLevel()
74{
75 if (aggregator)
76 LOG_TRACE(aggregator->log, "Converting aggregation data to two-level.");
77
78 switch (type)
79 {
80 #define M(NAME) \
81 case Type::NAME: \
82 NAME ## _two_level = std::make_unique<decltype(NAME ## _two_level)::element_type>(*NAME); \
83 NAME.reset(); \
84 type = Type::NAME ## _two_level; \
85 break;
86
87 APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
88
89 #undef M
90
91 default:
92 throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
93 }
94}
95
96
97Block Aggregator::getHeader(bool final) const
98{
99 Block res;
100
101 if (params.src_header)
102 {
103 for (size_t i = 0; i < params.keys_size; ++i)
104 res.insert(params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty());
105
106 for (size_t i = 0; i < params.aggregates_size; ++i)
107 {
108 size_t arguments_size = params.aggregates[i].arguments.size();
109 DataTypes argument_types(arguments_size);
110 for (size_t j = 0; j < arguments_size; ++j)
111 argument_types[j] = params.src_header.safeGetByPosition(params.aggregates[i].arguments[j]).type;
112
113 DataTypePtr type;
114 if (final)
115 type = params.aggregates[i].function->getReturnType();
116 else
117 type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
118
119 res.insert({ type, params.aggregates[i].column_name });
120 }
121 }
122 else if (params.intermediate_header)
123 {
124 res = params.intermediate_header.cloneEmpty();
125
126 if (final)
127 {
128 for (size_t i = 0; i < params.aggregates_size; ++i)
129 {
130 auto & elem = res.getByPosition(params.keys_size + i);
131
132 elem.type = params.aggregates[i].function->getReturnType();
133 elem.column = elem.type->createColumn();
134 }
135 }
136 }
137
138 return materializeBlock(res);
139}
140
141
142Aggregator::Aggregator(const Params & params_)
143 : params(params_),
144 isCancelled([]() { return false; })
145{
146 /// Use query-level memory tracker
147 if (auto memory_tracker_child = CurrentThread::getMemoryTracker())
148 if (auto memory_tracker = memory_tracker_child->getParent())
149 memory_usage_before_aggregation = memory_tracker->get();
150
151 aggregate_functions.resize(params.aggregates_size);
152 for (size_t i = 0; i < params.aggregates_size; ++i)
153 aggregate_functions[i] = params.aggregates[i].function.get();
154
155 /// Initialize sizes of aggregation states and its offsets.
156 offsets_of_aggregate_states.resize(params.aggregates_size);
157 total_size_of_aggregate_states = 0;
158 all_aggregates_has_trivial_destructor = true;
159
160 // aggreate_states will be aligned as below:
161 // |<-- state_1 -->|<-- pad_1 -->|<-- state_2 -->|<-- pad_2 -->| .....
162 //
163 // pad_N will be used to match alignment requirement for each next state.
164 // The address of state_1 is aligned based on maximum alignment requirements in states
165 for (size_t i = 0; i < params.aggregates_size; ++i)
166 {
167 offsets_of_aggregate_states[i] = total_size_of_aggregate_states;
168
169 total_size_of_aggregate_states += params.aggregates[i].function->sizeOfData();
170
171 // aggreate states are aligned based on maximum requirement
172 align_aggregate_states = std::max(align_aggregate_states, params.aggregates[i].function->alignOfData());
173
174 // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned.
175 if (i + 1 < params.aggregates_size)
176 {
177 size_t alignment_of_next_state = params.aggregates[i + 1].function->alignOfData();
178 if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0)
179 throw Exception("Logical error: alignOfData is not 2^N", ErrorCodes::LOGICAL_ERROR);
180
181 /// Extend total_size to next alignment requirement
182 /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state.
183 total_size_of_aggregate_states = (total_size_of_aggregate_states + alignment_of_next_state - 1) / alignment_of_next_state * alignment_of_next_state;
184 }
185
186 if (!params.aggregates[i].function->hasTrivialDestructor())
187 all_aggregates_has_trivial_destructor = false;
188 }
189
190 method_chosen = chooseAggregationMethod();
191 HashMethodContext::Settings cache_settings;
192 cache_settings.max_threads = params.max_threads;
193 aggregation_state_cache = AggregatedDataVariants::createCache(method_chosen, cache_settings);
194}
195
196
197AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
198{
199 /// If no keys. All aggregating to single row.
200 if (params.keys_size == 0)
201 return AggregatedDataVariants::Type::without_key;
202
203 /// Check if at least one of the specified keys is nullable.
204 DataTypes types_removed_nullable;
205 types_removed_nullable.reserve(params.keys.size());
206 bool has_nullable_key = false;
207 bool has_low_cardinality = false;
208
209 for (const auto & pos : params.keys)
210 {
211 DataTypePtr type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
212
213 if (type->lowCardinality())
214 {
215 has_low_cardinality = true;
216 type = removeLowCardinality(type);
217 }
218
219 if (type->isNullable())
220 {
221 has_nullable_key = true;
222 type = removeNullable(type);
223 }
224
225 types_removed_nullable.push_back(type);
226 }
227
228 /** Returns ordinary (not two-level) methods, because we start from them.
229 * Later, during aggregation process, data may be converted (partitioned) to two-level structure, if cardinality is high.
230 */
231
232 size_t keys_bytes = 0;
233 size_t num_fixed_contiguous_keys = 0;
234
235 key_sizes.resize(params.keys_size);
236 for (size_t j = 0; j < params.keys_size; ++j)
237 {
238 if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
239 {
240 if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
241 {
242 ++num_fixed_contiguous_keys;
243 key_sizes[j] = types_removed_nullable[j]->getSizeOfValueInMemory();
244 keys_bytes += key_sizes[j];
245 }
246 }
247 }
248
249 if (has_nullable_key)
250 {
251 if (params.keys_size == num_fixed_contiguous_keys && !has_low_cardinality)
252 {
253 /// Pack if possible all the keys along with information about which key values are nulls
254 /// into a fixed 16- or 32-byte blob.
255 if (std::tuple_size<KeysNullMap<UInt128>>::value + keys_bytes <= 16)
256 return AggregatedDataVariants::Type::nullable_keys128;
257 if (std::tuple_size<KeysNullMap<UInt256>>::value + keys_bytes <= 32)
258 return AggregatedDataVariants::Type::nullable_keys256;
259 }
260
261 if (has_low_cardinality && params.keys_size == 1)
262 {
263 if (types_removed_nullable[0]->isValueRepresentedByNumber())
264 {
265 size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
266
267 if (size_of_field == 1)
268 return AggregatedDataVariants::Type::low_cardinality_key8;
269 if (size_of_field == 2)
270 return AggregatedDataVariants::Type::low_cardinality_key16;
271 if (size_of_field == 4)
272 return AggregatedDataVariants::Type::low_cardinality_key32;
273 if (size_of_field == 8)
274 return AggregatedDataVariants::Type::low_cardinality_key64;
275 }
276 else if (isString(types_removed_nullable[0]))
277 return AggregatedDataVariants::Type::low_cardinality_key_string;
278 else if (isFixedString(types_removed_nullable[0]))
279 return AggregatedDataVariants::Type::low_cardinality_key_fixed_string;
280 }
281
282 /// Fallback case.
283 return AggregatedDataVariants::Type::serialized;
284 }
285
286 /// No key has been found to be nullable.
287
288 /// Single numeric key.
289 if (params.keys_size == 1 && types_removed_nullable[0]->isValueRepresentedByNumber())
290 {
291 size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
292
293 if (has_low_cardinality)
294 {
295 if (size_of_field == 1)
296 return AggregatedDataVariants::Type::low_cardinality_key8;
297 if (size_of_field == 2)
298 return AggregatedDataVariants::Type::low_cardinality_key16;
299 if (size_of_field == 4)
300 return AggregatedDataVariants::Type::low_cardinality_key32;
301 if (size_of_field == 8)
302 return AggregatedDataVariants::Type::low_cardinality_key64;
303 }
304
305 if (size_of_field == 1)
306 return AggregatedDataVariants::Type::key8;
307 if (size_of_field == 2)
308 return AggregatedDataVariants::Type::key16;
309 if (size_of_field == 4)
310 return AggregatedDataVariants::Type::key32;
311 if (size_of_field == 8)
312 return AggregatedDataVariants::Type::key64;
313 if (size_of_field == 16)
314 return AggregatedDataVariants::Type::keys128;
315 throw Exception("Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16.", ErrorCodes::LOGICAL_ERROR);
316 }
317
318 /// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
319 if (params.keys_size == num_fixed_contiguous_keys)
320 {
321 if (has_low_cardinality)
322 {
323 if (keys_bytes <= 16)
324 return AggregatedDataVariants::Type::low_cardinality_keys128;
325 if (keys_bytes <= 32)
326 return AggregatedDataVariants::Type::low_cardinality_keys256;
327 }
328
329 if (keys_bytes <= 16)
330 return AggregatedDataVariants::Type::keys128;
331 if (keys_bytes <= 32)
332 return AggregatedDataVariants::Type::keys256;
333 }
334
335 /// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
336 if (params.keys_size == 1 && isString(types_removed_nullable[0]))
337 {
338 if (has_low_cardinality)
339 return AggregatedDataVariants::Type::low_cardinality_key_string;
340 else
341 return AggregatedDataVariants::Type::key_string;
342 }
343
344 if (params.keys_size == 1 && isFixedString(types_removed_nullable[0]))
345 {
346 if (has_low_cardinality)
347 return AggregatedDataVariants::Type::low_cardinality_key_fixed_string;
348 else
349 return AggregatedDataVariants::Type::key_fixed_string;
350 }
351
352 return AggregatedDataVariants::Type::serialized;
353}
354
355
356void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const
357{
358 for (size_t j = 0; j < params.aggregates_size; ++j)
359 {
360 try
361 {
362 /** An exception may occur if there is a shortage of memory.
363 * In order that then everything is properly destroyed, we "roll back" some of the created states.
364 * The code is not very convenient.
365 */
366 aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]);
367 }
368 catch (...)
369 {
370 for (size_t rollback_j = 0; rollback_j < j; ++rollback_j)
371 aggregate_functions[rollback_j]->destroy(aggregate_data + offsets_of_aggregate_states[rollback_j]);
372
373 throw;
374 }
375 }
376}
377
378
379/** It's interesting - if you remove `noinline`, then gcc for some reason will inline this function, and the performance decreases (~ 10%).
380 * (Probably because after the inline of this function, more internal functions no longer be inlined.)
381 * Inline does not make sense, since the inner loop is entirely inside this function.
382 */
383template <typename Method>
384void NO_INLINE Aggregator::executeImpl(
385 Method & method,
386 Arena * aggregates_pool,
387 size_t rows,
388 ColumnRawPtrs & key_columns,
389 AggregateFunctionInstruction * aggregate_instructions,
390 bool no_more_keys,
391 AggregateDataPtr overflow_row) const
392{
393 typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
394
395 if (!no_more_keys)
396 //executeImplCase<false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
397 executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions);
398 else
399 executeImplCase<true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
400}
401
402
403template <bool no_more_keys, typename Method>
404void NO_INLINE Aggregator::executeImplCase(
405 Method & method,
406 typename Method::State & state,
407 Arena * aggregates_pool,
408 size_t rows,
409 AggregateFunctionInstruction * aggregate_instructions,
410 AggregateDataPtr overflow_row) const
411{
412 /// NOTE When editing this code, also pay attention to SpecializedAggregator.h.
413
414 /// For all rows.
415 for (size_t i = 0; i < rows; ++i)
416 {
417 AggregateDataPtr aggregate_data = nullptr;
418
419 if constexpr (!no_more_keys) /// Insert.
420 {
421 auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
422
423 /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
424 if (emplace_result.isInserted())
425 {
426 /// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
427 emplace_result.setMapped(nullptr);
428
429 aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
430 createAggregateStates(aggregate_data);
431
432 emplace_result.setMapped(aggregate_data);
433 }
434 else
435 aggregate_data = emplace_result.getMapped();
436 }
437 else
438 {
439 /// Add only if the key already exists.
440 auto find_result = state.findKey(method.data, i, *aggregates_pool);
441 if (find_result.isFound())
442 aggregate_data = find_result.getMapped();
443 }
444
445 /// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
446
447 /// If the key does not fit, and the data does not need to be aggregated in a separate row, then there's nothing to do.
448 if (!aggregate_data && !overflow_row)
449 continue;
450
451 AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row;
452
453 /// Add values to the aggregate functions.
454 for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
455 (*inst->func)(inst->that, value + inst->state_offset, inst->arguments, i, aggregates_pool);
456 }
457}
458
459
460template <typename Method>
461void NO_INLINE Aggregator::executeImplBatch(
462 Method & method,
463 typename Method::State & state,
464 Arena * aggregates_pool,
465 size_t rows,
466 AggregateFunctionInstruction * aggregate_instructions) const
467{
468 PODArray<AggregateDataPtr> places(rows);
469
470 /// For all rows.
471 for (size_t i = 0; i < rows; ++i)
472 {
473 AggregateDataPtr aggregate_data = nullptr;
474
475 auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
476
477 /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
478 if (emplace_result.isInserted())
479 {
480 /// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
481 emplace_result.setMapped(nullptr);
482
483 aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
484 createAggregateStates(aggregate_data);
485
486 emplace_result.setMapped(aggregate_data);
487 }
488 else
489 aggregate_data = emplace_result.getMapped();
490
491 places[i] = aggregate_data;
492 assert(places[i] != nullptr);
493 }
494
495 /// Add values to the aggregate functions.
496 for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
497 {
498 if (inst->offsets)
499 inst->batch_that->addBatchArray(rows, places.data(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
500 else
501 inst->batch_that->addBatch(rows, places.data(), inst->state_offset, inst->batch_arguments, aggregates_pool);
502 }
503}
504
505
506void NO_INLINE Aggregator::executeWithoutKeyImpl(
507 AggregatedDataWithoutKey & res,
508 size_t rows,
509 AggregateFunctionInstruction * aggregate_instructions,
510 Arena * arena) const
511{
512 /// Adding values
513 for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
514 {
515 if (inst->offsets)
516 inst->batch_that->addBatchSinglePlace(
517 inst->offsets[static_cast<ssize_t>(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena);
518 else
519 inst->batch_that->addBatchSinglePlace(rows, res + inst->state_offset, inst->batch_arguments, arena);
520 }
521}
522
523
524bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
525 ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
526{
527 UInt64 num_rows = block.rows();
528 return executeOnBlock(block.getColumns(), num_rows, result, key_columns, aggregate_columns, no_more_keys);
529}
530
531bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
532 ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
533{
534 if (isCancelled())
535 return true;
536
537 /// `result` will destroy the states of aggregate functions in the destructor
538 result.aggregator = this;
539
540 /// How to perform the aggregation?
541 if (result.empty())
542 {
543 result.init(method_chosen);
544 result.keys_size = params.keys_size;
545 result.key_sizes = key_sizes;
546 LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
547 }
548
549 if (isCancelled())
550 return true;
551
552 for (size_t i = 0; i < params.aggregates_size; ++i)
553 aggregate_columns[i].resize(params.aggregates[i].arguments.size());
554
555 /** Constant columns are not supported directly during aggregation.
556 * To make them work anyway, we materialize them.
557 */
558 Columns materialized_columns;
559
560 /// Remember the columns we will work with
561 for (size_t i = 0; i < params.keys_size; ++i)
562 {
563 materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
564 key_columns[i] = materialized_columns.back().get();
565
566 if (!result.isLowCardinality())
567 {
568 auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
569 if (column_no_lc.get() != key_columns[i])
570 {
571 materialized_columns.emplace_back(std::move(column_no_lc));
572 key_columns[i] = materialized_columns.back().get();
573 }
574 }
575 }
576
577 AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1);
578 aggregate_functions_instructions[params.aggregates_size].that = nullptr;
579
580 std::vector<std::vector<const IColumn *>> nested_columns_holder;
581 for (size_t i = 0; i < params.aggregates_size; ++i)
582 {
583 for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
584 {
585 materialized_columns.push_back(columns.at(params.aggregates[i].arguments[j])->convertToFullColumnIfConst());
586 aggregate_columns[i][j] = materialized_columns.back().get();
587
588 auto column_no_lc = recursiveRemoveLowCardinality(aggregate_columns[i][j]->getPtr());
589 if (column_no_lc.get() != aggregate_columns[i][j])
590 {
591 materialized_columns.emplace_back(std::move(column_no_lc));
592 aggregate_columns[i][j] = materialized_columns.back().get();
593 }
594 }
595
596 aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
597 aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
598 auto that = aggregate_functions[i];
599 /// Unnest consecutive trailing -State combinators
600 while (auto func = typeid_cast<const AggregateFunctionState *>(that))
601 that = func->getNestedFunction().get();
602 aggregate_functions_instructions[i].that = that;
603 aggregate_functions_instructions[i].func = that->getAddressOfAddFunction();
604
605 if (auto func = typeid_cast<const AggregateFunctionArray *>(that))
606 {
607 /// Unnest consecutive -State combinators before -Array
608 that = func->getNestedFunction().get();
609 while (auto nested_func = typeid_cast<const AggregateFunctionState *>(that))
610 that = nested_func->getNestedFunction().get();
611 auto [nested_columns, offsets] = checkAndGetNestedArrayOffset(aggregate_columns[i].data(), that->getArgumentTypes().size());
612 nested_columns_holder.push_back(std::move(nested_columns));
613 aggregate_functions_instructions[i].batch_arguments = nested_columns_holder.back().data();
614 aggregate_functions_instructions[i].offsets = offsets;
615 }
616 else
617 aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();
618
619 aggregate_functions_instructions[i].batch_that = that;
620 }
621
622 if (isCancelled())
623 return true;
624
625 if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
626 {
627 AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
628 createAggregateStates(place);
629 result.without_key = place;
630 }
631
632 /// We select one of the aggregation methods and call it.
633
634 /// For the case when there are no keys (all aggregate into one row).
635 if (result.type == AggregatedDataVariants::Type::without_key)
636 {
637 executeWithoutKeyImpl(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool);
638 }
639 else
640 {
641 /// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
642 AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
643
644 #define M(NAME, IS_TWO_LEVEL) \
645 else if (result.type == AggregatedDataVariants::Type::NAME) \
646 executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, aggregate_functions_instructions.data(), \
647 no_more_keys, overflow_row_ptr);
648
649 if (false) {}
650 APPLY_FOR_AGGREGATED_VARIANTS(M)
651 #undef M
652 }
653
654 size_t result_size = result.sizeWithoutOverflowRow();
655 Int64 current_memory_usage = 0;
656 if (auto memory_tracker_child = CurrentThread::getMemoryTracker())
657 if (auto memory_tracker = memory_tracker_child->getParent())
658 current_memory_usage = memory_tracker->get();
659
660 auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads.
661
662 bool worth_convert_to_two_level
663 = (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
664 || (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(params.group_by_two_level_threshold_bytes));
665
666 /** Converting to a two-level data structure.
667 * It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
668 */
669 if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
670 result.convertToTwoLevel();
671
672 /// Checking the constraints.
673 if (!checkLimits(result_size, no_more_keys))
674 return false;
675
676 /** Flush data to disk if too much RAM is consumed.
677 * Data can only be flushed to disk if a two-level aggregation structure is used.
678 */
679 if (params.max_bytes_before_external_group_by
680 && result.isTwoLevel()
681 && current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
682 && worth_convert_to_two_level)
683 {
684 if (!enoughSpaceInDirectory(params.tmp_path, current_memory_usage + params.min_free_disk_space))
685 throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
686
687 writeToTemporaryFile(result);
688 }
689
690 return true;
691}
692
693
694void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
695{
696 Stopwatch watch;
697 size_t rows = data_variants.size();
698
699 auto file = createTemporaryFile(params.tmp_path);
700 const std::string & path = file->path();
701 WriteBufferFromFile file_buf(path);
702 CompressedWriteBuffer compressed_buf(file_buf);
703 NativeBlockOutputStream block_out(compressed_buf, ClickHouseRevision::get(), getHeader(false));
704
705 LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << ".");
706 ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
707
708 /// Flush only two-level data and possibly overflow data.
709
710#define M(NAME) \
711 else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
712 writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out);
713
714 if (false) {}
715 APPLY_FOR_VARIANTS_TWO_LEVEL(M)
716#undef M
717 else
718 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
719
720 /// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones.
721 data_variants.init(data_variants.type);
722 data_variants.aggregates_pools = Arenas(1, std::make_shared<Arena>());
723 data_variants.aggregates_pool = data_variants.aggregates_pools.back().get();
724 data_variants.without_key = nullptr;
725
726 block_out.flush();
727 compressed_buf.next();
728 file_buf.next();
729
730 double elapsed_seconds = watch.elapsedSeconds();
731 double compressed_bytes = file_buf.count();
732 double uncompressed_bytes = compressed_buf.count();
733
734 {
735 std::lock_guard lock(temporary_files.mutex);
736 temporary_files.files.emplace_back(std::move(file));
737 temporary_files.sum_size_uncompressed += uncompressed_bytes;
738 temporary_files.sum_size_compressed += compressed_bytes;
739 }
740
741 ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
742 ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
743
744 LOG_TRACE(log, std::fixed << std::setprecision(3)
745 << "Written part in " << elapsed_seconds << " sec., "
746 << rows << " rows, "
747 << (uncompressed_bytes / 1048576.0) << " MiB uncompressed, "
748 << (compressed_bytes / 1048576.0) << " MiB compressed, "
749 << (uncompressed_bytes / rows) << " uncompressed bytes per row, "
750 << (compressed_bytes / rows) << " compressed bytes per row, "
751 << "compression rate: " << (uncompressed_bytes / compressed_bytes)
752 << " (" << (rows / elapsed_seconds) << " rows/sec., "
753 << (uncompressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. uncompressed, "
754 << (compressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. compressed)");
755}
756
757
758template <typename Method>
759Block Aggregator::convertOneBucketToBlock(
760 AggregatedDataVariants & data_variants,
761 Method & method,
762 bool final,
763 size_t bucket) const
764{
765 Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
766 [bucket, &method, this] (
767 MutableColumns & key_columns,
768 AggregateColumnsData & aggregate_columns,
769 MutableColumns & final_aggregate_columns,
770 bool final_)
771 {
772 convertToBlockImpl(method, method.data.impls[bucket],
773 key_columns, aggregate_columns, final_aggregate_columns, final_);
774 });
775
776 block.info.bucket_num = bucket;
777 return block;
778}
779
780Block Aggregator::mergeAndConvertOneBucketToBlock(
781 ManyAggregatedDataVariants & variants,
782 Arena * arena,
783 bool final,
784 size_t bucket) const
785{
786 auto & merged_data = *variants[0];
787 auto method = merged_data.type;
788 Block block;
789
790 if (false) {}
791#define M(NAME) \
792 else if (method == AggregatedDataVariants::Type::NAME) \
793 { \
794 mergeBucketImpl<decltype(merged_data.NAME)::element_type>(variants, bucket, arena); \
795 block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \
796 }
797
798 APPLY_FOR_VARIANTS_TWO_LEVEL(M)
799#undef M
800
801 return block;
802}
803
804
805template <typename Method>
806void Aggregator::writeToTemporaryFileImpl(
807 AggregatedDataVariants & data_variants,
808 Method & method,
809 IBlockOutputStream & out)
810{
811 size_t max_temporary_block_size_rows = 0;
812 size_t max_temporary_block_size_bytes = 0;
813
814 auto update_max_sizes = [&](const Block & block)
815 {
816 size_t block_size_rows = block.rows();
817 size_t block_size_bytes = block.bytes();
818
819 if (block_size_rows > max_temporary_block_size_rows)
820 max_temporary_block_size_rows = block_size_rows;
821 if (block_size_bytes > max_temporary_block_size_bytes)
822 max_temporary_block_size_bytes = block_size_bytes;
823 };
824
825 for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
826 {
827 Block block = convertOneBucketToBlock(data_variants, method, false, bucket);
828 out.write(block);
829 update_max_sizes(block);
830 }
831
832 if (params.overflow_row)
833 {
834 Block block = prepareBlockAndFillWithoutKey(data_variants, false, true);
835 out.write(block);
836 update_max_sizes(block);
837 }
838
839 /// Pass ownership of the aggregate functions states:
840 /// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
841 data_variants.aggregator = nullptr;
842
843 LOG_TRACE(log, std::fixed << std::setprecision(3)
844 << "Max size of temporary block: " << max_temporary_block_size_rows << " rows, "
845 << (max_temporary_block_size_bytes / 1048576.0) << " MiB.");
846}
847
848
849bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
850{
851 if (!no_more_keys && params.max_rows_to_group_by && result_size > params.max_rows_to_group_by)
852 {
853 switch (params.group_by_overflow_mode)
854 {
855 case OverflowMode::THROW:
856 throw Exception("Limit for rows to GROUP BY exceeded: has " + toString(result_size)
857 + " rows, maximum: " + toString(params.max_rows_to_group_by),
858 ErrorCodes::TOO_MANY_ROWS);
859
860 case OverflowMode::BREAK:
861 return false;
862
863 case OverflowMode::ANY:
864 no_more_keys = true;
865 break;
866 }
867 }
868
869 return true;
870}
871
872
873void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result)
874{
875 if (isCancelled())
876 return;
877
878 ColumnRawPtrs key_columns(params.keys_size);
879 AggregateColumns aggregate_columns(params.aggregates_size);
880
881 /** Used if there is a limit on the maximum number of rows in the aggregation,
882 * and if group_by_overflow_mode == ANY.
883 * In this case, new keys are not added to the set, but aggregation is performed only by
884 * keys that have already managed to get into the set.
885 */
886 bool no_more_keys = false;
887
888 LOG_TRACE(log, "Aggregating");
889
890 Stopwatch watch;
891
892 size_t src_rows = 0;
893 size_t src_bytes = 0;
894
895 /// Read all the data
896 while (Block block = stream->read())
897 {
898 if (isCancelled())
899 return;
900
901 src_rows += block.rows();
902 src_bytes += block.bytes();
903
904 if (!executeOnBlock(block, result, key_columns, aggregate_columns, no_more_keys))
905 break;
906 }
907
908 /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
909 /// To do this, we pass a block with zero rows to aggregate.
910 if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
911 executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, no_more_keys);
912
913 double elapsed_seconds = watch.elapsedSeconds();
914 size_t rows = result.sizeWithoutOverflowRow();
915 LOG_TRACE(log, std::fixed << std::setprecision(3)
916 << "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
917 << " in " << elapsed_seconds << " sec."
918 << " (" << src_rows / elapsed_seconds << " rows/sec., " << src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
919}
920
921
922template <typename Method, typename Table>
923void Aggregator::convertToBlockImpl(
924 Method & method,
925 Table & data,
926 MutableColumns & key_columns,
927 AggregateColumnsData & aggregate_columns,
928 MutableColumns & final_aggregate_columns,
929 bool final) const
930{
931 if (data.empty())
932 return;
933
934 if (key_columns.size() != params.keys_size)
935 throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
936
937 if (final)
938 convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns);
939 else
940 convertToBlockImplNotFinal(method, data, key_columns, aggregate_columns);
941 /// In order to release memory early.
942 data.clearAndShrink();
943}
944
945template <typename Method, typename Table>
946void NO_INLINE Aggregator::convertToBlockImplFinal(
947 Method & method,
948 Table & data,
949 MutableColumns & key_columns,
950 MutableColumns & final_aggregate_columns) const
951{
952 if constexpr (Method::low_cardinality_optimization)
953 {
954 if (data.hasNullKeyData())
955 {
956 key_columns[0]->insertDefault();
957
958 for (size_t i = 0; i < params.aggregates_size; ++i)
959 aggregate_functions[i]->insertResultInto(
960 data.getNullKeyData() + offsets_of_aggregate_states[i],
961 *final_aggregate_columns[i]);
962 }
963 }
964
965 data.forEachValue([&](const auto & key, auto & mapped)
966 {
967 method.insertKeyIntoColumns(key, key_columns, key_sizes);
968
969 for (size_t i = 0; i < params.aggregates_size; ++i)
970 aggregate_functions[i]->insertResultInto(
971 mapped + offsets_of_aggregate_states[i],
972 *final_aggregate_columns[i]);
973 });
974
975 destroyImpl<Method>(data);
976}
977
978template <typename Method, typename Table>
979void NO_INLINE Aggregator::convertToBlockImplNotFinal(
980 Method & method,
981 Table & data,
982 MutableColumns & key_columns,
983 AggregateColumnsData & aggregate_columns) const
984{
985 if constexpr (Method::low_cardinality_optimization)
986 {
987 if (data.hasNullKeyData())
988 {
989 key_columns[0]->insertDefault();
990
991 for (size_t i = 0; i < params.aggregates_size; ++i)
992 aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
993 }
994 }
995
996 data.forEachValue([&](const auto & key, auto & mapped)
997 {
998 method.insertKeyIntoColumns(key, key_columns, key_sizes);
999
1000 /// reserved, so push_back does not throw exceptions
1001 for (size_t i = 0; i < params.aggregates_size; ++i)
1002 aggregate_columns[i]->push_back(mapped + offsets_of_aggregate_states[i]);
1003
1004 mapped = nullptr;
1005 });
1006}
1007
1008
1009template <typename Filler>
1010Block Aggregator::prepareBlockAndFill(
1011 AggregatedDataVariants & data_variants,
1012 bool final,
1013 size_t rows,
1014 Filler && filler) const
1015{
1016 MutableColumns key_columns(params.keys_size);
1017 MutableColumns aggregate_columns(params.aggregates_size);
1018 MutableColumns final_aggregate_columns(params.aggregates_size);
1019 AggregateColumnsData aggregate_columns_data(params.aggregates_size);
1020
1021 Block header = getHeader(final);
1022
1023 for (size_t i = 0; i < params.keys_size; ++i)
1024 {
1025 key_columns[i] = header.safeGetByPosition(i).type->createColumn();
1026 key_columns[i]->reserve(rows);
1027 }
1028
1029 for (size_t i = 0; i < params.aggregates_size; ++i)
1030 {
1031 if (!final)
1032 {
1033 aggregate_columns[i] = header.safeGetByPosition(i + params.keys_size).type->createColumn();
1034
1035 /// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
1036 ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
1037
1038 for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j)
1039 column_aggregate_func.addArena(data_variants.aggregates_pools[j]);
1040
1041 aggregate_columns_data[i] = &column_aggregate_func.getData();
1042 aggregate_columns_data[i]->reserve(rows);
1043 }
1044 else
1045 {
1046 final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
1047 final_aggregate_columns[i]->reserve(rows);
1048
1049 if (aggregate_functions[i]->isState())
1050 {
1051 /// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
1052 ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*final_aggregate_columns[i]);
1053
1054 for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j)
1055 column_aggregate_func.addArena(data_variants.aggregates_pools[j]);
1056 }
1057 }
1058 }
1059
1060 filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);
1061
1062 Block res = header.cloneEmpty();
1063
1064 for (size_t i = 0; i < params.keys_size; ++i)
1065 res.getByPosition(i).column = std::move(key_columns[i]);
1066
1067 for (size_t i = 0; i < params.aggregates_size; ++i)
1068 {
1069 if (final)
1070 res.getByPosition(i + params.keys_size).column = std::move(final_aggregate_columns[i]);
1071 else
1072 res.getByPosition(i + params.keys_size).column = std::move(aggregate_columns[i]);
1073 }
1074
1075 /// Change the size of the columns-constants in the block.
1076 size_t columns = header.columns();
1077 for (size_t i = 0; i < columns; ++i)
1078 if (isColumnConst(*res.getByPosition(i).column))
1079 res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
1080
1081 return res;
1082}
1083
1084
1085Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const
1086{
1087 size_t rows = 1;
1088
1089 auto filler = [&data_variants, this](
1090 MutableColumns & key_columns,
1091 AggregateColumnsData & aggregate_columns,
1092 MutableColumns & final_aggregate_columns,
1093 bool final_)
1094 {
1095 if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
1096 {
1097 AggregatedDataWithoutKey & data = data_variants.without_key;
1098
1099 for (size_t i = 0; i < params.aggregates_size; ++i)
1100 {
1101 if (!final_)
1102 aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
1103 else
1104 aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
1105 }
1106
1107 if (!final_)
1108 data = nullptr;
1109
1110 if (params.overflow_row)
1111 for (size_t i = 0; i < params.keys_size; ++i)
1112 key_columns[i]->insertDefault();
1113 }
1114 };
1115
1116 Block block = prepareBlockAndFill(data_variants, final, rows, filler);
1117
1118 if (is_overflows)
1119 block.info.is_overflows = true;
1120
1121 if (final)
1122 destroyWithoutKey(data_variants);
1123
1124 return block;
1125}
1126
1127Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const
1128{
1129 size_t rows = data_variants.sizeWithoutOverflowRow();
1130
1131 auto filler = [&data_variants, this](
1132 MutableColumns & key_columns,
1133 AggregateColumnsData & aggregate_columns,
1134 MutableColumns & final_aggregate_columns,
1135 bool final_)
1136 {
1137 #define M(NAME) \
1138 else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
1139 convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
1140 key_columns, aggregate_columns, final_aggregate_columns, final_);
1141
1142 if (false) {}
1143 APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
1144 #undef M
1145 else
1146 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
1147 };
1148
1149 return prepareBlockAndFill(data_variants, final, rows, filler);
1150}
1151
1152
1153BlocksList Aggregator::prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const
1154{
1155#define M(NAME) \
1156 else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
1157 return prepareBlocksAndFillTwoLevelImpl(data_variants, *data_variants.NAME, final, thread_pool);
1158
1159 if (false) {}
1160 APPLY_FOR_VARIANTS_TWO_LEVEL(M)
1161#undef M
1162 else
1163 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
1164}
1165
1166
1167template <typename Method>
1168BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
1169 AggregatedDataVariants & data_variants,
1170 Method & method,
1171 bool final,
1172 ThreadPool * thread_pool) const
1173{
1174 auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group)
1175 {
1176 if (thread_group)
1177 CurrentThread::attachToIfDetached(thread_group);
1178 return convertOneBucketToBlock(data_variants, method, final, bucket);
1179 };
1180
1181 /// packaged_task is used to ensure that exceptions are automatically thrown into the main stream.
1182
1183 std::vector<std::packaged_task<Block()>> tasks(Method::Data::NUM_BUCKETS);
1184
1185 try
1186 {
1187 for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
1188 {
1189 if (method.data.impls[bucket].empty())
1190 continue;
1191
1192 tasks[bucket] = std::packaged_task<Block()>(std::bind(converter, bucket, CurrentThread::getGroup()));
1193
1194 if (thread_pool)
1195 thread_pool->scheduleOrThrowOnError([bucket, &tasks] { tasks[bucket](); });
1196 else
1197 tasks[bucket]();
1198 }
1199 }
1200 catch (...)
1201 {
1202 /// If this is not done, then in case of an exception, tasks will be destroyed before the threads are completed, and it will be bad.
1203 if (thread_pool)
1204 thread_pool->wait();
1205
1206 throw;
1207 }
1208
1209 if (thread_pool)
1210 thread_pool->wait();
1211
1212 BlocksList blocks;
1213
1214 for (auto & task : tasks)
1215 {
1216 if (!task.valid())
1217 continue;
1218
1219 blocks.emplace_back(task.get_future().get());
1220 }
1221
1222 return blocks;
1223}
1224
1225
1226BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const
1227{
1228 if (isCancelled())
1229 return BlocksList();
1230
1231 LOG_TRACE(log, "Converting aggregated data to blocks");
1232
1233 Stopwatch watch;
1234
1235 BlocksList blocks;
1236
1237 /// In what data structure is the data aggregated?
1238 if (data_variants.empty())
1239 return blocks;
1240
1241 std::unique_ptr<ThreadPool> thread_pool;
1242 if (max_threads > 1 && data_variants.sizeWithoutOverflowRow() > 100000 /// TODO Make a custom threshold.
1243 && data_variants.isTwoLevel()) /// TODO Use the shared thread pool with the `merge` function.
1244 thread_pool = std::make_unique<ThreadPool>(max_threads);
1245
1246 if (isCancelled())
1247 return BlocksList();
1248
1249 if (data_variants.without_key)
1250 blocks.emplace_back(prepareBlockAndFillWithoutKey(
1251 data_variants, final, data_variants.type != AggregatedDataVariants::Type::without_key));
1252
1253 if (isCancelled())
1254 return BlocksList();
1255
1256 if (data_variants.type != AggregatedDataVariants::Type::without_key)
1257 {
1258 if (!data_variants.isTwoLevel())
1259 blocks.emplace_back(prepareBlockAndFillSingleLevel(data_variants, final));
1260 else
1261 blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
1262 }
1263
1264 if (!final)
1265 {
1266 /// data_variants will not destroy the states of aggregate functions in the destructor.
1267 /// Now ColumnAggregateFunction owns the states.
1268 data_variants.aggregator = nullptr;
1269 }
1270
1271 if (isCancelled())
1272 return BlocksList();
1273
1274 size_t rows = 0;
1275 size_t bytes = 0;
1276
1277 for (const auto & block : blocks)
1278 {
1279 rows += block.rows();
1280 bytes += block.bytes();
1281 }
1282
1283 double elapsed_seconds = watch.elapsedSeconds();
1284 LOG_TRACE(log, std::fixed << std::setprecision(3)
1285 << "Converted aggregated data to blocks. "
1286 << rows << " rows, " << bytes / 1048576.0 << " MiB"
1287 << " in " << elapsed_seconds << " sec."
1288 << " (" << rows / elapsed_seconds << " rows/sec., " << bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
1289
1290 return blocks;
1291}
1292
1293
1294template <typename Method, typename Table>
1295void NO_INLINE Aggregator::mergeDataNullKey(
1296 Table & table_dst,
1297 Table & table_src,
1298 Arena * arena) const
1299{
1300 if constexpr (Method::low_cardinality_optimization)
1301 {
1302 if (table_src.hasNullKeyData())
1303 {
1304 if (!table_dst.hasNullKeyData())
1305 {
1306 table_dst.hasNullKeyData() = true;
1307 table_dst.getNullKeyData() = table_src.getNullKeyData();
1308 }
1309 else
1310 {
1311 for (size_t i = 0; i < params.aggregates_size; ++i)
1312 aggregate_functions[i]->merge(
1313 table_dst.getNullKeyData() + offsets_of_aggregate_states[i],
1314 table_src.getNullKeyData() + offsets_of_aggregate_states[i],
1315 arena);
1316
1317 for (size_t i = 0; i < params.aggregates_size; ++i)
1318 aggregate_functions[i]->destroy(
1319 table_src.getNullKeyData() + offsets_of_aggregate_states[i]);
1320 }
1321
1322 table_src.hasNullKeyData() = false;
1323 table_src.getNullKeyData() = nullptr;
1324 }
1325 }
1326}
1327
1328
1329template <typename Method, typename Table>
1330void NO_INLINE Aggregator::mergeDataImpl(
1331 Table & table_dst,
1332 Table & table_src,
1333 Arena * arena) const
1334{
1335 if constexpr (Method::low_cardinality_optimization)
1336 mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
1337
1338 table_src.mergeToViaEmplace(table_dst,
1339 [&](AggregateDataPtr & dst, AggregateDataPtr & src, bool inserted)
1340 {
1341 if (!inserted)
1342 {
1343 for (size_t i = 0; i < params.aggregates_size; ++i)
1344 aggregate_functions[i]->merge(
1345 dst + offsets_of_aggregate_states[i],
1346 src + offsets_of_aggregate_states[i],
1347 arena);
1348
1349 for (size_t i = 0; i < params.aggregates_size; ++i)
1350 aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
1351 }
1352 else
1353 {
1354 dst = src;
1355 }
1356
1357 src = nullptr;
1358 });
1359 table_src.clearAndShrink();
1360}
1361
1362
1363template <typename Method, typename Table>
1364void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
1365 Table & table_dst,
1366 AggregatedDataWithoutKey & overflows,
1367 Table & table_src,
1368 Arena * arena) const
1369{
1370 /// Note : will create data for NULL key if not exist
1371 if constexpr (Method::low_cardinality_optimization)
1372 mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
1373
1374 table_src.mergeToViaFind(table_dst, [&](AggregateDataPtr dst, AggregateDataPtr & src, bool found)
1375 {
1376 AggregateDataPtr res_data = found ? dst : overflows;
1377
1378 for (size_t i = 0; i < params.aggregates_size; ++i)
1379 aggregate_functions[i]->merge(
1380 res_data + offsets_of_aggregate_states[i],
1381 src + offsets_of_aggregate_states[i],
1382 arena);
1383
1384 for (size_t i = 0; i < params.aggregates_size; ++i)
1385 aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
1386
1387 src = nullptr;
1388 });
1389 table_src.clearAndShrink();
1390}
1391
1392template <typename Method, typename Table>
1393void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
1394 Table & table_dst,
1395 Table & table_src,
1396 Arena * arena) const
1397{
1398 /// Note : will create data for NULL key if not exist
1399 if constexpr (Method::low_cardinality_optimization)
1400 mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
1401
1402 table_src.mergeToViaFind(table_dst,
1403 [&](AggregateDataPtr dst, AggregateDataPtr & src, bool found)
1404 {
1405 if (!found)
1406 return;
1407
1408 for (size_t i = 0; i < params.aggregates_size; ++i)
1409 aggregate_functions[i]->merge(
1410 dst + offsets_of_aggregate_states[i],
1411 src + offsets_of_aggregate_states[i],
1412 arena);
1413
1414 for (size_t i = 0; i < params.aggregates_size; ++i)
1415 aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
1416
1417 src = nullptr;
1418 });
1419 table_src.clearAndShrink();
1420}
1421
1422
1423void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
1424 ManyAggregatedDataVariants & non_empty_data) const
1425{
1426 AggregatedDataVariantsPtr & res = non_empty_data[0];
1427
1428 /// We merge all aggregation results to the first.
1429 for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
1430 {
1431 AggregatedDataWithoutKey & res_data = res->without_key;
1432 AggregatedDataWithoutKey & current_data = non_empty_data[result_num]->without_key;
1433
1434 for (size_t i = 0; i < params.aggregates_size; ++i)
1435 aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i], res->aggregates_pool);
1436
1437 for (size_t i = 0; i < params.aggregates_size; ++i)
1438 aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]);
1439
1440 current_data = nullptr;
1441 }
1442}
1443
1444
1445template <typename Method>
1446void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
1447 ManyAggregatedDataVariants & non_empty_data) const
1448{
1449 AggregatedDataVariantsPtr & res = non_empty_data[0];
1450 bool no_more_keys = false;
1451
1452 /// We merge all aggregation results to the first.
1453 for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
1454 {
1455 if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys))
1456 break;
1457
1458 AggregatedDataVariants & current = *non_empty_data[result_num];
1459
1460 if (!no_more_keys)
1461 mergeDataImpl<Method>(
1462 getDataVariant<Method>(*res).data,
1463 getDataVariant<Method>(current).data,
1464 res->aggregates_pool);
1465 else if (res->without_key)
1466 mergeDataNoMoreKeysImpl<Method>(
1467 getDataVariant<Method>(*res).data,
1468 res->without_key,
1469 getDataVariant<Method>(current).data,
1470 res->aggregates_pool);
1471 else
1472 mergeDataOnlyExistingKeysImpl<Method>(
1473 getDataVariant<Method>(*res).data,
1474 getDataVariant<Method>(current).data,
1475 res->aggregates_pool);
1476
1477 /// `current` will not destroy the states of aggregate functions in the destructor
1478 current.aggregator = nullptr;
1479 }
1480}
1481
1482
1483template <typename Method>
1484void NO_INLINE Aggregator::mergeBucketImpl(
1485 ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const
1486{
1487 /// We merge all aggregation results to the first.
1488 AggregatedDataVariantsPtr & res = data[0];
1489 for (size_t result_num = 1, size = data.size(); result_num < size; ++result_num)
1490 {
1491 AggregatedDataVariants & current = *data[result_num];
1492
1493 mergeDataImpl<Method>(
1494 getDataVariant<Method>(*res).data.impls[bucket],
1495 getDataVariant<Method>(current).data.impls[bucket],
1496 arena);
1497 }
1498}
1499
1500
1501/** Combines aggregation states together, turns them into blocks, and outputs streams.
1502 * If the aggregation states are two-level, then it produces blocks strictly in order of 'bucket_num'.
1503 * (This is important for distributed processing.)
1504 * In doing so, it can handle different buckets in parallel, using up to `threads` threads.
1505 */
1506class MergingAndConvertingBlockInputStream : public IBlockInputStream
1507{
1508public:
1509 /** The input is a set of non-empty sets of partially aggregated data,
1510 * which are all either single-level, or are two-level.
1511 */
1512 MergingAndConvertingBlockInputStream(const Aggregator & aggregator_, ManyAggregatedDataVariants & data_, bool final_, size_t threads_)
1513 : aggregator(aggregator_), data(data_), final(final_), threads(threads_)
1514 {
1515 /// At least we need one arena in first data item per thread
1516 if (!data.empty() && threads > data[0]->aggregates_pools.size())
1517 {
1518 Arenas & first_pool = data[0]->aggregates_pools;
1519 for (size_t j = first_pool.size(); j < threads; j++)
1520 first_pool.emplace_back(std::make_shared<Arena>());
1521 }
1522 }
1523
1524 String getName() const override { return "MergingAndConverting"; }
1525
1526 Block getHeader() const override { return aggregator.getHeader(final); }
1527
1528 ~MergingAndConvertingBlockInputStream() override
1529 {
1530 LOG_TRACE(&Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish");
1531
1532 /// We need to wait for threads to finish before destructor of 'parallel_merge_data',
1533 /// because the threads access 'parallel_merge_data'.
1534 if (parallel_merge_data)
1535 parallel_merge_data->pool.wait();
1536 }
1537
1538protected:
1539 Block readImpl() override
1540 {
1541 if (data.empty())
1542 return {};
1543
1544 if (current_bucket_num >= NUM_BUCKETS)
1545 return {};
1546
1547 AggregatedDataVariantsPtr & first = data[0];
1548
1549 if (current_bucket_num == -1)
1550 {
1551 ++current_bucket_num;
1552
1553 if (first->type == AggregatedDataVariants::Type::without_key || aggregator.params.overflow_row)
1554 {
1555 aggregator.mergeWithoutKeyDataImpl(data);
1556 return aggregator.prepareBlockAndFillWithoutKey(
1557 *first, final, first->type != AggregatedDataVariants::Type::without_key);
1558 }
1559 }
1560
1561 if (!first->isTwoLevel())
1562 {
1563 if (current_bucket_num > 0)
1564 return {};
1565
1566 if (first->type == AggregatedDataVariants::Type::without_key)
1567 return {};
1568
1569 ++current_bucket_num;
1570
1571 #define M(NAME) \
1572 else if (first->type == AggregatedDataVariants::Type::NAME) \
1573 aggregator.mergeSingleLevelDataImpl<decltype(first->NAME)::element_type>(data);
1574 if (false) {}
1575 APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
1576 #undef M
1577 else
1578 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
1579
1580 return aggregator.prepareBlockAndFillSingleLevel(*first, final);
1581 }
1582 else
1583 {
1584 if (!parallel_merge_data)
1585 {
1586 parallel_merge_data = std::make_unique<ParallelMergeData>(threads);
1587 for (size_t i = 0; i < threads; ++i)
1588 scheduleThreadForNextBucket();
1589 }
1590
1591 Block res;
1592
1593 while (true)
1594 {
1595 std::unique_lock lock(parallel_merge_data->mutex);
1596
1597 if (parallel_merge_data->exception)
1598 std::rethrow_exception(parallel_merge_data->exception);
1599
1600 auto it = parallel_merge_data->ready_blocks.find(current_bucket_num);
1601 if (it != parallel_merge_data->ready_blocks.end())
1602 {
1603 ++current_bucket_num;
1604 scheduleThreadForNextBucket();
1605
1606 if (it->second)
1607 {
1608 res.swap(it->second);
1609 break;
1610 }
1611 else if (current_bucket_num >= NUM_BUCKETS)
1612 break;
1613 }
1614
1615 parallel_merge_data->condvar.wait(lock);
1616 }
1617
1618 return res;
1619 }
1620 }
1621
1622private:
1623 const Aggregator & aggregator;
1624 ManyAggregatedDataVariants data;
1625 bool final;
1626 size_t threads;
1627
1628 Int32 current_bucket_num = -1;
1629 Int32 max_scheduled_bucket_num = -1;
1630 static constexpr Int32 NUM_BUCKETS = 256;
1631
1632 struct ParallelMergeData
1633 {
1634 std::map<Int32, Block> ready_blocks;
1635 std::exception_ptr exception;
1636 std::mutex mutex;
1637 std::condition_variable condvar;
1638 ThreadPool pool;
1639
1640 explicit ParallelMergeData(size_t threads_) : pool(threads_) {}
1641 };
1642
1643 std::unique_ptr<ParallelMergeData> parallel_merge_data;
1644
1645 void scheduleThreadForNextBucket()
1646 {
1647 ++max_scheduled_bucket_num;
1648 if (max_scheduled_bucket_num >= NUM_BUCKETS)
1649 return;
1650
1651 parallel_merge_data->pool.scheduleOrThrowOnError(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
1652 max_scheduled_bucket_num, CurrentThread::getGroup()));
1653 }
1654
1655 void thread(Int32 bucket_num, ThreadGroupStatusPtr thread_group)
1656 {
1657 try
1658 {
1659 setThreadName("MergingAggregtd");
1660 if (thread_group)
1661 CurrentThread::attachToIfDetached(thread_group);
1662 CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
1663
1664 /// TODO: add no_more_keys support maybe
1665
1666 auto & merged_data = *data[0];
1667 auto method = merged_data.type;
1668 Block block;
1669
1670 /// Select Arena to avoid race conditions
1671 size_t thread_number = static_cast<size_t>(bucket_num) % threads;
1672 Arena * arena = merged_data.aggregates_pools.at(thread_number).get();
1673
1674 if (false) {}
1675 #define M(NAME) \
1676 else if (method == AggregatedDataVariants::Type::NAME) \
1677 { \
1678 aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num, arena); \
1679 block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \
1680 }
1681
1682 APPLY_FOR_VARIANTS_TWO_LEVEL(M)
1683 #undef M
1684
1685 std::lock_guard lock(parallel_merge_data->mutex);
1686 parallel_merge_data->ready_blocks[bucket_num] = std::move(block);
1687 }
1688 catch (...)
1689 {
1690 std::lock_guard lock(parallel_merge_data->mutex);
1691 if (!parallel_merge_data->exception)
1692 parallel_merge_data->exception = std::current_exception();
1693 }
1694
1695 parallel_merge_data->condvar.notify_all();
1696 }
1697};
1698
1699ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const
1700{
1701 if (data_variants.empty())
1702 throw Exception("Empty data passed to Aggregator::mergeAndConvertToBlocks.", ErrorCodes::EMPTY_DATA_PASSED);
1703
1704 LOG_TRACE(log, "Merging aggregated data");
1705
1706 ManyAggregatedDataVariants non_empty_data;
1707 non_empty_data.reserve(data_variants.size());
1708 for (auto & data : data_variants)
1709 if (!data->empty())
1710 non_empty_data.push_back(data);
1711
1712 if (non_empty_data.empty())
1713 return {};
1714
1715 if (non_empty_data.size() > 1)
1716 {
1717 /// Sort the states in descending order so that the merge is more efficient (since all states are merged into the first).
1718 std::sort(non_empty_data.begin(), non_empty_data.end(),
1719 [](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
1720 {
1721 return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();
1722 });
1723 }
1724
1725 /// If at least one of the options is two-level, then convert all the options into two-level ones, if there are not such.
1726 /// Note - perhaps it would be more optimal not to convert single-level versions before the merge, but merge them separately, at the end.
1727
1728 bool has_at_least_one_two_level = false;
1729 for (const auto & variant : non_empty_data)
1730 {
1731 if (variant->isTwoLevel())
1732 {
1733 has_at_least_one_two_level = true;
1734 break;
1735 }
1736 }
1737
1738 if (has_at_least_one_two_level)
1739 for (auto & variant : non_empty_data)
1740 if (!variant->isTwoLevel())
1741 variant->convertToTwoLevel();
1742
1743 AggregatedDataVariantsPtr & first = non_empty_data[0];
1744
1745 for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
1746 {
1747 if (first->type != non_empty_data[i]->type)
1748 throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
1749
1750 /** Elements from the remaining sets can be moved to the first data set.
1751 * Therefore, it must own all the arenas of all other sets.
1752 */
1753 first->aggregates_pools.insert(first->aggregates_pools.end(),
1754 non_empty_data[i]->aggregates_pools.begin(), non_empty_data[i]->aggregates_pools.end());
1755 }
1756
1757 return non_empty_data;
1758}
1759
1760std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
1761 ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const
1762{
1763 ManyAggregatedDataVariants non_empty_data = prepareVariantsToMerge(data_variants);
1764
1765 if (non_empty_data.empty())
1766 return std::make_unique<NullBlockInputStream>(getHeader(final));
1767
1768 return std::make_unique<MergingAndConvertingBlockInputStream>(*this, non_empty_data, final, max_threads);
1769}
1770
1771
1772template <bool no_more_keys, typename Method, typename Table>
1773void NO_INLINE Aggregator::mergeStreamsImplCase(
1774 Block & block,
1775 Arena * aggregates_pool,
1776 Method & method [[maybe_unused]],
1777 Table & data,
1778 AggregateDataPtr overflow_row) const
1779{
1780 ColumnRawPtrs key_columns(params.keys_size);
1781 AggregateColumnsConstData aggregate_columns(params.aggregates_size);
1782
1783 /// Remember the columns we will work with
1784 for (size_t i = 0; i < params.keys_size; ++i)
1785 key_columns[i] = block.safeGetByPosition(i).column.get();
1786
1787 for (size_t i = 0; i < params.aggregates_size; ++i)
1788 aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData();
1789
1790 typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
1791
1792 /// For all rows.
1793 size_t rows = block.rows();
1794 for (size_t i = 0; i < rows; ++i)
1795 {
1796 AggregateDataPtr aggregate_data = nullptr;
1797
1798 if (!no_more_keys)
1799 {
1800 auto emplace_result = state.emplaceKey(data, i, *aggregates_pool);
1801 if (emplace_result.isInserted())
1802 {
1803 emplace_result.setMapped(nullptr);
1804
1805 aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
1806 createAggregateStates(aggregate_data);
1807
1808 emplace_result.setMapped(aggregate_data);
1809 }
1810 else
1811 aggregate_data = emplace_result.getMapped();
1812 }
1813 else
1814 {
1815 auto find_result = state.findKey(data, i, *aggregates_pool);
1816 if (find_result.isFound())
1817 aggregate_data = find_result.getMapped();
1818 }
1819
1820 /// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
1821
1822 /// If the key does not fit, and the data does not need to be aggregated into a separate row, then there's nothing to do.
1823 if (!aggregate_data && !overflow_row)
1824 continue;
1825
1826 AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row;
1827
1828 /// Merge state of aggregate functions.
1829 for (size_t j = 0; j < params.aggregates_size; ++j)
1830 aggregate_functions[j]->merge(
1831 value + offsets_of_aggregate_states[j],
1832 (*aggregate_columns[j])[i],
1833 aggregates_pool);
1834 }
1835
1836 /// Early release memory.
1837 block.clear();
1838}
1839
1840template <typename Method, typename Table>
1841void NO_INLINE Aggregator::mergeStreamsImpl(
1842 Block & block,
1843 Arena * aggregates_pool,
1844 Method & method,
1845 Table & data,
1846 AggregateDataPtr overflow_row,
1847 bool no_more_keys) const
1848{
1849 if (!no_more_keys)
1850 mergeStreamsImplCase<false>(block, aggregates_pool, method, data, overflow_row);
1851 else
1852 mergeStreamsImplCase<true>(block, aggregates_pool, method, data, overflow_row);
1853}
1854
1855
1856void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
1857 Block & block,
1858 AggregatedDataVariants & result) const
1859{
1860 AggregateColumnsConstData aggregate_columns(params.aggregates_size);
1861
1862 /// Remember the columns we will work with
1863 for (size_t i = 0; i < params.aggregates_size; ++i)
1864 aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData();
1865
1866 AggregatedDataWithoutKey & res = result.without_key;
1867 if (!res)
1868 {
1869 AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
1870 createAggregateStates(place);
1871 res = place;
1872 }
1873
1874 /// Adding Values
1875 for (size_t i = 0; i < params.aggregates_size; ++i)
1876 aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0], result.aggregates_pool);
1877
1878 /// Early release memory.
1879 block.clear();
1880}
1881
1882
1883void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads)
1884{
1885 if (isCancelled())
1886 return;
1887
1888 /** If the remote servers used a two-level aggregation method,
1889 * then blocks will contain information about the number of the bucket.
1890 * Then the calculations can be parallelized by buckets.
1891 * We decompose the blocks to the bucket numbers indicated in them.
1892 */
1893 BucketToBlocks bucket_to_blocks;
1894
1895 /// Read all the data.
1896 LOG_TRACE(log, "Reading blocks of partially aggregated data.");
1897
1898 size_t total_input_rows = 0;
1899 size_t total_input_blocks = 0;
1900 while (Block block = stream->read())
1901 {
1902 if (isCancelled())
1903 return;
1904
1905 total_input_rows += block.rows();
1906 ++total_input_blocks;
1907 bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
1908 }
1909
1910 LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows
1911 << " rows.");
1912
1913 mergeBlocks(bucket_to_blocks, result, max_threads);
1914}
1915
1916void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads)
1917{
1918 if (bucket_to_blocks.empty())
1919 return;
1920
1921 UInt64 total_input_rows = 0;
1922 for (auto & bucket : bucket_to_blocks)
1923 for (auto & block : bucket.second)
1924 total_input_rows += block.rows();
1925
1926 /** `minus one` means the absence of information about the bucket
1927 * - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
1928 * If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation.
1929 */
1930 auto max_bucket = bucket_to_blocks.rbegin()->first;
1931 bool has_two_level = max_bucket >= 0;
1932
1933 if (has_two_level)
1934 {
1935 #define M(NAME) \
1936 if (method_chosen == AggregatedDataVariants::Type::NAME) \
1937 method_chosen = AggregatedDataVariants::Type::NAME ## _two_level;
1938
1939 APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
1940
1941 #undef M
1942 }
1943
1944 if (isCancelled())
1945 return;
1946
1947 /// result will destroy the states of aggregate functions in the destructor
1948 result.aggregator = this;
1949
1950 result.init(method_chosen);
1951 result.keys_size = params.keys_size;
1952 result.key_sizes = key_sizes;
1953
1954 bool has_blocks_with_unknown_bucket = bucket_to_blocks.count(-1);
1955
1956 /// First, parallel the merge for the individual buckets. Then we continue merge the data not allocated to the buckets.
1957 if (has_two_level)
1958 {
1959 /** In this case, no_more_keys is not supported due to the fact that
1960 * from different threads it is difficult to update the general state for "other" keys (overflows).
1961 * That is, the keys in the end can be significantly larger than max_rows_to_group_by.
1962 */
1963
1964 LOG_TRACE(log, "Merging partially aggregated two-level data.");
1965
1966 auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group)
1967 {
1968 if (thread_group)
1969 CurrentThread::attachToIfDetached(thread_group);
1970
1971 for (Block & block : bucket_to_blocks[bucket])
1972 {
1973 if (isCancelled())
1974 return;
1975
1976 #define M(NAME) \
1977 else if (result.type == AggregatedDataVariants::Type::NAME) \
1978 mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
1979
1980 if (false) {}
1981 APPLY_FOR_VARIANTS_TWO_LEVEL(M)
1982 #undef M
1983 else
1984 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
1985 }
1986 };
1987
1988 std::unique_ptr<ThreadPool> thread_pool;
1989 if (max_threads > 1 && total_input_rows > 100000) /// TODO Make a custom threshold.
1990 thread_pool = std::make_unique<ThreadPool>(max_threads);
1991
1992 for (const auto & bucket_blocks : bucket_to_blocks)
1993 {
1994 const auto bucket = bucket_blocks.first;
1995
1996 if (bucket == -1)
1997 continue;
1998
1999 result.aggregates_pools.push_back(std::make_shared<Arena>());
2000 Arena * aggregates_pool = result.aggregates_pools.back().get();
2001
2002 auto task = std::bind(merge_bucket, bucket, aggregates_pool, CurrentThread::getGroup());
2003
2004 if (thread_pool)
2005 thread_pool->scheduleOrThrowOnError(task);
2006 else
2007 task();
2008 }
2009
2010 if (thread_pool)
2011 thread_pool->wait();
2012
2013 LOG_TRACE(log, "Merged partially aggregated two-level data.");
2014 }
2015
2016 if (isCancelled())
2017 {
2018 result.invalidate();
2019 return;
2020 }
2021
2022 if (has_blocks_with_unknown_bucket)
2023 {
2024 LOG_TRACE(log, "Merging partially aggregated single-level data.");
2025
2026 bool no_more_keys = false;
2027
2028 BlocksList & blocks = bucket_to_blocks[-1];
2029 for (Block & block : blocks)
2030 {
2031 if (isCancelled())
2032 {
2033 result.invalidate();
2034 return;
2035 }
2036
2037 if (!checkLimits(result.sizeWithoutOverflowRow(), no_more_keys))
2038 break;
2039
2040 if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
2041 mergeWithoutKeyStreamsImpl(block, result);
2042
2043 #define M(NAME, IS_TWO_LEVEL) \
2044 else if (result.type == AggregatedDataVariants::Type::NAME) \
2045 mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
2046
2047 APPLY_FOR_AGGREGATED_VARIANTS(M)
2048 #undef M
2049 else if (result.type != AggregatedDataVariants::Type::without_key)
2050 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
2051 }
2052
2053 LOG_TRACE(log, "Merged partially aggregated single-level data.");
2054 }
2055}
2056
2057
2058Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
2059{
2060 if (blocks.empty())
2061 return {};
2062
2063 auto bucket_num = blocks.front().info.bucket_num;
2064 bool is_overflows = blocks.front().info.is_overflows;
2065
2066 LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << bucket_num << ").");
2067 Stopwatch watch;
2068
2069 /** If possible, change 'method' to some_hash64. Otherwise, leave as is.
2070 * Better hash function is needed because during external aggregation,
2071 * we may merge partitions of data with total number of keys far greater than 4 billion.
2072 */
2073 auto merge_method = method_chosen;
2074
2075#define APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M) \
2076 M(key64) \
2077 M(key_string) \
2078 M(key_fixed_string) \
2079 M(keys128) \
2080 M(keys256) \
2081 M(serialized) \
2082
2083#define M(NAME) \
2084 if (merge_method == AggregatedDataVariants::Type::NAME) \
2085 merge_method = AggregatedDataVariants::Type::NAME ## _hash64; \
2086
2087 APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M)
2088#undef M
2089
2090#undef APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION
2091
2092 /// Temporary data for aggregation.
2093 AggregatedDataVariants result;
2094
2095 /// result will destroy the states of aggregate functions in the destructor
2096 result.aggregator = this;
2097
2098 result.init(merge_method);
2099 result.keys_size = params.keys_size;
2100 result.key_sizes = key_sizes;
2101
2102 for (Block & block : blocks)
2103 {
2104 if (isCancelled())
2105 return {};
2106
2107 if (bucket_num >= 0 && block.info.bucket_num != bucket_num)
2108 bucket_num = -1;
2109
2110 if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
2111 mergeWithoutKeyStreamsImpl(block, result);
2112
2113 #define M(NAME, IS_TWO_LEVEL) \
2114 else if (result.type == AggregatedDataVariants::Type::NAME) \
2115 mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
2116
2117 APPLY_FOR_AGGREGATED_VARIANTS(M)
2118 #undef M
2119 else if (result.type != AggregatedDataVariants::Type::without_key)
2120 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
2121 }
2122
2123 if (isCancelled())
2124 return {};
2125
2126 Block block;
2127 if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
2128 block = prepareBlockAndFillWithoutKey(result, final, is_overflows);
2129 else
2130 block = prepareBlockAndFillSingleLevel(result, final);
2131 /// NOTE: two-level data is not possible here - chooseAggregationMethod chooses only among single-level methods.
2132
2133 if (!final)
2134 {
2135 /// Pass ownership of aggregate function states from result to ColumnAggregateFunction objects in the resulting block.
2136 result.aggregator = nullptr;
2137 }
2138
2139 size_t rows = block.rows();
2140 size_t bytes = block.bytes();
2141 double elapsed_seconds = watch.elapsedSeconds();
2142 LOG_TRACE(log, std::fixed << std::setprecision(3)
2143 << "Merged partially aggregated blocks. "
2144 << rows << " rows, " << bytes / 1048576.0 << " MiB."
2145 << " in " << elapsed_seconds << " sec."
2146 << " (" << rows / elapsed_seconds << " rows/sec., " << bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
2147
2148 if (isCancelled())
2149 return {};
2150
2151 block.info.bucket_num = bucket_num;
2152 return block;
2153}
2154
2155
2156template <typename Method>
2157void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
2158 Method & method,
2159 Arena * pool,
2160 ColumnRawPtrs & key_columns,
2161 const Block & source,
2162 std::vector<Block> & destinations) const
2163{
2164 typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
2165
2166 size_t rows = source.rows();
2167 size_t columns = source.columns();
2168
2169 /// Create a 'selector' that will contain bucket index for every row. It will be used to scatter rows to buckets.
2170 IColumn::Selector selector(rows);
2171
2172 /// For every row.
2173 for (size_t i = 0; i < rows; ++i)
2174 {
2175 if constexpr (Method::low_cardinality_optimization)
2176 {
2177 if (state.isNullAt(i))
2178 {
2179 selector[i] = 0;
2180 continue;
2181 }
2182 }
2183
2184 /// Calculate bucket number from row hash.
2185 auto hash = state.getHash(method.data, i, *pool);
2186 auto bucket = method.data.getBucketFromHash(hash);
2187
2188 selector[i] = bucket;
2189 }
2190
2191 size_t num_buckets = destinations.size();
2192
2193 for (size_t column_idx = 0; column_idx < columns; ++column_idx)
2194 {
2195 const ColumnWithTypeAndName & src_col = source.getByPosition(column_idx);
2196 MutableColumns scattered_columns = src_col.column->scatter(num_buckets, selector);
2197
2198 for (size_t bucket = 0, size = num_buckets; bucket < size; ++bucket)
2199 {
2200 if (!scattered_columns[bucket]->empty())
2201 {
2202 Block & dst = destinations[bucket];
2203 dst.info.bucket_num = bucket;
2204 dst.insert({std::move(scattered_columns[bucket]), src_col.type, src_col.name});
2205 }
2206
2207 /** Inserted columns of type ColumnAggregateFunction will own states of aggregate functions
2208 * by holding shared_ptr to source column. See ColumnAggregateFunction.h
2209 */
2210 }
2211 }
2212}
2213
2214
2215std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
2216{
2217 if (!block)
2218 return {};
2219
2220 AggregatedDataVariants data;
2221
2222 ColumnRawPtrs key_columns(params.keys_size);
2223
2224 /// Remember the columns we will work with
2225 for (size_t i = 0; i < params.keys_size; ++i)
2226 key_columns[i] = block.safeGetByPosition(i).column.get();
2227
2228 AggregatedDataVariants::Type type = method_chosen;
2229 data.keys_size = params.keys_size;
2230 data.key_sizes = key_sizes;
2231
2232#define M(NAME) \
2233 else if (type == AggregatedDataVariants::Type::NAME) \
2234 type = AggregatedDataVariants::Type::NAME ## _two_level;
2235
2236 if (false) {}
2237 APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
2238#undef M
2239 else
2240 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
2241
2242 data.init(type);
2243
2244 size_t num_buckets = 0;
2245
2246#define M(NAME) \
2247 else if (data.type == AggregatedDataVariants::Type::NAME) \
2248 num_buckets = data.NAME->data.NUM_BUCKETS;
2249
2250 if (false) {}
2251 APPLY_FOR_VARIANTS_TWO_LEVEL(M)
2252#undef M
2253 else
2254 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
2255
2256 std::vector<Block> splitted_blocks(num_buckets);
2257
2258#define M(NAME) \
2259 else if (data.type == AggregatedDataVariants::Type::NAME) \
2260 convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, \
2261 key_columns, block, splitted_blocks);
2262
2263 if (false) {}
2264 APPLY_FOR_VARIANTS_TWO_LEVEL(M)
2265#undef M
2266 else
2267 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
2268
2269 return splitted_blocks;
2270}
2271
2272
2273template <typename Method, typename Table>
2274void NO_INLINE Aggregator::destroyImpl(Table & table) const
2275{
2276 table.forEachMapped([&](AggregateDataPtr & data)
2277 {
2278 /** If an exception (usually a lack of memory, the MemoryTracker throws) arose
2279 * after inserting the key into a hash table, but before creating all states of aggregate functions,
2280 * then data will be equal nullptr.
2281 */
2282 if (nullptr == data)
2283 return;
2284
2285 for (size_t i = 0; i < params.aggregates_size; ++i)
2286 if (!aggregate_functions[i]->isState())
2287 aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
2288
2289 data = nullptr;
2290 });
2291}
2292
2293
2294void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
2295{
2296 AggregatedDataWithoutKey & res_data = result.without_key;
2297
2298 if (nullptr != res_data)
2299 {
2300 for (size_t i = 0; i < params.aggregates_size; ++i)
2301 if (!aggregate_functions[i]->isState())
2302 aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
2303
2304 res_data = nullptr;
2305 }
2306}
2307
2308
2309void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
2310{
2311 if (result.size() == 0)
2312 return;
2313
2314 LOG_TRACE(log, "Destroying aggregate states");
2315
2316 /// In what data structure is the data aggregated?
2317 if (result.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
2318 destroyWithoutKey(result);
2319
2320#define M(NAME, IS_TWO_LEVEL) \
2321 else if (result.type == AggregatedDataVariants::Type::NAME) \
2322 destroyImpl<decltype(result.NAME)::element_type>(result.NAME->data);
2323
2324 if (false) {}
2325 APPLY_FOR_AGGREGATED_VARIANTS(M)
2326#undef M
2327 else if (result.type != AggregatedDataVariants::Type::without_key)
2328 throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
2329}
2330
2331
2332void Aggregator::setCancellationHook(const CancellationHook cancellation_hook)
2333{
2334 isCancelled = cancellation_hook;
2335}
2336
2337
2338}
2339