1#include <limits>
2
3#include <Core/NamesAndTypes.h>
4#include <Core/SortCursor.h>
5#include <Columns/ColumnNullable.h>
6#include <Interpreters/MergeJoin.h>
7#include <Interpreters/AnalyzedJoin.h>
8#include <Interpreters/sortBlock.h>
9#include <Interpreters/join_common.h>
10#include <DataStreams/materializeBlock.h>
11#include <DataStreams/MergeSortingBlockInputStream.h>
12#include <DataStreams/MergingSortedBlockInputStream.h>
13#include <DataStreams/OneBlockInputStream.h>
14#include <DataStreams/TemporaryFileStream.h>
15#include <DataStreams/ConcatBlockInputStream.h>
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22 extern const int SET_SIZE_LIMIT_EXCEEDED;
23 extern const int NOT_IMPLEMENTED;
24 extern const int PARAMETER_OUT_OF_BOUND;
25 extern const int NOT_ENOUGH_SPACE;
26 extern const int LOGICAL_ERROR;
27}
28
29namespace
30{
31
32template <bool has_nulls>
33int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos)
34{
35 static constexpr int null_direction_hint = 1;
36
37 if constexpr (has_nulls)
38 {
39 auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column);
40 auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column);
41
42 if (left_nullable && right_nullable)
43 {
44 int res = left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
45 if (res)
46 return res;
47
48 /// NULL != NULL case
49 if (left_column.isNullAt(lhs_pos))
50 return null_direction_hint;
51 }
52
53 if (left_nullable && !right_nullable)
54 {
55 if (left_column.isNullAt(lhs_pos))
56 return null_direction_hint;
57 return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
58 }
59
60 if (!left_nullable && right_nullable)
61 {
62 if (right_column.isNullAt(rhs_pos))
63 return -null_direction_hint;
64 return left_column.compareAt(lhs_pos, rhs_pos, right_nullable->getNestedColumn(), null_direction_hint);
65 }
66 }
67
68 /// !left_nullable && !right_nullable
69 return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
70}
71
72Block extractMinMax(const Block & block, const Block & keys)
73{
74 if (block.rows() == 0)
75 throw Exception("Unexpected empty block", ErrorCodes::LOGICAL_ERROR);
76
77 Block min_max = keys.cloneEmpty();
78 MutableColumns columns = min_max.mutateColumns();
79
80 for (size_t i = 0; i < columns.size(); ++i)
81 {
82 auto & src_column = block.getByName(keys.getByPosition(i).name);
83
84 columns[i]->insertFrom(*src_column.column, 0);
85 columns[i]->insertFrom(*src_column.column, block.rows() - 1);
86 }
87
88 min_max.setColumns(std::move(columns));
89 return min_max;
90}
91
92}
93
94struct MergeJoinEqualRange
95{
96 size_t left_start = 0;
97 size_t right_start = 0;
98 size_t left_length = 0;
99 size_t right_length = 0;
100
101 bool empty() const { return !left_length && !right_length; }
102};
103
104using Range = MergeJoinEqualRange;
105
106
107class MergeJoinCursor
108{
109public:
110 MergeJoinCursor(const Block & block, const SortDescription & desc_)
111 : impl(SortCursorImpl(block, desc_))
112 {}
113
114 size_t position() const { return impl.pos; }
115 size_t end() const { return impl.rows; }
116 bool atEnd() const { return impl.pos >= impl.rows; }
117 void nextN(size_t num) { impl.pos += num; }
118
119 void setCompareNullability(const MergeJoinCursor & rhs)
120 {
121 has_nullable_columns = false;
122
123 for (size_t i = 0; i < impl.sort_columns_size; ++i)
124 {
125 bool is_left_nullable = isColumnNullable(*impl.sort_columns[i]);
126 bool is_right_nullable = isColumnNullable(*rhs.impl.sort_columns[i]);
127
128 if (is_left_nullable || is_right_nullable)
129 {
130 has_nullable_columns = true;
131 break;
132 }
133 }
134 }
135
136 Range getNextEqualRange(MergeJoinCursor & rhs)
137 {
138 if (has_nullable_columns)
139 return getNextEqualRangeImpl<true>(rhs);
140 return getNextEqualRangeImpl<false>(rhs);
141 }
142
143 int intersect(const Block & min_max, const Names & key_names)
144 {
145 if (end() == 0 || min_max.rows() != 2)
146 throw Exception("Unexpected block size", ErrorCodes::LOGICAL_ERROR);
147
148 size_t last_position = end() - 1;
149 int first_vs_max = 0;
150 int last_vs_min = 0;
151
152 for (size_t i = 0; i < impl.sort_columns.size(); ++i)
153 {
154 auto & left_column = *impl.sort_columns[i];
155 auto & right_column = *min_max.getByName(key_names[i]).column; /// cannot get by position cause of possible duplicates
156
157 if (!first_vs_max)
158 first_vs_max = nullableCompareAt<true>(left_column, right_column, position(), 1);
159
160 if (!last_vs_min)
161 last_vs_min = nullableCompareAt<true>(left_column, right_column, last_position, 0);
162 }
163
164 if (first_vs_max > 0)
165 return 1;
166 if (last_vs_min < 0)
167 return -1;
168 return 0;
169 }
170
171private:
172 SortCursorImpl impl;
173 bool has_nullable_columns = false;
174
175 template <bool has_nulls>
176 Range getNextEqualRangeImpl(MergeJoinCursor & rhs)
177 {
178 while (!atEnd() && !rhs.atEnd())
179 {
180 int cmp = compareAt<has_nulls>(rhs, impl.pos, rhs.impl.pos);
181 if (cmp < 0)
182 impl.next();
183 if (cmp > 0)
184 rhs.impl.next();
185 if (!cmp)
186 {
187 Range range{impl.pos, rhs.impl.pos, 0, 0};
188 range.left_length = getEqualLength();
189 range.right_length = rhs.getEqualLength();
190 return range;
191 }
192 }
193
194 return Range{impl.pos, rhs.impl.pos, 0, 0};
195 }
196
197 template <bool has_nulls>
198 int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
199 {
200 int res = 0;
201 for (size_t i = 0; i < impl.sort_columns_size; ++i)
202 {
203 auto * left_column = impl.sort_columns[i];
204 auto * right_column = rhs.impl.sort_columns[i];
205
206 res = nullableCompareAt<has_nulls>(*left_column, *right_column, lhs_pos, rhs_pos);
207 if (res)
208 break;
209 }
210 return res;
211 }
212
213 size_t getEqualLength()
214 {
215 if (atEnd())
216 return 0;
217
218 size_t pos = impl.pos;
219 while (sameNext(pos))
220 ++pos;
221 return pos - impl.pos + 1;
222 }
223
224 bool sameNext(size_t lhs_pos) const
225 {
226 if (lhs_pos + 1 >= impl.rows)
227 return false;
228
229 for (size_t i = 0; i < impl.sort_columns_size; ++i)
230 if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0)
231 return false;
232 return true;
233 }
234};
235
236namespace
237{
238
239MutableColumns makeMutableColumns(const Block & block, size_t rows_to_reserve = 0)
240{
241 MutableColumns columns;
242 columns.reserve(block.columns());
243
244 for (const auto & src_column : block)
245 {
246 columns.push_back(src_column.column->cloneEmpty());
247 columns.back()->reserve(rows_to_reserve);
248 }
249 return columns;
250}
251
252void makeSortAndMerge(const Names & keys, SortDescription & sort, SortDescription & merge)
253{
254 NameSet unique_keys;
255 for (auto & key_name : keys)
256 {
257 merge.emplace_back(SortColumnDescription(key_name, 1, 1));
258
259 if (!unique_keys.count(key_name))
260 {
261 unique_keys.insert(key_name);
262 sort.emplace_back(SortColumnDescription(key_name, 1, 1));
263 }
264 }
265}
266
267void copyLeftRange(const Block & block, MutableColumns & columns, size_t start, size_t rows_to_add)
268{
269 for (size_t i = 0; i < block.columns(); ++i)
270 {
271 const auto & src_column = block.getByPosition(i).column;
272 columns[i]->insertRangeFrom(*src_column, start, rows_to_add);
273 }
274}
275
276void copyRightRange(const Block & right_block, const Block & right_columns_to_add, MutableColumns & columns,
277 size_t row_position, size_t rows_to_add)
278{
279 for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
280 {
281 const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name).column;
282 auto & dst_column = columns[i];
283 auto * dst_nullable = typeid_cast<ColumnNullable *>(dst_column.get());
284
285 if (dst_nullable && !isColumnNullable(*src_column))
286 dst_nullable->insertManyFromNotNullable(*src_column, row_position, rows_to_add);
287 else
288 dst_column->insertManyFrom(*src_column, row_position, rows_to_add);
289 }
290}
291
292void joinEqualsAnyLeft(const Block & right_block, const Block & right_columns_to_add, MutableColumns & right_columns, const Range & range)
293{
294 copyRightRange(right_block, right_columns_to_add, right_columns, range.right_start, range.left_length);
295}
296
297void joinEquals(const Block & left_block, const Block & right_block, const Block & right_columns_to_add,
298 MutableColumns & left_columns, MutableColumns & right_columns, const Range & range, bool is_all)
299{
300 size_t left_rows_to_add = range.left_length;
301 size_t right_rows_to_add = is_all ? range.right_length : 1;
302
303 size_t row_position = range.right_start;
304 for (size_t right_row = 0; right_row < right_rows_to_add; ++right_row, ++row_position)
305 {
306 copyLeftRange(left_block, left_columns, range.left_start, left_rows_to_add);
307 copyRightRange(right_block, right_columns_to_add, right_columns, row_position, left_rows_to_add);
308 }
309}
310
311void appendNulls(MutableColumns & right_columns, size_t rows_to_add)
312{
313 for (auto & column : right_columns)
314 column->insertManyDefaults(rows_to_add);
315}
316
317void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, MutableColumns & right_columns,
318 size_t start, size_t end, bool copy_left)
319{
320 if (end <= start)
321 return;
322
323 size_t rows_to_add = end - start;
324 if (copy_left)
325 copyLeftRange(left_block, left_columns, start, rows_to_add);
326 appendNulls(right_columns, rows_to_add);
327}
328
329Blocks blocksListToBlocks(const BlocksList & in_blocks)
330{
331 Blocks out_blocks;
332 out_blocks.reserve(in_blocks.size());
333 for (const auto & block : in_blocks)
334 out_blocks.push_back(block);
335 return out_blocks;
336}
337
338std::unique_ptr<TemporaryFile> flushBlockToFile(const String & tmp_path, const Block & header, Block && block)
339{
340 auto tmp_file = createTemporaryFile(tmp_path);
341
342 OneBlockInputStream stream(block);
343 std::atomic<bool> is_cancelled{false};
344 TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled);
345 if (is_cancelled)
346 throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
347
348 return tmp_file;
349}
350
351void flushStreamToFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream,
352 std::vector<std::unique_ptr<TemporaryFile>> & files,
353 std::function<void(const Block &)> callback = [](const Block &){})
354{
355 while (Block block = stream.read())
356 {
357 if (!block.rows())
358 continue;
359
360 callback(block);
361 auto tmp_file = flushBlockToFile(tmp_path, header, std::move(block));
362 files.emplace_back(std::move(tmp_file));
363 }
364}
365
366BlockInputStreams makeSortedInputStreams(std::vector<MiniLSM::SortedFiles> & sorted_files, const Block & header)
367{
368 BlockInputStreams inputs;
369
370 for (const auto & track : sorted_files)
371 {
372 BlockInputStreams sequence;
373 for (const auto & file : track)
374 sequence.emplace_back(std::make_shared<TemporaryFileLazyInputStream>(file->path(), header));
375 inputs.emplace_back(std::make_shared<ConcatBlockInputStream>(sequence));
376 }
377
378 return inputs;
379}
380
381}
382
383
384void MiniLSM::insert(const BlocksList & blocks)
385{
386 if (blocks.empty())
387 return;
388
389 SortedFiles sorted_blocks;
390 if (blocks.size() > 1)
391 {
392 BlockInputStreams inputs;
393 inputs.reserve(blocks.size());
394 for (auto & block : blocks)
395 inputs.push_back(std::make_shared<OneBlockInputStream>(block));
396
397 MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
398 flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks);
399 }
400 else
401 {
402 OneBlockInputStream sorted_input(blocks.front());
403 flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks);
404 }
405
406 sorted_files.emplace_back(std::move(sorted_blocks));
407 if (sorted_files.size() >= max_size)
408 merge();
409}
410
411/// TODO: better merge strategy
412void MiniLSM::merge(std::function<void(const Block &)> callback)
413{
414 BlockInputStreams inputs = makeSortedInputStreams(sorted_files, sample_block);
415 MergingSortedBlockInputStream sorted_stream(inputs, sort_description, rows_in_block);
416
417 SortedFiles out;
418 flushStreamToFiles(path, sample_block, sorted_stream, out, callback);
419
420 sorted_files.clear();
421 sorted_files.emplace_back(std::move(out));
422}
423
424
425MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block_)
426 : table_join(table_join_)
427 , size_limits(table_join->sizeLimits())
428 , right_sample_block(right_sample_block_)
429 , nullable_right_side(table_join->forceNullableRight())
430 , is_all(table_join->strictness() == ASTTableJoin::Strictness::All)
431 , is_inner(isInner(table_join->kind()))
432 , is_left(isLeft(table_join->kind()))
433 , skip_not_intersected(table_join->enablePartialMergeJoinOptimizations())
434 , max_rows_in_right_block(table_join->maxRowsInRightBlock())
435{
436 if (!isLeft(table_join->kind()) && !isInner(table_join->kind()))
437 throw Exception("Partial merge supported for LEFT and INNER JOINs only", ErrorCodes::NOT_IMPLEMENTED);
438
439 if (!max_rows_in_right_block)
440 throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero", ErrorCodes::PARAMETER_OUT_OF_BOUND);
441
442 if (!size_limits.hasLimits())
443 {
444 size_limits.max_bytes = table_join->defaultMaxBytes();
445 if (!size_limits.max_bytes)
446 throw Exception("No limit for MergeJoin (max_rows_in_join, max_bytes_in_join or default_max_bytes_in_join have to be set)",
447 ErrorCodes::PARAMETER_OUT_OF_BOUND);
448 }
449
450 JoinCommon::extractKeysForJoin(table_join->keyNamesRight(), right_sample_block, right_table_keys, right_columns_to_add);
451
452 const NameSet required_right_keys = table_join->requiredRightKeys();
453 for (const auto & column : right_table_keys)
454 if (required_right_keys.count(column.name))
455 right_columns_to_add.insert(ColumnWithTypeAndName{nullptr, column.type, column.name});
456
457 JoinCommon::removeLowCardinalityInplace(right_columns_to_add);
458 JoinCommon::createMissedColumns(right_columns_to_add);
459
460 if (nullable_right_side)
461 JoinCommon::convertColumnsToNullable(right_columns_to_add);
462
463 makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
464 makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
465
466 lsm = std::make_unique<MiniLSM>(table_join->getTemporaryPath(), right_sample_block, right_sort_description, max_rows_in_right_block);
467}
468
469void MergeJoin::setTotals(const Block & totals_block)
470{
471 totals = totals_block;
472 mergeRightBlocks();
473}
474
475void MergeJoin::joinTotals(Block & block) const
476{
477 JoinCommon::joinTotals(totals, right_columns_to_add, table_join->keyNamesRight(), block);
478}
479
480void MergeJoin::mergeRightBlocks()
481{
482 if (is_in_memory)
483 mergeInMemoryRightBlocks();
484 else
485 mergeFlushedRightBlocks();
486}
487
488void MergeJoin::mergeInMemoryRightBlocks()
489{
490 std::unique_lock lock(rwlock);
491
492 if (right_blocks.empty())
493 return;
494
495 Blocks blocks_to_merge = blocksListToBlocks(right_blocks);
496 clearRightBlocksList();
497
498 /// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN
499 MergeSortingBlocksBlockInputStream sorted_input(blocks_to_merge, right_sort_description, max_rows_in_right_block);
500
501 while (Block block = sorted_input.read())
502 {
503 if (!block.rows())
504 continue;
505
506 if (skip_not_intersected)
507 min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys));
508 countBlockSize(block);
509 loaded_right_blocks.emplace_back(std::make_shared<Block>(std::move(block)));
510 }
511}
512
513void MergeJoin::mergeFlushedRightBlocks()
514{
515 std::unique_lock lock(rwlock);
516
517 lsm->insert(right_blocks);
518 clearRightBlocksList();
519
520 auto callback = [&](const Block & block)
521 {
522 if (skip_not_intersected)
523 min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys));
524 countBlockSize(block);
525 };
526
527 lsm->merge(callback);
528 flushed_right_blocks.swap(lsm->sorted_files.front());
529
530 /// Get memory limit or aproximate it from row limit and bytes per row factor
531 UInt64 memory_limit = size_limits.max_bytes;
532 UInt64 rows_limit = size_limits.max_rows;
533 if (!memory_limit && rows_limit)
534 memory_limit = right_blocks_bytes * rows_limit / right_blocks_row_count;
535
536 cached_right_blocks = std::make_unique<Cache>(memory_limit);
537}
538
539void MergeJoin::flushRightBlocks()
540{
541 /// it's under unique_lock(rwlock)
542
543 is_in_memory = false;
544 lsm->insert(right_blocks);
545 clearRightBlocksList();
546}
547
548bool MergeJoin::saveRightBlock(Block && block)
549{
550 std::unique_lock lock(rwlock);
551
552 countBlockSize(block);
553 right_blocks.emplace_back(std::move(block));
554
555 bool has_memory = size_limits.softCheck(right_blocks_row_count, right_blocks_bytes);
556 if (!has_memory)
557 flushRightBlocks();
558 return true;
559}
560
561bool MergeJoin::addJoinedBlock(const Block & src_block)
562{
563 Block block = materializeBlock(src_block);
564 JoinCommon::removeLowCardinalityInplace(block);
565
566 sortBlock(block, right_sort_description);
567 return saveRightBlock(std::move(block));
568}
569
570void MergeJoin::joinBlock(Block & block)
571{
572 JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight());
573 materializeBlockInplace(block);
574 JoinCommon::removeLowCardinalityInplace(block);
575
576 sortBlock(block, left_sort_description);
577 if (is_in_memory)
578 joinSortedBlock<true>(block);
579 else
580 joinSortedBlock<false>(block);
581}
582
583template <bool in_memory>
584void MergeJoin::joinSortedBlock(Block & block)
585{
586 std::shared_lock lock(rwlock);
587
588 size_t rows_to_reserve = is_left ? block.rows() : 0;
589 MutableColumns left_columns = makeMutableColumns(block, (is_all ? rows_to_reserve : 0));
590 MutableColumns right_columns = makeMutableColumns(right_columns_to_add, rows_to_reserve);
591 MergeJoinCursor left_cursor(block, left_merge_description);
592 size_t left_key_tail = 0;
593 size_t right_blocks_count = rightBlocksCount<in_memory>();
594
595 if (is_left)
596 {
597 for (size_t i = 0; i < right_blocks_count; ++i)
598 {
599 if (left_cursor.atEnd())
600 break;
601
602 if (skip_not_intersected)
603 {
604 int intersection = left_cursor.intersect(min_max_right_blocks[i], table_join->keyNamesRight());
605 if (intersection < 0)
606 break; /// (left) ... (right)
607 if (intersection > 0)
608 continue; /// (right) ... (left)
609 }
610
611 std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
612
613 leftJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
614 }
615
616 left_cursor.nextN(left_key_tail);
617 joinInequalsLeft(block, left_columns, right_columns, left_cursor.position(), left_cursor.end(), is_all);
618 //left_cursor.nextN(left_cursor.end() - left_cursor.position());
619
620 changeLeftColumns(block, std::move(left_columns));
621 addRightColumns(block, std::move(right_columns));
622 }
623 else if (is_inner)
624 {
625 for (size_t i = 0; i < right_blocks_count; ++i)
626 {
627 if (left_cursor.atEnd())
628 break;
629
630 if (skip_not_intersected)
631 {
632 int intersection = left_cursor.intersect(min_max_right_blocks[i], table_join->keyNamesRight());
633 if (intersection < 0)
634 break; /// (left) ... (right)
635 if (intersection > 0)
636 continue; /// (right) ... (left)
637 }
638
639 std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i);
640
641 innerJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail);
642 }
643
644 left_cursor.nextN(left_key_tail);
645 changeLeftColumns(block, std::move(left_columns));
646 addRightColumns(block, std::move(right_columns));
647 }
648}
649
650void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
651 MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
652{
653 MergeJoinCursor right_cursor(right_block, right_merge_description);
654 left_cursor.setCompareNullability(right_cursor);
655
656 while (!left_cursor.atEnd() && !right_cursor.atEnd())
657 {
658 /// Not zero left_key_tail means there were equality for the last left key in previous leftJoin() call.
659 /// Do not join it twice: join only if it's equal with a first right key of current leftJoin() call and skip otherwise.
660 size_t left_unequal_position = left_cursor.position() + left_key_tail;
661 left_key_tail = 0;
662
663 Range range = left_cursor.getNextEqualRange(right_cursor);
664
665 joinInequalsLeft(left_block, left_columns, right_columns, left_unequal_position, range.left_start, is_all);
666
667 if (range.empty())
668 break;
669
670 if (is_all)
671 joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
672 else
673 joinEqualsAnyLeft(right_block, right_columns_to_add, right_columns, range);
674
675 right_cursor.nextN(range.right_length);
676
677 /// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
678 if (is_all && right_cursor.atEnd())
679 {
680 left_key_tail = range.left_length;
681 break;
682 }
683 left_cursor.nextN(range.left_length);
684 }
685}
686
687void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
688 MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail)
689{
690 MergeJoinCursor right_cursor(right_block, right_merge_description);
691 left_cursor.setCompareNullability(right_cursor);
692
693 while (!left_cursor.atEnd() && !right_cursor.atEnd())
694 {
695 Range range = left_cursor.getNextEqualRange(right_cursor);
696 if (range.empty())
697 break;
698
699 joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all);
700 right_cursor.nextN(range.right_length);
701
702 /// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block)
703 if (is_all && right_cursor.atEnd())
704 {
705 left_key_tail = range.left_length;
706 break;
707 }
708 left_cursor.nextN(range.left_length);
709 }
710}
711
712void MergeJoin::changeLeftColumns(Block & block, MutableColumns && columns)
713{
714 if (is_left && !is_all)
715 return;
716 block.setColumns(std::move(columns));
717}
718
719void MergeJoin::addRightColumns(Block & block, MutableColumns && right_columns)
720{
721 for (size_t i = 0; i < right_columns_to_add.columns(); ++i)
722 {
723 const auto & column = right_columns_to_add.getByPosition(i);
724 block.insert(ColumnWithTypeAndName{std::move(right_columns[i]), column.type, column.name});
725 }
726}
727
728template <bool in_memory>
729size_t MergeJoin::rightBlocksCount()
730{
731 if constexpr (!in_memory)
732 return flushed_right_blocks.size();
733 else
734 return loaded_right_blocks.size();
735}
736
737template <bool in_memory>
738std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos)
739{
740 if constexpr (!in_memory)
741 {
742 auto load_func = [&]() -> std::shared_ptr<Block>
743 {
744 TemporaryFileStream input(flushed_right_blocks[pos]->path(), right_sample_block);
745 return std::make_shared<Block>(input.block_in->read());
746 };
747
748 return cached_right_blocks->getOrSet(pos, load_func).first;
749 }
750 else
751 return loaded_right_blocks[pos];
752}
753
754}
755