1#pragma once
2
3#include <queue>
4
5#include <boost/smart_ptr/intrusive_ptr.hpp>
6
7#include <common/logger_useful.h>
8#include <Common/SharedBlockRowRef.h>
9
10#include <Core/Row.h>
11#include <Core/SortDescription.h>
12#include <Core/SortCursor.h>
13
14#include <IO/WriteHelpers.h>
15
16#include <DataStreams/IBlockInputStream.h>
17
18
19namespace DB
20{
21
22namespace ErrorCodes
23{
24 extern const int CORRUPTED_DATA;
25}
26
27
28/** Merges several sorted streams into one sorted stream.
29 */
30class MergingSortedBlockInputStream : public IBlockInputStream
31{
32public:
33 /** limit - if isn't 0, then we can produce only first limit rows in sorted order.
34 * out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
35 * quiet - don't log profiling info
36 */
37 MergingSortedBlockInputStream(
38 const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
39 UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false, bool average_block_sizes_ = false);
40
41 String getName() const override { return "MergingSorted"; }
42
43 bool isSortedOutput() const override { return true; }
44 const SortDescription & getSortDescription() const override { return description; }
45
46 Block getHeader() const override { return header; }
47
48protected:
49 /// Simple class, which allows to check stop condition during merge process
50 /// in simple case it just compare amount of merged rows with max_block_size
51 /// in `count_average` case it compares amount of merged rows with linear combination
52 /// of block sizes from which these rows were taken.
53 struct MergeStopCondition
54 {
55 size_t sum_blocks_granularity = 0;
56 size_t sum_rows_count = 0;
57 bool count_average;
58 size_t max_block_size;
59
60 MergeStopCondition(bool count_average_, size_t max_block_size_)
61 : count_average(count_average_)
62 , max_block_size(max_block_size_)
63 {}
64
65 /// add single row from block size `granularity`
66 void addRowWithGranularity(size_t granularity)
67 {
68 sum_blocks_granularity += granularity;
69 sum_rows_count++;
70 }
71
72 /// check that sum_rows_count is enough
73 bool checkStop() const;
74
75 bool empty() const
76 {
77 return sum_blocks_granularity == 0;
78 }
79 };
80
81 Block readImpl() override;
82
83 void readSuffixImpl() override;
84
85 /// Initializes the queue and the columns of next result block.
86 void init(MutableColumns & merged_columns);
87
88 /// Gets the next block from the source corresponding to the `current`.
89 template <typename TSortCursor>
90 void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue);
91
92
93 Block header;
94
95 const SortDescription description;
96 const size_t max_block_size;
97 UInt64 limit;
98 UInt64 total_merged_rows = 0;
99
100 bool first = true;
101 bool has_collation = false;
102 bool quiet = false;
103 bool average_block_sizes = false;
104
105 /// May be smaller or equal to max_block_size. To do 'reserve' for columns.
106 size_t expected_block_size = 0;
107
108 /// Blocks currently being merged.
109 size_t num_columns = 0;
110 std::vector<SharedBlockPtr> source_blocks;
111
112 using CursorImpls = std::vector<SortCursorImpl>;
113 CursorImpls cursors;
114
115 using Queue = std::priority_queue<SortCursor>;
116 Queue queue_without_collation;
117
118 using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
119 QueueWithCollation queue_with_collation;
120
121 /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
122 /// If it is not nullptr then it should be populated during execution
123 WriteBuffer * out_row_sources_buf;
124
125
126 /// These methods are used in Collapsing/Summing/Aggregating... SortedBlockInputStream-s.
127
128 /// Save the row pointed to by cursor in `row`.
129 template <typename TSortCursor>
130 void setRow(Row & row, TSortCursor & cursor)
131 {
132 for (size_t i = 0; i < num_columns; ++i)
133 {
134 try
135 {
136 cursor->all_columns[i]->get(cursor->pos, row[i]);
137 }
138 catch (...)
139 {
140 tryLogCurrentException(__PRETTY_FUNCTION__);
141
142 /// Find out the name of the column and throw more informative exception.
143
144 String column_name;
145 for (const auto & block : source_blocks)
146 {
147 if (i < block->columns())
148 {
149 column_name = block->safeGetByPosition(i).name;
150 break;
151 }
152 }
153
154 throw Exception("MergingSortedBlockInputStream failed to read row " + toString(cursor->pos)
155 + " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")"),
156 ErrorCodes::CORRUPTED_DATA);
157 }
158 }
159 }
160
161 template <typename TSortCursor>
162 void setRowRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
163 {
164 row_ref.row_num = cursor.impl->pos;
165 row_ref.shared_block = source_blocks[cursor.impl->order];
166 row_ref.columns = &row_ref.shared_block->all_columns;
167 }
168
169 template <typename TSortCursor>
170 void setPrimaryKeyRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
171 {
172 row_ref.row_num = cursor.impl->pos;
173 row_ref.shared_block = source_blocks[cursor.impl->order];
174 row_ref.columns = &row_ref.shared_block->sort_columns;
175 }
176
177private:
178
179 /** We support two different cursors - with Collation and without.
180 * Templates are used instead of polymorphic SortCursor and calls to virtual functions.
181 */
182 template <typename TSortCursor>
183 void initQueue(std::priority_queue<TSortCursor> & queue);
184
185 template <typename TSortCursor>
186 void merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue);
187
188 Logger * log = &Logger::get("MergingSortedBlockInputStream");
189
190 /// Read is finished.
191 bool finished = false;
192};
193
194}
195