1#include <Common/FieldVisitors.h>
2#include <Common/assert_cast.h>
3#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
4#include <Columns/ColumnsNumber.h>
5
6
7namespace DB
8{
9
10namespace ErrorCodes
11{
12 extern const int LOGICAL_ERROR;
13 extern const int NOT_IMPLEMENTED;
14}
15
16
17VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream(
18 const BlockInputStreams & inputs_, const SortDescription & description_,
19 const String & sign_column_, size_t max_block_size_,
20 WriteBuffer * out_row_sources_buf_, bool average_block_sizes_)
21 : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_)
22 , max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
23 , current_keys(max_rows_in_queue + 1)
24{
25 sign_column_number = header.getPositionByName(sign_column_);
26}
27
28
29
30inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
31{
32 if constexpr (sizeof(RowSourcePart) == 1)
33 buffer.write(*reinterpret_cast<const char *>(&row_source));
34 else
35 buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart));
36}
37
38void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size)
39{
40 if (out_row_sources_buf)
41 {
42 for (size_t i = 0; i < gap_size; ++i)
43 {
44 writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
45 current_row_sources.pop();
46 }
47 }
48}
49
50void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns)
51{
52 const auto & columns = row.shared_block->all_columns;
53 for (size_t i = 0; i < num_columns; ++i)
54 merged_columns[i]->insertFrom(*columns[i], row.row_num);
55
56 insertGap(skip_rows);
57
58 if (out_row_sources_buf)
59 {
60 current_row_sources.front().setSkipFlag(false);
61 writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
62 current_row_sources.pop();
63 }
64}
65
66Block VersionedCollapsingSortedBlockInputStream::readImpl()
67{
68 if (finished)
69 return {};
70
71 MutableColumns merged_columns;
72 init(merged_columns);
73
74 if (has_collation)
75 throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED);
76
77 if (merged_columns.empty())
78 return {};
79
80 merge(merged_columns, queue_without_collation);
81 return header.cloneWithColumns(std::move(merged_columns));
82}
83
84
85void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
86{
87 MergeStopCondition stop_condition(average_block_sizes, max_block_size);
88
89 auto update_queue = [this, & queue](SortCursor & cursor)
90 {
91 queue.pop();
92
93 if (out_row_sources_buf)
94 current_row_sources.emplace(cursor->order, true);
95
96 if (!cursor->isLast())
97 {
98 cursor->next();
99 queue.push(cursor);
100 }
101 else
102 {
103 /// We take next block from the corresponding source, if there is one.
104 fetchNextBlock(cursor, queue);
105 }
106 };
107
108 /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
109 while (!queue.empty())
110 {
111 SortCursor current = queue.top();
112 size_t current_block_granularity = current->rows;
113
114 SharedBlockRowRef next_key;
115
116 Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
117
118 setPrimaryKeyRef(next_key, current);
119
120 size_t rows_to_merge = 0;
121
122 /// Each branch either updates queue or increases rows_to_merge.
123 if (current_keys.empty())
124 {
125 sign_in_queue = sign;
126 current_keys.pushBack(next_key);
127 update_queue(current);
128 }
129 else
130 {
131 if (current_keys.back() == next_key)
132 {
133 update_queue(current);
134
135 if (sign == sign_in_queue)
136 current_keys.pushBack(next_key);
137 else
138 {
139 current_keys.popBack();
140 current_keys.pushGap(2);
141 }
142 }
143 else
144 rows_to_merge = current_keys.size();
145 }
146
147 if (current_keys.size() > max_rows_in_queue)
148 rows_to_merge = std::max(rows_to_merge, current_keys.size() - max_rows_in_queue);
149
150 while (rows_to_merge)
151 {
152 const auto & row = current_keys.front();
153 auto gap = current_keys.frontGap();
154
155 insertRow(gap, row, merged_columns);
156
157 current_keys.popFront();
158
159 stop_condition.addRowWithGranularity(current_block_granularity);
160 --rows_to_merge;
161
162 if (stop_condition.checkStop())
163 {
164 ++blocks_written;
165 return;
166 }
167 }
168 }
169
170 while (!current_keys.empty())
171 {
172 const auto & row = current_keys.front();
173 auto gap = current_keys.frontGap();
174
175 insertRow(gap, row, merged_columns);
176
177 current_keys.popFront();
178 }
179
180 /// Write information about last collapsed rows.
181 insertGap(current_keys.frontGap());
182
183 finished = true;
184}
185
186}
187