1 | #include <limits> |
2 | |
3 | #include <Core/NamesAndTypes.h> |
4 | #include <Core/SortCursor.h> |
5 | #include <Columns/ColumnNullable.h> |
6 | #include <Interpreters/MergeJoin.h> |
7 | #include <Interpreters/AnalyzedJoin.h> |
8 | #include <Interpreters/sortBlock.h> |
9 | #include <Interpreters/join_common.h> |
10 | #include <DataStreams/materializeBlock.h> |
11 | #include <DataStreams/MergeSortingBlockInputStream.h> |
12 | #include <DataStreams/MergingSortedBlockInputStream.h> |
13 | #include <DataStreams/OneBlockInputStream.h> |
14 | #include <DataStreams/TemporaryFileStream.h> |
15 | #include <DataStreams/ConcatBlockInputStream.h> |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | namespace ErrorCodes |
21 | { |
22 | extern const int SET_SIZE_LIMIT_EXCEEDED; |
23 | extern const int NOT_IMPLEMENTED; |
24 | extern const int PARAMETER_OUT_OF_BOUND; |
25 | extern const int NOT_ENOUGH_SPACE; |
26 | extern const int LOGICAL_ERROR; |
27 | } |
28 | |
29 | namespace |
30 | { |
31 | |
32 | template <bool has_nulls> |
33 | int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, size_t lhs_pos, size_t rhs_pos) |
34 | { |
35 | static constexpr int null_direction_hint = 1; |
36 | |
37 | if constexpr (has_nulls) |
38 | { |
39 | auto * left_nullable = checkAndGetColumn<ColumnNullable>(left_column); |
40 | auto * right_nullable = checkAndGetColumn<ColumnNullable>(right_column); |
41 | |
42 | if (left_nullable && right_nullable) |
43 | { |
44 | int res = left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); |
45 | if (res) |
46 | return res; |
47 | |
48 | /// NULL != NULL case |
49 | if (left_column.isNullAt(lhs_pos)) |
50 | return null_direction_hint; |
51 | } |
52 | |
53 | if (left_nullable && !right_nullable) |
54 | { |
55 | if (left_column.isNullAt(lhs_pos)) |
56 | return null_direction_hint; |
57 | return left_nullable->getNestedColumn().compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); |
58 | } |
59 | |
60 | if (!left_nullable && right_nullable) |
61 | { |
62 | if (right_column.isNullAt(rhs_pos)) |
63 | return -null_direction_hint; |
64 | return left_column.compareAt(lhs_pos, rhs_pos, right_nullable->getNestedColumn(), null_direction_hint); |
65 | } |
66 | } |
67 | |
68 | /// !left_nullable && !right_nullable |
69 | return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); |
70 | } |
71 | |
72 | Block (const Block & block, const Block & keys) |
73 | { |
74 | if (block.rows() == 0) |
75 | throw Exception("Unexpected empty block" , ErrorCodes::LOGICAL_ERROR); |
76 | |
77 | Block min_max = keys.cloneEmpty(); |
78 | MutableColumns columns = min_max.mutateColumns(); |
79 | |
80 | for (size_t i = 0; i < columns.size(); ++i) |
81 | { |
82 | auto & src_column = block.getByName(keys.getByPosition(i).name); |
83 | |
84 | columns[i]->insertFrom(*src_column.column, 0); |
85 | columns[i]->insertFrom(*src_column.column, block.rows() - 1); |
86 | } |
87 | |
88 | min_max.setColumns(std::move(columns)); |
89 | return min_max; |
90 | } |
91 | |
92 | } |
93 | |
94 | struct MergeJoinEqualRange |
95 | { |
96 | size_t left_start = 0; |
97 | size_t right_start = 0; |
98 | size_t left_length = 0; |
99 | size_t right_length = 0; |
100 | |
101 | bool empty() const { return !left_length && !right_length; } |
102 | }; |
103 | |
104 | using Range = MergeJoinEqualRange; |
105 | |
106 | |
107 | class MergeJoinCursor |
108 | { |
109 | public: |
110 | MergeJoinCursor(const Block & block, const SortDescription & desc_) |
111 | : impl(SortCursorImpl(block, desc_)) |
112 | {} |
113 | |
114 | size_t position() const { return impl.pos; } |
115 | size_t end() const { return impl.rows; } |
116 | bool atEnd() const { return impl.pos >= impl.rows; } |
117 | void nextN(size_t num) { impl.pos += num; } |
118 | |
119 | void setCompareNullability(const MergeJoinCursor & rhs) |
120 | { |
121 | has_nullable_columns = false; |
122 | |
123 | for (size_t i = 0; i < impl.sort_columns_size; ++i) |
124 | { |
125 | bool is_left_nullable = isColumnNullable(*impl.sort_columns[i]); |
126 | bool is_right_nullable = isColumnNullable(*rhs.impl.sort_columns[i]); |
127 | |
128 | if (is_left_nullable || is_right_nullable) |
129 | { |
130 | has_nullable_columns = true; |
131 | break; |
132 | } |
133 | } |
134 | } |
135 | |
136 | Range getNextEqualRange(MergeJoinCursor & rhs) |
137 | { |
138 | if (has_nullable_columns) |
139 | return getNextEqualRangeImpl<true>(rhs); |
140 | return getNextEqualRangeImpl<false>(rhs); |
141 | } |
142 | |
143 | int intersect(const Block & min_max, const Names & key_names) |
144 | { |
145 | if (end() == 0 || min_max.rows() != 2) |
146 | throw Exception("Unexpected block size" , ErrorCodes::LOGICAL_ERROR); |
147 | |
148 | size_t last_position = end() - 1; |
149 | int first_vs_max = 0; |
150 | int last_vs_min = 0; |
151 | |
152 | for (size_t i = 0; i < impl.sort_columns.size(); ++i) |
153 | { |
154 | auto & left_column = *impl.sort_columns[i]; |
155 | auto & right_column = *min_max.getByName(key_names[i]).column; /// cannot get by position cause of possible duplicates |
156 | |
157 | if (!first_vs_max) |
158 | first_vs_max = nullableCompareAt<true>(left_column, right_column, position(), 1); |
159 | |
160 | if (!last_vs_min) |
161 | last_vs_min = nullableCompareAt<true>(left_column, right_column, last_position, 0); |
162 | } |
163 | |
164 | if (first_vs_max > 0) |
165 | return 1; |
166 | if (last_vs_min < 0) |
167 | return -1; |
168 | return 0; |
169 | } |
170 | |
171 | private: |
172 | SortCursorImpl impl; |
173 | bool has_nullable_columns = false; |
174 | |
175 | template <bool has_nulls> |
176 | Range getNextEqualRangeImpl(MergeJoinCursor & rhs) |
177 | { |
178 | while (!atEnd() && !rhs.atEnd()) |
179 | { |
180 | int cmp = compareAt<has_nulls>(rhs, impl.pos, rhs.impl.pos); |
181 | if (cmp < 0) |
182 | impl.next(); |
183 | if (cmp > 0) |
184 | rhs.impl.next(); |
185 | if (!cmp) |
186 | { |
187 | Range range{impl.pos, rhs.impl.pos, 0, 0}; |
188 | range.left_length = getEqualLength(); |
189 | range.right_length = rhs.getEqualLength(); |
190 | return range; |
191 | } |
192 | } |
193 | |
194 | return Range{impl.pos, rhs.impl.pos, 0, 0}; |
195 | } |
196 | |
197 | template <bool has_nulls> |
198 | int compareAt(const MergeJoinCursor & rhs, size_t lhs_pos, size_t rhs_pos) const |
199 | { |
200 | int res = 0; |
201 | for (size_t i = 0; i < impl.sort_columns_size; ++i) |
202 | { |
203 | auto * left_column = impl.sort_columns[i]; |
204 | auto * right_column = rhs.impl.sort_columns[i]; |
205 | |
206 | res = nullableCompareAt<has_nulls>(*left_column, *right_column, lhs_pos, rhs_pos); |
207 | if (res) |
208 | break; |
209 | } |
210 | return res; |
211 | } |
212 | |
213 | size_t getEqualLength() |
214 | { |
215 | if (atEnd()) |
216 | return 0; |
217 | |
218 | size_t pos = impl.pos; |
219 | while (sameNext(pos)) |
220 | ++pos; |
221 | return pos - impl.pos + 1; |
222 | } |
223 | |
224 | bool sameNext(size_t lhs_pos) const |
225 | { |
226 | if (lhs_pos + 1 >= impl.rows) |
227 | return false; |
228 | |
229 | for (size_t i = 0; i < impl.sort_columns_size; ++i) |
230 | if (impl.sort_columns[i]->compareAt(lhs_pos, lhs_pos + 1, *(impl.sort_columns[i]), 1) != 0) |
231 | return false; |
232 | return true; |
233 | } |
234 | }; |
235 | |
236 | namespace |
237 | { |
238 | |
239 | MutableColumns makeMutableColumns(const Block & block, size_t rows_to_reserve = 0) |
240 | { |
241 | MutableColumns columns; |
242 | columns.reserve(block.columns()); |
243 | |
244 | for (const auto & src_column : block) |
245 | { |
246 | columns.push_back(src_column.column->cloneEmpty()); |
247 | columns.back()->reserve(rows_to_reserve); |
248 | } |
249 | return columns; |
250 | } |
251 | |
252 | void makeSortAndMerge(const Names & keys, SortDescription & sort, SortDescription & merge) |
253 | { |
254 | NameSet unique_keys; |
255 | for (auto & key_name : keys) |
256 | { |
257 | merge.emplace_back(SortColumnDescription(key_name, 1, 1)); |
258 | |
259 | if (!unique_keys.count(key_name)) |
260 | { |
261 | unique_keys.insert(key_name); |
262 | sort.emplace_back(SortColumnDescription(key_name, 1, 1)); |
263 | } |
264 | } |
265 | } |
266 | |
267 | void copyLeftRange(const Block & block, MutableColumns & columns, size_t start, size_t rows_to_add) |
268 | { |
269 | for (size_t i = 0; i < block.columns(); ++i) |
270 | { |
271 | const auto & src_column = block.getByPosition(i).column; |
272 | columns[i]->insertRangeFrom(*src_column, start, rows_to_add); |
273 | } |
274 | } |
275 | |
276 | void copyRightRange(const Block & right_block, const Block & right_columns_to_add, MutableColumns & columns, |
277 | size_t row_position, size_t rows_to_add) |
278 | { |
279 | for (size_t i = 0; i < right_columns_to_add.columns(); ++i) |
280 | { |
281 | const auto & src_column = right_block.getByName(right_columns_to_add.getByPosition(i).name).column; |
282 | auto & dst_column = columns[i]; |
283 | auto * dst_nullable = typeid_cast<ColumnNullable *>(dst_column.get()); |
284 | |
285 | if (dst_nullable && !isColumnNullable(*src_column)) |
286 | dst_nullable->insertManyFromNotNullable(*src_column, row_position, rows_to_add); |
287 | else |
288 | dst_column->insertManyFrom(*src_column, row_position, rows_to_add); |
289 | } |
290 | } |
291 | |
292 | void joinEqualsAnyLeft(const Block & right_block, const Block & right_columns_to_add, MutableColumns & right_columns, const Range & range) |
293 | { |
294 | copyRightRange(right_block, right_columns_to_add, right_columns, range.right_start, range.left_length); |
295 | } |
296 | |
297 | void joinEquals(const Block & left_block, const Block & right_block, const Block & right_columns_to_add, |
298 | MutableColumns & left_columns, MutableColumns & right_columns, const Range & range, bool is_all) |
299 | { |
300 | size_t left_rows_to_add = range.left_length; |
301 | size_t right_rows_to_add = is_all ? range.right_length : 1; |
302 | |
303 | size_t row_position = range.right_start; |
304 | for (size_t right_row = 0; right_row < right_rows_to_add; ++right_row, ++row_position) |
305 | { |
306 | copyLeftRange(left_block, left_columns, range.left_start, left_rows_to_add); |
307 | copyRightRange(right_block, right_columns_to_add, right_columns, row_position, left_rows_to_add); |
308 | } |
309 | } |
310 | |
311 | void appendNulls(MutableColumns & right_columns, size_t rows_to_add) |
312 | { |
313 | for (auto & column : right_columns) |
314 | column->insertManyDefaults(rows_to_add); |
315 | } |
316 | |
317 | void joinInequalsLeft(const Block & left_block, MutableColumns & left_columns, MutableColumns & right_columns, |
318 | size_t start, size_t end, bool copy_left) |
319 | { |
320 | if (end <= start) |
321 | return; |
322 | |
323 | size_t rows_to_add = end - start; |
324 | if (copy_left) |
325 | copyLeftRange(left_block, left_columns, start, rows_to_add); |
326 | appendNulls(right_columns, rows_to_add); |
327 | } |
328 | |
329 | Blocks blocksListToBlocks(const BlocksList & in_blocks) |
330 | { |
331 | Blocks out_blocks; |
332 | out_blocks.reserve(in_blocks.size()); |
333 | for (const auto & block : in_blocks) |
334 | out_blocks.push_back(block); |
335 | return out_blocks; |
336 | } |
337 | |
338 | std::unique_ptr<TemporaryFile> flushBlockToFile(const String & tmp_path, const Block & , Block && block) |
339 | { |
340 | auto tmp_file = createTemporaryFile(tmp_path); |
341 | |
342 | OneBlockInputStream stream(block); |
343 | std::atomic<bool> is_cancelled{false}; |
344 | TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled); |
345 | if (is_cancelled) |
346 | throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); |
347 | |
348 | return tmp_file; |
349 | } |
350 | |
351 | void flushStreamToFiles(const String & tmp_path, const Block & , IBlockInputStream & stream, |
352 | std::vector<std::unique_ptr<TemporaryFile>> & files, |
353 | std::function<void(const Block &)> callback = [](const Block &){}) |
354 | { |
355 | while (Block block = stream.read()) |
356 | { |
357 | if (!block.rows()) |
358 | continue; |
359 | |
360 | callback(block); |
361 | auto tmp_file = flushBlockToFile(tmp_path, header, std::move(block)); |
362 | files.emplace_back(std::move(tmp_file)); |
363 | } |
364 | } |
365 | |
366 | BlockInputStreams makeSortedInputStreams(std::vector<MiniLSM::SortedFiles> & sorted_files, const Block & ) |
367 | { |
368 | BlockInputStreams inputs; |
369 | |
370 | for (const auto & track : sorted_files) |
371 | { |
372 | BlockInputStreams sequence; |
373 | for (const auto & file : track) |
374 | sequence.emplace_back(std::make_shared<TemporaryFileLazyInputStream>(file->path(), header)); |
375 | inputs.emplace_back(std::make_shared<ConcatBlockInputStream>(sequence)); |
376 | } |
377 | |
378 | return inputs; |
379 | } |
380 | |
381 | } |
382 | |
383 | |
384 | void MiniLSM::insert(const BlocksList & blocks) |
385 | { |
386 | if (blocks.empty()) |
387 | return; |
388 | |
389 | SortedFiles sorted_blocks; |
390 | if (blocks.size() > 1) |
391 | { |
392 | BlockInputStreams inputs; |
393 | inputs.reserve(blocks.size()); |
394 | for (auto & block : blocks) |
395 | inputs.push_back(std::make_shared<OneBlockInputStream>(block)); |
396 | |
397 | MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); |
398 | flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks); |
399 | } |
400 | else |
401 | { |
402 | OneBlockInputStream sorted_input(blocks.front()); |
403 | flushStreamToFiles(path, sample_block, sorted_input, sorted_blocks); |
404 | } |
405 | |
406 | sorted_files.emplace_back(std::move(sorted_blocks)); |
407 | if (sorted_files.size() >= max_size) |
408 | merge(); |
409 | } |
410 | |
411 | /// TODO: better merge strategy |
412 | void MiniLSM::merge(std::function<void(const Block &)> callback) |
413 | { |
414 | BlockInputStreams inputs = makeSortedInputStreams(sorted_files, sample_block); |
415 | MergingSortedBlockInputStream sorted_stream(inputs, sort_description, rows_in_block); |
416 | |
417 | SortedFiles out; |
418 | flushStreamToFiles(path, sample_block, sorted_stream, out, callback); |
419 | |
420 | sorted_files.clear(); |
421 | sorted_files.emplace_back(std::move(out)); |
422 | } |
423 | |
424 | |
425 | MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block_) |
426 | : table_join(table_join_) |
427 | , size_limits(table_join->sizeLimits()) |
428 | , right_sample_block(right_sample_block_) |
429 | , nullable_right_side(table_join->forceNullableRight()) |
430 | , is_all(table_join->strictness() == ASTTableJoin::Strictness::All) |
431 | , is_inner(isInner(table_join->kind())) |
432 | , is_left(isLeft(table_join->kind())) |
433 | , skip_not_intersected(table_join->enablePartialMergeJoinOptimizations()) |
434 | , max_rows_in_right_block(table_join->maxRowsInRightBlock()) |
435 | { |
436 | if (!isLeft(table_join->kind()) && !isInner(table_join->kind())) |
437 | throw Exception("Partial merge supported for LEFT and INNER JOINs only" , ErrorCodes::NOT_IMPLEMENTED); |
438 | |
439 | if (!max_rows_in_right_block) |
440 | throw Exception("partial_merge_join_rows_in_right_blocks cannot be zero" , ErrorCodes::PARAMETER_OUT_OF_BOUND); |
441 | |
442 | if (!size_limits.hasLimits()) |
443 | { |
444 | size_limits.max_bytes = table_join->defaultMaxBytes(); |
445 | if (!size_limits.max_bytes) |
446 | throw Exception("No limit for MergeJoin (max_rows_in_join, max_bytes_in_join or default_max_bytes_in_join have to be set)" , |
447 | ErrorCodes::PARAMETER_OUT_OF_BOUND); |
448 | } |
449 | |
450 | JoinCommon::extractKeysForJoin(table_join->keyNamesRight(), right_sample_block, right_table_keys, right_columns_to_add); |
451 | |
452 | const NameSet required_right_keys = table_join->requiredRightKeys(); |
453 | for (const auto & column : right_table_keys) |
454 | if (required_right_keys.count(column.name)) |
455 | right_columns_to_add.insert(ColumnWithTypeAndName{nullptr, column.type, column.name}); |
456 | |
457 | JoinCommon::removeLowCardinalityInplace(right_columns_to_add); |
458 | JoinCommon::createMissedColumns(right_columns_to_add); |
459 | |
460 | if (nullable_right_side) |
461 | JoinCommon::convertColumnsToNullable(right_columns_to_add); |
462 | |
463 | makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description); |
464 | makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description); |
465 | |
466 | lsm = std::make_unique<MiniLSM>(table_join->getTemporaryPath(), right_sample_block, right_sort_description, max_rows_in_right_block); |
467 | } |
468 | |
469 | void MergeJoin::setTotals(const Block & totals_block) |
470 | { |
471 | totals = totals_block; |
472 | mergeRightBlocks(); |
473 | } |
474 | |
475 | void MergeJoin::joinTotals(Block & block) const |
476 | { |
477 | JoinCommon::joinTotals(totals, right_columns_to_add, table_join->keyNamesRight(), block); |
478 | } |
479 | |
480 | void MergeJoin::mergeRightBlocks() |
481 | { |
482 | if (is_in_memory) |
483 | mergeInMemoryRightBlocks(); |
484 | else |
485 | mergeFlushedRightBlocks(); |
486 | } |
487 | |
488 | void MergeJoin::mergeInMemoryRightBlocks() |
489 | { |
490 | std::unique_lock lock(rwlock); |
491 | |
492 | if (right_blocks.empty()) |
493 | return; |
494 | |
495 | Blocks blocks_to_merge = blocksListToBlocks(right_blocks); |
496 | clearRightBlocksList(); |
497 | |
498 | /// TODO: there should be no splitted keys by blocks for RIGHT|FULL JOIN |
499 | MergeSortingBlocksBlockInputStream sorted_input(blocks_to_merge, right_sort_description, max_rows_in_right_block); |
500 | |
501 | while (Block block = sorted_input.read()) |
502 | { |
503 | if (!block.rows()) |
504 | continue; |
505 | |
506 | if (skip_not_intersected) |
507 | min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys)); |
508 | countBlockSize(block); |
509 | loaded_right_blocks.emplace_back(std::make_shared<Block>(std::move(block))); |
510 | } |
511 | } |
512 | |
513 | void MergeJoin::mergeFlushedRightBlocks() |
514 | { |
515 | std::unique_lock lock(rwlock); |
516 | |
517 | lsm->insert(right_blocks); |
518 | clearRightBlocksList(); |
519 | |
520 | auto callback = [&](const Block & block) |
521 | { |
522 | if (skip_not_intersected) |
523 | min_max_right_blocks.emplace_back(extractMinMax(block, right_table_keys)); |
524 | countBlockSize(block); |
525 | }; |
526 | |
527 | lsm->merge(callback); |
528 | flushed_right_blocks.swap(lsm->sorted_files.front()); |
529 | |
530 | /// Get memory limit or aproximate it from row limit and bytes per row factor |
531 | UInt64 memory_limit = size_limits.max_bytes; |
532 | UInt64 rows_limit = size_limits.max_rows; |
533 | if (!memory_limit && rows_limit) |
534 | memory_limit = right_blocks_bytes * rows_limit / right_blocks_row_count; |
535 | |
536 | cached_right_blocks = std::make_unique<Cache>(memory_limit); |
537 | } |
538 | |
539 | void MergeJoin::flushRightBlocks() |
540 | { |
541 | /// it's under unique_lock(rwlock) |
542 | |
543 | is_in_memory = false; |
544 | lsm->insert(right_blocks); |
545 | clearRightBlocksList(); |
546 | } |
547 | |
548 | bool MergeJoin::saveRightBlock(Block && block) |
549 | { |
550 | std::unique_lock lock(rwlock); |
551 | |
552 | countBlockSize(block); |
553 | right_blocks.emplace_back(std::move(block)); |
554 | |
555 | bool has_memory = size_limits.softCheck(right_blocks_row_count, right_blocks_bytes); |
556 | if (!has_memory) |
557 | flushRightBlocks(); |
558 | return true; |
559 | } |
560 | |
561 | bool MergeJoin::addJoinedBlock(const Block & src_block) |
562 | { |
563 | Block block = materializeBlock(src_block); |
564 | JoinCommon::removeLowCardinalityInplace(block); |
565 | |
566 | sortBlock(block, right_sort_description); |
567 | return saveRightBlock(std::move(block)); |
568 | } |
569 | |
570 | void MergeJoin::joinBlock(Block & block) |
571 | { |
572 | JoinCommon::checkTypesOfKeys(block, table_join->keyNamesLeft(), right_table_keys, table_join->keyNamesRight()); |
573 | materializeBlockInplace(block); |
574 | JoinCommon::removeLowCardinalityInplace(block); |
575 | |
576 | sortBlock(block, left_sort_description); |
577 | if (is_in_memory) |
578 | joinSortedBlock<true>(block); |
579 | else |
580 | joinSortedBlock<false>(block); |
581 | } |
582 | |
583 | template <bool in_memory> |
584 | void MergeJoin::joinSortedBlock(Block & block) |
585 | { |
586 | std::shared_lock lock(rwlock); |
587 | |
588 | size_t rows_to_reserve = is_left ? block.rows() : 0; |
589 | MutableColumns left_columns = makeMutableColumns(block, (is_all ? rows_to_reserve : 0)); |
590 | MutableColumns right_columns = makeMutableColumns(right_columns_to_add, rows_to_reserve); |
591 | MergeJoinCursor left_cursor(block, left_merge_description); |
592 | size_t left_key_tail = 0; |
593 | size_t right_blocks_count = rightBlocksCount<in_memory>(); |
594 | |
595 | if (is_left) |
596 | { |
597 | for (size_t i = 0; i < right_blocks_count; ++i) |
598 | { |
599 | if (left_cursor.atEnd()) |
600 | break; |
601 | |
602 | if (skip_not_intersected) |
603 | { |
604 | int intersection = left_cursor.intersect(min_max_right_blocks[i], table_join->keyNamesRight()); |
605 | if (intersection < 0) |
606 | break; /// (left) ... (right) |
607 | if (intersection > 0) |
608 | continue; /// (right) ... (left) |
609 | } |
610 | |
611 | std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i); |
612 | |
613 | leftJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail); |
614 | } |
615 | |
616 | left_cursor.nextN(left_key_tail); |
617 | joinInequalsLeft(block, left_columns, right_columns, left_cursor.position(), left_cursor.end(), is_all); |
618 | //left_cursor.nextN(left_cursor.end() - left_cursor.position()); |
619 | |
620 | changeLeftColumns(block, std::move(left_columns)); |
621 | addRightColumns(block, std::move(right_columns)); |
622 | } |
623 | else if (is_inner) |
624 | { |
625 | for (size_t i = 0; i < right_blocks_count; ++i) |
626 | { |
627 | if (left_cursor.atEnd()) |
628 | break; |
629 | |
630 | if (skip_not_intersected) |
631 | { |
632 | int intersection = left_cursor.intersect(min_max_right_blocks[i], table_join->keyNamesRight()); |
633 | if (intersection < 0) |
634 | break; /// (left) ... (right) |
635 | if (intersection > 0) |
636 | continue; /// (right) ... (left) |
637 | } |
638 | |
639 | std::shared_ptr<Block> right_block = loadRightBlock<in_memory>(i); |
640 | |
641 | innerJoin(left_cursor, block, *right_block, left_columns, right_columns, left_key_tail); |
642 | } |
643 | |
644 | left_cursor.nextN(left_key_tail); |
645 | changeLeftColumns(block, std::move(left_columns)); |
646 | addRightColumns(block, std::move(right_columns)); |
647 | } |
648 | } |
649 | |
650 | void MergeJoin::leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, |
651 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail) |
652 | { |
653 | MergeJoinCursor right_cursor(right_block, right_merge_description); |
654 | left_cursor.setCompareNullability(right_cursor); |
655 | |
656 | while (!left_cursor.atEnd() && !right_cursor.atEnd()) |
657 | { |
658 | /// Not zero left_key_tail means there were equality for the last left key in previous leftJoin() call. |
659 | /// Do not join it twice: join only if it's equal with a first right key of current leftJoin() call and skip otherwise. |
660 | size_t left_unequal_position = left_cursor.position() + left_key_tail; |
661 | left_key_tail = 0; |
662 | |
663 | Range range = left_cursor.getNextEqualRange(right_cursor); |
664 | |
665 | joinInequalsLeft(left_block, left_columns, right_columns, left_unequal_position, range.left_start, is_all); |
666 | |
667 | if (range.empty()) |
668 | break; |
669 | |
670 | if (is_all) |
671 | joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all); |
672 | else |
673 | joinEqualsAnyLeft(right_block, right_columns_to_add, right_columns, range); |
674 | |
675 | right_cursor.nextN(range.right_length); |
676 | |
677 | /// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block) |
678 | if (is_all && right_cursor.atEnd()) |
679 | { |
680 | left_key_tail = range.left_length; |
681 | break; |
682 | } |
683 | left_cursor.nextN(range.left_length); |
684 | } |
685 | } |
686 | |
687 | void MergeJoin::innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, |
688 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail) |
689 | { |
690 | MergeJoinCursor right_cursor(right_block, right_merge_description); |
691 | left_cursor.setCompareNullability(right_cursor); |
692 | |
693 | while (!left_cursor.atEnd() && !right_cursor.atEnd()) |
694 | { |
695 | Range range = left_cursor.getNextEqualRange(right_cursor); |
696 | if (range.empty()) |
697 | break; |
698 | |
699 | joinEquals(left_block, right_block, right_columns_to_add, left_columns, right_columns, range, is_all); |
700 | right_cursor.nextN(range.right_length); |
701 | |
702 | /// Do not run over last left keys for ALL JOIN (cause of possible duplicates in next right block) |
703 | if (is_all && right_cursor.atEnd()) |
704 | { |
705 | left_key_tail = range.left_length; |
706 | break; |
707 | } |
708 | left_cursor.nextN(range.left_length); |
709 | } |
710 | } |
711 | |
712 | void MergeJoin::changeLeftColumns(Block & block, MutableColumns && columns) |
713 | { |
714 | if (is_left && !is_all) |
715 | return; |
716 | block.setColumns(std::move(columns)); |
717 | } |
718 | |
719 | void MergeJoin::addRightColumns(Block & block, MutableColumns && right_columns) |
720 | { |
721 | for (size_t i = 0; i < right_columns_to_add.columns(); ++i) |
722 | { |
723 | const auto & column = right_columns_to_add.getByPosition(i); |
724 | block.insert(ColumnWithTypeAndName{std::move(right_columns[i]), column.type, column.name}); |
725 | } |
726 | } |
727 | |
728 | template <bool in_memory> |
729 | size_t MergeJoin::rightBlocksCount() |
730 | { |
731 | if constexpr (!in_memory) |
732 | return flushed_right_blocks.size(); |
733 | else |
734 | return loaded_right_blocks.size(); |
735 | } |
736 | |
737 | template <bool in_memory> |
738 | std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) |
739 | { |
740 | if constexpr (!in_memory) |
741 | { |
742 | auto load_func = [&]() -> std::shared_ptr<Block> |
743 | { |
744 | TemporaryFileStream input(flushed_right_blocks[pos]->path(), right_sample_block); |
745 | return std::make_shared<Block>(input.block_in->read()); |
746 | }; |
747 | |
748 | return cached_right_blocks->getOrSet(pos, load_func).first; |
749 | } |
750 | else |
751 | return loaded_right_blocks[pos]; |
752 | } |
753 | |
754 | } |
755 | |