1 | #include <Common/FieldVisitors.h> |
2 | #include <Common/assert_cast.h> |
3 | #include <DataStreams/CollapsingSortedBlockInputStream.h> |
4 | #include <Columns/ColumnsNumber.h> |
5 | |
6 | /// Maximum number of messages about incorrect data in the log. |
7 | #define MAX_ERROR_MESSAGES 10 |
8 | |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | namespace ErrorCodes |
14 | { |
15 | extern const int INCORRECT_DATA; |
16 | extern const int LOGICAL_ERROR; |
17 | } |
18 | |
19 | |
20 | void CollapsingSortedBlockInputStream::reportIncorrectData() |
21 | { |
22 | std::stringstream s; |
23 | s << "Incorrect data: number of rows with sign = 1 (" << count_positive |
24 | << ") differs with number of rows with sign = -1 (" << count_negative |
25 | << ") by more than one (for key: " ; |
26 | |
27 | for (size_t i = 0, size = current_key.size(); i < size; ++i) |
28 | { |
29 | if (i != 0) |
30 | s << ", " ; |
31 | s << applyVisitor(FieldVisitorToString(), (*(*current_key.columns)[i])[current_key.row_num]); |
32 | } |
33 | |
34 | s << ")." ; |
35 | |
36 | /** Fow now we limit ourselves to just logging such situations, |
37 | * since the data is generated by external programs. |
38 | * With inconsistent data, this is an unavoidable error that can not be easily corrected by admins. Therefore Warning. |
39 | */ |
40 | LOG_WARNING(log, s.rdbuf()); |
41 | } |
42 | |
43 | |
44 | void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition) |
45 | { |
46 | if (count_positive == 0 && count_negative == 0) |
47 | { |
48 | /// No input rows have been read. |
49 | return; |
50 | } |
51 | |
52 | if (last_is_positive || count_positive != count_negative) |
53 | { |
54 | if (count_positive <= count_negative) |
55 | { |
56 | condition.addRowWithGranularity(block_size); |
57 | for (size_t i = 0; i < num_columns; ++i) |
58 | merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num); |
59 | |
60 | if (out_row_sources_buf) |
61 | current_row_sources[first_negative_pos].setSkipFlag(false); |
62 | } |
63 | |
64 | if (count_positive >= count_negative) |
65 | { |
66 | condition.addRowWithGranularity(block_size); |
67 | for (size_t i = 0; i < num_columns; ++i) |
68 | merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num); |
69 | |
70 | if (out_row_sources_buf) |
71 | current_row_sources[last_positive_pos].setSkipFlag(false); |
72 | } |
73 | |
74 | if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) |
75 | { |
76 | if (count_incorrect_data < MAX_ERROR_MESSAGES) |
77 | reportIncorrectData(); |
78 | ++count_incorrect_data; |
79 | } |
80 | } |
81 | |
82 | if (out_row_sources_buf) |
83 | out_row_sources_buf->write( |
84 | reinterpret_cast<const char *>(current_row_sources.data()), |
85 | current_row_sources.size() * sizeof(RowSourcePart)); |
86 | } |
87 | |
88 | |
89 | Block CollapsingSortedBlockInputStream::readImpl() |
90 | { |
91 | if (finished) |
92 | return {}; |
93 | |
94 | MutableColumns merged_columns; |
95 | init(merged_columns); |
96 | |
97 | if (has_collation) |
98 | throw Exception("Logical error: " + getName() + " does not support collations" , ErrorCodes::LOGICAL_ERROR); |
99 | |
100 | if (merged_columns.empty()) |
101 | return {}; |
102 | |
103 | merge(merged_columns, queue_without_collation); |
104 | return header.cloneWithColumns(std::move(merged_columns)); |
105 | } |
106 | |
107 | |
108 | void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) |
109 | { |
110 | |
111 | MergeStopCondition stop_condition(average_block_sizes, max_block_size); |
112 | size_t current_block_granularity; |
113 | /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` |
114 | for (; !queue.empty(); ++current_pos) |
115 | { |
116 | SortCursor current = queue.top(); |
117 | current_block_granularity = current->rows; |
118 | |
119 | if (current_key.empty()) |
120 | setPrimaryKeyRef(current_key, current); |
121 | |
122 | Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos]; |
123 | setPrimaryKeyRef(next_key, current); |
124 | |
125 | bool key_differs = next_key != current_key; |
126 | |
127 | /// if there are enough rows and the last one is calculated completely |
128 | if (key_differs && stop_condition.checkStop()) |
129 | { |
130 | ++blocks_written; |
131 | return; |
132 | } |
133 | |
134 | queue.pop(); |
135 | |
136 | if (key_differs) |
137 | { |
138 | /// We write data for the previous primary key. |
139 | insertRows(merged_columns, current_block_granularity, stop_condition); |
140 | |
141 | current_key.swap(next_key); |
142 | |
143 | count_negative = 0; |
144 | count_positive = 0; |
145 | |
146 | current_pos = 0; |
147 | first_negative_pos = 0; |
148 | last_positive_pos = 0; |
149 | last_negative_pos = 0; |
150 | current_row_sources.resize(0); |
151 | } |
152 | |
153 | /// Initially, skip all rows. On insert, unskip "corner" rows. |
154 | if (out_row_sources_buf) |
155 | current_row_sources.emplace_back(current.impl->order, true); |
156 | |
157 | if (sign == 1) |
158 | { |
159 | ++count_positive; |
160 | last_is_positive = true; |
161 | |
162 | setRowRef(last_positive, current); |
163 | last_positive_pos = current_pos; |
164 | } |
165 | else if (sign == -1) |
166 | { |
167 | if (!count_negative) |
168 | { |
169 | setRowRef(first_negative, current); |
170 | first_negative_pos = current_pos; |
171 | } |
172 | |
173 | if (!blocks_written && stop_condition.empty()) |
174 | { |
175 | setRowRef(last_negative, current); |
176 | last_negative_pos = current_pos; |
177 | } |
178 | |
179 | ++count_negative; |
180 | last_is_positive = false; |
181 | } |
182 | else |
183 | throw Exception("Incorrect data: Sign = " + toString(sign) + " (must be 1 or -1)." , |
184 | ErrorCodes::INCORRECT_DATA); |
185 | |
186 | if (!current->isLast()) |
187 | { |
188 | current->next(); |
189 | queue.push(current); |
190 | } |
191 | else |
192 | { |
193 | /// We take next block from the corresponding source, if there is one. |
194 | fetchNextBlock(current, queue); |
195 | } |
196 | } |
197 | |
198 | /// Write data for last primary key. |
199 | insertRows(merged_columns, /*some_granularity*/ 0, stop_condition); |
200 | |
201 | finished = true; |
202 | } |
203 | |
204 | } |
205 | |