1#pragma once
2
3#include <common/logger_useful.h>
4
5#include <Core/ColumnNumbers.h>
6#include <DataStreams/MergingSortedBlockInputStream.h>
7#include <AggregateFunctions/IAggregateFunction.h>
8#include <Columns/ColumnAggregateFunction.h>
9#include <Common/OptimizedRegularExpression.h>
10#include <Common/AlignedBuffer.h>
11
12
13namespace DB
14{
15
16/** Intended for implementation of "rollup" - aggregation (rounding) of older data
17 * for a table with Graphite data (Graphite is the system for time series monitoring).
18 *
19 * Table with graphite data has at least the following columns (accurate to the name):
20 * Path, Time, Value, Version
21 *
22 * Path - name of metric (sensor);
23 * Time - time of measurement;
24 * Value - value of measurement;
25 * Version - a number, that for equal pairs of Path and Time, need to leave only record with maximum version.
26 *
27 * Each row in a table correspond to one value of one sensor.
28 *
29 * Pattern should contain function, retention scheme, or both of them. The order of patterns does mean as well:
30 * * Aggregation OR retention patterns should be first
31 * * Then aggregation AND retention full patterns have to be placed
32 * * default pattern without regexp must be the last
33 *
34 * Rollup rules are specified in the following way:
35 *
36 * pattern
37 * regexp
38 * function
39 * pattern
40 * regexp
41 * age -> precision
42 * age -> precision
43 * ...
44 * pattern
45 * regexp
46 * function
47 * age -> precision
48 * age -> precision
49 * ...
50 * pattern
51 * ...
52 * default
53 * function
54 * age -> precision
55 * ...
56 *
57 * regexp - pattern for sensor name
58 * default - if no pattern has matched
59 *
60 * age - minimal data age (in seconds), to start rounding with specified precision.
61 * precision - rounding precision (in seconds)
62 *
63 * function - name of aggregate function to be applied for values, that time was rounded to same.
64 *
65 * Example:
66 *
67 * <graphite_rollup>
68 * <pattern>
69 * <regexp>\.max$</regexp>
70 * <function>max</function>
71 * </pattern>
72 * <pattern>
73 * <regexp>click_cost</regexp>
74 * <function>any</function>
75 * <retention>
76 * <age>0</age>
77 * <precision>5</precision>
78 * </retention>
79 * <retention>
80 * <age>86400</age>
81 * <precision>60</precision>
82 * </retention>
83 * </pattern>
84 * <default>
85 * <function>max</function>
86 * <retention>
87 * <age>0</age>
88 * <precision>60</precision>
89 * </retention>
90 * <retention>
91 * <age>3600</age>
92 * <precision>300</precision>
93 * </retention>
94 * <retention>
95 * <age>86400</age>
96 * <precision>3600</precision>
97 * </retention>
98 * </default>
99 * </graphite_rollup>
100 */
101
102namespace Graphite
103{
104 struct Retention
105 {
106 UInt32 age;
107 UInt32 precision;
108 };
109
110 using Retentions = std::vector<Retention>;
111
112 struct Pattern
113 {
114 std::shared_ptr<OptimizedRegularExpression> regexp;
115 std::string regexp_str;
116 AggregateFunctionPtr function;
117 Retentions retentions; /// Must be ordered by 'age' descending.
118 enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
119 };
120
121 using Patterns = std::vector<Pattern>;
122 using RetentionPattern = Pattern;
123 using AggregationPattern = Pattern;
124
125 struct Params
126 {
127 String config_name;
128 String path_column_name;
129 String time_column_name;
130 String value_column_name;
131 String version_column_name;
132 Graphite::Patterns patterns;
133 };
134
135 using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;
136}
137
138/** Merges several sorted streams into one.
139 *
140 * For each group of consecutive identical values of the `path` column,
141 * and the same `time` values, rounded to some precision
142 * (where rounding accuracy depends on the template set for `path`
143 * and the amount of time elapsed from `time` to the specified time),
144 * keeps one line,
145 * performing the rounding of time,
146 * merge `value` values using the specified aggregate functions,
147 * as well as keeping the maximum value of the `version` column.
148 */
149class GraphiteRollupSortedBlockInputStream : public MergingSortedBlockInputStream
150{
151public:
152 GraphiteRollupSortedBlockInputStream(
153 const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
154 const Graphite::Params & params_, time_t time_of_merge_);
155
156 String getName() const override { return "GraphiteRollupSorted"; }
157
158 ~GraphiteRollupSortedBlockInputStream() override
159 {
160 if (aggregate_state_created)
161 std::get<1>(current_rule)->function->destroy(place_for_aggregate_state.data());
162 }
163
164protected:
165 Block readImpl() override;
166
167private:
168 Logger * log = &Logger::get("GraphiteRollupSortedBlockInputStream");
169
170 const Graphite::Params params;
171
172 size_t path_column_num;
173 size_t time_column_num;
174 size_t value_column_num;
175 size_t version_column_num;
176
177 /// All columns other than 'time', 'value', 'version'. They are unmodified during rollup.
178 ColumnNumbers unmodified_column_numbers;
179
180 time_t time_of_merge;
181
182 /// No data has been read.
183 bool is_first = true;
184
185 /// All data has been read.
186 bool finished = false;
187
188 /* | path | time | rounded_time | version | value | unmodified |
189 * -----------------------------------------------------------------------------------
190 * | A | 11 | 10 | 1 | 1 | a | |
191 * | A | 11 | 10 | 3 | 2 | b |> subgroup(A, 11) |
192 * | A | 11 | 10 | 2 | 3 | c | |> group(A, 10)
193 * ----------------------------------------------------------------------------------|>
194 * | A | 12 | 10 | 0 | 4 | d | |> Outputs (A, 10, avg(2, 5), a)
195 * | A | 12 | 10 | 1 | 5 | e |> subgroup(A, 12) |
196 * -----------------------------------------------------------------------------------
197 * | A | 21 | 20 | 1 | 6 | f |
198 * | B | 11 | 10 | 1 | 7 | g |
199 * ...
200 */
201
202 /// Path name of current bucket
203 StringRef current_group_path;
204
205 /// Last row with maximum version for current primary key (time bucket).
206 SharedBlockRowRef current_subgroup_newest_row;
207
208 /// Time of last read row
209 time_t current_time = 0;
210 time_t current_time_rounded = 0;
211
212 Graphite::RollupRule current_rule = {nullptr, nullptr};
213 AlignedBuffer place_for_aggregate_state;
214 bool aggregate_state_created = false; /// Invariant: if true then current_rule is not NULL.
215
216 const Graphite::Pattern undef_pattern =
217 { /// temporary empty pattern for selectPatternForPath
218 nullptr,
219 "",
220 nullptr,
221 DB::Graphite::Retentions(),
222 undef_pattern.TypeUndef,
223 };
224 Graphite::RollupRule selectPatternForPath(StringRef path) const;
225 UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
226
227
228 void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
229
230 /// Insert the values into the resulting columns, which will not be changed in the future.
231 template <typename TSortCursor>
232 void startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor, Graphite::RollupRule next_pattern);
233
234 /// Insert the calculated `time`, `value`, `version` values into the resulting columns by the last group of rows.
235 void finishCurrentGroup(MutableColumns & merged_columns);
236
237 /// Update the state of the aggregate function with the new `value`.
238 void accumulateRow(SharedBlockRowRef & row);
239};
240
241}
242