1#pragma once
2
3#include <memory>
4#include <shared_mutex>
5
6#include <Common/LRUCache.h>
7#include <Common/filesystemHelpers.h>
8#include <Core/Block.h>
9#include <Core/SortDescription.h>
10#include <Interpreters/IJoin.h>
11#include <DataStreams/SizeLimits.h>
12
13namespace DB
14{
15
16class AnalyzedJoin;
17class MergeJoinCursor;
18struct MergeJoinEqualRange;
19
20struct MiniLSM
21{
22 using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>;
23
24 const String & path;
25 const Block & sample_block;
26 const SortDescription & sort_description;
27 const size_t rows_in_block;
28 const size_t max_size;
29 std::vector<SortedFiles> sorted_files;
30
31 MiniLSM(const String & path_, const Block & sample_block_, const SortDescription & description,
32 size_t rows_in_block_, size_t max_size_ = 16)
33 : path(path_)
34 , sample_block(sample_block_)
35 , sort_description(description)
36 , rows_in_block(rows_in_block_)
37 , max_size(max_size_)
38 {}
39
40 void insert(const BlocksList & blocks);
41 void merge(std::function<void(const Block &)> callback = [](const Block &){});
42};
43
44
45class MergeJoin : public IJoin
46{
47public:
48 MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block);
49
50 bool addJoinedBlock(const Block & block) override;
51 void joinBlock(Block &) override;
52 void joinTotals(Block &) const override;
53 void setTotals(const Block &) override;
54 bool hasTotals() const override { return totals; }
55 size_t getTotalRowCount() const override { return right_blocks_row_count; }
56
57private:
58 /// There're two size limits for right-hand table: max_rows_in_join, max_bytes_in_join.
59 /// max_bytes is prefered. If it isn't set we aproximate it as (max_rows * bytes/row).
60 struct BlockByteWeight
61 {
62 size_t operator()(const Block & block) const { return block.bytes(); }
63 };
64
65 using Cache = LRUCache<size_t, Block, std::hash<size_t>, BlockByteWeight>;
66
67 mutable std::shared_mutex rwlock;
68 std::shared_ptr<AnalyzedJoin> table_join;
69 SizeLimits size_limits;
70 SortDescription left_sort_description;
71 SortDescription right_sort_description;
72 SortDescription left_merge_description;
73 SortDescription right_merge_description;
74 Block right_sample_block;
75 Block right_table_keys;
76 Block right_columns_to_add;
77 BlocksList right_blocks;
78 Blocks min_max_right_blocks;
79 std::unique_ptr<Cache> cached_right_blocks;
80 std::vector<std::shared_ptr<Block>> loaded_right_blocks;
81 std::unique_ptr<MiniLSM> lsm;
82 MiniLSM::SortedFiles flushed_right_blocks;
83 Block totals;
84 size_t right_blocks_row_count = 0;
85 size_t right_blocks_bytes = 0;
86 bool is_in_memory = true;
87 const bool nullable_right_side;
88 const bool is_all;
89 const bool is_inner;
90 const bool is_left;
91 const bool skip_not_intersected;
92 const size_t max_rows_in_right_block;
93
94 void changeLeftColumns(Block & block, MutableColumns && columns);
95 void addRightColumns(Block & block, MutableColumns && columns);
96
97 void mergeRightBlocks();
98
99 template <bool in_memory>
100 size_t rightBlocksCount();
101 template <bool in_memory>
102 void joinSortedBlock(Block & block);
103 template <bool in_memory>
104 std::shared_ptr<Block> loadRightBlock(size_t pos);
105
106 void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
107 MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
108 void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block,
109 MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
110
111 bool saveRightBlock(Block && block);
112 void flushRightBlocks();
113
114 void mergeInMemoryRightBlocks();
115 void mergeFlushedRightBlocks();
116
117 void clearRightBlocksList()
118 {
119 right_blocks.clear();
120 right_blocks_row_count = 0;
121 right_blocks_bytes = 0;
122 }
123
124 void countBlockSize(const Block & block)
125 {
126 right_blocks_row_count += block.rows();
127 right_blocks_bytes += block.bytes();
128 }
129};
130
131}
132