1#include <future>
2#include <Common/setThreadName.h>
3#include <Common/CurrentMetrics.h>
4#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
5#include <Common/CurrentThread.h>
6
7
8namespace CurrentMetrics
9{
10 extern const Metric QueryThread;
11}
12
13
14namespace DB
15{
16
17
18/** Scheme of operation:
19 *
20 * We have to output blocks in specific order: by bucket number:
21 *
22 * o o o o ... o
23 * 0 1 2 3 255
24 *
25 * Each block is the result of merge of blocks with same bucket number from several sources:
26 *
27 * src1 o o ...
28 * | |
29 * src2 o o
30 *
31 * | |
32 * v v
33 *
34 * result o o
35 * 0 1
36 *
37 * (we must merge 0th block from src1 with 0th block from src2 to form 0th result block and so on)
38 *
39 * We may read (request over network) blocks from different sources in parallel.
40 * It is done by getNextBlocksToMerge method. Number of threads is 'reading_threads'.
41 *
42 * Also, we may do merges for different buckets in parallel.
43 * For example, we may
44 * merge 1th block from src1 with 1th block from src2 in one thread
45 * and merge 2nd block from src1 with 2nd block from src2 in other thread.
46 * Number of threads is 'merging_threads'
47 * And we must keep only 'merging_threads' buckets of blocks in memory simultaneously,
48 * because our goal is to limit memory usage: not to keep all result in memory, but return it in streaming form.
49 *
50 * So, we return result sequentially, but perform calculations of resulting blocks in parallel.
51 * (calculation - is doing merge of source blocks for same buckets)
52 *
53 * Example:
54 *
55 * src1 . . o o . . .
56 * | |
57 * src2 o o
58 *
59 * | |
60 * v v
61 *
62 * result . . o o . . .
63 *
64 * In this picture, we do only two merges in parallel.
65 * When a merge is done, method 'getNextBlocksToMerge' is called to get blocks from sources for next bucket.
66 * Then next merge is performed.
67 *
68 * Main ('readImpl') method is waiting for merged blocks for next bucket and returns it.
69 */
70
71
72MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficientBlockInputStream(
73 BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t reading_threads_, size_t merging_threads_)
74 : aggregator(params), final(final_),
75 reading_threads(std::min(reading_threads_, inputs_.size())), merging_threads(merging_threads_),
76 inputs(inputs_.begin(), inputs_.end())
77{
78 children = inputs_;
79
80 /** Create threads that will request and read data from remote servers.
81 */
82 if (reading_threads > 1)
83 reading_pool = std::make_unique<ThreadPool>(reading_threads);
84
85 /** Create threads. Each of them will pull next set of blocks to merge in a loop,
86 * then merge them and place result in a queue (in fact, ordered map), from where we will read ready result blocks.
87 */
88 if (merging_threads > 1)
89 parallel_merge_data = std::make_unique<ParallelMergeData>(merging_threads);
90}
91
92
93Block MergingAggregatedMemoryEfficientBlockInputStream::getHeader() const
94{
95 return aggregator.getHeader(final);
96}
97
98
99void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix()
100{
101 start();
102}
103
104
105void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix()
106{
107 if (!all_read && !isCancelled())
108 throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
109
110 finalize();
111
112 for (size_t i = 0; i < children.size(); ++i)
113 children[i]->readSuffix();
114}
115
116
117void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill)
118{
119 if (kill)
120 is_killed = true;
121
122 bool old_val = false;
123 if (!is_cancelled.compare_exchange_strong(old_val, true))
124 return;
125
126 if (parallel_merge_data)
127 {
128 {
129 std::unique_lock lock(parallel_merge_data->merged_blocks_mutex);
130 parallel_merge_data->finish = true;
131 }
132 parallel_merge_data->merged_blocks_changed.notify_one(); /// readImpl method must stop waiting and exit.
133 parallel_merge_data->have_space.notify_all(); /// Merging threads must stop waiting and exit.
134 }
135
136 for (auto & input : inputs)
137 {
138 try
139 {
140 input.stream->cancel(kill);
141 }
142 catch (...)
143 {
144 /** If failed to ask to stop processing one or more sources.
145 * (example: connection reset during distributed query execution)
146 * - then don't care.
147 */
148 LOG_ERROR(log, "Exception while cancelling " << input.stream->getName());
149 }
150 }
151}
152
153
154void MergingAggregatedMemoryEfficientBlockInputStream::start()
155{
156 if (started)
157 return;
158
159 started = true;
160
161 /// If child is RemoteBlockInputStream, then child->readPrefix() will send query to remote server, initiating calculations.
162
163 if (reading_threads == 1)
164 {
165 for (auto & child : children)
166 child->readPrefix();
167 }
168 else
169 {
170 size_t num_children = children.size();
171 try
172 {
173 for (size_t i = 0; i < num_children; ++i)
174 {
175 auto & child = children[i];
176
177 auto thread_group = CurrentThread::getGroup();
178 reading_pool->scheduleOrThrowOnError([&child, thread_group]
179 {
180 setThreadName("MergeAggReadThr");
181 if (thread_group)
182 CurrentThread::attachToIfDetached(thread_group);
183 CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
184 child->readPrefix();
185 });
186 }
187 }
188 catch (...)
189 {
190 reading_pool->wait();
191 throw;
192 }
193 reading_pool->wait();
194 }
195
196 if (merging_threads > 1)
197 {
198 auto & pool = parallel_merge_data->pool;
199
200 /** Create threads that will receive and merge blocks.
201 */
202
203 for (size_t i = 0; i < merging_threads; ++i)
204 pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]() { mergeThread(thread_group); });
205 }
206}
207
208
209Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
210{
211 start();
212
213 if (!parallel_merge_data)
214 {
215 if (BlocksToMerge blocks_to_merge = getNextBlocksToMerge())
216 return aggregator.mergeBlocks(*blocks_to_merge, final);
217 return {};
218 }
219 else
220 {
221 Block res;
222
223 while (true)
224 {
225 std::unique_lock lock(parallel_merge_data->merged_blocks_mutex);
226
227 parallel_merge_data->merged_blocks_changed.wait(lock, [this]
228 {
229 return parallel_merge_data->finish /// Requested to finish early.
230 || parallel_merge_data->exception /// An error in merging thread.
231 || parallel_merge_data->exhausted /// No more data in sources.
232 || !parallel_merge_data->merged_blocks.empty(); /// Have another merged block.
233 });
234
235 if (parallel_merge_data->exception)
236 std::rethrow_exception(parallel_merge_data->exception);
237
238 if (parallel_merge_data->finish)
239 break;
240
241 bool have_merged_block_or_merging_in_progress = !parallel_merge_data->merged_blocks.empty();
242
243 if (parallel_merge_data->exhausted && !have_merged_block_or_merging_in_progress)
244 break;
245
246 if (have_merged_block_or_merging_in_progress)
247 {
248 auto it = parallel_merge_data->merged_blocks.begin();
249
250 if (it->second)
251 {
252 res.swap(it->second);
253 parallel_merge_data->merged_blocks.erase(it);
254
255 lock.unlock();
256 parallel_merge_data->have_space.notify_one(); /// We consumed block. Merging thread may merge next block for us.
257 break;
258 }
259 }
260 }
261
262 if (!res)
263 all_read = true;
264
265 return res;
266 }
267}
268
269
270MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEfficientBlockInputStream()
271{
272 try
273 {
274 if (!all_read)
275 cancel(false);
276
277 finalize();
278 }
279 catch (...)
280 {
281 tryLogCurrentException(__PRETTY_FUNCTION__);
282 }
283}
284
285
286void MergingAggregatedMemoryEfficientBlockInputStream::finalize()
287{
288 if (!started)
289 return;
290
291 LOG_TRACE(log, "Waiting for threads to finish");
292
293 if (parallel_merge_data)
294 parallel_merge_data->pool.wait();
295
296 LOG_TRACE(log, "Waited for threads to finish");
297}
298
299
300void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(ThreadGroupStatusPtr thread_group)
301{
302 CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
303
304 try
305 {
306 if (thread_group)
307 CurrentThread::attachToIfDetached(thread_group);
308 setThreadName("MergeAggMergThr");
309
310 while (!parallel_merge_data->finish)
311 {
312 /** Receiving next blocks is processing by one thread pool, and merge is in another.
313 * This is quite complex interaction.
314 * Each time:
315 * - 'reading_threads' will read one next block from each source;
316 * - group of blocks for merge is created from them;
317 * - one of 'merging_threads' will do merge this group of blocks;
318 */
319 BlocksToMerge blocks_to_merge;
320 int output_order = -1;
321
322 /** Synchronously:
323 * - fetch next blocks from sources,
324 * wait for space in 'merged_blocks'
325 * and reserve a place in 'merged_blocks' to do merge of them;
326 * - or, if no next blocks, set 'exhausted' flag.
327 */
328 {
329 std::lock_guard lock_next_blocks(parallel_merge_data->get_next_blocks_mutex);
330
331 if (parallel_merge_data->exhausted || parallel_merge_data->finish)
332 break;
333
334 blocks_to_merge = getNextBlocksToMerge();
335
336 if (!blocks_to_merge || blocks_to_merge->empty())
337 {
338 {
339 std::unique_lock lock_merged_blocks(parallel_merge_data->merged_blocks_mutex);
340 parallel_merge_data->exhausted = true;
341 }
342
343 /// No new blocks has been read from sources. (But maybe, in another mergeThread, some previous block is still prepared.)
344 parallel_merge_data->merged_blocks_changed.notify_one();
345 break;
346 }
347
348 output_order = blocks_to_merge->front().info.is_overflows
349 ? NUM_BUCKETS /// "Overflow" blocks returned by 'getNextBlocksToMerge' after all other blocks.
350 : blocks_to_merge->front().info.bucket_num;
351
352 {
353 std::unique_lock lock_merged_blocks(parallel_merge_data->merged_blocks_mutex);
354
355 parallel_merge_data->have_space.wait(lock_merged_blocks, [this]
356 {
357 return parallel_merge_data->merged_blocks.size() < merging_threads
358 || parallel_merge_data->finish;
359 });
360
361 if (parallel_merge_data->finish)
362 break;
363
364 /** Place empty block. It is promise to do merge and fill it.
365 * Main thread knows, that there will be result for 'output_order' place.
366 * Main thread must return results exactly in 'output_order', so that is important.
367 */
368 parallel_merge_data->merged_blocks[output_order]; //-V607
369 }
370 }
371
372 /// At this point, several merge threads may work in parallel.
373 Block res = aggregator.mergeBlocks(*blocks_to_merge, final);
374
375 {
376 std::lock_guard lock(parallel_merge_data->merged_blocks_mutex);
377
378 if (parallel_merge_data->finish)
379 break;
380
381 parallel_merge_data->merged_blocks[output_order] = res;
382 }
383
384 /// Notify that we have another merged block.
385 parallel_merge_data->merged_blocks_changed.notify_one();
386 }
387 }
388 catch (...)
389 {
390 {
391 std::lock_guard lock(parallel_merge_data->merged_blocks_mutex);
392 parallel_merge_data->exception = std::current_exception();
393 parallel_merge_data->finish = true;
394 }
395
396 parallel_merge_data->merged_blocks_changed.notify_one();
397 parallel_merge_data->have_space.notify_all();
398 }
399}
400
401
402MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregatedMemoryEfficientBlockInputStream::getNextBlocksToMerge()
403{
404 /** There are several input sources.
405 * From each of them, data may be received in one of following forms:
406 *
407 * 1. Block with specified 'bucket_num'.
408 * It means, that on remote server, data was partitioned by buckets.
409 * And data for each 'bucket_num' from different servers may be merged independently.
410 * Because data in different buckets will contain different aggregation keys.
411 * Data for different 'bucket_num's will be received in increasing order of 'bucket_num'.
412 *
413 * 2. Block without specified 'bucket_num'.
414 * It means, that on remote server, data was not partitioned by buckets.
415 * If all servers will send non-partitioned data, we may just merge it.
416 * But if some other servers will send partitioned data,
417 * then we must first partition non-partitioned data, and then merge data in each partition.
418 *
419 * 3. Blocks with 'is_overflows' = true.
420 * It is additional data, that was not passed 'max_rows_to_group_by' threshold.
421 * It must be merged together independently of ordinary data.
422 */
423 ++current_bucket_num;
424
425 /// Read from source next block with bucket number not greater than 'current_bucket_num'.
426
427 auto need_that_input = [this] (Input & input)
428 {
429 return !input.is_exhausted
430 && input.block.info.bucket_num < current_bucket_num;
431 };
432
433 auto read_from_input = [this] (Input & input)
434 {
435 /// If block with 'overflows' (not ordinary data) will be received, then remember that block and repeat.
436 while (true)
437 {
438// std::cerr << "reading block\n";
439 Block block = input.stream->read();
440
441 if (!block)
442 {
443// std::cerr << "input is exhausted\n";
444 input.is_exhausted = true;
445 break;
446 }
447
448 if (block.info.bucket_num != -1)
449 {
450 /// One of partitioned blocks for two-level data.
451// std::cerr << "block for bucket " << block.info.bucket_num << "\n";
452
453 has_two_level = true;
454 input.block = block;
455 }
456 else if (block.info.is_overflows)
457 {
458// std::cerr << "block for overflows\n";
459
460 has_overflows = true;
461 input.overflow_block = block;
462
463 continue;
464 }
465 else
466 {
467 /// Block for non-partitioned (single-level) data.
468// std::cerr << "block without bucket\n";
469
470 input.block = block;
471 }
472
473 break;
474 }
475 };
476
477 if (reading_threads == 1)
478 {
479 for (auto & input : inputs)
480 if (need_that_input(input))
481 read_from_input(input);
482 }
483 else
484 {
485 try
486 {
487 for (auto & input : inputs)
488 {
489 if (need_that_input(input))
490 {
491 auto thread_group = CurrentThread::getGroup();
492 reading_pool->scheduleOrThrowOnError([&input, &read_from_input, thread_group]
493 {
494 setThreadName("MergeAggReadThr");
495 if (thread_group)
496 CurrentThread::attachToIfDetached(thread_group);
497 CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
498 read_from_input(input);
499 });
500 }
501 }
502 }
503 catch (...)
504 {
505 reading_pool->wait();
506 throw;
507 }
508 reading_pool->wait();
509 }
510
511 while (true)
512 {
513 if (current_bucket_num >= NUM_BUCKETS)
514 {
515 /// All ordinary data was processed. Maybe, there are also 'overflows'-blocks.
516// std::cerr << "at end\n";
517
518 if (has_overflows)
519 {
520// std::cerr << "merging overflows\n";
521
522 has_overflows = false;
523 BlocksToMerge blocks_to_merge = std::make_unique<BlocksList>();
524
525 for (auto & input : inputs)
526 if (input.overflow_block)
527 blocks_to_merge->emplace_back(std::move(input.overflow_block));
528
529 return blocks_to_merge;
530 }
531 else
532 return {};
533 }
534 else if (has_two_level)
535 {
536 /** Having two-level (partitioned) data.
537 * Will process by bucket numbers in increasing order.
538 * Find minimum bucket number, for which there is data
539 * - this will be data for merge.
540 */
541// std::cerr << "has two level\n";
542
543 int min_bucket_num = NUM_BUCKETS;
544
545 for (auto & input : inputs)
546 {
547 /// Blocks for already partitioned (two-level) data.
548 if (input.block.info.bucket_num != -1 && input.block.info.bucket_num < min_bucket_num)
549 min_bucket_num = input.block.info.bucket_num;
550
551 /// Not yet partitioned (splitted to buckets) block. Will partition it and place result to 'splitted_blocks'.
552 if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty())
553 {
554 LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
555
556 input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
557 input.block = Block();
558 }
559
560 /// Blocks we got by splitting non-partitioned blocks.
561 if (!input.splitted_blocks.empty())
562 {
563 for (const auto & block : input.splitted_blocks)
564 {
565 if (block && block.info.bucket_num < min_bucket_num)
566 {
567 min_bucket_num = block.info.bucket_num;
568 break;
569 }
570 }
571 }
572 }
573
574 current_bucket_num = min_bucket_num;
575
576// std::cerr << "current_bucket_num = " << current_bucket_num << "\n";
577
578 /// No more blocks with ordinary data.
579 if (current_bucket_num == NUM_BUCKETS)
580 continue;
581
582 /// Collect all blocks for 'current_bucket_num' to do merge.
583 BlocksToMerge blocks_to_merge = std::make_unique<BlocksList>();
584
585 for (auto & input : inputs)
586 {
587 if (input.block.info.bucket_num == current_bucket_num)
588 {
589// std::cerr << "having block for current_bucket_num\n";
590
591 blocks_to_merge->emplace_back(std::move(input.block));
592 input.block = Block();
593 }
594 else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num])
595 {
596// std::cerr << "having splitted data for bucket\n";
597
598 blocks_to_merge->emplace_back(std::move(input.splitted_blocks[min_bucket_num]));
599 input.splitted_blocks[min_bucket_num] = Block();
600 }
601 }
602
603 return blocks_to_merge;
604 }
605 else
606 {
607 /// There are only non-partitioned (single-level) data. Just merge them.
608// std::cerr << "don't have two level\n";
609
610 BlocksToMerge blocks_to_merge = std::make_unique<BlocksList>();
611
612 for (auto & input : inputs)
613 if (input.block)
614 blocks_to_merge->emplace_back(std::move(input.block));
615
616 current_bucket_num = NUM_BUCKETS;
617 return blocks_to_merge;
618 }
619 }
620}
621
622}
623