1 | #include <Common/FieldVisitors.h> |
2 | #include <Common/assert_cast.h> |
3 | #include <DataStreams/VersionedCollapsingSortedBlockInputStream.h> |
4 | #include <Columns/ColumnsNumber.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | namespace ErrorCodes |
11 | { |
12 | extern const int LOGICAL_ERROR; |
13 | extern const int NOT_IMPLEMENTED; |
14 | } |
15 | |
16 | |
17 | VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream( |
18 | const BlockInputStreams & inputs_, const SortDescription & description_, |
19 | const String & sign_column_, size_t max_block_size_, |
20 | WriteBuffer * out_row_sources_buf_, bool average_block_sizes_) |
21 | : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_) |
22 | , max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) |
23 | , current_keys(max_rows_in_queue + 1) |
24 | { |
25 | sign_column_number = header.getPositionByName(sign_column_); |
26 | } |
27 | |
28 | |
29 | |
30 | inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source) |
31 | { |
32 | if constexpr (sizeof(RowSourcePart) == 1) |
33 | buffer.write(*reinterpret_cast<const char *>(&row_source)); |
34 | else |
35 | buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart)); |
36 | } |
37 | |
38 | void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size) |
39 | { |
40 | if (out_row_sources_buf) |
41 | { |
42 | for (size_t i = 0; i < gap_size; ++i) |
43 | { |
44 | writeRowSourcePart(*out_row_sources_buf, current_row_sources.front()); |
45 | current_row_sources.pop(); |
46 | } |
47 | } |
48 | } |
49 | |
50 | void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns) |
51 | { |
52 | const auto & columns = row.shared_block->all_columns; |
53 | for (size_t i = 0; i < num_columns; ++i) |
54 | merged_columns[i]->insertFrom(*columns[i], row.row_num); |
55 | |
56 | insertGap(skip_rows); |
57 | |
58 | if (out_row_sources_buf) |
59 | { |
60 | current_row_sources.front().setSkipFlag(false); |
61 | writeRowSourcePart(*out_row_sources_buf, current_row_sources.front()); |
62 | current_row_sources.pop(); |
63 | } |
64 | } |
65 | |
66 | Block VersionedCollapsingSortedBlockInputStream::readImpl() |
67 | { |
68 | if (finished) |
69 | return {}; |
70 | |
71 | MutableColumns merged_columns; |
72 | init(merged_columns); |
73 | |
74 | if (has_collation) |
75 | throw Exception("Logical error: " + getName() + " does not support collations" , ErrorCodes::NOT_IMPLEMENTED); |
76 | |
77 | if (merged_columns.empty()) |
78 | return {}; |
79 | |
80 | merge(merged_columns, queue_without_collation); |
81 | return header.cloneWithColumns(std::move(merged_columns)); |
82 | } |
83 | |
84 | |
85 | void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) |
86 | { |
87 | MergeStopCondition stop_condition(average_block_sizes, max_block_size); |
88 | |
89 | auto update_queue = [this, & queue](SortCursor & cursor) |
90 | { |
91 | queue.pop(); |
92 | |
93 | if (out_row_sources_buf) |
94 | current_row_sources.emplace(cursor->order, true); |
95 | |
96 | if (!cursor->isLast()) |
97 | { |
98 | cursor->next(); |
99 | queue.push(cursor); |
100 | } |
101 | else |
102 | { |
103 | /// We take next block from the corresponding source, if there is one. |
104 | fetchNextBlock(cursor, queue); |
105 | } |
106 | }; |
107 | |
108 | /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` |
109 | while (!queue.empty()) |
110 | { |
111 | SortCursor current = queue.top(); |
112 | size_t current_block_granularity = current->rows; |
113 | |
114 | SharedBlockRowRef next_key; |
115 | |
116 | Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos]; |
117 | |
118 | setPrimaryKeyRef(next_key, current); |
119 | |
120 | size_t rows_to_merge = 0; |
121 | |
122 | /// Each branch either updates queue or increases rows_to_merge. |
123 | if (current_keys.empty()) |
124 | { |
125 | sign_in_queue = sign; |
126 | current_keys.pushBack(next_key); |
127 | update_queue(current); |
128 | } |
129 | else |
130 | { |
131 | if (current_keys.back() == next_key) |
132 | { |
133 | update_queue(current); |
134 | |
135 | if (sign == sign_in_queue) |
136 | current_keys.pushBack(next_key); |
137 | else |
138 | { |
139 | current_keys.popBack(); |
140 | current_keys.pushGap(2); |
141 | } |
142 | } |
143 | else |
144 | rows_to_merge = current_keys.size(); |
145 | } |
146 | |
147 | if (current_keys.size() > max_rows_in_queue) |
148 | rows_to_merge = std::max(rows_to_merge, current_keys.size() - max_rows_in_queue); |
149 | |
150 | while (rows_to_merge) |
151 | { |
152 | const auto & row = current_keys.front(); |
153 | auto gap = current_keys.frontGap(); |
154 | |
155 | insertRow(gap, row, merged_columns); |
156 | |
157 | current_keys.popFront(); |
158 | |
159 | stop_condition.addRowWithGranularity(current_block_granularity); |
160 | --rows_to_merge; |
161 | |
162 | if (stop_condition.checkStop()) |
163 | { |
164 | ++blocks_written; |
165 | return; |
166 | } |
167 | } |
168 | } |
169 | |
170 | while (!current_keys.empty()) |
171 | { |
172 | const auto & row = current_keys.front(); |
173 | auto gap = current_keys.frontGap(); |
174 | |
175 | insertRow(gap, row, merged_columns); |
176 | |
177 | current_keys.popFront(); |
178 | } |
179 | |
180 | /// Write information about last collapsed rows. |
181 | insertGap(current_keys.frontGap()); |
182 | |
183 | finished = true; |
184 | } |
185 | |
186 | } |
187 | |