| 1 | #include <DataStreams/GraphiteRollupSortedBlockInputStream.h> |
| 2 | #include <type_traits> |
| 3 | |
| 4 | |
| 5 | namespace DB |
| 6 | { |
| 7 | |
| 8 | namespace ErrorCodes |
| 9 | { |
| 10 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
| 11 | extern const int LOGICAL_ERROR; |
| 12 | } |
| 13 | |
| 14 | |
| 15 | GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream( |
| 16 | const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, |
| 17 | const Graphite::Params & params_, time_t time_of_merge_) |
| 18 | : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), |
| 19 | params(params_), time_of_merge(time_of_merge_) |
| 20 | { |
| 21 | size_t max_size_of_aggregate_state = 0; |
| 22 | size_t max_alignment_of_aggregate_state = 1; |
| 23 | |
| 24 | for (const auto & pattern : params.patterns) |
| 25 | { |
| 26 | if (pattern.function) |
| 27 | { |
| 28 | max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData()); |
| 29 | max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData()); |
| 30 | } |
| 31 | } |
| 32 | |
| 33 | place_for_aggregate_state.reset(max_size_of_aggregate_state, max_alignment_of_aggregate_state); |
| 34 | |
| 35 | /// Memoize column numbers in block. |
| 36 | path_column_num = header.getPositionByName(params.path_column_name); |
| 37 | time_column_num = header.getPositionByName(params.time_column_name); |
| 38 | value_column_num = header.getPositionByName(params.value_column_name); |
| 39 | version_column_num = header.getPositionByName(params.version_column_name); |
| 40 | |
| 41 | for (size_t i = 0; i < num_columns; ++i) |
| 42 | if (i != time_column_num && i != value_column_num && i != version_column_num) |
| 43 | unmodified_column_numbers.push_back(i); |
| 44 | } |
| 45 | |
| 46 | |
| 47 | Graphite::RollupRule GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const |
| 48 | { |
| 49 | const Graphite::Pattern * first_match = &undef_pattern; |
| 50 | |
| 51 | for (const auto & pattern : params.patterns) |
| 52 | { |
| 53 | if (!pattern.regexp) |
| 54 | { |
| 55 | /// Default pattern |
| 56 | if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll) |
| 57 | { |
| 58 | /// There is only default pattern for both retention and aggregation |
| 59 | return std::pair(&pattern, &pattern); |
| 60 | } |
| 61 | if (pattern.type != first_match->type) |
| 62 | { |
| 63 | if (first_match->type == first_match->TypeRetention) |
| 64 | { |
| 65 | return std::pair(first_match, &pattern); |
| 66 | } |
| 67 | if (first_match->type == first_match->TypeAggregation) |
| 68 | { |
| 69 | return std::pair(&pattern, first_match); |
| 70 | } |
| 71 | } |
| 72 | } |
| 73 | else if (pattern.regexp->match(path.data, path.size)) |
| 74 | { |
| 75 | /// General pattern with matched path |
| 76 | if (pattern.type == pattern.TypeAll) |
| 77 | { |
| 78 | /// Only for not default patterns with both function and retention parameters |
| 79 | return std::pair(&pattern, &pattern); |
| 80 | } |
| 81 | if (first_match->type == first_match->TypeUndef) |
| 82 | { |
| 83 | first_match = &pattern; |
| 84 | continue; |
| 85 | } |
| 86 | if (pattern.type != first_match->type) |
| 87 | { |
| 88 | if (first_match->type == first_match->TypeRetention) |
| 89 | { |
| 90 | return std::pair(first_match, &pattern); |
| 91 | } |
| 92 | if (first_match->type == first_match->TypeAggregation) |
| 93 | { |
| 94 | return std::pair(&pattern, first_match); |
| 95 | } |
| 96 | } |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | return {nullptr, nullptr}; |
| 101 | } |
| 102 | |
| 103 | |
| 104 | UInt32 GraphiteRollupSortedBlockInputStream::selectPrecision(const Graphite::Retentions & retentions, time_t time) const |
| 105 | { |
| 106 | static_assert(is_signed_v<time_t>, "time_t must be signed type" ); |
| 107 | |
| 108 | for (const auto & retention : retentions) |
| 109 | { |
| 110 | if (time_of_merge - time >= static_cast<time_t>(retention.age)) |
| 111 | return retention.precision; |
| 112 | } |
| 113 | |
| 114 | /// No rounding. |
| 115 | return 1; |
| 116 | } |
| 117 | |
| 118 | |
| 119 | /** Round the unix timestamp to seconds precision. |
| 120 | * In this case, the date should not change. The date is calculated using the local time zone. |
| 121 | * |
| 122 | * If the rounding value is less than an hour, |
| 123 | * then, assuming that time zones that differ from UTC by a non-integer number of hours are not supported, |
| 124 | * just simply round the unix timestamp down to a multiple of 3600. |
| 125 | * And if the rounding value is greater, |
| 126 | * then we will round down the number of seconds from the beginning of the day in the local time zone. |
| 127 | * |
| 128 | * Rounding to more than a day is not supported. |
| 129 | */ |
| 130 | static time_t roundTimeToPrecision(const DateLUTImpl & date_lut, time_t time, UInt32 precision) |
| 131 | { |
| 132 | if (precision <= 3600) |
| 133 | { |
| 134 | return time / precision * precision; |
| 135 | } |
| 136 | else |
| 137 | { |
| 138 | time_t date = date_lut.toDate(time); |
| 139 | time_t remainder = time - date; |
| 140 | return date + remainder / precision * precision; |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | |
| 145 | Block GraphiteRollupSortedBlockInputStream::readImpl() |
| 146 | { |
| 147 | if (finished) |
| 148 | return Block(); |
| 149 | |
| 150 | MutableColumns merged_columns; |
| 151 | init(merged_columns); |
| 152 | |
| 153 | if (has_collation) |
| 154 | throw Exception("Logical error: " + getName() + " does not support collations" , ErrorCodes::LOGICAL_ERROR); |
| 155 | |
| 156 | if (merged_columns.empty()) |
| 157 | return Block(); |
| 158 | |
| 159 | merge(merged_columns, queue_without_collation); |
| 160 | return header.cloneWithColumns(std::move(merged_columns)); |
| 161 | } |
| 162 | |
| 163 | |
| 164 | void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) |
| 165 | { |
| 166 | const DateLUTImpl & date_lut = DateLUT::instance(); |
| 167 | |
| 168 | size_t started_rows = 0; /// Number of times startNextRow() has been called. |
| 169 | |
| 170 | /// Take rows in needed order and put them into `merged_columns` until we get `max_block_size` rows. |
| 171 | /// |
| 172 | /// Variables starting with current_* refer to the rows previously popped from the queue that will |
| 173 | /// contribute towards current output row. |
| 174 | /// Variables starting with next_* refer to the row at the top of the queue. |
| 175 | |
| 176 | while (!queue.empty()) |
| 177 | { |
| 178 | SortCursor next_cursor = queue.top(); |
| 179 | |
| 180 | StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos); |
| 181 | bool new_path = is_first || next_path != current_group_path; |
| 182 | |
| 183 | is_first = false; |
| 184 | |
| 185 | time_t next_row_time = next_cursor->all_columns[time_column_num]->getUInt(next_cursor->pos); |
| 186 | /// Is new key before rounding. |
| 187 | bool is_new_key = new_path || next_row_time != current_time; |
| 188 | |
| 189 | if (is_new_key) |
| 190 | { |
| 191 | /// Accumulate the row that has maximum version in the previous group of rows with the same key: |
| 192 | if (started_rows) |
| 193 | accumulateRow(current_subgroup_newest_row); |
| 194 | |
| 195 | Graphite::RollupRule next_rule = current_rule; |
| 196 | if (new_path) |
| 197 | next_rule = selectPatternForPath(next_path); |
| 198 | |
| 199 | const Graphite::RetentionPattern * retention_pattern = std::get<0>(next_rule); |
| 200 | time_t next_time_rounded; |
| 201 | if (retention_pattern) |
| 202 | { |
| 203 | UInt32 precision = selectPrecision(retention_pattern->retentions, next_row_time); |
| 204 | next_time_rounded = roundTimeToPrecision(date_lut, next_row_time, precision); |
| 205 | } |
| 206 | else |
| 207 | { |
| 208 | /// If no pattern has matched - take the value as-is. |
| 209 | next_time_rounded = next_row_time; |
| 210 | } |
| 211 | |
| 212 | /// Key will be new after rounding. It means new result row. |
| 213 | bool will_be_new_key = new_path || next_time_rounded != current_time_rounded; |
| 214 | |
| 215 | if (will_be_new_key) |
| 216 | { |
| 217 | if (started_rows) |
| 218 | { |
| 219 | finishCurrentGroup(merged_columns); |
| 220 | |
| 221 | /// We have enough rows - return, but don't advance the loop. At the beginning of the |
| 222 | /// next call to merge() the same next_cursor will be processed once more and |
| 223 | /// the next output row will be created from it. |
| 224 | if (started_rows >= max_block_size) |
| 225 | return; |
| 226 | } |
| 227 | |
| 228 | /// At this point previous row has been fully processed, so we can advance the loop |
| 229 | /// (substitute current_* values for next_*, advance the cursor). |
| 230 | |
| 231 | startNextGroup(merged_columns, next_cursor, next_rule); |
| 232 | ++started_rows; |
| 233 | |
| 234 | current_time_rounded = next_time_rounded; |
| 235 | } |
| 236 | |
| 237 | current_time = next_row_time; |
| 238 | } |
| 239 | |
| 240 | /// Within all rows with same key, we should leave only one row with maximum version; |
| 241 | /// and for rows with same maximum version - only last row. |
| 242 | if (is_new_key |
| 243 | || next_cursor->all_columns[version_column_num]->compareAt( |
| 244 | next_cursor->pos, current_subgroup_newest_row.row_num, |
| 245 | *(*current_subgroup_newest_row.columns)[version_column_num], |
| 246 | /* nan_direction_hint = */ 1) >= 0) |
| 247 | { |
| 248 | setRowRef(current_subgroup_newest_row, next_cursor); |
| 249 | |
| 250 | /// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup |
| 251 | /// But since we keep in memory current_subgroup_newest_row's block, we could use StringRef for current_group_path and don't |
| 252 | /// make deep copy of the path. |
| 253 | current_group_path = next_path; |
| 254 | } |
| 255 | |
| 256 | queue.pop(); |
| 257 | |
| 258 | if (!next_cursor->isLast()) |
| 259 | { |
| 260 | next_cursor->next(); |
| 261 | queue.push(next_cursor); |
| 262 | } |
| 263 | else |
| 264 | { |
| 265 | /// We get the next block from the appropriate source, if there is one. |
| 266 | fetchNextBlock(next_cursor, queue); |
| 267 | } |
| 268 | } |
| 269 | |
| 270 | /// Write result row for the last group. |
| 271 | if (started_rows) |
| 272 | { |
| 273 | accumulateRow(current_subgroup_newest_row); |
| 274 | finishCurrentGroup(merged_columns); |
| 275 | } |
| 276 | |
| 277 | finished = true; |
| 278 | } |
| 279 | |
| 280 | |
| 281 | template <typename TSortCursor> |
| 282 | void GraphiteRollupSortedBlockInputStream::startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor, |
| 283 | Graphite::RollupRule next_rule) |
| 284 | { |
| 285 | const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(next_rule); |
| 286 | |
| 287 | /// Copy unmodified column values (including path column). |
| 288 | for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i) |
| 289 | { |
| 290 | size_t j = unmodified_column_numbers[i]; |
| 291 | merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos); |
| 292 | } |
| 293 | |
| 294 | if (aggregation_pattern) |
| 295 | { |
| 296 | aggregation_pattern->function->create(place_for_aggregate_state.data()); |
| 297 | aggregate_state_created = true; |
| 298 | } |
| 299 | |
| 300 | current_rule = next_rule; |
| 301 | } |
| 302 | |
| 303 | |
| 304 | void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & merged_columns) |
| 305 | { |
| 306 | /// Insert calculated values of the columns `time`, `value`, `version`. |
| 307 | merged_columns[time_column_num]->insert(current_time_rounded); |
| 308 | merged_columns[version_column_num]->insertFrom( |
| 309 | *(*current_subgroup_newest_row.columns)[version_column_num], current_subgroup_newest_row.row_num); |
| 310 | |
| 311 | const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule); |
| 312 | if (aggregate_state_created) |
| 313 | { |
| 314 | aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]); |
| 315 | aggregation_pattern->function->destroy(place_for_aggregate_state.data()); |
| 316 | aggregate_state_created = false; |
| 317 | } |
| 318 | else |
| 319 | merged_columns[value_column_num]->insertFrom( |
| 320 | *(*current_subgroup_newest_row.columns)[value_column_num], current_subgroup_newest_row.row_num); |
| 321 | } |
| 322 | |
| 323 | |
| 324 | void GraphiteRollupSortedBlockInputStream::accumulateRow(SharedBlockRowRef & row) |
| 325 | { |
| 326 | const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule); |
| 327 | if (aggregate_state_created) |
| 328 | aggregation_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr); |
| 329 | } |
| 330 | |
| 331 | } |
| 332 | |