1#include <boost/rational.hpp> /// For calculations related to sampling coefficients.
2#include <optional>
3
4#include <Poco/File.h>
5
6#include <Common/FieldVisitors.h>
7#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
8#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
9#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
10#include <Storages/MergeTree/MergeTreeReadPool.h>
11#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
12#include <Storages/MergeTree/MergeTreeIndices.h>
13#include <Storages/MergeTree/MergeTreeIndexReader.h>
14#include <Storages/MergeTree/KeyCondition.h>
15#include <Storages/ReadInOrderOptimizer.h>
16#include <Parsers/ASTIdentifier.h>
17#include <Parsers/ASTLiteral.h>
18#include <Parsers/ASTFunction.h>
19#include <Parsers/ASTSampleRatio.h>
20#include <Interpreters/ExpressionAnalyzer.h>
21
22/// Allow to use __uint128_t as a template parameter for boost::rational.
23// https://stackoverflow.com/questions/41198673/uint128-t-not-working-with-clang-and-libstdc
24#if !defined(__GLIBCXX_BITSIZE_INT_N_0) && defined(__SIZEOF_INT128__)
25namespace std
26{
27 template <>
28 struct numeric_limits<__uint128_t>
29 {
30 static constexpr bool is_specialized = true;
31 static constexpr bool is_signed = false;
32 static constexpr bool is_integer = true;
33 static constexpr int radix = 2;
34 static constexpr int digits = 128;
35 static constexpr __uint128_t min () { return 0; } // used in boost 1.65.1+
36 static constexpr __uint128_t max () { return __uint128_t(0) - 1; } // used in boost 1.68.0+
37 };
38}
39#endif
40
41#include <DataStreams/ExpressionBlockInputStream.h>
42#include <DataStreams/FilterBlockInputStream.h>
43#include <DataStreams/CollapsingFinalBlockInputStream.h>
44#include <DataStreams/AddingConstColumnBlockInputStream.h>
45#include <DataStreams/CreatingSetsBlockInputStream.h>
46#include <DataStreams/MergingSortedBlockInputStream.h>
47#include <DataStreams/NullBlockInputStream.h>
48#include <DataStreams/SummingSortedBlockInputStream.h>
49#include <DataStreams/ReplacingSortedBlockInputStream.h>
50#include <DataStreams/ReverseBlockInputStream.h>
51#include <DataStreams/AggregatingSortedBlockInputStream.h>
52#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
53#include <DataTypes/DataTypesNumber.h>
54#include <DataTypes/DataTypeDate.h>
55#include <DataTypes/DataTypeEnum.h>
56#include <Storages/VirtualColumnUtils.h>
57#include <Processors/Transforms/FilterTransform.h>
58#include <Processors/Transforms/AddingConstColumnTransform.h>
59#include <Processors/Transforms/ExpressionTransform.h>
60#include <Processors/Transforms/ReverseTransform.h>
61#include <Processors/Transforms/MergingSortedTransform.h>
62#include <Processors/Executors/TreeExecutorBlockInputStream.h>
63#include <Processors/Sources/SourceFromInputStream.h>
64
65namespace ProfileEvents
66{
67 extern const Event SelectedParts;
68 extern const Event SelectedRanges;
69 extern const Event SelectedMarks;
70}
71
72
73namespace DB
74{
75
76namespace ErrorCodes
77{
78 extern const int INDEX_NOT_USED;
79 extern const int SAMPLING_NOT_SUPPORTED;
80 extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
81 extern const int ILLEGAL_COLUMN;
82 extern const int ARGUMENT_OUT_OF_BOUND;
83}
84
85
86MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
87 : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
88{
89}
90
91
92/// Construct a block consisting only of possible values of virtual columns
93static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
94{
95 auto column = ColumnString::create();
96
97 for (const auto & part : parts)
98 column->insert(part->name);
99
100 return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
101}
102
103
104size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
105 const MergeTreeData::DataPartsVector & parts, const KeyCondition & key_condition, const Settings & settings) const
106{
107 size_t rows_count = 0;
108
109 /// We will find out how many rows we would have read without sampling.
110 LOG_DEBUG(log, "Preliminary index scan with condition: " << key_condition.toString());
111
112 for (size_t i = 0; i < parts.size(); ++i)
113 {
114 const MergeTreeData::DataPartPtr & part = parts[i];
115 MarkRanges ranges = markRangesFromPKRange(part, key_condition, settings);
116
117 /** In order to get a lower bound on the number of rows that match the condition on PK,
118 * consider only guaranteed full marks.
119 * That is, do not take into account the first and last marks, which may be incomplete.
120 */
121 for (size_t j = 0; j < ranges.size(); ++j)
122 if (ranges[j].end - ranges[j].begin > 2)
123 rows_count += part->index_granularity.getRowsCountInRange({ranges[j].begin + 1, ranges[j].end - 1});
124
125 }
126
127 return rows_count;
128}
129
130
131using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
132
133static std::string toString(const RelativeSize & x)
134{
135 return ASTSampleRatio::toString(x.numerator()) + "/" + ASTSampleRatio::toString(x.denominator());
136}
137
138/// Converts sample size to an approximate number of rows (ex. `SAMPLE 1000000`) to relative value (ex. `SAMPLE 0.1`).
139static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows)
140{
141 if (approx_total_rows == 0)
142 return 1;
143
144 const auto & node_sample = node->as<ASTSampleRatio &>();
145
146 auto absolute_sample_size = node_sample.ratio.numerator / node_sample.ratio.denominator;
147 return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows));
148}
149
150
151Pipes MergeTreeDataSelectExecutor::read(
152 const Names & column_names_to_return,
153 const SelectQueryInfo & query_info,
154 const Context & context,
155 const UInt64 max_block_size,
156 const unsigned num_streams,
157 const PartitionIdToMaxBlock * max_block_numbers_to_read) const
158{
159 return readFromParts(
160 data.getDataPartsVector(), column_names_to_return, query_info, context,
161 max_block_size, num_streams, max_block_numbers_to_read);
162}
163
164Pipes MergeTreeDataSelectExecutor::readFromParts(
165 MergeTreeData::DataPartsVector parts,
166 const Names & column_names_to_return,
167 const SelectQueryInfo & query_info,
168 const Context & context,
169 const UInt64 max_block_size,
170 const unsigned num_streams,
171 const PartitionIdToMaxBlock * max_block_numbers_to_read) const
172{
173 size_t part_index = 0;
174
175 /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
176 /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
177 Names virt_column_names;
178 Names real_column_names;
179
180 bool part_column_queried = false;
181
182 bool sample_factor_column_queried = false;
183 Float64 used_sample_factor = 1;
184
185 for (const String & name : column_names_to_return)
186 {
187 if (name == "_part")
188 {
189 part_column_queried = true;
190 virt_column_names.push_back(name);
191 }
192 else if (name == "_part_index")
193 {
194 virt_column_names.push_back(name);
195 }
196 else if (name == "_partition_id")
197 {
198 virt_column_names.push_back(name);
199 }
200 else if (name == "_sample_factor")
201 {
202 sample_factor_column_queried = true;
203 virt_column_names.push_back(name);
204 }
205 else
206 {
207 real_column_names.push_back(name);
208 }
209 }
210
211 NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical();
212
213 /// If there are only virtual columns in the query, you must request at least one non-virtual one.
214 if (real_column_names.empty())
215 real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
216
217 /// If `_part` virtual column is requested, we try to use it as an index.
218 Block virtual_columns_block = getBlockWithPartColumn(parts);
219 if (part_column_queried)
220 VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
221
222 std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
223
224 data.check(real_column_names);
225
226 const Settings & settings = context.getSettingsRef();
227 Names primary_key_columns = data.primary_key_columns;
228
229 KeyCondition key_condition(query_info, context, primary_key_columns, data.primary_key_expr);
230
231 if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
232 {
233 std::stringstream exception_message;
234 exception_message << "Primary key (";
235 for (size_t i = 0, size = primary_key_columns.size(); i < size; ++i)
236 exception_message << (i == 0 ? "" : ", ") << primary_key_columns[i];
237 exception_message << ") is not used and setting 'force_primary_key' is set.";
238
239 throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED);
240 }
241
242 std::optional<KeyCondition> minmax_idx_condition;
243 if (data.minmax_idx_expr)
244 {
245 minmax_idx_condition.emplace(query_info, context, data.minmax_idx_columns, data.minmax_idx_expr);
246
247 if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue())
248 {
249 String msg = "MinMax index by columns (";
250 bool first = true;
251 for (const String & col : data.minmax_idx_columns)
252 {
253 if (first)
254 first = false;
255 else
256 msg += ", ";
257 msg += col;
258 }
259 msg += ") is not used and setting 'force_index_by_date' is set";
260
261 throw Exception(msg, ErrorCodes::INDEX_NOT_USED);
262 }
263 }
264
265 /// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
266 /// as well as `max_block_number_to_read`.
267 {
268 auto prev_parts = parts;
269 parts.clear();
270
271 for (const auto & part : prev_parts)
272 {
273 if (part_values.find(part->name) == part_values.end())
274 continue;
275
276 if (part->isEmpty())
277 continue;
278
279 if (minmax_idx_condition && !minmax_idx_condition->mayBeTrueInParallelogram(
280 part->minmax_idx.parallelogram, data.minmax_idx_column_types))
281 continue;
282
283 if (max_block_numbers_to_read)
284 {
285 auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
286 if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
287 continue;
288 }
289
290 parts.push_back(part);
291 }
292 }
293
294 /// Sampling.
295 Names column_names_to_read = real_column_names;
296 std::shared_ptr<ASTFunction> filter_function;
297 ExpressionActionsPtr filter_expression;
298
299 RelativeSize relative_sample_size = 0;
300 RelativeSize relative_sample_offset = 0;
301
302 const auto & select = query_info.query->as<ASTSelectQuery &>();
303
304 auto select_sample_size = select.sample_size();
305 auto select_sample_offset = select.sample_offset();
306
307 if (select_sample_size)
308 {
309 relative_sample_size.assign(
310 select_sample_size->as<ASTSampleRatio &>().ratio.numerator,
311 select_sample_size->as<ASTSampleRatio &>().ratio.denominator);
312
313 if (relative_sample_size < 0)
314 throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
315
316 relative_sample_offset = 0;
317 if (select_sample_offset)
318 relative_sample_offset.assign(
319 select_sample_offset->as<ASTSampleRatio &>().ratio.numerator,
320 select_sample_offset->as<ASTSampleRatio &>().ratio.denominator);
321
322 if (relative_sample_offset < 0)
323 throw Exception("Negative sample offset", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
324
325 /// Convert absolute value of the sampling (in form `SAMPLE 1000000` - how many rows to read) into the relative `SAMPLE 0.1` (how much data to read).
326 size_t approx_total_rows = 0;
327 if (relative_sample_size > 1 || relative_sample_offset > 1)
328 approx_total_rows = getApproximateTotalRowsToRead(parts, key_condition, settings);
329
330 if (relative_sample_size > 1)
331 {
332 relative_sample_size = convertAbsoluteSampleSizeToRelative(select_sample_size, approx_total_rows);
333 LOG_DEBUG(log, "Selected relative sample size: " << toString(relative_sample_size));
334 }
335
336 /// SAMPLE 1 is the same as the absence of SAMPLE.
337 if (relative_sample_size == RelativeSize(1))
338 relative_sample_size = 0;
339
340 if (relative_sample_offset > 0 && RelativeSize(0) == relative_sample_size)
341 throw Exception("Sampling offset is incorrect because no sampling", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
342
343 if (relative_sample_offset > 1)
344 {
345 relative_sample_offset = convertAbsoluteSampleSizeToRelative(select_sample_offset, approx_total_rows);
346 LOG_DEBUG(log, "Selected relative sample offset: " << toString(relative_sample_offset));
347 }
348 }
349
350 /** Which range of sampling key values do I need to read?
351 * First, in the whole range ("universe") we select the interval
352 * of relative `relative_sample_size` size, offset from the beginning by `relative_sample_offset`.
353 *
354 * Example: SAMPLE 0.4 OFFSET 0.3
355 *
356 * [------********------]
357 * ^ - offset
358 * <------> - size
359 *
360 * If the interval passes through the end of the universe, then cut its right side.
361 *
362 * Example: SAMPLE 0.4 OFFSET 0.8
363 *
364 * [----------------****]
365 * ^ - offset
366 * <------> - size
367 *
368 * Next, if the `parallel_replicas_count`, `parallel_replica_offset` settings are set,
369 * then it is necessary to break the received interval into pieces of the number `parallel_replicas_count`,
370 * and select a piece with the number `parallel_replica_offset` (from zero).
371 *
372 * Example: SAMPLE 0.4 OFFSET 0.3, parallel_replicas_count = 2, parallel_replica_offset = 1
373 *
374 * [----------****------]
375 * ^ - offset
376 * <------> - size
377 * <--><--> - pieces for different `parallel_replica_offset`, select the second one.
378 *
379 * It is very important that the intervals for different `parallel_replica_offset` cover the entire range without gaps and overlaps.
380 * It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals.
381 */
382
383 bool use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
384 bool no_data = false; /// There is nothing left after sampling.
385
386 if (use_sampling)
387 {
388 if (sample_factor_column_queried && relative_sample_size != RelativeSize(0))
389 used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);
390
391 RelativeSize size_of_universum = 0;
392 DataTypePtr sampling_column_type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type;
393
394 if (typeid_cast<const DataTypeUInt64 *>(sampling_column_type.get()))
395 size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + RelativeSize(1);
396 else if (typeid_cast<const DataTypeUInt32 *>(sampling_column_type.get()))
397 size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + RelativeSize(1);
398 else if (typeid_cast<const DataTypeUInt16 *>(sampling_column_type.get()))
399 size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + RelativeSize(1);
400 else if (typeid_cast<const DataTypeUInt8 *>(sampling_column_type.get()))
401 size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + RelativeSize(1);
402 else
403 throw Exception("Invalid sampling column type in storage parameters: " + sampling_column_type->getName() + ". Must be unsigned integer type.",
404 ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
405
406 if (settings.parallel_replicas_count > 1)
407 {
408 if (relative_sample_size == RelativeSize(0))
409 relative_sample_size = 1;
410
411 relative_sample_size /= settings.parallel_replicas_count.value;
412 relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.value);
413 }
414
415 if (relative_sample_offset >= RelativeSize(1))
416 no_data = true;
417
418 /// Calculate the half-interval of `[lower, upper)` column values.
419 bool has_lower_limit = false;
420 bool has_upper_limit = false;
421
422 RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum;
423 RelativeSize upper_limit_rational = (relative_sample_offset + relative_sample_size) * size_of_universum;
424
425 UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
426 UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);
427
428 if (lower > 0)
429 has_lower_limit = true;
430
431 if (upper_limit_rational < size_of_universum)
432 has_upper_limit = true;
433
434 /*std::cerr << std::fixed << std::setprecision(100)
435 << "relative_sample_size: " << relative_sample_size << "\n"
436 << "relative_sample_offset: " << relative_sample_offset << "\n"
437 << "lower_limit_float: " << lower_limit_rational << "\n"
438 << "upper_limit_float: " << upper_limit_rational << "\n"
439 << "lower: " << lower << "\n"
440 << "upper: " << upper << "\n";*/
441
442 if ((has_upper_limit && upper == 0)
443 || (has_lower_limit && has_upper_limit && lower == upper))
444 no_data = true;
445
446 if (no_data || (!has_lower_limit && !has_upper_limit))
447 {
448 use_sampling = false;
449 }
450 else
451 {
452 /// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed.
453
454 std::shared_ptr<ASTFunction> lower_function;
455 std::shared_ptr<ASTFunction> upper_function;
456
457 /// If sample and final are used together no need to calculate sampling expression twice.
458 /// The first time it was calculated for final, because sample key is a part of the PK.
459 /// So, assume that we already have calculated column.
460 ASTPtr sampling_key_ast = data.getSamplingKeyAST();
461 if (select.final())
462 {
463 sampling_key_ast = std::make_shared<ASTIdentifier>(data.sampling_expr_column_name);
464
465 /// We do spoil available_real_columns here, but it is not used later.
466 available_real_columns.emplace_back(data.sampling_expr_column_name, std::move(sampling_column_type));
467 }
468
469 if (has_lower_limit)
470 {
471 if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createLeftBounded(lower, true)))
472 throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
473
474 ASTPtr args = std::make_shared<ASTExpressionList>();
475 args->children.push_back(sampling_key_ast);
476 args->children.push_back(std::make_shared<ASTLiteral>(lower));
477
478 lower_function = std::make_shared<ASTFunction>();
479 lower_function->name = "greaterOrEquals";
480 lower_function->arguments = args;
481 lower_function->children.push_back(lower_function->arguments);
482
483 filter_function = lower_function;
484 }
485
486 if (has_upper_limit)
487 {
488 if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createRightBounded(upper, false)))
489 throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
490
491 ASTPtr args = std::make_shared<ASTExpressionList>();
492 args->children.push_back(sampling_key_ast);
493 args->children.push_back(std::make_shared<ASTLiteral>(upper));
494
495 upper_function = std::make_shared<ASTFunction>();
496 upper_function->name = "less";
497 upper_function->arguments = args;
498 upper_function->children.push_back(upper_function->arguments);
499
500 filter_function = upper_function;
501 }
502
503 if (has_lower_limit && has_upper_limit)
504 {
505 ASTPtr args = std::make_shared<ASTExpressionList>();
506 args->children.push_back(lower_function);
507 args->children.push_back(upper_function);
508
509 filter_function = std::make_shared<ASTFunction>();
510 filter_function->name = "and";
511 filter_function->arguments = args;
512 filter_function->children.push_back(filter_function->arguments);
513 }
514
515 ASTPtr query = filter_function;
516 auto syntax_result = SyntaxAnalyzer(context).analyze(query, available_real_columns);
517 filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false);
518
519 if (!select.final())
520 {
521 /// Add columns needed for `sample_by_ast` to `column_names_to_read`.
522 /// Skip this if final was used, because such columns were already added from PK.
523 std::vector<String> add_columns = filter_expression->getRequiredColumns();
524 column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
525 std::sort(column_names_to_read.begin(), column_names_to_read.end());
526 column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()),
527 column_names_to_read.end());
528 }
529 }
530 }
531
532 if (no_data)
533 {
534 LOG_DEBUG(log, "Sampling yields no data.");
535 return {};
536 }
537
538 LOG_DEBUG(log, "Key condition: " << key_condition.toString());
539 if (minmax_idx_condition)
540 LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition->toString());
541
542 /// PREWHERE
543 String prewhere_column;
544 if (select.prewhere())
545 prewhere_column = select.prewhere()->getColumnName();
546
547 RangesInDataParts parts_with_ranges;
548
549 std::vector<std::pair<MergeTreeIndexPtr, MergeTreeIndexConditionPtr>> useful_indices;
550 for (const auto & index : data.skip_indices)
551 {
552 auto condition = index->createIndexCondition(query_info, context);
553 if (!condition->alwaysUnknownOrTrue())
554 useful_indices.emplace_back(index, condition);
555 }
556
557 /// Let's find what range to read from each part.
558 size_t sum_marks = 0;
559 size_t sum_ranges = 0;
560 for (auto & part : parts)
561 {
562 RangesInDataPart ranges(part, part_index++);
563
564 if (data.hasPrimaryKey())
565 ranges.ranges = markRangesFromPKRange(part, key_condition, settings);
566 else
567 {
568 size_t total_marks_count = part->getMarksCount();
569 if (total_marks_count)
570 {
571 if (part->index_granularity.hasFinalMark())
572 --total_marks_count;
573 ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
574 }
575 }
576
577 for (const auto & index_and_condition : useful_indices)
578 ranges.ranges = filterMarksUsingIndex(
579 index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings);
580
581 if (!ranges.ranges.empty())
582 {
583 parts_with_ranges.push_back(ranges);
584
585 sum_ranges += ranges.ranges.size();
586 sum_marks += ranges.getMarksCount();
587 }
588 }
589
590 LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
591 << sum_marks << " marks to read from " << sum_ranges << " ranges");
592
593 if (parts_with_ranges.empty())
594 return {};
595
596 ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size());
597 ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
598 ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
599
600 Pipes res;
601
602 if (select.final())
603 {
604 /// Add columns needed to calculate the sorting expression and the sign.
605 std::vector<String> add_columns = data.sorting_key_expr->getRequiredColumns();
606 column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
607
608 if (!data.merging_params.sign_column.empty())
609 column_names_to_read.push_back(data.merging_params.sign_column);
610 if (!data.merging_params.version_column.empty())
611 column_names_to_read.push_back(data.merging_params.version_column);
612
613 std::sort(column_names_to_read.begin(), column_names_to_read.end());
614 column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
615
616 res = spreadMarkRangesAmongStreamsFinal(
617 std::move(parts_with_ranges),
618 column_names_to_read,
619 max_block_size,
620 settings.use_uncompressed_cache,
621 query_info,
622 virt_column_names,
623 settings);
624 }
625 else if (settings.optimize_read_in_order && query_info.input_sorting_info)
626 {
627 size_t prefix_size = query_info.input_sorting_info->order_key_prefix_descr.size();
628 auto order_key_prefix_ast = data.sorting_key_expr_ast->clone();
629 order_key_prefix_ast->children.resize(prefix_size);
630
631 auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, data.getColumns().getAllPhysical());
632 auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);
633
634 res = spreadMarkRangesAmongStreamsWithOrder(
635 std::move(parts_with_ranges),
636 num_streams,
637 column_names_to_read,
638 max_block_size,
639 settings.use_uncompressed_cache,
640 query_info,
641 sorting_key_prefix_expr,
642 virt_column_names,
643 settings);
644 }
645 else
646 {
647 res = spreadMarkRangesAmongStreams(
648 std::move(parts_with_ranges),
649 num_streams,
650 column_names_to_read,
651 max_block_size,
652 settings.use_uncompressed_cache,
653 query_info,
654 virt_column_names,
655 settings);
656 }
657
658 if (use_sampling)
659 {
660 for (auto & pipe : res)
661 pipe.addSimpleTransform(std::make_shared<FilterTransform>(
662 pipe.getHeader(), filter_expression, filter_function->getColumnName(), false));
663 }
664
665 /// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
666 if (sample_factor_column_queried)
667 {
668 for (auto & pipe : res)
669 pipe.addSimpleTransform(std::make_shared<AddingConstColumnTransform<Float64>>(
670 pipe.getHeader(), std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor"));
671 }
672
673 if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
674 {
675 for (auto & pipe : res)
676 pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
677 pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
678 }
679
680 return res;
681}
682
683namespace
684{
685
686size_t roundRowsOrBytesToMarks(
687 size_t rows_setting,
688 size_t bytes_setting,
689 size_t rows_granularity,
690 size_t bytes_granularity)
691{
692 if (bytes_granularity == 0)
693 return (rows_setting + rows_granularity - 1) / rows_granularity;
694 else
695 return (bytes_setting + bytes_granularity - 1) / bytes_granularity;
696}
697
698}
699
700
701Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
702 RangesInDataParts && parts,
703 size_t num_streams,
704 const Names & column_names,
705 UInt64 max_block_size,
706 bool use_uncompressed_cache,
707 const SelectQueryInfo & query_info,
708 const Names & virt_columns,
709 const Settings & settings) const
710{
711 /// Count marks for each part.
712 std::vector<size_t> sum_marks_in_parts(parts.size());
713 size_t sum_marks = 0;
714 size_t total_rows = 0;
715
716 const auto data_settings = data.getSettings();
717 size_t adaptive_parts = 0;
718 for (size_t i = 0; i < parts.size(); ++i)
719 {
720 total_rows += parts[i].getRowsCount();
721 /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
722 std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
723
724 sum_marks_in_parts[i] = parts[i].getMarksCount();
725 sum_marks += sum_marks_in_parts[i];
726
727 if (parts[i].data_part->index_granularity_info.is_adaptive)
728 adaptive_parts++;
729 }
730
731 size_t index_granularity_bytes = 0;
732 if (adaptive_parts > parts.size() / 2)
733 index_granularity_bytes = data_settings->index_granularity_bytes;
734
735 const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
736 settings.merge_tree_max_rows_to_use_cache,
737 settings.merge_tree_max_bytes_to_use_cache,
738 data_settings->index_granularity,
739 index_granularity_bytes);
740
741 const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
742 settings.merge_tree_min_rows_for_concurrent_read,
743 settings.merge_tree_min_bytes_for_concurrent_read,
744 data_settings->index_granularity,
745 index_granularity_bytes);
746
747 if (sum_marks > max_marks_to_use_cache)
748 use_uncompressed_cache = false;
749
750 Pipes res;
751 if (0 == sum_marks)
752 return res;
753
754 if (num_streams > 1)
755 {
756 /// Parallel query execution.
757
758 /// Reduce the number of num_streams if the data is small.
759 if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams)
760 num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
761
762 MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
763 num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, query_info.prewhere_info, true,
764 column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
765
766 /// Let's estimate total number of rows for progress bar.
767 LOG_TRACE(log, "Reading approx. " << total_rows << " rows with " << num_streams << " streams");
768
769 for (size_t i = 0; i < num_streams; ++i)
770 {
771 auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
772 i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
773 settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
774 query_info.prewhere_info, settings, virt_columns);
775
776 if (i == 0)
777 {
778 /// Set the approximate number of rows for the first source only
779 source->addTotalRowsApprox(total_rows);
780 }
781
782 res.emplace_back(std::move(source));
783 }
784 }
785 else
786 {
787 /// Sequential query execution.
788
789 for (size_t part_index = 0; part_index < parts.size(); ++part_index)
790 {
791 RangesInDataPart & part = parts[part_index];
792
793 auto source = std::make_shared<MergeTreeSelectProcessor>(
794 data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
795 settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
796 query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
797 virt_columns, part.part_index_in_query);
798
799 res.emplace_back(std::move(source));
800 }
801 }
802
803 return res;
804}
805
806Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
807 RangesInDataParts && parts,
808 size_t num_streams,
809 const Names & column_names,
810 UInt64 max_block_size,
811 bool use_uncompressed_cache,
812 const SelectQueryInfo & query_info,
813 const ExpressionActionsPtr & sorting_key_prefix_expr,
814 const Names & virt_columns,
815 const Settings & settings) const
816{
817 size_t sum_marks = 0;
818 const InputSortingInfoPtr & input_sorting_info = query_info.input_sorting_info;
819 size_t adaptive_parts = 0;
820 std::vector<size_t> sum_marks_in_parts(parts.size());
821 const auto data_settings = data.getSettings();
822
823 for (size_t i = 0; i < parts.size(); ++i)
824 {
825 sum_marks_in_parts[i] = parts[i].getMarksCount();
826 sum_marks += sum_marks_in_parts[i];
827
828 /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
829 std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
830
831 if (parts[i].data_part->index_granularity_info.is_adaptive)
832 adaptive_parts++;
833 }
834
835 size_t index_granularity_bytes = 0;
836 if (adaptive_parts > parts.size() / 2)
837 index_granularity_bytes = data_settings->index_granularity_bytes;
838
839 const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
840 settings.merge_tree_max_rows_to_use_cache,
841 settings.merge_tree_max_bytes_to_use_cache,
842 data_settings->index_granularity,
843 index_granularity_bytes);
844
845 const size_t min_marks_for_concurrent_read = roundRowsOrBytesToMarks(
846 settings.merge_tree_min_rows_for_concurrent_read,
847 settings.merge_tree_min_bytes_for_concurrent_read,
848 data_settings->index_granularity,
849 index_granularity_bytes);
850
851 if (sum_marks > max_marks_to_use_cache)
852 use_uncompressed_cache = false;
853
854 Pipes res;
855
856 if (sum_marks == 0)
857 return res;
858
859 /// Let's split ranges to avoid reading much data.
860 auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size](const auto & ranges, int direction)
861 {
862 MarkRanges new_ranges;
863 const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity;
864 size_t marks_in_range = 1;
865
866 if (direction == 1)
867 {
868 /// Split first few ranges to avoid reading much data.
869 bool splitted = false;
870 for (auto range : ranges)
871 {
872 while (!splitted && range.begin + marks_in_range < range.end)
873 {
874 new_ranges.emplace_back(range.begin, range.begin + marks_in_range);
875 range.begin += marks_in_range;
876 marks_in_range *= 2;
877
878 if (marks_in_range > max_marks_in_range)
879 splitted = true;
880 }
881 new_ranges.emplace_back(range.begin, range.end);
882 }
883 }
884 else
885 {
886 /// Split all ranges to avoid reading much data, because we have to
887 /// store whole range in memory to reverse it.
888 for (auto it = ranges.rbegin(); it != ranges.rend(); ++it)
889 {
890 auto range = *it;
891 while (range.begin + marks_in_range < range.end)
892 {
893 new_ranges.emplace_back(range.end - marks_in_range, range.end);
894 range.end -= marks_in_range;
895 marks_in_range = std::min(marks_in_range * 2, max_marks_in_range);
896 }
897 new_ranges.emplace_back(range.begin, range.end);
898 }
899 std::reverse(new_ranges.begin(), new_ranges.end());
900 }
901
902 return new_ranges;
903 };
904
905 const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;
906
907 for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
908 {
909 size_t need_marks = min_marks_per_stream;
910
911 Pipes pipes;
912
913 /// Loop over parts.
914 /// We will iteratively take part or some subrange of a part from the back
915 /// and assign a stream to read from it.
916 while (need_marks > 0 && !parts.empty())
917 {
918 RangesInDataPart part = parts.back();
919 parts.pop_back();
920
921 size_t & marks_in_part = sum_marks_in_parts.back();
922
923 /// We will not take too few rows from a part.
924 if (marks_in_part >= min_marks_for_concurrent_read &&
925 need_marks < min_marks_for_concurrent_read)
926 need_marks = min_marks_for_concurrent_read;
927
928 /// Do not leave too few rows in the part.
929 if (marks_in_part > need_marks &&
930 marks_in_part - need_marks < min_marks_for_concurrent_read)
931 need_marks = marks_in_part;
932
933 MarkRanges ranges_to_get_from_part;
934
935 /// We take the whole part if it is small enough.
936 if (marks_in_part <= need_marks)
937 {
938 /// Restore the order of segments.
939 std::reverse(part.ranges.begin(), part.ranges.end());
940
941 ranges_to_get_from_part = part.ranges;
942
943 need_marks -= marks_in_part;
944 sum_marks_in_parts.pop_back();
945 }
946 else
947 {
948 /// Loop through ranges in part. Take enough ranges to cover "need_marks".
949 while (need_marks > 0)
950 {
951 if (part.ranges.empty())
952 throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR);
953
954 MarkRange & range = part.ranges.back();
955
956 const size_t marks_in_range = range.end - range.begin;
957 const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
958
959 ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
960 range.begin += marks_to_get_from_range;
961 marks_in_part -= marks_to_get_from_range;
962 need_marks -= marks_to_get_from_range;
963 if (range.begin == range.end)
964 part.ranges.pop_back();
965 }
966 parts.emplace_back(part);
967 }
968
969 ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_sorting_info->direction);
970
971 if (input_sorting_info->direction == 1)
972 {
973 pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
974 data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
975 settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
976 use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
977 settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query));
978 }
979 else
980 {
981 pipes.emplace_back(std::make_shared<MergeTreeReverseSelectProcessor>(
982 data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
983 settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
984 use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
985 settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query));
986
987 pipes.back().addSimpleTransform(std::make_shared<ReverseTransform>(pipes.back().getHeader()));
988 }
989 }
990
991 if (pipes.size() > 1)
992 {
993 SortDescription sort_description;
994 for (size_t j = 0; j < input_sorting_info->order_key_prefix_descr.size(); ++j)
995 sort_description.emplace_back(data.sorting_key_columns[j],
996 input_sorting_info->direction, 1);
997
998 for (auto & pipe : pipes)
999 pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
1000
1001 auto merging_sorted = std::make_shared<MergingSortedTransform>(
1002 pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
1003
1004 res.emplace_back(std::move(pipes), std::move(merging_sorted));
1005 }
1006 else
1007 res.emplace_back(std::move(pipes.front()));
1008 }
1009
1010 return res;
1011}
1012
1013
1014Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
1015 RangesInDataParts && parts,
1016 const Names & column_names,
1017 UInt64 max_block_size,
1018 bool use_uncompressed_cache,
1019 const SelectQueryInfo & query_info,
1020 const Names & virt_columns,
1021 const Settings & settings) const
1022{
1023 const auto data_settings = data.getSettings();
1024 size_t sum_marks = 0;
1025 size_t adaptive_parts = 0;
1026 for (size_t i = 0; i < parts.size(); ++i)
1027 {
1028 for (size_t j = 0; j < parts[i].ranges.size(); ++j)
1029 sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;
1030
1031 if (parts[i].data_part->index_granularity_info.is_adaptive)
1032 adaptive_parts++;
1033 }
1034
1035 size_t index_granularity_bytes = 0;
1036 if (adaptive_parts >= parts.size() / 2)
1037 index_granularity_bytes = data_settings->index_granularity_bytes;
1038
1039 const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks(
1040 settings.merge_tree_max_rows_to_use_cache,
1041 settings.merge_tree_max_bytes_to_use_cache,
1042 data_settings->index_granularity,
1043 index_granularity_bytes);
1044
1045 if (sum_marks > max_marks_to_use_cache)
1046 use_uncompressed_cache = false;
1047
1048 Pipes pipes;
1049
1050 for (size_t part_index = 0; part_index < parts.size(); ++part_index)
1051 {
1052 RangesInDataPart & part = parts[part_index];
1053
1054 auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
1055 data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
1056 settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
1057 query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
1058 virt_columns, part.part_index_in_query);
1059
1060 Pipe pipe(std::move(source_processor));
1061 pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr));
1062 pipes.emplace_back(std::move(pipe));
1063 }
1064
1065 Names sort_columns = data.sorting_key_columns;
1066 SortDescription sort_description;
1067 size_t sort_columns_size = sort_columns.size();
1068 sort_description.reserve(sort_columns_size);
1069
1070 Block header = pipes.at(0).getHeader();
1071 for (size_t i = 0; i < sort_columns_size; ++i)
1072 sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
1073
1074 /// Converts pipes to BlockInputsStreams.
1075 /// It is temporary, till not all merging streams are implemented as processors.
1076 auto streams_to_merge = [&pipes]()
1077 {
1078 size_t num_streams = pipes.size();
1079
1080 BlockInputStreams streams;
1081 streams.reserve(num_streams);
1082
1083 for (size_t i = 0; i < num_streams; ++i)
1084 streams.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipes[i])));
1085
1086 pipes.clear();
1087 return streams;
1088 };
1089
1090 BlockInputStreamPtr merged;
1091 switch (data.merging_params.mode)
1092 {
1093 case MergeTreeData::MergingParams::Ordinary:
1094 {
1095 auto merged_processor =
1096 std::make_shared<MergingSortedTransform>(header, pipes.size(), sort_description, max_block_size);
1097 pipes.emplace_back(std::move(pipes), std::move(merged_processor));
1098 break;
1099 }
1100
1101 case MergeTreeData::MergingParams::Collapsing:
1102 merged = std::make_shared<CollapsingFinalBlockInputStream>(
1103 streams_to_merge(), sort_description, data.merging_params.sign_column);
1104 break;
1105
1106 case MergeTreeData::MergingParams::Summing:
1107 merged = std::make_shared<SummingSortedBlockInputStream>(streams_to_merge(),
1108 sort_description, data.merging_params.columns_to_sum, max_block_size);
1109 break;
1110
1111 case MergeTreeData::MergingParams::Aggregating:
1112 merged = std::make_shared<AggregatingSortedBlockInputStream>(streams_to_merge(), sort_description, max_block_size);
1113 break;
1114
1115 case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream
1116 merged = std::make_shared<ReplacingSortedBlockInputStream>(streams_to_merge(),
1117 sort_description, data.merging_params.version_column, max_block_size);
1118 break;
1119
1120 case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream
1121 merged = std::make_shared<VersionedCollapsingSortedBlockInputStream>(
1122 streams_to_merge(), sort_description, data.merging_params.sign_column, max_block_size);
1123 break;
1124
1125 case MergeTreeData::MergingParams::Graphite:
1126 throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
1127 }
1128
1129 if (merged)
1130 pipes.emplace_back(std::make_shared<SourceFromInputStream>(merged));
1131
1132 return pipes;
1133}
1134
1135
1136void MergeTreeDataSelectExecutor::createPositiveSignCondition(
1137 ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const
1138{
1139 auto function = std::make_shared<ASTFunction>();
1140 auto arguments = std::make_shared<ASTExpressionList>();
1141 auto sign = std::make_shared<ASTIdentifier>(data.merging_params.sign_column);
1142 auto one = std::make_shared<ASTLiteral>(1);
1143
1144 function->name = "equals";
1145 function->arguments = arguments;
1146 function->children.push_back(arguments);
1147
1148 arguments->children.push_back(sign);
1149 arguments->children.push_back(one);
1150
1151 ASTPtr query = function;
1152 auto syntax_result = SyntaxAnalyzer(context).analyze(query, data.getColumns().getAllPhysical());
1153 out_expression = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
1154 out_column = function->getColumnName();
1155}
1156
1157
1158/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
1159/// In other words, it removes subranges from whole range, that definitely could not contain required keys.
1160MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
1161 const MergeTreeData::DataPartPtr & part, const KeyCondition & key_condition, const Settings & settings) const
1162{
1163 MarkRanges res;
1164
1165 size_t marks_count = part->index_granularity.getMarksCount();
1166 const auto & index = part->index;
1167 if (marks_count == 0)
1168 return res;
1169
1170 bool has_final_mark = part->index_granularity.hasFinalMark();
1171
1172 /// If index is not used.
1173 if (key_condition.alwaysUnknownOrTrue())
1174 {
1175 if (has_final_mark)
1176 res.push_back(MarkRange(0, marks_count - 1));
1177 else
1178 res.push_back(MarkRange(0, marks_count));
1179 }
1180 else
1181 {
1182 size_t used_key_size = key_condition.getMaxKeyColumn() + 1;
1183 size_t min_marks_for_seek = roundRowsOrBytesToMarks(
1184 settings.merge_tree_min_rows_for_seek,
1185 settings.merge_tree_min_bytes_for_seek,
1186 part->index_granularity_info.fixed_index_granularity,
1187 part->index_granularity_info.index_granularity_bytes);
1188
1189 /** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back).
1190 * At each step, take the left segment and check if it fits.
1191 * If fits, split it into smaller ones and put them on the stack. If not, discard it.
1192 * If the segment is already of one mark length, add it to response and discard it.
1193 */
1194 std::vector<MarkRange> ranges_stack{ {0, marks_count} };
1195
1196 /// NOTE Creating temporary Field objects to pass to KeyCondition.
1197 Row index_left(used_key_size);
1198 Row index_right(used_key_size);
1199
1200 while (!ranges_stack.empty())
1201 {
1202 MarkRange range = ranges_stack.back();
1203 ranges_stack.pop_back();
1204
1205 bool may_be_true;
1206 if (range.end == marks_count && !has_final_mark)
1207 {
1208 for (size_t i = 0; i < used_key_size; ++i)
1209 index[i]->get(range.begin, index_left[i]);
1210
1211 may_be_true = key_condition.mayBeTrueAfter(
1212 used_key_size, index_left.data(), data.primary_key_data_types);
1213 }
1214 else
1215 {
1216 if (has_final_mark && range.end == marks_count)
1217 range.end -= 1; /// Remove final empty mark. It's useful only for primary key condition.
1218
1219 for (size_t i = 0; i < used_key_size; ++i)
1220 {
1221 index[i]->get(range.begin, index_left[i]);
1222 index[i]->get(range.end, index_right[i]);
1223 }
1224
1225 may_be_true = key_condition.mayBeTrueInRange(
1226 used_key_size, index_left.data(), index_right.data(), data.primary_key_data_types);
1227 }
1228
1229 if (!may_be_true)
1230 continue;
1231
1232 if (range.end == range.begin + 1)
1233 {
1234 /// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range.
1235 if (res.empty() || range.begin - res.back().end > min_marks_for_seek)
1236 res.push_back(range);
1237 else
1238 res.back().end = range.end;
1239 }
1240 else
1241 {
1242 /// Break the segment and put the result on the stack from right to left.
1243 size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1;
1244 size_t end;
1245
1246 for (end = range.end; end > range.begin + step; end -= step)
1247 ranges_stack.push_back(MarkRange(end - step, end));
1248
1249 ranges_stack.push_back(MarkRange(range.begin, end));
1250 }
1251 }
1252 }
1253
1254 return res;
1255}
1256
1257MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
1258 MergeTreeIndexPtr index,
1259 MergeTreeIndexConditionPtr condition,
1260 MergeTreeData::DataPartPtr part,
1261 const MarkRanges & ranges,
1262 const Settings & settings) const
1263{
1264 if (!Poco::File(part->getFullPath() + index->getFileName() + ".idx").exists())
1265 {
1266 LOG_DEBUG(log, "File for index " << backQuote(index->name) << " does not exist. Skipping it.");
1267 return ranges;
1268 }
1269
1270 const size_t min_marks_for_seek = roundRowsOrBytesToMarks(
1271 settings.merge_tree_min_rows_for_seek,
1272 settings.merge_tree_min_bytes_for_seek,
1273 part->index_granularity_info.index_granularity_bytes,
1274 part->index_granularity_info.fixed_index_granularity);
1275
1276 size_t granules_dropped = 0;
1277
1278 size_t marks_count = part->getMarksCount();
1279 size_t final_mark = part->index_granularity.hasFinalMark();
1280 size_t index_marks_count = (marks_count - final_mark + index->granularity - 1) / index->granularity;
1281
1282 MergeTreeIndexReader reader(
1283 index, part,
1284 index_marks_count,
1285 ranges);
1286
1287 MarkRanges res;
1288
1289 /// Some granules can cover two or more ranges,
1290 /// this variable is stored to avoid reading the same granule twice.
1291 MergeTreeIndexGranulePtr granule = nullptr;
1292 size_t last_index_mark = 0;
1293 for (const auto & range : ranges)
1294 {
1295 MarkRange index_range(
1296 range.begin / index->granularity,
1297 (range.end + index->granularity - 1) / index->granularity);
1298
1299 if (last_index_mark != index_range.begin || !granule)
1300 reader.seek(index_range.begin);
1301
1302 for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
1303 {
1304 if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
1305 granule = reader.read();
1306
1307 MarkRange data_range(
1308 std::max(range.begin, index_mark * index->granularity),
1309 std::min(range.end, (index_mark + 1) * index->granularity));
1310
1311 if (!condition->mayBeTrueOnGranule(granule))
1312 {
1313 ++granules_dropped;
1314 continue;
1315 }
1316
1317 if (res.empty() || res.back().end - data_range.begin > min_marks_for_seek)
1318 res.push_back(data_range);
1319 else
1320 res.back().end = data_range.end;
1321 }
1322
1323 last_index_mark = index_range.end - 1;
1324 }
1325
1326 LOG_DEBUG(log, "Index " << backQuote(index->name) << " has dropped " << granules_dropped << " granules.");
1327
1328 return res;
1329}
1330
1331
1332}
1333