1#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
2#include <type_traits>
3
4
5namespace DB
6{
7
8namespace ErrorCodes
9{
10 extern const int NO_SUCH_COLUMN_IN_TABLE;
11 extern const int LOGICAL_ERROR;
12}
13
14
15GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream(
16 const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
17 const Graphite::Params & params_, time_t time_of_merge_)
18 : MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
19 params(params_), time_of_merge(time_of_merge_)
20{
21 size_t max_size_of_aggregate_state = 0;
22 size_t max_alignment_of_aggregate_state = 1;
23
24 for (const auto & pattern : params.patterns)
25 {
26 if (pattern.function)
27 {
28 max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData());
29 max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData());
30 }
31 }
32
33 place_for_aggregate_state.reset(max_size_of_aggregate_state, max_alignment_of_aggregate_state);
34
35 /// Memoize column numbers in block.
36 path_column_num = header.getPositionByName(params.path_column_name);
37 time_column_num = header.getPositionByName(params.time_column_name);
38 value_column_num = header.getPositionByName(params.value_column_name);
39 version_column_num = header.getPositionByName(params.version_column_name);
40
41 for (size_t i = 0; i < num_columns; ++i)
42 if (i != time_column_num && i != value_column_num && i != version_column_num)
43 unmodified_column_numbers.push_back(i);
44}
45
46
47Graphite::RollupRule GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
48{
49 const Graphite::Pattern * first_match = &undef_pattern;
50
51 for (const auto & pattern : params.patterns)
52 {
53 if (!pattern.regexp)
54 {
55 /// Default pattern
56 if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll)
57 {
58 /// There is only default pattern for both retention and aggregation
59 return std::pair(&pattern, &pattern);
60 }
61 if (pattern.type != first_match->type)
62 {
63 if (first_match->type == first_match->TypeRetention)
64 {
65 return std::pair(first_match, &pattern);
66 }
67 if (first_match->type == first_match->TypeAggregation)
68 {
69 return std::pair(&pattern, first_match);
70 }
71 }
72 }
73 else if (pattern.regexp->match(path.data, path.size))
74 {
75 /// General pattern with matched path
76 if (pattern.type == pattern.TypeAll)
77 {
78 /// Only for not default patterns with both function and retention parameters
79 return std::pair(&pattern, &pattern);
80 }
81 if (first_match->type == first_match->TypeUndef)
82 {
83 first_match = &pattern;
84 continue;
85 }
86 if (pattern.type != first_match->type)
87 {
88 if (first_match->type == first_match->TypeRetention)
89 {
90 return std::pair(first_match, &pattern);
91 }
92 if (first_match->type == first_match->TypeAggregation)
93 {
94 return std::pair(&pattern, first_match);
95 }
96 }
97 }
98 }
99
100 return {nullptr, nullptr};
101}
102
103
104UInt32 GraphiteRollupSortedBlockInputStream::selectPrecision(const Graphite::Retentions & retentions, time_t time) const
105{
106 static_assert(is_signed_v<time_t>, "time_t must be signed type");
107
108 for (const auto & retention : retentions)
109 {
110 if (time_of_merge - time >= static_cast<time_t>(retention.age))
111 return retention.precision;
112 }
113
114 /// No rounding.
115 return 1;
116}
117
118
119/** Round the unix timestamp to seconds precision.
120 * In this case, the date should not change. The date is calculated using the local time zone.
121 *
122 * If the rounding value is less than an hour,
123 * then, assuming that time zones that differ from UTC by a non-integer number of hours are not supported,
124 * just simply round the unix timestamp down to a multiple of 3600.
125 * And if the rounding value is greater,
126 * then we will round down the number of seconds from the beginning of the day in the local time zone.
127 *
128 * Rounding to more than a day is not supported.
129 */
130static time_t roundTimeToPrecision(const DateLUTImpl & date_lut, time_t time, UInt32 precision)
131{
132 if (precision <= 3600)
133 {
134 return time / precision * precision;
135 }
136 else
137 {
138 time_t date = date_lut.toDate(time);
139 time_t remainder = time - date;
140 return date + remainder / precision * precision;
141 }
142}
143
144
145Block GraphiteRollupSortedBlockInputStream::readImpl()
146{
147 if (finished)
148 return Block();
149
150 MutableColumns merged_columns;
151 init(merged_columns);
152
153 if (has_collation)
154 throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
155
156 if (merged_columns.empty())
157 return Block();
158
159 merge(merged_columns, queue_without_collation);
160 return header.cloneWithColumns(std::move(merged_columns));
161}
162
163
164void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
165{
166 const DateLUTImpl & date_lut = DateLUT::instance();
167
168 size_t started_rows = 0; /// Number of times startNextRow() has been called.
169
170 /// Take rows in needed order and put them into `merged_columns` until we get `max_block_size` rows.
171 ///
172 /// Variables starting with current_* refer to the rows previously popped from the queue that will
173 /// contribute towards current output row.
174 /// Variables starting with next_* refer to the row at the top of the queue.
175
176 while (!queue.empty())
177 {
178 SortCursor next_cursor = queue.top();
179
180 StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos);
181 bool new_path = is_first || next_path != current_group_path;
182
183 is_first = false;
184
185 time_t next_row_time = next_cursor->all_columns[time_column_num]->getUInt(next_cursor->pos);
186 /// Is new key before rounding.
187 bool is_new_key = new_path || next_row_time != current_time;
188
189 if (is_new_key)
190 {
191 /// Accumulate the row that has maximum version in the previous group of rows with the same key:
192 if (started_rows)
193 accumulateRow(current_subgroup_newest_row);
194
195 Graphite::RollupRule next_rule = current_rule;
196 if (new_path)
197 next_rule = selectPatternForPath(next_path);
198
199 const Graphite::RetentionPattern * retention_pattern = std::get<0>(next_rule);
200 time_t next_time_rounded;
201 if (retention_pattern)
202 {
203 UInt32 precision = selectPrecision(retention_pattern->retentions, next_row_time);
204 next_time_rounded = roundTimeToPrecision(date_lut, next_row_time, precision);
205 }
206 else
207 {
208 /// If no pattern has matched - take the value as-is.
209 next_time_rounded = next_row_time;
210 }
211
212 /// Key will be new after rounding. It means new result row.
213 bool will_be_new_key = new_path || next_time_rounded != current_time_rounded;
214
215 if (will_be_new_key)
216 {
217 if (started_rows)
218 {
219 finishCurrentGroup(merged_columns);
220
221 /// We have enough rows - return, but don't advance the loop. At the beginning of the
222 /// next call to merge() the same next_cursor will be processed once more and
223 /// the next output row will be created from it.
224 if (started_rows >= max_block_size)
225 return;
226 }
227
228 /// At this point previous row has been fully processed, so we can advance the loop
229 /// (substitute current_* values for next_*, advance the cursor).
230
231 startNextGroup(merged_columns, next_cursor, next_rule);
232 ++started_rows;
233
234 current_time_rounded = next_time_rounded;
235 }
236
237 current_time = next_row_time;
238 }
239
240 /// Within all rows with same key, we should leave only one row with maximum version;
241 /// and for rows with same maximum version - only last row.
242 if (is_new_key
243 || next_cursor->all_columns[version_column_num]->compareAt(
244 next_cursor->pos, current_subgroup_newest_row.row_num,
245 *(*current_subgroup_newest_row.columns)[version_column_num],
246 /* nan_direction_hint = */ 1) >= 0)
247 {
248 setRowRef(current_subgroup_newest_row, next_cursor);
249
250 /// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup
251 /// But since we keep in memory current_subgroup_newest_row's block, we could use StringRef for current_group_path and don't
252 /// make deep copy of the path.
253 current_group_path = next_path;
254 }
255
256 queue.pop();
257
258 if (!next_cursor->isLast())
259 {
260 next_cursor->next();
261 queue.push(next_cursor);
262 }
263 else
264 {
265 /// We get the next block from the appropriate source, if there is one.
266 fetchNextBlock(next_cursor, queue);
267 }
268 }
269
270 /// Write result row for the last group.
271 if (started_rows)
272 {
273 accumulateRow(current_subgroup_newest_row);
274 finishCurrentGroup(merged_columns);
275 }
276
277 finished = true;
278}
279
280
281template <typename TSortCursor>
282void GraphiteRollupSortedBlockInputStream::startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor,
283 Graphite::RollupRule next_rule)
284{
285 const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(next_rule);
286
287 /// Copy unmodified column values (including path column).
288 for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i)
289 {
290 size_t j = unmodified_column_numbers[i];
291 merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos);
292 }
293
294 if (aggregation_pattern)
295 {
296 aggregation_pattern->function->create(place_for_aggregate_state.data());
297 aggregate_state_created = true;
298 }
299
300 current_rule = next_rule;
301}
302
303
304void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & merged_columns)
305{
306 /// Insert calculated values of the columns `time`, `value`, `version`.
307 merged_columns[time_column_num]->insert(current_time_rounded);
308 merged_columns[version_column_num]->insertFrom(
309 *(*current_subgroup_newest_row.columns)[version_column_num], current_subgroup_newest_row.row_num);
310
311 const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
312 if (aggregate_state_created)
313 {
314 aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]);
315 aggregation_pattern->function->destroy(place_for_aggregate_state.data());
316 aggregate_state_created = false;
317 }
318 else
319 merged_columns[value_column_num]->insertFrom(
320 *(*current_subgroup_newest_row.columns)[value_column_num], current_subgroup_newest_row.row_num);
321}
322
323
324void GraphiteRollupSortedBlockInputStream::accumulateRow(SharedBlockRowRef & row)
325{
326 const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
327 if (aggregate_state_created)
328 aggregation_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
329}
330
331}
332