1 | #include <DataStreams/GraphiteRollupSortedBlockInputStream.h> |
2 | #include <type_traits> |
3 | |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | namespace ErrorCodes |
9 | { |
10 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
11 | extern const int LOGICAL_ERROR; |
12 | } |
13 | |
14 | |
15 | GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream( |
16 | const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, |
17 | const Graphite::Params & params_, time_t time_of_merge_) |
18 | : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), |
19 | params(params_), time_of_merge(time_of_merge_) |
20 | { |
21 | size_t max_size_of_aggregate_state = 0; |
22 | size_t max_alignment_of_aggregate_state = 1; |
23 | |
24 | for (const auto & pattern : params.patterns) |
25 | { |
26 | if (pattern.function) |
27 | { |
28 | max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData()); |
29 | max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData()); |
30 | } |
31 | } |
32 | |
33 | place_for_aggregate_state.reset(max_size_of_aggregate_state, max_alignment_of_aggregate_state); |
34 | |
35 | /// Memoize column numbers in block. |
36 | path_column_num = header.getPositionByName(params.path_column_name); |
37 | time_column_num = header.getPositionByName(params.time_column_name); |
38 | value_column_num = header.getPositionByName(params.value_column_name); |
39 | version_column_num = header.getPositionByName(params.version_column_name); |
40 | |
41 | for (size_t i = 0; i < num_columns; ++i) |
42 | if (i != time_column_num && i != value_column_num && i != version_column_num) |
43 | unmodified_column_numbers.push_back(i); |
44 | } |
45 | |
46 | |
47 | Graphite::RollupRule GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const |
48 | { |
49 | const Graphite::Pattern * first_match = &undef_pattern; |
50 | |
51 | for (const auto & pattern : params.patterns) |
52 | { |
53 | if (!pattern.regexp) |
54 | { |
55 | /// Default pattern |
56 | if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll) |
57 | { |
58 | /// There is only default pattern for both retention and aggregation |
59 | return std::pair(&pattern, &pattern); |
60 | } |
61 | if (pattern.type != first_match->type) |
62 | { |
63 | if (first_match->type == first_match->TypeRetention) |
64 | { |
65 | return std::pair(first_match, &pattern); |
66 | } |
67 | if (first_match->type == first_match->TypeAggregation) |
68 | { |
69 | return std::pair(&pattern, first_match); |
70 | } |
71 | } |
72 | } |
73 | else if (pattern.regexp->match(path.data, path.size)) |
74 | { |
75 | /// General pattern with matched path |
76 | if (pattern.type == pattern.TypeAll) |
77 | { |
78 | /// Only for not default patterns with both function and retention parameters |
79 | return std::pair(&pattern, &pattern); |
80 | } |
81 | if (first_match->type == first_match->TypeUndef) |
82 | { |
83 | first_match = &pattern; |
84 | continue; |
85 | } |
86 | if (pattern.type != first_match->type) |
87 | { |
88 | if (first_match->type == first_match->TypeRetention) |
89 | { |
90 | return std::pair(first_match, &pattern); |
91 | } |
92 | if (first_match->type == first_match->TypeAggregation) |
93 | { |
94 | return std::pair(&pattern, first_match); |
95 | } |
96 | } |
97 | } |
98 | } |
99 | |
100 | return {nullptr, nullptr}; |
101 | } |
102 | |
103 | |
104 | UInt32 GraphiteRollupSortedBlockInputStream::selectPrecision(const Graphite::Retentions & retentions, time_t time) const |
105 | { |
106 | static_assert(is_signed_v<time_t>, "time_t must be signed type" ); |
107 | |
108 | for (const auto & retention : retentions) |
109 | { |
110 | if (time_of_merge - time >= static_cast<time_t>(retention.age)) |
111 | return retention.precision; |
112 | } |
113 | |
114 | /// No rounding. |
115 | return 1; |
116 | } |
117 | |
118 | |
119 | /** Round the unix timestamp to seconds precision. |
120 | * In this case, the date should not change. The date is calculated using the local time zone. |
121 | * |
122 | * If the rounding value is less than an hour, |
123 | * then, assuming that time zones that differ from UTC by a non-integer number of hours are not supported, |
124 | * just simply round the unix timestamp down to a multiple of 3600. |
125 | * And if the rounding value is greater, |
126 | * then we will round down the number of seconds from the beginning of the day in the local time zone. |
127 | * |
128 | * Rounding to more than a day is not supported. |
129 | */ |
130 | static time_t roundTimeToPrecision(const DateLUTImpl & date_lut, time_t time, UInt32 precision) |
131 | { |
132 | if (precision <= 3600) |
133 | { |
134 | return time / precision * precision; |
135 | } |
136 | else |
137 | { |
138 | time_t date = date_lut.toDate(time); |
139 | time_t remainder = time - date; |
140 | return date + remainder / precision * precision; |
141 | } |
142 | } |
143 | |
144 | |
145 | Block GraphiteRollupSortedBlockInputStream::readImpl() |
146 | { |
147 | if (finished) |
148 | return Block(); |
149 | |
150 | MutableColumns merged_columns; |
151 | init(merged_columns); |
152 | |
153 | if (has_collation) |
154 | throw Exception("Logical error: " + getName() + " does not support collations" , ErrorCodes::LOGICAL_ERROR); |
155 | |
156 | if (merged_columns.empty()) |
157 | return Block(); |
158 | |
159 | merge(merged_columns, queue_without_collation); |
160 | return header.cloneWithColumns(std::move(merged_columns)); |
161 | } |
162 | |
163 | |
164 | void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) |
165 | { |
166 | const DateLUTImpl & date_lut = DateLUT::instance(); |
167 | |
168 | size_t started_rows = 0; /// Number of times startNextRow() has been called. |
169 | |
170 | /// Take rows in needed order and put them into `merged_columns` until we get `max_block_size` rows. |
171 | /// |
172 | /// Variables starting with current_* refer to the rows previously popped from the queue that will |
173 | /// contribute towards current output row. |
174 | /// Variables starting with next_* refer to the row at the top of the queue. |
175 | |
176 | while (!queue.empty()) |
177 | { |
178 | SortCursor next_cursor = queue.top(); |
179 | |
180 | StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos); |
181 | bool new_path = is_first || next_path != current_group_path; |
182 | |
183 | is_first = false; |
184 | |
185 | time_t next_row_time = next_cursor->all_columns[time_column_num]->getUInt(next_cursor->pos); |
186 | /// Is new key before rounding. |
187 | bool is_new_key = new_path || next_row_time != current_time; |
188 | |
189 | if (is_new_key) |
190 | { |
191 | /// Accumulate the row that has maximum version in the previous group of rows with the same key: |
192 | if (started_rows) |
193 | accumulateRow(current_subgroup_newest_row); |
194 | |
195 | Graphite::RollupRule next_rule = current_rule; |
196 | if (new_path) |
197 | next_rule = selectPatternForPath(next_path); |
198 | |
199 | const Graphite::RetentionPattern * retention_pattern = std::get<0>(next_rule); |
200 | time_t next_time_rounded; |
201 | if (retention_pattern) |
202 | { |
203 | UInt32 precision = selectPrecision(retention_pattern->retentions, next_row_time); |
204 | next_time_rounded = roundTimeToPrecision(date_lut, next_row_time, precision); |
205 | } |
206 | else |
207 | { |
208 | /// If no pattern has matched - take the value as-is. |
209 | next_time_rounded = next_row_time; |
210 | } |
211 | |
212 | /// Key will be new after rounding. It means new result row. |
213 | bool will_be_new_key = new_path || next_time_rounded != current_time_rounded; |
214 | |
215 | if (will_be_new_key) |
216 | { |
217 | if (started_rows) |
218 | { |
219 | finishCurrentGroup(merged_columns); |
220 | |
221 | /// We have enough rows - return, but don't advance the loop. At the beginning of the |
222 | /// next call to merge() the same next_cursor will be processed once more and |
223 | /// the next output row will be created from it. |
224 | if (started_rows >= max_block_size) |
225 | return; |
226 | } |
227 | |
228 | /// At this point previous row has been fully processed, so we can advance the loop |
229 | /// (substitute current_* values for next_*, advance the cursor). |
230 | |
231 | startNextGroup(merged_columns, next_cursor, next_rule); |
232 | ++started_rows; |
233 | |
234 | current_time_rounded = next_time_rounded; |
235 | } |
236 | |
237 | current_time = next_row_time; |
238 | } |
239 | |
240 | /// Within all rows with same key, we should leave only one row with maximum version; |
241 | /// and for rows with same maximum version - only last row. |
242 | if (is_new_key |
243 | || next_cursor->all_columns[version_column_num]->compareAt( |
244 | next_cursor->pos, current_subgroup_newest_row.row_num, |
245 | *(*current_subgroup_newest_row.columns)[version_column_num], |
246 | /* nan_direction_hint = */ 1) >= 0) |
247 | { |
248 | setRowRef(current_subgroup_newest_row, next_cursor); |
249 | |
250 | /// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup |
251 | /// But since we keep in memory current_subgroup_newest_row's block, we could use StringRef for current_group_path and don't |
252 | /// make deep copy of the path. |
253 | current_group_path = next_path; |
254 | } |
255 | |
256 | queue.pop(); |
257 | |
258 | if (!next_cursor->isLast()) |
259 | { |
260 | next_cursor->next(); |
261 | queue.push(next_cursor); |
262 | } |
263 | else |
264 | { |
265 | /// We get the next block from the appropriate source, if there is one. |
266 | fetchNextBlock(next_cursor, queue); |
267 | } |
268 | } |
269 | |
270 | /// Write result row for the last group. |
271 | if (started_rows) |
272 | { |
273 | accumulateRow(current_subgroup_newest_row); |
274 | finishCurrentGroup(merged_columns); |
275 | } |
276 | |
277 | finished = true; |
278 | } |
279 | |
280 | |
281 | template <typename TSortCursor> |
282 | void GraphiteRollupSortedBlockInputStream::startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor, |
283 | Graphite::RollupRule next_rule) |
284 | { |
285 | const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(next_rule); |
286 | |
287 | /// Copy unmodified column values (including path column). |
288 | for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i) |
289 | { |
290 | size_t j = unmodified_column_numbers[i]; |
291 | merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos); |
292 | } |
293 | |
294 | if (aggregation_pattern) |
295 | { |
296 | aggregation_pattern->function->create(place_for_aggregate_state.data()); |
297 | aggregate_state_created = true; |
298 | } |
299 | |
300 | current_rule = next_rule; |
301 | } |
302 | |
303 | |
304 | void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & merged_columns) |
305 | { |
306 | /// Insert calculated values of the columns `time`, `value`, `version`. |
307 | merged_columns[time_column_num]->insert(current_time_rounded); |
308 | merged_columns[version_column_num]->insertFrom( |
309 | *(*current_subgroup_newest_row.columns)[version_column_num], current_subgroup_newest_row.row_num); |
310 | |
311 | const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule); |
312 | if (aggregate_state_created) |
313 | { |
314 | aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]); |
315 | aggregation_pattern->function->destroy(place_for_aggregate_state.data()); |
316 | aggregate_state_created = false; |
317 | } |
318 | else |
319 | merged_columns[value_column_num]->insertFrom( |
320 | *(*current_subgroup_newest_row.columns)[value_column_num], current_subgroup_newest_row.row_num); |
321 | } |
322 | |
323 | |
324 | void GraphiteRollupSortedBlockInputStream::accumulateRow(SharedBlockRowRef & row) |
325 | { |
326 | const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule); |
327 | if (aggregate_state_created) |
328 | aggregation_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr); |
329 | } |
330 | |
331 | } |
332 | |