1#include <Common/FieldVisitors.h>
2#include <Common/assert_cast.h>
3#include <DataStreams/CollapsingSortedBlockInputStream.h>
4#include <Columns/ColumnsNumber.h>
5
6/// Maximum number of messages about incorrect data in the log.
7#define MAX_ERROR_MESSAGES 10
8
9
10namespace DB
11{
12
13namespace ErrorCodes
14{
15 extern const int INCORRECT_DATA;
16 extern const int LOGICAL_ERROR;
17}
18
19
20void CollapsingSortedBlockInputStream::reportIncorrectData()
21{
22 std::stringstream s;
23 s << "Incorrect data: number of rows with sign = 1 (" << count_positive
24 << ") differs with number of rows with sign = -1 (" << count_negative
25 << ") by more than one (for key: ";
26
27 for (size_t i = 0, size = current_key.size(); i < size; ++i)
28 {
29 if (i != 0)
30 s << ", ";
31 s << applyVisitor(FieldVisitorToString(), (*(*current_key.columns)[i])[current_key.row_num]);
32 }
33
34 s << ").";
35
36 /** Fow now we limit ourselves to just logging such situations,
37 * since the data is generated by external programs.
38 * With inconsistent data, this is an unavoidable error that can not be easily corrected by admins. Therefore Warning.
39 */
40 LOG_WARNING(log, s.rdbuf());
41}
42
43
44void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition)
45{
46 if (count_positive == 0 && count_negative == 0)
47 {
48 /// No input rows have been read.
49 return;
50 }
51
52 if (last_is_positive || count_positive != count_negative)
53 {
54 if (count_positive <= count_negative)
55 {
56 condition.addRowWithGranularity(block_size);
57 for (size_t i = 0; i < num_columns; ++i)
58 merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
59
60 if (out_row_sources_buf)
61 current_row_sources[first_negative_pos].setSkipFlag(false);
62 }
63
64 if (count_positive >= count_negative)
65 {
66 condition.addRowWithGranularity(block_size);
67 for (size_t i = 0; i < num_columns; ++i)
68 merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
69
70 if (out_row_sources_buf)
71 current_row_sources[last_positive_pos].setSkipFlag(false);
72 }
73
74 if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
75 {
76 if (count_incorrect_data < MAX_ERROR_MESSAGES)
77 reportIncorrectData();
78 ++count_incorrect_data;
79 }
80 }
81
82 if (out_row_sources_buf)
83 out_row_sources_buf->write(
84 reinterpret_cast<const char *>(current_row_sources.data()),
85 current_row_sources.size() * sizeof(RowSourcePart));
86}
87
88
89Block CollapsingSortedBlockInputStream::readImpl()
90{
91 if (finished)
92 return {};
93
94 MutableColumns merged_columns;
95 init(merged_columns);
96
97 if (has_collation)
98 throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
99
100 if (merged_columns.empty())
101 return {};
102
103 merge(merged_columns, queue_without_collation);
104 return header.cloneWithColumns(std::move(merged_columns));
105}
106
107
108void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
109{
110
111 MergeStopCondition stop_condition(average_block_sizes, max_block_size);
112 size_t current_block_granularity;
113 /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
114 for (; !queue.empty(); ++current_pos)
115 {
116 SortCursor current = queue.top();
117 current_block_granularity = current->rows;
118
119 if (current_key.empty())
120 setPrimaryKeyRef(current_key, current);
121
122 Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
123 setPrimaryKeyRef(next_key, current);
124
125 bool key_differs = next_key != current_key;
126
127 /// if there are enough rows and the last one is calculated completely
128 if (key_differs && stop_condition.checkStop())
129 {
130 ++blocks_written;
131 return;
132 }
133
134 queue.pop();
135
136 if (key_differs)
137 {
138 /// We write data for the previous primary key.
139 insertRows(merged_columns, current_block_granularity, stop_condition);
140
141 current_key.swap(next_key);
142
143 count_negative = 0;
144 count_positive = 0;
145
146 current_pos = 0;
147 first_negative_pos = 0;
148 last_positive_pos = 0;
149 last_negative_pos = 0;
150 current_row_sources.resize(0);
151 }
152
153 /// Initially, skip all rows. On insert, unskip "corner" rows.
154 if (out_row_sources_buf)
155 current_row_sources.emplace_back(current.impl->order, true);
156
157 if (sign == 1)
158 {
159 ++count_positive;
160 last_is_positive = true;
161
162 setRowRef(last_positive, current);
163 last_positive_pos = current_pos;
164 }
165 else if (sign == -1)
166 {
167 if (!count_negative)
168 {
169 setRowRef(first_negative, current);
170 first_negative_pos = current_pos;
171 }
172
173 if (!blocks_written && stop_condition.empty())
174 {
175 setRowRef(last_negative, current);
176 last_negative_pos = current_pos;
177 }
178
179 ++count_negative;
180 last_is_positive = false;
181 }
182 else
183 throw Exception("Incorrect data: Sign = " + toString(sign) + " (must be 1 or -1).",
184 ErrorCodes::INCORRECT_DATA);
185
186 if (!current->isLast())
187 {
188 current->next();
189 queue.push(current);
190 }
191 else
192 {
193 /// We take next block from the corresponding source, if there is one.
194 fetchNextBlock(current, queue);
195 }
196 }
197
198 /// Write data for last primary key.
199 insertRows(merged_columns, /*some_granularity*/ 0, stop_condition);
200
201 finished = true;
202}
203
204}
205