1 | #pragma once |
2 | |
3 | #include <queue> |
4 | |
5 | #include <boost/smart_ptr/intrusive_ptr.hpp> |
6 | |
7 | #include <common/logger_useful.h> |
8 | #include <Common/SharedBlockRowRef.h> |
9 | |
10 | #include <Core/Row.h> |
11 | #include <Core/SortDescription.h> |
12 | #include <Core/SortCursor.h> |
13 | |
14 | #include <IO/WriteHelpers.h> |
15 | |
16 | #include <DataStreams/IBlockInputStream.h> |
17 | |
18 | |
19 | namespace DB |
20 | { |
21 | |
22 | namespace ErrorCodes |
23 | { |
24 | extern const int CORRUPTED_DATA; |
25 | } |
26 | |
27 | |
28 | /** Merges several sorted streams into one sorted stream. |
29 | */ |
30 | class MergingSortedBlockInputStream : public IBlockInputStream |
31 | { |
32 | public: |
33 | /** limit - if isn't 0, then we can produce only first limit rows in sorted order. |
34 | * out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag) |
35 | * quiet - don't log profiling info |
36 | */ |
37 | MergingSortedBlockInputStream( |
38 | const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, |
39 | UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false, bool average_block_sizes_ = false); |
40 | |
41 | String getName() const override { return "MergingSorted" ; } |
42 | |
43 | bool isSortedOutput() const override { return true; } |
44 | const SortDescription & getSortDescription() const override { return description; } |
45 | |
46 | Block () const override { return header; } |
47 | |
48 | protected: |
49 | /// Simple class, which allows to check stop condition during merge process |
50 | /// in simple case it just compare amount of merged rows with max_block_size |
51 | /// in `count_average` case it compares amount of merged rows with linear combination |
52 | /// of block sizes from which these rows were taken. |
53 | struct MergeStopCondition |
54 | { |
55 | size_t sum_blocks_granularity = 0; |
56 | size_t sum_rows_count = 0; |
57 | bool count_average; |
58 | size_t max_block_size; |
59 | |
60 | MergeStopCondition(bool count_average_, size_t max_block_size_) |
61 | : count_average(count_average_) |
62 | , max_block_size(max_block_size_) |
63 | {} |
64 | |
65 | /// add single row from block size `granularity` |
66 | void addRowWithGranularity(size_t granularity) |
67 | { |
68 | sum_blocks_granularity += granularity; |
69 | sum_rows_count++; |
70 | } |
71 | |
72 | /// check that sum_rows_count is enough |
73 | bool checkStop() const; |
74 | |
75 | bool empty() const |
76 | { |
77 | return sum_blocks_granularity == 0; |
78 | } |
79 | }; |
80 | |
81 | Block readImpl() override; |
82 | |
83 | void readSuffixImpl() override; |
84 | |
85 | /// Initializes the queue and the columns of next result block. |
86 | void init(MutableColumns & merged_columns); |
87 | |
88 | /// Gets the next block from the source corresponding to the `current`. |
89 | template <typename TSortCursor> |
90 | void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue); |
91 | |
92 | |
93 | Block ; |
94 | |
95 | const SortDescription description; |
96 | const size_t max_block_size; |
97 | UInt64 limit; |
98 | UInt64 total_merged_rows = 0; |
99 | |
100 | bool first = true; |
101 | bool has_collation = false; |
102 | bool quiet = false; |
103 | bool average_block_sizes = false; |
104 | |
105 | /// May be smaller or equal to max_block_size. To do 'reserve' for columns. |
106 | size_t expected_block_size = 0; |
107 | |
108 | /// Blocks currently being merged. |
109 | size_t num_columns = 0; |
110 | std::vector<SharedBlockPtr> source_blocks; |
111 | |
112 | using CursorImpls = std::vector<SortCursorImpl>; |
113 | CursorImpls cursors; |
114 | |
115 | using Queue = std::priority_queue<SortCursor>; |
116 | Queue queue_without_collation; |
117 | |
118 | using QueueWithCollation = std::priority_queue<SortCursorWithCollation>; |
119 | QueueWithCollation queue_with_collation; |
120 | |
121 | /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) |
122 | /// If it is not nullptr then it should be populated during execution |
123 | WriteBuffer * out_row_sources_buf; |
124 | |
125 | |
126 | /// These methods are used in Collapsing/Summing/Aggregating... SortedBlockInputStream-s. |
127 | |
128 | /// Save the row pointed to by cursor in `row`. |
129 | template <typename TSortCursor> |
130 | void setRow(Row & row, TSortCursor & cursor) |
131 | { |
132 | for (size_t i = 0; i < num_columns; ++i) |
133 | { |
134 | try |
135 | { |
136 | cursor->all_columns[i]->get(cursor->pos, row[i]); |
137 | } |
138 | catch (...) |
139 | { |
140 | tryLogCurrentException(__PRETTY_FUNCTION__); |
141 | |
142 | /// Find out the name of the column and throw more informative exception. |
143 | |
144 | String column_name; |
145 | for (const auto & block : source_blocks) |
146 | { |
147 | if (i < block->columns()) |
148 | { |
149 | column_name = block->safeGetByPosition(i).name; |
150 | break; |
151 | } |
152 | } |
153 | |
154 | throw Exception("MergingSortedBlockInputStream failed to read row " + toString(cursor->pos) |
155 | + " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")" ), |
156 | ErrorCodes::CORRUPTED_DATA); |
157 | } |
158 | } |
159 | } |
160 | |
161 | template <typename TSortCursor> |
162 | void setRowRef(SharedBlockRowRef & row_ref, TSortCursor & cursor) |
163 | { |
164 | row_ref.row_num = cursor.impl->pos; |
165 | row_ref.shared_block = source_blocks[cursor.impl->order]; |
166 | row_ref.columns = &row_ref.shared_block->all_columns; |
167 | } |
168 | |
169 | template <typename TSortCursor> |
170 | void setPrimaryKeyRef(SharedBlockRowRef & row_ref, TSortCursor & cursor) |
171 | { |
172 | row_ref.row_num = cursor.impl->pos; |
173 | row_ref.shared_block = source_blocks[cursor.impl->order]; |
174 | row_ref.columns = &row_ref.shared_block->sort_columns; |
175 | } |
176 | |
177 | private: |
178 | |
179 | /** We support two different cursors - with Collation and without. |
180 | * Templates are used instead of polymorphic SortCursor and calls to virtual functions. |
181 | */ |
182 | template <typename TSortCursor> |
183 | void initQueue(std::priority_queue<TSortCursor> & queue); |
184 | |
185 | template <typename TSortCursor> |
186 | void merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue); |
187 | |
188 | Logger * log = &Logger::get("MergingSortedBlockInputStream" ); |
189 | |
190 | /// Read is finished. |
191 | bool finished = false; |
192 | }; |
193 | |
194 | } |
195 | |