1 | #include <Storages/MergeTree/MergeTreeIndexSet.h> |
2 | |
3 | #include <Interpreters/ExpressionActions.h> |
4 | #include <Interpreters/ExpressionAnalyzer.h> |
5 | #include <Interpreters/SyntaxAnalyzer.h> |
6 | |
7 | #include <Parsers/ASTIdentifier.h> |
8 | #include <Parsers/ASTFunction.h> |
9 | #include <Parsers/ASTLiteral.h> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | |
15 | namespace ErrorCodes |
16 | { |
17 | extern const int INCORRECT_QUERY; |
18 | } |
19 | |
20 | /// 0b11 -- can be true and false at the same time |
21 | const Field UNKNOWN_FIELD(3u); |
22 | |
23 | |
24 | MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(const MergeTreeIndexSet & index_) |
25 | : IMergeTreeIndexGranule() |
26 | , index(index_) |
27 | , block(index.header.cloneEmpty()) {} |
28 | |
29 | MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet( |
30 | const MergeTreeIndexSet & index_, MutableColumns && mutable_columns_) |
31 | : IMergeTreeIndexGranule() |
32 | , index(index_) |
33 | , block(index.header.cloneWithColumns(std::move(mutable_columns_))) {} |
34 | |
35 | void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const |
36 | { |
37 | if (empty()) |
38 | throw Exception( |
39 | "Attempt to write empty set index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR); |
40 | |
41 | const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>()); |
42 | |
43 | if (index.max_rows && size() > index.max_rows) |
44 | { |
45 | size_type->serializeBinary(0, ostr); |
46 | return; |
47 | } |
48 | |
49 | size_type->serializeBinary(size(), ostr); |
50 | |
51 | for (size_t i = 0; i < index.columns.size(); ++i) |
52 | { |
53 | const auto & type = index.data_types[i]; |
54 | |
55 | IDataType::SerializeBinaryBulkSettings settings; |
56 | settings.getter = [&ostr](IDataType::SubstreamPath) -> WriteBuffer * { return &ostr; }; |
57 | settings.position_independent_encoding = false; |
58 | settings.low_cardinality_max_dictionary_size = 0; |
59 | |
60 | IDataType::SerializeBinaryBulkStatePtr state; |
61 | type->serializeBinaryBulkStatePrefix(settings, state); |
62 | type->serializeBinaryBulkWithMultipleStreams(*block.getByPosition(i).column, 0, size(), settings, state); |
63 | type->serializeBinaryBulkStateSuffix(settings, state); |
64 | } |
65 | } |
66 | |
67 | void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr) |
68 | { |
69 | block.clear(); |
70 | |
71 | Field field_rows; |
72 | const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>()); |
73 | size_type->deserializeBinary(field_rows, istr); |
74 | size_t rows_to_read = field_rows.get<size_t>(); |
75 | |
76 | if (rows_to_read == 0) |
77 | return; |
78 | |
79 | for (size_t i = 0; i < index.columns.size(); ++i) |
80 | { |
81 | const auto & type = index.data_types[i]; |
82 | auto new_column = type->createColumn(); |
83 | |
84 | IDataType::DeserializeBinaryBulkSettings settings; |
85 | settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; }; |
86 | settings.position_independent_encoding = false; |
87 | |
88 | IDataType::DeserializeBinaryBulkStatePtr state; |
89 | type->deserializeBinaryBulkStatePrefix(settings, state); |
90 | type->deserializeBinaryBulkWithMultipleStreams(*new_column, rows_to_read, settings, state); |
91 | |
92 | block.insert(ColumnWithTypeAndName(new_column->getPtr(), type, index.columns[i])); |
93 | } |
94 | } |
95 | |
96 | |
97 | MergeTreeIndexAggregatorSet::MergeTreeIndexAggregatorSet(const MergeTreeIndexSet & index_) |
98 | : index(index_), columns(index.header.cloneEmptyColumns()) |
99 | { |
100 | ColumnRawPtrs column_ptrs; |
101 | column_ptrs.reserve(index.columns.size()); |
102 | Columns materialized_columns; |
103 | for (const auto & column : index.header.getColumns()) |
104 | { |
105 | materialized_columns.emplace_back(column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality()); |
106 | column_ptrs.emplace_back(materialized_columns.back().get()); |
107 | } |
108 | |
109 | data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes)); |
110 | |
111 | columns = index.header.cloneEmptyColumns(); |
112 | } |
113 | |
114 | void MergeTreeIndexAggregatorSet::update(const Block & block, size_t * pos, size_t limit) |
115 | { |
116 | if (*pos >= block.rows()) |
117 | throw Exception( |
118 | "The provided position is not less than the number of block rows. Position: " |
119 | + toString(*pos) + ", Block rows: " + toString(block.rows()) + "." , ErrorCodes::LOGICAL_ERROR); |
120 | |
121 | size_t rows_read = std::min(limit, block.rows() - *pos); |
122 | |
123 | if (index.max_rows && size() > index.max_rows) |
124 | { |
125 | *pos += rows_read; |
126 | return; |
127 | } |
128 | |
129 | ColumnRawPtrs index_column_ptrs; |
130 | index_column_ptrs.reserve(index.columns.size()); |
131 | Columns materialized_columns; |
132 | for (const auto & column_name : index.columns) |
133 | { |
134 | materialized_columns.emplace_back( |
135 | block.getByName(column_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality()); |
136 | index_column_ptrs.emplace_back(materialized_columns.back().get()); |
137 | } |
138 | |
139 | IColumn::Filter filter(block.rows(), 0); |
140 | |
141 | bool has_new_data = false; |
142 | switch (data.type) |
143 | { |
144 | case ClearableSetVariants::Type::EMPTY: |
145 | break; |
146 | #define M(NAME) \ |
147 | case ClearableSetVariants::Type::NAME: \ |
148 | has_new_data = buildFilter(*data.NAME, index_column_ptrs, filter, *pos, rows_read, data); \ |
149 | break; |
150 | APPLY_FOR_SET_VARIANTS(M) |
151 | #undef M |
152 | } |
153 | |
154 | if (has_new_data) |
155 | { |
156 | for (size_t i = 0; i < columns.size(); ++i) |
157 | { |
158 | auto filtered_column = block.getByName(index.columns[i]).column->filter(filter, block.rows()); |
159 | columns[i]->insertRangeFrom(*filtered_column, 0, filtered_column->size()); |
160 | } |
161 | } |
162 | |
163 | *pos += rows_read; |
164 | } |
165 | |
166 | template <typename Method> |
167 | bool MergeTreeIndexAggregatorSet::buildFilter( |
168 | Method & method, |
169 | const ColumnRawPtrs & column_ptrs, |
170 | IColumn::Filter & filter, |
171 | size_t pos, |
172 | size_t limit, |
173 | ClearableSetVariants & variants) const |
174 | { |
175 | /// Like DistinctSortedBlockInputStream. |
176 | typename Method::State state(column_ptrs, key_sizes, nullptr); |
177 | |
178 | bool has_new_data = false; |
179 | for (size_t i = 0; i < limit; ++i) |
180 | { |
181 | auto emplace_result = state.emplaceKey(method.data, pos + i, variants.string_pool); |
182 | |
183 | if (emplace_result.isInserted()) |
184 | has_new_data = true; |
185 | |
186 | /// Emit the record if there is no such key in the current set yet. |
187 | /// Skip it otherwise. |
188 | filter[pos + i] = emplace_result.isInserted(); |
189 | } |
190 | return has_new_data; |
191 | } |
192 | |
193 | MergeTreeIndexGranulePtr MergeTreeIndexAggregatorSet::getGranuleAndReset() |
194 | { |
195 | auto granule = std::make_shared<MergeTreeIndexGranuleSet>(index, std::move(columns)); |
196 | |
197 | switch (data.type) |
198 | { |
199 | case ClearableSetVariants::Type::EMPTY: |
200 | break; |
201 | #define M(NAME) \ |
202 | case ClearableSetVariants::Type::NAME: \ |
203 | data.NAME->data.clear(); \ |
204 | break; |
205 | APPLY_FOR_SET_VARIANTS(M) |
206 | #undef M |
207 | } |
208 | |
209 | columns = index.header.cloneEmptyColumns(); |
210 | |
211 | return granule; |
212 | } |
213 | |
214 | |
215 | MergeTreeIndexConditionSet::MergeTreeIndexConditionSet( |
216 | const SelectQueryInfo & query, |
217 | const Context & context, |
218 | const MergeTreeIndexSet &index_) |
219 | : IMergeTreeIndexCondition(), index(index_) |
220 | { |
221 | for (size_t i = 0, size = index.columns.size(); i < size; ++i) |
222 | { |
223 | std::string name = index.columns[i]; |
224 | if (!key_columns.count(name)) |
225 | key_columns.insert(name); |
226 | } |
227 | |
228 | const auto & select = query.query->as<ASTSelectQuery &>(); |
229 | |
230 | if (select.where() && select.prewhere()) |
231 | expression_ast = makeASTFunction( |
232 | "and" , |
233 | select.where()->clone(), |
234 | select.prewhere()->clone()); |
235 | else if (select.where()) |
236 | expression_ast = select.where()->clone(); |
237 | else if (select.prewhere()) |
238 | expression_ast = select.prewhere()->clone(); |
239 | else |
240 | expression_ast = std::make_shared<ASTLiteral>(UNKNOWN_FIELD); |
241 | |
242 | useless = checkASTUseless(expression_ast); |
243 | /// Do not proceed if index is useless for this query. |
244 | if (useless) |
245 | return; |
246 | |
247 | /// Replace logical functions with bit functions. |
248 | /// Working with UInt8: last bit = can be true, previous = can be false (Like dbms/src/Storages/MergeTree/BoolMask.h). |
249 | traverseAST(expression_ast); |
250 | |
251 | auto syntax_analyzer_result = SyntaxAnalyzer(context, {}).analyze( |
252 | expression_ast, index.header.getNamesAndTypesList()); |
253 | actions = ExpressionAnalyzer(expression_ast, syntax_analyzer_result, context).getActions(true); |
254 | } |
255 | |
256 | bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const |
257 | { |
258 | return useless; |
259 | } |
260 | |
261 | bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const |
262 | { |
263 | auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule); |
264 | if (!granule) |
265 | throw Exception( |
266 | "Set index condition got a granule with the wrong type." , ErrorCodes::LOGICAL_ERROR); |
267 | |
268 | if (useless || !granule->size() || (index.max_rows && granule->size() > index.max_rows)) |
269 | return true; |
270 | |
271 | Block result = granule->block; |
272 | actions->execute(result); |
273 | |
274 | auto column = result.getByName(expression_ast->getColumnName()).column->convertToFullColumnIfLowCardinality(); |
275 | auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get()); |
276 | |
277 | const NullMap * null_map = nullptr; |
278 | |
279 | if (auto * col_nullable = checkAndGetColumn<ColumnNullable>(*column)) |
280 | { |
281 | col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn()); |
282 | null_map = &col_nullable->getNullMapData(); |
283 | } |
284 | |
285 | if (!col_uint8) |
286 | throw Exception("ColumnUInt8 expected as Set index condition result." , ErrorCodes::LOGICAL_ERROR); |
287 | |
288 | auto & condition = col_uint8->getData(); |
289 | |
290 | for (size_t i = 0; i < column->size(); ++i) |
291 | if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1) |
292 | return true; |
293 | |
294 | return false; |
295 | } |
296 | |
297 | void MergeTreeIndexConditionSet::traverseAST(ASTPtr & node) const |
298 | { |
299 | if (operatorFromAST(node)) |
300 | { |
301 | auto & args = node->as<ASTFunction>()->arguments->children; |
302 | |
303 | for (auto & arg : args) |
304 | traverseAST(arg); |
305 | return; |
306 | } |
307 | |
308 | if (atomFromAST(node)) |
309 | { |
310 | if (node->as<ASTIdentifier>() || node->as<ASTFunction>()) |
311 | node = makeASTFunction("__bitWrapperFunc" , node); |
312 | } |
313 | else |
314 | node = std::make_shared<ASTLiteral>(UNKNOWN_FIELD); |
315 | } |
316 | |
317 | bool MergeTreeIndexConditionSet::atomFromAST(ASTPtr & node) const |
318 | { |
319 | /// Function, literal or column |
320 | |
321 | if (node->as<ASTLiteral>()) |
322 | return true; |
323 | |
324 | if (const auto * identifier = node->as<ASTIdentifier>()) |
325 | return key_columns.count(identifier->getColumnName()) != 0; |
326 | |
327 | if (auto * func = node->as<ASTFunction>()) |
328 | { |
329 | if (key_columns.count(func->getColumnName())) |
330 | { |
331 | /// Function is already calculated. |
332 | node = std::make_shared<ASTIdentifier>(func->getColumnName()); |
333 | return true; |
334 | } |
335 | |
336 | auto & args = func->arguments->children; |
337 | |
338 | for (auto & arg : args) |
339 | if (!atomFromAST(arg)) |
340 | return false; |
341 | |
342 | return true; |
343 | } |
344 | |
345 | return false; |
346 | } |
347 | |
348 | bool MergeTreeIndexConditionSet::operatorFromAST(ASTPtr & node) const |
349 | { |
350 | /// Functions AND, OR, NOT. Replace with bit*. |
351 | auto * func = node->as<ASTFunction>(); |
352 | if (!func) |
353 | return false; |
354 | |
355 | auto & args = func->arguments->children; |
356 | |
357 | if (func->name == "not" ) |
358 | { |
359 | if (args.size() != 1) |
360 | return false; |
361 | |
362 | func->name = "__bitSwapLastTwo" ; |
363 | } |
364 | else if (func->name == "and" || func->name == "indexHint" ) |
365 | { |
366 | auto last_arg = args.back(); |
367 | args.pop_back(); |
368 | |
369 | ASTPtr new_func; |
370 | if (args.size() > 1) |
371 | new_func = makeASTFunction( |
372 | "__bitBoolMaskAnd" , |
373 | node, |
374 | last_arg); |
375 | else |
376 | new_func = makeASTFunction( |
377 | "__bitBoolMaskAnd" , |
378 | args.back(), |
379 | last_arg); |
380 | |
381 | node = new_func; |
382 | } |
383 | else if (func->name == "or" ) |
384 | { |
385 | auto last_arg = args.back(); |
386 | args.pop_back(); |
387 | |
388 | ASTPtr new_func; |
389 | if (args.size() > 1) |
390 | new_func = makeASTFunction( |
391 | "__bitBoolMaskOr" , |
392 | node, |
393 | last_arg); |
394 | else |
395 | new_func = makeASTFunction( |
396 | "__bitBoolMaskOr" , |
397 | args.back(), |
398 | last_arg); |
399 | |
400 | node = new_func; |
401 | } |
402 | else |
403 | return false; |
404 | |
405 | return true; |
406 | } |
407 | |
408 | bool MergeTreeIndexConditionSet::checkASTUseless(const ASTPtr &node, bool atomic) const |
409 | { |
410 | if (const auto * func = node->as<ASTFunction>()) |
411 | { |
412 | if (key_columns.count(func->getColumnName())) |
413 | return false; |
414 | |
415 | const ASTs & args = func->arguments->children; |
416 | |
417 | if (func->name == "and" || func->name == "indexHint" ) |
418 | return checkASTUseless(args[0], atomic) && checkASTUseless(args[1], atomic); |
419 | else if (func->name == "or" ) |
420 | return checkASTUseless(args[0], atomic) || checkASTUseless(args[1], atomic); |
421 | else if (func->name == "not" ) |
422 | return checkASTUseless(args[0], atomic); |
423 | else |
424 | return std::any_of(args.begin(), args.end(), |
425 | [this](const auto & arg) { return checkASTUseless(arg, true); }); |
426 | } |
427 | else if (const auto * literal = node->as<ASTLiteral>()) |
428 | return !atomic && literal->value.get<bool>(); |
429 | else if (const auto * identifier = node->as<ASTIdentifier>()) |
430 | return key_columns.find(identifier->getColumnName()) == std::end(key_columns); |
431 | else |
432 | return true; |
433 | } |
434 | |
435 | |
436 | MergeTreeIndexGranulePtr MergeTreeIndexSet::createIndexGranule() const |
437 | { |
438 | return std::make_shared<MergeTreeIndexGranuleSet>(*this); |
439 | } |
440 | |
441 | MergeTreeIndexAggregatorPtr MergeTreeIndexSet::createIndexAggregator() const |
442 | { |
443 | return std::make_shared<MergeTreeIndexAggregatorSet>(*this); |
444 | } |
445 | |
446 | MergeTreeIndexConditionPtr MergeTreeIndexSet::createIndexCondition( |
447 | const SelectQueryInfo & query, const Context & context) const |
448 | { |
449 | return std::make_shared<MergeTreeIndexConditionSet>(query, context, *this); |
450 | }; |
451 | |
452 | bool MergeTreeIndexSet::mayBenefitFromIndexForIn(const ASTPtr &) const |
453 | { |
454 | return false; |
455 | } |
456 | |
457 | |
458 | std::unique_ptr<IMergeTreeIndex> setIndexCreator( |
459 | const NamesAndTypesList & new_columns, |
460 | std::shared_ptr<ASTIndexDeclaration> node, |
461 | const Context & context) |
462 | { |
463 | if (node->name.empty()) |
464 | throw Exception("Index must have unique name" , ErrorCodes::INCORRECT_QUERY); |
465 | |
466 | size_t max_rows = 0; |
467 | if (!node->type->arguments || node->type->arguments->children.size() != 1) |
468 | throw Exception("Set index must have exactly one argument." , ErrorCodes::INCORRECT_QUERY); |
469 | else if (node->type->arguments->children.size() == 1) |
470 | max_rows = node->type->arguments->children[0]->as<ASTLiteral &>().value.get<size_t>(); |
471 | |
472 | |
473 | ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone()); |
474 | auto syntax = SyntaxAnalyzer(context, {}).analyze( |
475 | expr_list, new_columns); |
476 | auto unique_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false); |
477 | |
478 | auto sample = ExpressionAnalyzer(expr_list, syntax, context) |
479 | .getActions(true)->getSampleBlock(); |
480 | |
481 | Block ; |
482 | |
483 | Names columns; |
484 | DataTypes data_types; |
485 | |
486 | for (size_t i = 0; i < expr_list->children.size(); ++i) |
487 | { |
488 | const auto & column = sample.getByPosition(i); |
489 | |
490 | columns.emplace_back(column.name); |
491 | data_types.emplace_back(column.type); |
492 | |
493 | header.insert(ColumnWithTypeAndName(column.type->createColumn(), column.type, column.name)); |
494 | } |
495 | |
496 | return std::make_unique<MergeTreeIndexSet>( |
497 | node->name, std::move(unique_expr), columns, data_types, header, node->granularity, max_rows); |
498 | } |
499 | |
500 | } |
501 | |