| 1 | #pragma once |
| 2 | |
| 3 | #include <queue> |
| 4 | |
| 5 | #include <boost/smart_ptr/intrusive_ptr.hpp> |
| 6 | |
| 7 | #include <common/logger_useful.h> |
| 8 | #include <Common/SharedBlockRowRef.h> |
| 9 | |
| 10 | #include <Core/Row.h> |
| 11 | #include <Core/SortDescription.h> |
| 12 | #include <Core/SortCursor.h> |
| 13 | |
| 14 | #include <IO/WriteHelpers.h> |
| 15 | |
| 16 | #include <DataStreams/IBlockInputStream.h> |
| 17 | |
| 18 | |
| 19 | namespace DB |
| 20 | { |
| 21 | |
| 22 | namespace ErrorCodes |
| 23 | { |
| 24 | extern const int CORRUPTED_DATA; |
| 25 | } |
| 26 | |
| 27 | |
| 28 | /** Merges several sorted streams into one sorted stream. |
| 29 | */ |
| 30 | class MergingSortedBlockInputStream : public IBlockInputStream |
| 31 | { |
| 32 | public: |
| 33 | /** limit - if isn't 0, then we can produce only first limit rows in sorted order. |
| 34 | * out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag) |
| 35 | * quiet - don't log profiling info |
| 36 | */ |
| 37 | MergingSortedBlockInputStream( |
| 38 | const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, |
| 39 | UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false, bool average_block_sizes_ = false); |
| 40 | |
| 41 | String getName() const override { return "MergingSorted" ; } |
| 42 | |
| 43 | bool isSortedOutput() const override { return true; } |
| 44 | const SortDescription & getSortDescription() const override { return description; } |
| 45 | |
| 46 | Block () const override { return header; } |
| 47 | |
| 48 | protected: |
| 49 | /// Simple class, which allows to check stop condition during merge process |
| 50 | /// in simple case it just compare amount of merged rows with max_block_size |
| 51 | /// in `count_average` case it compares amount of merged rows with linear combination |
| 52 | /// of block sizes from which these rows were taken. |
| 53 | struct MergeStopCondition |
| 54 | { |
| 55 | size_t sum_blocks_granularity = 0; |
| 56 | size_t sum_rows_count = 0; |
| 57 | bool count_average; |
| 58 | size_t max_block_size; |
| 59 | |
| 60 | MergeStopCondition(bool count_average_, size_t max_block_size_) |
| 61 | : count_average(count_average_) |
| 62 | , max_block_size(max_block_size_) |
| 63 | {} |
| 64 | |
| 65 | /// add single row from block size `granularity` |
| 66 | void addRowWithGranularity(size_t granularity) |
| 67 | { |
| 68 | sum_blocks_granularity += granularity; |
| 69 | sum_rows_count++; |
| 70 | } |
| 71 | |
| 72 | /// check that sum_rows_count is enough |
| 73 | bool checkStop() const; |
| 74 | |
| 75 | bool empty() const |
| 76 | { |
| 77 | return sum_blocks_granularity == 0; |
| 78 | } |
| 79 | }; |
| 80 | |
| 81 | Block readImpl() override; |
| 82 | |
| 83 | void readSuffixImpl() override; |
| 84 | |
| 85 | /// Initializes the queue and the columns of next result block. |
| 86 | void init(MutableColumns & merged_columns); |
| 87 | |
| 88 | /// Gets the next block from the source corresponding to the `current`. |
| 89 | template <typename TSortCursor> |
| 90 | void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue); |
| 91 | |
| 92 | |
| 93 | Block ; |
| 94 | |
| 95 | const SortDescription description; |
| 96 | const size_t max_block_size; |
| 97 | UInt64 limit; |
| 98 | UInt64 total_merged_rows = 0; |
| 99 | |
| 100 | bool first = true; |
| 101 | bool has_collation = false; |
| 102 | bool quiet = false; |
| 103 | bool average_block_sizes = false; |
| 104 | |
| 105 | /// May be smaller or equal to max_block_size. To do 'reserve' for columns. |
| 106 | size_t expected_block_size = 0; |
| 107 | |
| 108 | /// Blocks currently being merged. |
| 109 | size_t num_columns = 0; |
| 110 | std::vector<SharedBlockPtr> source_blocks; |
| 111 | |
| 112 | using CursorImpls = std::vector<SortCursorImpl>; |
| 113 | CursorImpls cursors; |
| 114 | |
| 115 | using Queue = std::priority_queue<SortCursor>; |
| 116 | Queue queue_without_collation; |
| 117 | |
| 118 | using QueueWithCollation = std::priority_queue<SortCursorWithCollation>; |
| 119 | QueueWithCollation queue_with_collation; |
| 120 | |
| 121 | /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) |
| 122 | /// If it is not nullptr then it should be populated during execution |
| 123 | WriteBuffer * out_row_sources_buf; |
| 124 | |
| 125 | |
| 126 | /// These methods are used in Collapsing/Summing/Aggregating... SortedBlockInputStream-s. |
| 127 | |
| 128 | /// Save the row pointed to by cursor in `row`. |
| 129 | template <typename TSortCursor> |
| 130 | void setRow(Row & row, TSortCursor & cursor) |
| 131 | { |
| 132 | for (size_t i = 0; i < num_columns; ++i) |
| 133 | { |
| 134 | try |
| 135 | { |
| 136 | cursor->all_columns[i]->get(cursor->pos, row[i]); |
| 137 | } |
| 138 | catch (...) |
| 139 | { |
| 140 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 141 | |
| 142 | /// Find out the name of the column and throw more informative exception. |
| 143 | |
| 144 | String column_name; |
| 145 | for (const auto & block : source_blocks) |
| 146 | { |
| 147 | if (i < block->columns()) |
| 148 | { |
| 149 | column_name = block->safeGetByPosition(i).name; |
| 150 | break; |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | throw Exception("MergingSortedBlockInputStream failed to read row " + toString(cursor->pos) |
| 155 | + " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")" ), |
| 156 | ErrorCodes::CORRUPTED_DATA); |
| 157 | } |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | template <typename TSortCursor> |
| 162 | void setRowRef(SharedBlockRowRef & row_ref, TSortCursor & cursor) |
| 163 | { |
| 164 | row_ref.row_num = cursor.impl->pos; |
| 165 | row_ref.shared_block = source_blocks[cursor.impl->order]; |
| 166 | row_ref.columns = &row_ref.shared_block->all_columns; |
| 167 | } |
| 168 | |
| 169 | template <typename TSortCursor> |
| 170 | void setPrimaryKeyRef(SharedBlockRowRef & row_ref, TSortCursor & cursor) |
| 171 | { |
| 172 | row_ref.row_num = cursor.impl->pos; |
| 173 | row_ref.shared_block = source_blocks[cursor.impl->order]; |
| 174 | row_ref.columns = &row_ref.shared_block->sort_columns; |
| 175 | } |
| 176 | |
| 177 | private: |
| 178 | |
| 179 | /** We support two different cursors - with Collation and without. |
| 180 | * Templates are used instead of polymorphic SortCursor and calls to virtual functions. |
| 181 | */ |
| 182 | template <typename TSortCursor> |
| 183 | void initQueue(std::priority_queue<TSortCursor> & queue); |
| 184 | |
| 185 | template <typename TSortCursor> |
| 186 | void merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue); |
| 187 | |
| 188 | Logger * log = &Logger::get("MergingSortedBlockInputStream" ); |
| 189 | |
| 190 | /// Read is finished. |
| 191 | bool finished = false; |
| 192 | }; |
| 193 | |
| 194 | } |
| 195 | |