| 1 | #pragma once | 
|---|
| 2 |  | 
|---|
| 3 | #include <memory> | 
|---|
| 4 | #include <shared_mutex> | 
|---|
| 5 |  | 
|---|
| 6 | #include <Common/LRUCache.h> | 
|---|
| 7 | #include <Common/filesystemHelpers.h> | 
|---|
| 8 | #include <Core/Block.h> | 
|---|
| 9 | #include <Core/SortDescription.h> | 
|---|
| 10 | #include <Interpreters/IJoin.h> | 
|---|
| 11 | #include <DataStreams/SizeLimits.h> | 
|---|
| 12 |  | 
|---|
| 13 | namespace DB | 
|---|
| 14 | { | 
|---|
| 15 |  | 
|---|
| 16 | class AnalyzedJoin; | 
|---|
| 17 | class MergeJoinCursor; | 
|---|
| 18 | struct MergeJoinEqualRange; | 
|---|
| 19 |  | 
|---|
| 20 | struct MiniLSM | 
|---|
| 21 | { | 
|---|
| 22 | using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>; | 
|---|
| 23 |  | 
|---|
| 24 | const String & path; | 
|---|
| 25 | const Block & sample_block; | 
|---|
| 26 | const SortDescription & sort_description; | 
|---|
| 27 | const size_t rows_in_block; | 
|---|
| 28 | const size_t max_size; | 
|---|
| 29 | std::vector<SortedFiles> sorted_files; | 
|---|
| 30 |  | 
|---|
| 31 | MiniLSM(const String & path_, const Block & sample_block_, const SortDescription & description, | 
|---|
| 32 | size_t rows_in_block_, size_t max_size_ = 16) | 
|---|
| 33 | : path(path_) | 
|---|
| 34 | , sample_block(sample_block_) | 
|---|
| 35 | , sort_description(description) | 
|---|
| 36 | , rows_in_block(rows_in_block_) | 
|---|
| 37 | , max_size(max_size_) | 
|---|
| 38 | {} | 
|---|
| 39 |  | 
|---|
| 40 | void insert(const BlocksList & blocks); | 
|---|
| 41 | void merge(std::function<void(const Block &)> callback = [](const Block &){}); | 
|---|
| 42 | }; | 
|---|
| 43 |  | 
|---|
| 44 |  | 
|---|
| 45 | class MergeJoin : public IJoin | 
|---|
| 46 | { | 
|---|
| 47 | public: | 
|---|
| 48 | MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block); | 
|---|
| 49 |  | 
|---|
| 50 | bool addJoinedBlock(const Block & block) override; | 
|---|
| 51 | void joinBlock(Block &) override; | 
|---|
| 52 | void joinTotals(Block &) const override; | 
|---|
| 53 | void setTotals(const Block &) override; | 
|---|
| 54 | bool hasTotals() const override { return totals; } | 
|---|
| 55 | size_t getTotalRowCount() const override { return right_blocks_row_count; } | 
|---|
| 56 |  | 
|---|
| 57 | private: | 
|---|
| 58 | /// There're two size limits for right-hand table: max_rows_in_join, max_bytes_in_join. | 
|---|
| 59 | /// max_bytes is prefered. If it isn't set we aproximate it as (max_rows * bytes/row). | 
|---|
| 60 | struct BlockByteWeight | 
|---|
| 61 | { | 
|---|
| 62 | size_t operator()(const Block & block) const { return block.bytes(); } | 
|---|
| 63 | }; | 
|---|
| 64 |  | 
|---|
| 65 | using Cache = LRUCache<size_t, Block, std::hash<size_t>, BlockByteWeight>; | 
|---|
| 66 |  | 
|---|
| 67 | mutable std::shared_mutex rwlock; | 
|---|
| 68 | std::shared_ptr<AnalyzedJoin> table_join; | 
|---|
| 69 | SizeLimits size_limits; | 
|---|
| 70 | SortDescription left_sort_description; | 
|---|
| 71 | SortDescription right_sort_description; | 
|---|
| 72 | SortDescription left_merge_description; | 
|---|
| 73 | SortDescription right_merge_description; | 
|---|
| 74 | Block right_sample_block; | 
|---|
| 75 | Block right_table_keys; | 
|---|
| 76 | Block right_columns_to_add; | 
|---|
| 77 | BlocksList right_blocks; | 
|---|
| 78 | Blocks min_max_right_blocks; | 
|---|
| 79 | std::unique_ptr<Cache> cached_right_blocks; | 
|---|
| 80 | std::vector<std::shared_ptr<Block>> loaded_right_blocks; | 
|---|
| 81 | std::unique_ptr<MiniLSM> lsm; | 
|---|
| 82 | MiniLSM::SortedFiles flushed_right_blocks; | 
|---|
| 83 | Block totals; | 
|---|
| 84 | size_t right_blocks_row_count = 0; | 
|---|
| 85 | size_t right_blocks_bytes = 0; | 
|---|
| 86 | bool is_in_memory = true; | 
|---|
| 87 | const bool nullable_right_side; | 
|---|
| 88 | const bool is_all; | 
|---|
| 89 | const bool is_inner; | 
|---|
| 90 | const bool is_left; | 
|---|
| 91 | const bool skip_not_intersected; | 
|---|
| 92 | const size_t max_rows_in_right_block; | 
|---|
| 93 |  | 
|---|
| 94 | void changeLeftColumns(Block & block, MutableColumns && columns); | 
|---|
| 95 | void addRightColumns(Block & block, MutableColumns && columns); | 
|---|
| 96 |  | 
|---|
| 97 | void mergeRightBlocks(); | 
|---|
| 98 |  | 
|---|
| 99 | template <bool in_memory> | 
|---|
| 100 | size_t rightBlocksCount(); | 
|---|
| 101 | template <bool in_memory> | 
|---|
| 102 | void joinSortedBlock(Block & block); | 
|---|
| 103 | template <bool in_memory> | 
|---|
| 104 | std::shared_ptr<Block> loadRightBlock(size_t pos); | 
|---|
| 105 |  | 
|---|
| 106 | void leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, | 
|---|
| 107 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail); | 
|---|
| 108 | void innerJoin(MergeJoinCursor & left_cursor, const Block & left_block, const Block & right_block, | 
|---|
| 109 | MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail); | 
|---|
| 110 |  | 
|---|
| 111 | bool saveRightBlock(Block && block); | 
|---|
| 112 | void flushRightBlocks(); | 
|---|
| 113 |  | 
|---|
| 114 | void mergeInMemoryRightBlocks(); | 
|---|
| 115 | void mergeFlushedRightBlocks(); | 
|---|
| 116 |  | 
|---|
| 117 | void clearRightBlocksList() | 
|---|
| 118 | { | 
|---|
| 119 | right_blocks.clear(); | 
|---|
| 120 | right_blocks_row_count = 0; | 
|---|
| 121 | right_blocks_bytes = 0; | 
|---|
| 122 | } | 
|---|
| 123 |  | 
|---|
| 124 | void countBlockSize(const Block & block) | 
|---|
| 125 | { | 
|---|
| 126 | right_blocks_row_count += block.rows(); | 
|---|
| 127 | right_blocks_bytes += block.bytes(); | 
|---|
| 128 | } | 
|---|
| 129 | }; | 
|---|
| 130 |  | 
|---|
| 131 | } | 
|---|
| 132 |  | 
|---|