1#include <Storages/MergeTree/MergeTreeIndexSet.h>
2
3#include <Interpreters/ExpressionActions.h>
4#include <Interpreters/ExpressionAnalyzer.h>
5#include <Interpreters/SyntaxAnalyzer.h>
6
7#include <Parsers/ASTIdentifier.h>
8#include <Parsers/ASTFunction.h>
9#include <Parsers/ASTLiteral.h>
10
11
12namespace DB
13{
14
15namespace ErrorCodes
16{
17 extern const int INCORRECT_QUERY;
18}
19
20/// 0b11 -- can be true and false at the same time
21const Field UNKNOWN_FIELD(3u);
22
23
24MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(const MergeTreeIndexSet & index_)
25 : IMergeTreeIndexGranule()
26 , index(index_)
27 , block(index.header.cloneEmpty()) {}
28
29MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
30 const MergeTreeIndexSet & index_, MutableColumns && mutable_columns_)
31 : IMergeTreeIndexGranule()
32 , index(index_)
33 , block(index.header.cloneWithColumns(std::move(mutable_columns_))) {}
34
35void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
36{
37 if (empty())
38 throw Exception(
39 "Attempt to write empty set index " + backQuote(index.name), ErrorCodes::LOGICAL_ERROR);
40
41 const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
42
43 if (index.max_rows && size() > index.max_rows)
44 {
45 size_type->serializeBinary(0, ostr);
46 return;
47 }
48
49 size_type->serializeBinary(size(), ostr);
50
51 for (size_t i = 0; i < index.columns.size(); ++i)
52 {
53 const auto & type = index.data_types[i];
54
55 IDataType::SerializeBinaryBulkSettings settings;
56 settings.getter = [&ostr](IDataType::SubstreamPath) -> WriteBuffer * { return &ostr; };
57 settings.position_independent_encoding = false;
58 settings.low_cardinality_max_dictionary_size = 0;
59
60 IDataType::SerializeBinaryBulkStatePtr state;
61 type->serializeBinaryBulkStatePrefix(settings, state);
62 type->serializeBinaryBulkWithMultipleStreams(*block.getByPosition(i).column, 0, size(), settings, state);
63 type->serializeBinaryBulkStateSuffix(settings, state);
64 }
65}
66
67void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr)
68{
69 block.clear();
70
71 Field field_rows;
72 const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
73 size_type->deserializeBinary(field_rows, istr);
74 size_t rows_to_read = field_rows.get<size_t>();
75
76 if (rows_to_read == 0)
77 return;
78
79 for (size_t i = 0; i < index.columns.size(); ++i)
80 {
81 const auto & type = index.data_types[i];
82 auto new_column = type->createColumn();
83
84 IDataType::DeserializeBinaryBulkSettings settings;
85 settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; };
86 settings.position_independent_encoding = false;
87
88 IDataType::DeserializeBinaryBulkStatePtr state;
89 type->deserializeBinaryBulkStatePrefix(settings, state);
90 type->deserializeBinaryBulkWithMultipleStreams(*new_column, rows_to_read, settings, state);
91
92 block.insert(ColumnWithTypeAndName(new_column->getPtr(), type, index.columns[i]));
93 }
94}
95
96
97MergeTreeIndexAggregatorSet::MergeTreeIndexAggregatorSet(const MergeTreeIndexSet & index_)
98 : index(index_), columns(index.header.cloneEmptyColumns())
99{
100 ColumnRawPtrs column_ptrs;
101 column_ptrs.reserve(index.columns.size());
102 Columns materialized_columns;
103 for (const auto & column : index.header.getColumns())
104 {
105 materialized_columns.emplace_back(column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality());
106 column_ptrs.emplace_back(materialized_columns.back().get());
107 }
108
109 data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
110
111 columns = index.header.cloneEmptyColumns();
112}
113
114void MergeTreeIndexAggregatorSet::update(const Block & block, size_t * pos, size_t limit)
115{
116 if (*pos >= block.rows())
117 throw Exception(
118 "The provided position is not less than the number of block rows. Position: "
119 + toString(*pos) + ", Block rows: " + toString(block.rows()) + ".", ErrorCodes::LOGICAL_ERROR);
120
121 size_t rows_read = std::min(limit, block.rows() - *pos);
122
123 if (index.max_rows && size() > index.max_rows)
124 {
125 *pos += rows_read;
126 return;
127 }
128
129 ColumnRawPtrs index_column_ptrs;
130 index_column_ptrs.reserve(index.columns.size());
131 Columns materialized_columns;
132 for (const auto & column_name : index.columns)
133 {
134 materialized_columns.emplace_back(
135 block.getByName(column_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality());
136 index_column_ptrs.emplace_back(materialized_columns.back().get());
137 }
138
139 IColumn::Filter filter(block.rows(), 0);
140
141 bool has_new_data = false;
142 switch (data.type)
143 {
144 case ClearableSetVariants::Type::EMPTY:
145 break;
146#define M(NAME) \
147 case ClearableSetVariants::Type::NAME: \
148 has_new_data = buildFilter(*data.NAME, index_column_ptrs, filter, *pos, rows_read, data); \
149 break;
150 APPLY_FOR_SET_VARIANTS(M)
151#undef M
152 }
153
154 if (has_new_data)
155 {
156 for (size_t i = 0; i < columns.size(); ++i)
157 {
158 auto filtered_column = block.getByName(index.columns[i]).column->filter(filter, block.rows());
159 columns[i]->insertRangeFrom(*filtered_column, 0, filtered_column->size());
160 }
161 }
162
163 *pos += rows_read;
164}
165
166template <typename Method>
167bool MergeTreeIndexAggregatorSet::buildFilter(
168 Method & method,
169 const ColumnRawPtrs & column_ptrs,
170 IColumn::Filter & filter,
171 size_t pos,
172 size_t limit,
173 ClearableSetVariants & variants) const
174{
175 /// Like DistinctSortedBlockInputStream.
176 typename Method::State state(column_ptrs, key_sizes, nullptr);
177
178 bool has_new_data = false;
179 for (size_t i = 0; i < limit; ++i)
180 {
181 auto emplace_result = state.emplaceKey(method.data, pos + i, variants.string_pool);
182
183 if (emplace_result.isInserted())
184 has_new_data = true;
185
186 /// Emit the record if there is no such key in the current set yet.
187 /// Skip it otherwise.
188 filter[pos + i] = emplace_result.isInserted();
189 }
190 return has_new_data;
191}
192
193MergeTreeIndexGranulePtr MergeTreeIndexAggregatorSet::getGranuleAndReset()
194{
195 auto granule = std::make_shared<MergeTreeIndexGranuleSet>(index, std::move(columns));
196
197 switch (data.type)
198 {
199 case ClearableSetVariants::Type::EMPTY:
200 break;
201#define M(NAME) \
202 case ClearableSetVariants::Type::NAME: \
203 data.NAME->data.clear(); \
204 break;
205 APPLY_FOR_SET_VARIANTS(M)
206#undef M
207 }
208
209 columns = index.header.cloneEmptyColumns();
210
211 return granule;
212}
213
214
215MergeTreeIndexConditionSet::MergeTreeIndexConditionSet(
216 const SelectQueryInfo & query,
217 const Context & context,
218 const MergeTreeIndexSet &index_)
219 : IMergeTreeIndexCondition(), index(index_)
220{
221 for (size_t i = 0, size = index.columns.size(); i < size; ++i)
222 {
223 std::string name = index.columns[i];
224 if (!key_columns.count(name))
225 key_columns.insert(name);
226 }
227
228 const auto & select = query.query->as<ASTSelectQuery &>();
229
230 if (select.where() && select.prewhere())
231 expression_ast = makeASTFunction(
232 "and",
233 select.where()->clone(),
234 select.prewhere()->clone());
235 else if (select.where())
236 expression_ast = select.where()->clone();
237 else if (select.prewhere())
238 expression_ast = select.prewhere()->clone();
239 else
240 expression_ast = std::make_shared<ASTLiteral>(UNKNOWN_FIELD);
241
242 useless = checkASTUseless(expression_ast);
243 /// Do not proceed if index is useless for this query.
244 if (useless)
245 return;
246
247 /// Replace logical functions with bit functions.
248 /// Working with UInt8: last bit = can be true, previous = can be false (Like dbms/src/Storages/MergeTree/BoolMask.h).
249 traverseAST(expression_ast);
250
251 auto syntax_analyzer_result = SyntaxAnalyzer(context, {}).analyze(
252 expression_ast, index.header.getNamesAndTypesList());
253 actions = ExpressionAnalyzer(expression_ast, syntax_analyzer_result, context).getActions(true);
254}
255
256bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
257{
258 return useless;
259}
260
261bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const
262{
263 auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule);
264 if (!granule)
265 throw Exception(
266 "Set index condition got a granule with the wrong type.", ErrorCodes::LOGICAL_ERROR);
267
268 if (useless || !granule->size() || (index.max_rows && granule->size() > index.max_rows))
269 return true;
270
271 Block result = granule->block;
272 actions->execute(result);
273
274 auto column = result.getByName(expression_ast->getColumnName()).column->convertToFullColumnIfLowCardinality();
275 auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get());
276
277 const NullMap * null_map = nullptr;
278
279 if (auto * col_nullable = checkAndGetColumn<ColumnNullable>(*column))
280 {
281 col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn());
282 null_map = &col_nullable->getNullMapData();
283 }
284
285 if (!col_uint8)
286 throw Exception("ColumnUInt8 expected as Set index condition result.", ErrorCodes::LOGICAL_ERROR);
287
288 auto & condition = col_uint8->getData();
289
290 for (size_t i = 0; i < column->size(); ++i)
291 if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
292 return true;
293
294 return false;
295}
296
297void MergeTreeIndexConditionSet::traverseAST(ASTPtr & node) const
298{
299 if (operatorFromAST(node))
300 {
301 auto & args = node->as<ASTFunction>()->arguments->children;
302
303 for (auto & arg : args)
304 traverseAST(arg);
305 return;
306 }
307
308 if (atomFromAST(node))
309 {
310 if (node->as<ASTIdentifier>() || node->as<ASTFunction>())
311 node = makeASTFunction("__bitWrapperFunc", node);
312 }
313 else
314 node = std::make_shared<ASTLiteral>(UNKNOWN_FIELD);
315}
316
317bool MergeTreeIndexConditionSet::atomFromAST(ASTPtr & node) const
318{
319 /// Function, literal or column
320
321 if (node->as<ASTLiteral>())
322 return true;
323
324 if (const auto * identifier = node->as<ASTIdentifier>())
325 return key_columns.count(identifier->getColumnName()) != 0;
326
327 if (auto * func = node->as<ASTFunction>())
328 {
329 if (key_columns.count(func->getColumnName()))
330 {
331 /// Function is already calculated.
332 node = std::make_shared<ASTIdentifier>(func->getColumnName());
333 return true;
334 }
335
336 auto & args = func->arguments->children;
337
338 for (auto & arg : args)
339 if (!atomFromAST(arg))
340 return false;
341
342 return true;
343 }
344
345 return false;
346}
347
348bool MergeTreeIndexConditionSet::operatorFromAST(ASTPtr & node) const
349{
350 /// Functions AND, OR, NOT. Replace with bit*.
351 auto * func = node->as<ASTFunction>();
352 if (!func)
353 return false;
354
355 auto & args = func->arguments->children;
356
357 if (func->name == "not")
358 {
359 if (args.size() != 1)
360 return false;
361
362 func->name = "__bitSwapLastTwo";
363 }
364 else if (func->name == "and" || func->name == "indexHint")
365 {
366 auto last_arg = args.back();
367 args.pop_back();
368
369 ASTPtr new_func;
370 if (args.size() > 1)
371 new_func = makeASTFunction(
372 "__bitBoolMaskAnd",
373 node,
374 last_arg);
375 else
376 new_func = makeASTFunction(
377 "__bitBoolMaskAnd",
378 args.back(),
379 last_arg);
380
381 node = new_func;
382 }
383 else if (func->name == "or")
384 {
385 auto last_arg = args.back();
386 args.pop_back();
387
388 ASTPtr new_func;
389 if (args.size() > 1)
390 new_func = makeASTFunction(
391 "__bitBoolMaskOr",
392 node,
393 last_arg);
394 else
395 new_func = makeASTFunction(
396 "__bitBoolMaskOr",
397 args.back(),
398 last_arg);
399
400 node = new_func;
401 }
402 else
403 return false;
404
405 return true;
406}
407
408bool MergeTreeIndexConditionSet::checkASTUseless(const ASTPtr &node, bool atomic) const
409{
410 if (const auto * func = node->as<ASTFunction>())
411 {
412 if (key_columns.count(func->getColumnName()))
413 return false;
414
415 const ASTs & args = func->arguments->children;
416
417 if (func->name == "and" || func->name == "indexHint")
418 return checkASTUseless(args[0], atomic) && checkASTUseless(args[1], atomic);
419 else if (func->name == "or")
420 return checkASTUseless(args[0], atomic) || checkASTUseless(args[1], atomic);
421 else if (func->name == "not")
422 return checkASTUseless(args[0], atomic);
423 else
424 return std::any_of(args.begin(), args.end(),
425 [this](const auto & arg) { return checkASTUseless(arg, true); });
426 }
427 else if (const auto * literal = node->as<ASTLiteral>())
428 return !atomic && literal->value.get<bool>();
429 else if (const auto * identifier = node->as<ASTIdentifier>())
430 return key_columns.find(identifier->getColumnName()) == std::end(key_columns);
431 else
432 return true;
433}
434
435
436MergeTreeIndexGranulePtr MergeTreeIndexSet::createIndexGranule() const
437{
438 return std::make_shared<MergeTreeIndexGranuleSet>(*this);
439}
440
441MergeTreeIndexAggregatorPtr MergeTreeIndexSet::createIndexAggregator() const
442{
443 return std::make_shared<MergeTreeIndexAggregatorSet>(*this);
444}
445
446MergeTreeIndexConditionPtr MergeTreeIndexSet::createIndexCondition(
447 const SelectQueryInfo & query, const Context & context) const
448{
449 return std::make_shared<MergeTreeIndexConditionSet>(query, context, *this);
450};
451
452bool MergeTreeIndexSet::mayBenefitFromIndexForIn(const ASTPtr &) const
453{
454 return false;
455}
456
457
458std::unique_ptr<IMergeTreeIndex> setIndexCreator(
459 const NamesAndTypesList & new_columns,
460 std::shared_ptr<ASTIndexDeclaration> node,
461 const Context & context)
462{
463 if (node->name.empty())
464 throw Exception("Index must have unique name", ErrorCodes::INCORRECT_QUERY);
465
466 size_t max_rows = 0;
467 if (!node->type->arguments || node->type->arguments->children.size() != 1)
468 throw Exception("Set index must have exactly one argument.", ErrorCodes::INCORRECT_QUERY);
469 else if (node->type->arguments->children.size() == 1)
470 max_rows = node->type->arguments->children[0]->as<ASTLiteral &>().value.get<size_t>();
471
472
473 ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone());
474 auto syntax = SyntaxAnalyzer(context, {}).analyze(
475 expr_list, new_columns);
476 auto unique_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false);
477
478 auto sample = ExpressionAnalyzer(expr_list, syntax, context)
479 .getActions(true)->getSampleBlock();
480
481 Block header;
482
483 Names columns;
484 DataTypes data_types;
485
486 for (size_t i = 0; i < expr_list->children.size(); ++i)
487 {
488 const auto & column = sample.getByPosition(i);
489
490 columns.emplace_back(column.name);
491 data_types.emplace_back(column.type);
492
493 header.insert(ColumnWithTypeAndName(column.type->createColumn(), column.type, column.name));
494 }
495
496 return std::make_unique<MergeTreeIndexSet>(
497 node->name, std::move(unique_expr), columns, data_types, header, node->granularity, max_rows);
498}
499
500}
501