1 | #pragma once |
2 | |
3 | #include <memory> |
4 | #include <shared_mutex> |
5 | |
6 | #include <Common/LRUCache.h> |
7 | #include <Common/filesystemHelpers.h> |
8 | #include <Core/Block.h> |
9 | #include <Core/SortDescription.h> |
10 | #include <Interpreters/IJoin.h> |
11 | #include <DataStreams/SizeLimits.h> |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | class AnalyzedJoin; |
17 | class MergeJoinCursor; |
18 | struct MergeJoinEqualRange; |
19 | |
20 | struct MiniLSM |
21 | { |
22 | using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>; |
23 | |
24 | const String & path; |
25 | const Block & sample_block; |
26 | const SortDescription & sort_description; |
27 | const size_t rows_in_block; |
28 | const size_t max_size; |
29 | std::vector<SortedFiles> sorted_files; |
30 | |
31 | MiniLSM(const String & path_, const Block & sample_block_, const SortDescription & description, |
32 | size_t rows_in_block_, size_t max_size_ = 16) |
33 | : path(path_) |
34 | , sample_block(sample_block_) |
35 | , sort_description(description) |
36 | , rows_in_block(rows_in_block_) |
37 | , max_size(max_size_) |
38 | {} |
39 | |
40 | void insert(const BlocksList & blocks); |
41 | void merge(std::function<void(const Block &)> callback = [](const Block &){}); |
42 | }; |
43 | |
44 | |
45 | class MergeJoin : public IJoin |
46 | { |
47 | public: |
48 | MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block); |
49 | |
50 | bool addJoinedBlock(const Block & block) override; |
51 | void joinBlock(Block &) override; |
52 | void joinTotals(Block &) const override; |
53 | void setTotals(const Block &) override; |
54 | bool hasTotals() const override { return totals; } |
55 | size_t getTotalRowCount() const override { return right_blocks_row_count; } |
56 | |
57 | private: |
58 | /// There're two size limits for right-hand table: max_rows_in_join, max_bytes_in_join. |
59 | /// max_bytes is prefered. If it isn't set we aproximate it as (max_rows * bytes/row). |
60 | struct BlockByteWeight |
61 | { |
62 | size_t operator()(const Block & block) const { return block.bytes(); } |
63 | }; |
64 | |
65 | using Cache = LRUCache<size_t, Block, std::hash<size_t>, BlockByteWeight>; |
66 | |
67 | mutable std::shared_mutex rwlock; |
68 | std::shared_ptr<AnalyzedJoin> table_join; |
69 | SizeLimits size_limits; |
70 | SortDescription left_sort_description; |
71 | SortDescription right_sort_description; |
72 | SortDescription left_merge_description; |
73 | SortDescription right_merge_description; |
74 | Block right_sample_block; |
75 | Block right_table_keys; |
76 | Block right_columns_to_add; |
77 | BlocksList right_blocks; |
78 | Blocks min_max_right_blocks; |
79 | std::unique_ptr<Cache> cached_right_blocks; |
80 | std::vector<std::shared_ptr<Block>> loaded_right_blocks; |
81 | std::unique_ptr<MiniLSM> lsm; |
82 | MiniLSM::SortedFiles flushed_right_blocks; |
83 | Block totals; |
84 | size_t right_blocks_row_count = 0; |
85 | size_t right_blocks_bytes = 0; |
86 | bool is_in_memory = true; |
87 | const bool nullable_right_side; |
88 | const bool is_all; |
89 | const bool is_inner; |
90 | const bool is_left; |
91 | const bool skip_not_intersected; |
92 | const size_t max_rows_in_right_block; |
93 | |
94 | void changeLeftColumns(Block & block, MutableColumns && columns); |
95 | void addRightColumns(Block & block, MutableColumns && columns); |
96 | |
97 | void mergeRightBlocks(); |
98 | |
99 | template <bool in_memory> |
100 | size_t rightBlocksCount(); |
101 | template <bool in_memory> |
102 | void joinSortedBlock(Block & block); |
103 | template <bool in_memory> |
104 | std::shared_ptr<Block> loadRightBlock(size_t pos); |
105 | |
106 | void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, |
107 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail); |
108 | void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, |
109 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail); |
110 | |
111 | bool saveRightBlock(Block && block); |
112 | void flushRightBlocks(); |
113 | |
114 | void mergeInMemoryRightBlocks(); |
115 | void mergeFlushedRightBlocks(); |
116 | |
117 | void clearRightBlocksList() |
118 | { |
119 | right_blocks.clear(); |
120 | right_blocks_row_count = 0; |
121 | right_blocks_bytes = 0; |
122 | } |
123 | |
124 | void countBlockSize(const Block & block) |
125 | { |
126 | right_blocks_row_count += block.rows(); |
127 | right_blocks_bytes += block.bytes(); |
128 | } |
129 | }; |
130 | |
131 | } |
132 | |