1 | #include <Storages/MergeTree/MergeTreeIndexMinMax.h> |
2 | |
3 | #include <Interpreters/ExpressionActions.h> |
4 | #include <Interpreters/ExpressionAnalyzer.h> |
5 | #include <Interpreters/SyntaxAnalyzer.h> |
6 | |
7 | #include <Poco/Logger.h> |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | namespace ErrorCodes |
13 | { |
14 | extern const int LOGICAL_ERROR; |
15 | extern const int INCORRECT_QUERY; |
16 | } |
17 | |
18 | |
19 | MergeTreeIndexGranuleMinMax::MergeTreeIndexGranuleMinMax(const MergeTreeIndexMinMax & index_) |
20 | : IMergeTreeIndexGranule(), index(index_), parallelogram() {} |
21 | |
22 | MergeTreeIndexGranuleMinMax::MergeTreeIndexGranuleMinMax( |
23 | const MergeTreeIndexMinMax & index_, std::vector<Range> && parallelogram_) |
24 | : IMergeTreeIndexGranule(), index(index_), parallelogram(std::move(parallelogram_)) {} |
25 | |
26 | void MergeTreeIndexGranuleMinMax::serializeBinary(WriteBuffer & ostr) const |
27 | { |
28 | if (empty()) |
29 | throw Exception( |
30 | "Attempt to write empty minmax index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR); |
31 | |
32 | for (size_t i = 0; i < index.columns.size(); ++i) |
33 | { |
34 | const DataTypePtr & type = index.data_types[i]; |
35 | if (!type->isNullable()) |
36 | { |
37 | type->serializeBinary(parallelogram[i].left, ostr); |
38 | type->serializeBinary(parallelogram[i].right, ostr); |
39 | } |
40 | else |
41 | { |
42 | bool is_null = parallelogram[i].left.isNull() || parallelogram[i].right.isNull(); // one is enough |
43 | writeBinary(is_null, ostr); |
44 | if (!is_null) |
45 | { |
46 | type->serializeBinary(parallelogram[i].left, ostr); |
47 | type->serializeBinary(parallelogram[i].right, ostr); |
48 | } |
49 | } |
50 | } |
51 | } |
52 | |
53 | void MergeTreeIndexGranuleMinMax::deserializeBinary(ReadBuffer & istr) |
54 | { |
55 | parallelogram.clear(); |
56 | Field min_val; |
57 | Field max_val; |
58 | for (size_t i = 0; i < index.columns.size(); ++i) |
59 | { |
60 | const DataTypePtr & type = index.data_types[i]; |
61 | if (!type->isNullable()) |
62 | { |
63 | type->deserializeBinary(min_val, istr); |
64 | type->deserializeBinary(max_val, istr); |
65 | } |
66 | else |
67 | { |
68 | bool is_null; |
69 | readBinary(is_null, istr); |
70 | if (!is_null) |
71 | { |
72 | type->deserializeBinary(min_val, istr); |
73 | type->deserializeBinary(max_val, istr); |
74 | } |
75 | else |
76 | { |
77 | min_val = Null(); |
78 | max_val = Null(); |
79 | } |
80 | } |
81 | parallelogram.emplace_back(min_val, true, max_val, true); |
82 | } |
83 | } |
84 | |
85 | |
86 | MergeTreeIndexAggregatorMinMax::MergeTreeIndexAggregatorMinMax(const MergeTreeIndexMinMax & index_) |
87 | : index(index_) {} |
88 | |
89 | MergeTreeIndexGranulePtr MergeTreeIndexAggregatorMinMax::getGranuleAndReset() |
90 | { |
91 | return std::make_shared<MergeTreeIndexGranuleMinMax>(index, std::move(parallelogram)); |
92 | } |
93 | |
94 | void MergeTreeIndexAggregatorMinMax::update(const Block & block, size_t * pos, size_t limit) |
95 | { |
96 | if (*pos >= block.rows()) |
97 | throw Exception( |
98 | "The provided position is not less than the number of block rows. Position: " |
99 | + toString(*pos) + ", Block rows: " + toString(block.rows()) + "." , ErrorCodes::LOGICAL_ERROR); |
100 | |
101 | size_t rows_read = std::min(limit, block.rows() - *pos); |
102 | |
103 | Field field_min; |
104 | Field field_max; |
105 | for (size_t i = 0; i < index.columns.size(); ++i) |
106 | { |
107 | const auto & column = block.getByName(index.columns[i]).column; |
108 | column->cut(*pos, rows_read)->getExtremes(field_min, field_max); |
109 | |
110 | if (parallelogram.size() <= i) |
111 | { |
112 | parallelogram.emplace_back(field_min, true, field_max, true); |
113 | } |
114 | else |
115 | { |
116 | parallelogram[i].left = std::min(parallelogram[i].left, field_min); |
117 | parallelogram[i].right = std::max(parallelogram[i].right, field_max); |
118 | } |
119 | } |
120 | |
121 | *pos += rows_read; |
122 | } |
123 | |
124 | |
125 | MergeTreeIndexConditionMinMax::MergeTreeIndexConditionMinMax( |
126 | const SelectQueryInfo &query, |
127 | const Context &context, |
128 | const MergeTreeIndexMinMax &index_) |
129 | : IMergeTreeIndexCondition(), index(index_), condition(query, context, index.columns, index.expr) {} |
130 | |
131 | bool MergeTreeIndexConditionMinMax::alwaysUnknownOrTrue() const |
132 | { |
133 | return condition.alwaysUnknownOrTrue(); |
134 | } |
135 | |
136 | bool MergeTreeIndexConditionMinMax::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const |
137 | { |
138 | std::shared_ptr<MergeTreeIndexGranuleMinMax> granule |
139 | = std::dynamic_pointer_cast<MergeTreeIndexGranuleMinMax>(idx_granule); |
140 | if (!granule) |
141 | throw Exception( |
142 | "Minmax index condition got a granule with the wrong type." , ErrorCodes::LOGICAL_ERROR); |
143 | for (const auto & range : granule->parallelogram) |
144 | if (range.left.isNull() || range.right.isNull()) |
145 | return true; |
146 | return condition.mayBeTrueInParallelogram(granule->parallelogram, index.data_types); |
147 | } |
148 | |
149 | |
150 | MergeTreeIndexGranulePtr MergeTreeIndexMinMax::createIndexGranule() const |
151 | { |
152 | return std::make_shared<MergeTreeIndexGranuleMinMax>(*this); |
153 | } |
154 | |
155 | |
156 | MergeTreeIndexAggregatorPtr MergeTreeIndexMinMax::createIndexAggregator() const |
157 | { |
158 | return std::make_shared<MergeTreeIndexAggregatorMinMax>(*this); |
159 | } |
160 | |
161 | |
162 | MergeTreeIndexConditionPtr MergeTreeIndexMinMax::createIndexCondition( |
163 | const SelectQueryInfo & query, const Context & context) const |
164 | { |
165 | return std::make_shared<MergeTreeIndexConditionMinMax>(query, context, *this); |
166 | }; |
167 | |
168 | bool MergeTreeIndexMinMax::mayBenefitFromIndexForIn(const ASTPtr & node) const |
169 | { |
170 | const String column_name = node->getColumnName(); |
171 | |
172 | for (const auto & cname : columns) |
173 | if (column_name == cname) |
174 | return true; |
175 | |
176 | if (const auto * func = typeid_cast<const ASTFunction *>(node.get())) |
177 | if (func->arguments->children.size() == 1) |
178 | return mayBenefitFromIndexForIn(func->arguments->children.front()); |
179 | |
180 | return false; |
181 | } |
182 | |
183 | std::unique_ptr<IMergeTreeIndex> minmaxIndexCreator( |
184 | const NamesAndTypesList & new_columns, |
185 | std::shared_ptr<ASTIndexDeclaration> node, |
186 | const Context & context) |
187 | { |
188 | if (node->name.empty()) |
189 | throw Exception("Index must have unique name" , ErrorCodes::INCORRECT_QUERY); |
190 | |
191 | if (node->type->arguments) |
192 | throw Exception("Minmax index have not any arguments" , ErrorCodes::INCORRECT_QUERY); |
193 | |
194 | ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone()); |
195 | auto syntax = SyntaxAnalyzer(context, {}).analyze( |
196 | expr_list, new_columns); |
197 | auto minmax_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false); |
198 | |
199 | auto sample = ExpressionAnalyzer(expr_list, syntax, context) |
200 | .getActions(true)->getSampleBlock(); |
201 | |
202 | Names columns; |
203 | DataTypes data_types; |
204 | |
205 | for (size_t i = 0; i < expr_list->children.size(); ++i) |
206 | { |
207 | const auto & column = sample.getByPosition(i); |
208 | |
209 | columns.emplace_back(column.name); |
210 | data_types.emplace_back(column.type); |
211 | } |
212 | |
213 | return std::make_unique<MergeTreeIndexMinMax>( |
214 | node->name, std::move(minmax_expr), columns, data_types, sample, node->granularity); |
215 | } |
216 | |
217 | } |
218 | |