1#include <Storages/MergeTree/MergeTreeReader.h>
2#include <Columns/FilterDescription.h>
3#include <Columns/ColumnsCommon.h>
4#include <ext/range.h>
5#include <DataTypes/DataTypeNothing.h>
6
7#ifdef __SSE2__
8#include <emmintrin.h>
9#endif
10
11namespace DB
12{
13
14MergeTreeRangeReader::DelayedStream::DelayedStream(
15 size_t from_mark, MergeTreeReader * merge_tree_reader_)
16 : current_mark(from_mark), current_offset(0), num_delayed_rows(0)
17 , merge_tree_reader(merge_tree_reader_)
18 , index_granularity(&(merge_tree_reader->data_part->index_granularity))
19 , continue_reading(false), is_finished(false)
20{
21}
22
23size_t MergeTreeRangeReader::DelayedStream::position() const
24{
25 size_t num_rows_before_current_mark = index_granularity->getMarkStartingRow(current_mark);
26 return num_rows_before_current_mark + current_offset + num_delayed_rows;
27}
28
29size_t MergeTreeRangeReader::DelayedStream::readRows(Columns & columns, size_t num_rows)
30{
31 if (num_rows)
32 {
33 size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, columns);
34 continue_reading = true;
35
36 /// Zero rows_read maybe either because reading has finished
37 /// or because there is no columns we can read in current part (for example, all columns are default).
38 /// In the last case we can't finish reading, but it's also ok for the first case
39 /// because we can finish reading by calculation the number of pending rows.
40 if (0 < rows_read && rows_read < num_rows)
41 is_finished = true;
42
43 return rows_read;
44 }
45
46 return 0;
47}
48
49size_t MergeTreeRangeReader::DelayedStream::read(Columns & columns, size_t from_mark, size_t offset, size_t num_rows)
50{
51 size_t num_rows_before_from_mark = index_granularity->getMarkStartingRow(from_mark);
52 /// We already stand accurately in required position,
53 /// so because stream is lazy, we don't read anything
54 /// and only increment amount delayed_rows
55 if (position() == num_rows_before_from_mark + offset)
56 {
57 num_delayed_rows += num_rows;
58 return 0;
59 }
60 else
61 {
62 size_t read_rows = finalize(columns);
63
64 continue_reading = false;
65 current_mark = from_mark;
66 current_offset = offset;
67 num_delayed_rows = num_rows;
68
69 return read_rows;
70 }
71}
72
73size_t MergeTreeRangeReader::DelayedStream::finalize(Columns & columns)
74{
75 /// We need to skip some rows before reading
76 if (current_offset && !continue_reading)
77 {
78 for (size_t mark_num : ext::range(current_mark, index_granularity->getMarksCount()))
79 {
80 size_t mark_index_granularity = index_granularity->getMarkRows(mark_num);
81 if (current_offset >= mark_index_granularity)
82 {
83 current_offset -= mark_index_granularity;
84 current_mark++;
85 }
86 else
87 break;
88
89 }
90
91 /// Skip some rows from begin of granule.
92 /// We don't know size of rows in compressed granule,
93 /// so have to read them and throw out.
94 if (current_offset)
95 {
96 Columns tmp_columns;
97 tmp_columns.resize(columns.size());
98 readRows(tmp_columns, current_offset);
99 }
100 }
101
102 size_t rows_to_read = num_delayed_rows;
103 current_offset += num_delayed_rows;
104 num_delayed_rows = 0;
105
106 return readRows(columns, rows_to_read);
107}
108
109
110MergeTreeRangeReader::Stream::Stream(
111 size_t from_mark, size_t to_mark, MergeTreeReader * merge_tree_reader_)
112 : current_mark(from_mark), offset_after_current_mark(0)
113 , last_mark(to_mark)
114 , merge_tree_reader(merge_tree_reader_)
115 , index_granularity(&(merge_tree_reader->data_part->index_granularity))
116 , current_mark_index_granularity(index_granularity->getMarkRows(from_mark))
117 , stream(from_mark, merge_tree_reader)
118{
119 size_t marks_count = index_granularity->getMarksCount();
120 if (from_mark >= marks_count)
121 throw Exception("Trying create stream to read from mark №"+ toString(current_mark) + " but total marks count is "
122 + toString(marks_count), ErrorCodes::LOGICAL_ERROR);
123
124 if (last_mark > marks_count)
125 throw Exception("Trying create stream to read to mark №"+ toString(current_mark) + " but total marks count is "
126 + toString(marks_count), ErrorCodes::LOGICAL_ERROR);
127}
128
129void MergeTreeRangeReader::Stream::checkNotFinished() const
130{
131 if (isFinished())
132 throw Exception("Cannot read out of marks range.", ErrorCodes::LOGICAL_ERROR);
133}
134
135void MergeTreeRangeReader::Stream::checkEnoughSpaceInCurrentGranule(size_t num_rows) const
136{
137 if (num_rows + offset_after_current_mark > current_mark_index_granularity)
138 throw Exception("Cannot read from granule more than index_granularity.", ErrorCodes::LOGICAL_ERROR);
139}
140
141size_t MergeTreeRangeReader::Stream::readRows(Columns & columns, size_t num_rows)
142{
143 size_t rows_read = stream.read(columns, current_mark, offset_after_current_mark, num_rows);
144
145 if (stream.isFinished())
146 finish();
147
148 return rows_read;
149}
150
151void MergeTreeRangeReader::Stream::toNextMark()
152{
153 ++current_mark;
154
155 size_t total_marks_count = index_granularity->getMarksCount();
156 if (current_mark < total_marks_count)
157 current_mark_index_granularity = index_granularity->getMarkRows(current_mark);
158 else if (current_mark == total_marks_count)
159 current_mark_index_granularity = 0; /// HACK?
160 else
161 throw Exception("Trying to read from mark " + toString(current_mark) + ", but total marks count " + toString(total_marks_count), ErrorCodes::LOGICAL_ERROR);
162
163 offset_after_current_mark = 0;
164}
165
166size_t MergeTreeRangeReader::Stream::read(Columns & columns, size_t num_rows, bool skip_remaining_rows_in_current_granule)
167{
168 checkEnoughSpaceInCurrentGranule(num_rows);
169
170 if (num_rows)
171 {
172 checkNotFinished();
173
174 size_t read_rows = readRows(columns, num_rows);
175
176 offset_after_current_mark += num_rows;
177
178 /// Start new granule; skipped_rows_after_offset is already zero.
179 if (offset_after_current_mark == current_mark_index_granularity || skip_remaining_rows_in_current_granule)
180 toNextMark();
181
182 return read_rows;
183 }
184 else
185 {
186 /// Nothing to read.
187 if (skip_remaining_rows_in_current_granule)
188 {
189 /// Skip the rest of the rows in granule and start new one.
190 checkNotFinished();
191 toNextMark();
192 }
193
194 return 0;
195 }
196}
197
198void MergeTreeRangeReader::Stream::skip(size_t num_rows)
199{
200 if (num_rows)
201 {
202 checkNotFinished();
203 checkEnoughSpaceInCurrentGranule(num_rows);
204
205 offset_after_current_mark += num_rows;
206
207 if (offset_after_current_mark == current_mark_index_granularity)
208 {
209 /// Start new granule; skipped_rows_after_offset is already zero.
210 toNextMark();
211 }
212 }
213}
214
215size_t MergeTreeRangeReader::Stream::finalize(Columns & columns)
216{
217 size_t read_rows = stream.finalize(columns);
218
219 if (stream.isFinished())
220 finish();
221
222 return read_rows;
223}
224
225
226void MergeTreeRangeReader::ReadResult::addGranule(size_t num_rows_)
227{
228 rows_per_granule.push_back(num_rows_);
229 total_rows_per_granule += num_rows_;
230}
231
232void MergeTreeRangeReader::ReadResult::adjustLastGranule()
233{
234 size_t num_rows_to_subtract = total_rows_per_granule - num_read_rows;
235
236 if (rows_per_granule.empty())
237 throw Exception("Can't adjust last granule because no granules were added.", ErrorCodes::LOGICAL_ERROR);
238
239 if (num_rows_to_subtract > rows_per_granule.back())
240 throw Exception("Can't adjust last granule because it has " + toString(rows_per_granule.back())
241 + " rows, but try to subtract " + toString(num_rows_to_subtract) + " rows.",
242 ErrorCodes::LOGICAL_ERROR);
243
244 rows_per_granule.back() -= num_rows_to_subtract;
245 total_rows_per_granule -= num_rows_to_subtract;
246}
247
248void MergeTreeRangeReader::ReadResult::clear()
249{
250 /// Need to save information about the number of granules.
251 num_rows_to_skip_in_last_granule += rows_per_granule.back();
252 rows_per_granule.assign(rows_per_granule.size(), 0);
253 total_rows_per_granule = 0;
254 filter_holder = nullptr;
255 filter = nullptr;
256}
257
258void MergeTreeRangeReader::ReadResult::shrink(Columns & old_columns)
259{
260 for (size_t i = 0; i < old_columns.size(); ++i)
261 {
262 if (!old_columns[i])
263 continue;
264 auto new_column = old_columns[i]->cloneEmpty();
265 new_column->reserve(total_rows_per_granule);
266 for (size_t j = 0, pos = 0; j < rows_per_granule_original.size(); pos += rows_per_granule_original[i], ++j)
267 {
268 if (rows_per_granule[j])
269 new_column->insertRangeFrom(*old_columns[i], pos, rows_per_granule[j]);
270 }
271 old_columns[i] = std::move(new_column);
272 }
273}
274
275void MergeTreeRangeReader::ReadResult::setFilterConstTrue()
276{
277 clearFilter();
278 filter_holder = DataTypeUInt8().createColumnConst(num_rows, 1u);
279}
280
281void MergeTreeRangeReader::ReadResult::setFilterConstFalse()
282{
283 clearFilter();
284 columns.clear();
285 num_rows = 0;
286}
287
288void MergeTreeRangeReader::ReadResult::optimize()
289{
290 if (total_rows_per_granule == 0 || filter == nullptr)
291 return;
292
293 NumRows zero_tails;
294 auto total_zero_rows_in_tails = countZeroTails(filter->getData(), zero_tails);
295
296 if (total_zero_rows_in_tails == filter->size())
297 {
298 clear();
299 return;
300 }
301 else if (total_zero_rows_in_tails == 0 && countBytesInResultFilter(filter->getData()) == filter->size())
302 {
303 setFilterConstTrue();
304 return;
305 }
306 /// Just a guess. If only a few rows may be skipped, it's better not to skip at all.
307 else if (2 * total_zero_rows_in_tails > filter->size())
308 {
309 for (auto i : ext::range(0, rows_per_granule.size()))
310 {
311 rows_per_granule_original.push_back(rows_per_granule[i]);
312 rows_per_granule[i] -= zero_tails[i];
313 }
314 num_rows_to_skip_in_last_granule += rows_per_granule_original.back() - rows_per_granule.back();
315
316 /// Check if const 1 after shrink
317 if (countBytesInResultFilter(filter->getData()) + total_zero_rows_in_tails == total_rows_per_granule)
318 {
319 total_rows_per_granule = total_rows_per_granule - total_zero_rows_in_tails;
320 num_rows = total_rows_per_granule;
321 setFilterConstTrue();
322 shrink(columns); /// shrink acts as filtering in such case
323 }
324 else
325 {
326 auto new_filter = ColumnUInt8::create(filter->size() - total_zero_rows_in_tails);
327 IColumn::Filter & new_data = new_filter->getData();
328
329 collapseZeroTails(filter->getData(), new_data);
330 total_rows_per_granule = new_filter->size();
331 num_rows = total_rows_per_granule;
332 filter_original = filter;
333 filter_holder_original = std::move(filter_holder);
334 filter = new_filter.get();
335 filter_holder = std::move(new_filter);
336 }
337 need_filter = true;
338 }
339 /// Another guess, if it's worth filtering at PREWHERE
340 else if (countBytesInResultFilter(filter->getData()) < 0.6 * filter->size())
341 need_filter = true;
342}
343
344size_t MergeTreeRangeReader::ReadResult::countZeroTails(const IColumn::Filter & filter_vec, NumRows & zero_tails) const
345{
346 zero_tails.resize(0);
347 zero_tails.reserve(rows_per_granule.size());
348
349 auto filter_data = filter_vec.data();
350
351 size_t total_zero_rows_in_tails = 0;
352
353 for (auto rows_to_read : rows_per_granule)
354 {
355 /// Count the number of zeros at the end of filter for rows were read from current granule.
356 zero_tails.push_back(numZerosInTail(filter_data, filter_data + rows_to_read));
357 total_zero_rows_in_tails += zero_tails.back();
358 filter_data += rows_to_read;
359 }
360
361 return total_zero_rows_in_tails;
362}
363
364void MergeTreeRangeReader::ReadResult::collapseZeroTails(const IColumn::Filter & filter_vec, IColumn::Filter & new_filter_vec)
365{
366 auto filter_data = filter_vec.data();
367 auto new_filter_data = new_filter_vec.data();
368
369 for (auto i : ext::range(0, rows_per_granule.size()))
370 {
371 memcpySmallAllowReadWriteOverflow15(new_filter_data, filter_data, rows_per_granule[i]);
372 filter_data += rows_per_granule_original[i];
373 new_filter_data += rows_per_granule[i];
374 }
375
376 new_filter_vec.resize(new_filter_data - new_filter_vec.data());
377}
378
379size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, const UInt8 * end)
380{
381 size_t count = 0;
382
383#if defined(__SSE2__) && defined(__POPCNT__)
384 const __m128i zero16 = _mm_setzero_si128();
385 while (end - begin >= 64)
386 {
387 end -= 64;
388 auto pos = end;
389 UInt64 val =
390 static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
391 _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos)),
392 zero16)))
393 | (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
394 _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 16)),
395 zero16))) << 16u)
396 | (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
397 _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 32)),
398 zero16))) << 32u)
399 | (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
400 _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 48)),
401 zero16))) << 48u);
402 if (val == 0)
403 count += 64;
404 else
405 {
406 count += __builtin_clzll(val);
407 return count;
408 }
409 }
410#endif
411
412 while (end > begin && *(--end) == 0)
413 {
414 ++count;
415 }
416 return count;
417}
418
419void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
420{
421 if (!new_filter && filter)
422 throw Exception("Can't replace existing filter with empty.", ErrorCodes::LOGICAL_ERROR);
423
424 if (filter)
425 {
426 size_t new_size = new_filter->size();
427
428 if (new_size != total_rows_per_granule)
429 throw Exception("Can't set filter because it's size is " + toString(new_size) + " but "
430 + toString(total_rows_per_granule) + " rows was read.", ErrorCodes::LOGICAL_ERROR);
431 }
432
433 ConstantFilterDescription const_description(*new_filter);
434 if (const_description.always_false)
435 clear();
436 else if (!const_description.always_true)
437 {
438 FilterDescription filter_description(*new_filter);
439 filter_holder = filter_description.data_holder ? filter_description.data_holder : new_filter;
440 filter = typeid_cast<const ColumnUInt8 *>(filter_holder.get());
441 if (!filter)
442 throw Exception("setFilter function expected ColumnUInt8.", ErrorCodes::LOGICAL_ERROR);
443 }
444}
445
446
447size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn::Filter & filter_)
448{
449 auto it = filter_bytes_map.find(&filter_);
450 if (it == filter_bytes_map.end())
451 {
452 auto bytes = countBytesInFilter(filter_);
453 filter_bytes_map[&filter_] = bytes;
454 return bytes;
455 }
456 else
457 return it->second;
458}
459
460MergeTreeRangeReader::MergeTreeRangeReader(
461 MergeTreeReader * merge_tree_reader_,
462 MergeTreeRangeReader * prev_reader_,
463 const PrewhereInfoPtr & prewhere_,
464 bool last_reader_in_chain_)
465 : merge_tree_reader(merge_tree_reader_)
466 , index_granularity(&(merge_tree_reader->data_part->index_granularity)), prev_reader(prev_reader_)
467 , prewhere(prewhere_), last_reader_in_chain(last_reader_in_chain_), is_initialized(true)
468{
469 if (prev_reader)
470 sample_block = prev_reader->getSampleBlock();
471
472 for (auto & name_and_type : merge_tree_reader->getColumns())
473 sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
474
475 if (prewhere)
476 {
477 if (prewhere->alias_actions)
478 prewhere->alias_actions->execute(sample_block, true);
479
480 sample_block_before_prewhere = sample_block;
481 if (prewhere->prewhere_actions)
482 prewhere->prewhere_actions->execute(sample_block, true);
483
484 if (prewhere->remove_prewhere_column)
485 sample_block.erase(prewhere->prewhere_column_name);
486 }
487}
488
489bool MergeTreeRangeReader::isReadingFinished() const
490{
491 return prev_reader ? prev_reader->isReadingFinished() : stream.isFinished();
492}
493
494size_t MergeTreeRangeReader::numReadRowsInCurrentGranule() const
495{
496 return prev_reader ? prev_reader->numReadRowsInCurrentGranule() : stream.numReadRowsInCurrentGranule();
497}
498
499size_t MergeTreeRangeReader::numPendingRowsInCurrentGranule() const
500{
501 if (prev_reader)
502 return prev_reader->numPendingRowsInCurrentGranule();
503
504 auto pending_rows = stream.numPendingRowsInCurrentGranule();
505
506 if (pending_rows)
507 return pending_rows;
508
509 return numRowsInCurrentGranule();
510}
511
512
513size_t MergeTreeRangeReader::numRowsInCurrentGranule() const
514{
515 /// If pending_rows is zero, than stream is not initialized.
516 if (stream.current_mark_index_granularity)
517 return stream.current_mark_index_granularity;
518
519 /// We haven't read anything, return first
520 size_t first_mark = merge_tree_reader->getFirstMarkToRead();
521 return index_granularity->getMarkRows(first_mark);
522}
523
524size_t MergeTreeRangeReader::currentMark() const
525{
526 return stream.currentMark();
527}
528
529size_t MergeTreeRangeReader::Stream::numPendingRows() const
530{
531 size_t rows_between_marks = index_granularity->getRowsCountInRange(current_mark, last_mark);
532 return rows_between_marks - offset_after_current_mark;
533}
534
535bool MergeTreeRangeReader::isCurrentRangeFinished() const
536{
537 return prev_reader ? prev_reader->isCurrentRangeFinished() : stream.isFinished();
538}
539
540MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges)
541{
542 if (max_rows == 0)
543 throw Exception("Expected at least 1 row to read, got 0.", ErrorCodes::LOGICAL_ERROR);
544
545 ReadResult read_result;
546
547 if (prev_reader)
548 {
549 read_result = prev_reader->read(max_rows, ranges);
550
551 size_t num_read_rows;
552 Columns columns = continueReadingChain(read_result, num_read_rows);
553
554 /// Nothing to do. Return empty result.
555 if (read_result.num_rows == 0)
556 return read_result;
557
558 bool has_columns = false;
559 for (auto & column : columns)
560 {
561 if (column)
562 has_columns = true;
563 }
564
565 size_t total_bytes = 0;
566 for (auto & column : columns)
567 {
568 if (column)
569 total_bytes += column->byteSize();
570 }
571
572 read_result.addNumBytesRead(total_bytes);
573
574 bool should_evaluate_missing_defaults = false;
575
576 if (has_columns)
577 {
578 /// num_read_rows >= read_result.num_rows
579 /// We must filter block before adding columns to read_result.block
580
581 /// Fill missing columns before filtering because some arrays from Nested may have empty data.
582 merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, num_read_rows);
583
584 if (read_result.getFilter())
585 filterColumns(columns, read_result.getFilter()->getData());
586 }
587 else
588 {
589 size_t num_rows = read_result.num_rows;
590
591 /// If block is empty, we still may need to add missing columns.
592 /// In that case use number of rows in result block and don't filter block.
593 if (num_rows)
594 merge_tree_reader->fillMissingColumns(columns, should_evaluate_missing_defaults, num_rows);
595 }
596
597 if (!columns.empty() && should_evaluate_missing_defaults)
598 {
599 auto block = prev_reader->sample_block.cloneWithColumns(read_result.columns);
600 auto block_before_prewhere = read_result.block_before_prewhere;
601 for (auto & ctn : block)
602 {
603 if (block_before_prewhere.has(ctn.name))
604 block_before_prewhere.erase(ctn.name);
605 }
606
607 if (block_before_prewhere)
608 {
609 if (read_result.need_filter)
610 {
611 auto old_columns = block_before_prewhere.getColumns();
612 filterColumns(old_columns, read_result.getFilter()->getData());
613 block_before_prewhere.setColumns(std::move(old_columns));
614 }
615
616 for (auto && ctn : block_before_prewhere)
617 block.insert(std::move(ctn));
618 }
619
620 merge_tree_reader->evaluateMissingDefaults(block, columns);
621 }
622
623 read_result.columns.reserve(read_result.columns.size() + columns.size());
624 for (auto & column : columns)
625 read_result.columns.emplace_back(std::move(column));
626 }
627 else
628 {
629 read_result = startReadingChain(max_rows, ranges);
630 read_result.num_rows = read_result.numReadRows();
631
632 if (read_result.num_rows)
633 {
634 bool should_evaluate_missing_defaults;
635 merge_tree_reader->fillMissingColumns(read_result.columns, should_evaluate_missing_defaults,
636 read_result.num_rows);
637
638 if (should_evaluate_missing_defaults)
639 merge_tree_reader->evaluateMissingDefaults({}, read_result.columns);
640 }
641 else
642 read_result.columns.clear();
643
644 size_t total_bytes = 0;
645 for (auto & column : read_result.columns)
646 total_bytes += column->byteSize();
647
648 read_result.addNumBytesRead(total_bytes);
649 }
650
651 if (read_result.num_rows == 0)
652 return read_result;
653
654 executePrewhereActionsAndFilterColumns(read_result);
655
656 return read_result;
657}
658
659void MergeTreeRangeReader::filterColumns(Columns & columns, const IColumn::Filter & filter) const
660{
661 for (auto & column : columns)
662 {
663 if (column)
664 {
665 column = column->filter(filter, -1);
666
667 if (column->empty())
668 {
669 columns.clear();
670 return;
671 }
672 }
673 }
674}
675
676MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t max_rows, MarkRanges & ranges)
677{
678 ReadResult result;
679 result.columns.resize(merge_tree_reader->getColumns().size());
680
681 /// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
682 /// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than
683 /// result.num_rows_read if the last granule in range also the last in part (so we have to adjust last granule).
684 {
685 size_t space_left = max_rows;
686 while (space_left && (!stream.isFinished() || !ranges.empty()))
687 {
688 if (stream.isFinished())
689 {
690 result.addRows(stream.finalize(result.columns));
691 stream = Stream(ranges.back().begin, ranges.back().end, merge_tree_reader);
692 result.addRange(ranges.back());
693 ranges.pop_back();
694 }
695
696 auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
697 bool last = rows_to_read == space_left;
698 result.addRows(stream.read(result.columns, rows_to_read, !last));
699 result.addGranule(rows_to_read);
700 space_left -= rows_to_read;
701 }
702 }
703
704 result.addRows(stream.finalize(result.columns));
705
706 /// Last granule may be incomplete.
707 result.adjustLastGranule();
708
709 return result;
710}
711
712Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t & num_rows)
713{
714 Columns columns;
715 num_rows = 0;
716
717 if (result.rowsPerGranule().empty())
718 {
719 /// If zero rows were read on prev step, than there is no more rows to read.
720 /// Last granule may have less rows than index_granularity, so finish reading manually.
721 stream.finish();
722 return columns;
723 }
724
725 columns.resize(merge_tree_reader->numColumnsInResult());
726
727 auto & rows_per_granule = result.rowsPerGranule();
728 auto & started_ranges = result.startedRanges();
729
730 size_t next_range_to_start = 0;
731
732 auto size = rows_per_granule.size();
733 for (auto i : ext::range(0, size))
734 {
735 if (next_range_to_start < started_ranges.size()
736 && i == started_ranges[next_range_to_start].num_granules_read_before_start)
737 {
738 num_rows += stream.finalize(columns);
739 auto & range = started_ranges[next_range_to_start].range;
740 ++next_range_to_start;
741 stream = Stream(range.begin, range.end, merge_tree_reader);
742 }
743
744 bool last = i + 1 == size;
745 num_rows += stream.read(columns, rows_per_granule[i], !last);
746 }
747
748 stream.skip(result.numRowsToSkipInLastGranule());
749 num_rows += stream.finalize(columns);
750
751 /// added_rows may be zero if all columns were read in prewhere and it's ok.
752 if (num_rows && num_rows != result.totalRowsPerGranule())
753 throw Exception("RangeReader read " + toString(num_rows) + " rows, but "
754 + toString(result.totalRowsPerGranule()) + " expected.", ErrorCodes::LOGICAL_ERROR);
755
756 return columns;
757}
758
759void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
760{
761 if (!prewhere)
762 return;
763
764 auto & header = merge_tree_reader->getColumns();
765 size_t num_columns = header.size();
766
767 if (result.columns.size() != num_columns)
768 throw Exception("Invalid number of columns passed to MergeTreeRangeReader. "
769 "Expected " + toString(num_columns) + ", "
770 "got " + toString(result.columns.size()), ErrorCodes::LOGICAL_ERROR);
771
772 ColumnPtr filter;
773 size_t prewhere_column_pos;
774
775 {
776 /// Restore block from columns list.
777 Block block;
778 size_t pos = 0;
779
780 if (prev_reader)
781 {
782 for (auto & col : prev_reader->getSampleBlock())
783 {
784 block.insert({result.columns[pos], col.type, col.name});
785 ++pos;
786 }
787 }
788
789 for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
790 block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
791
792 if (prewhere && prewhere->alias_actions)
793 prewhere->alias_actions->execute(block);
794
795 /// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
796 result.block_before_prewhere = block;
797 prewhere->prewhere_actions->execute(block);
798
799 prewhere_column_pos = block.getPositionByName(prewhere->prewhere_column_name);
800
801 result.columns.clear();
802 result.columns.reserve(block.columns());
803 for (auto & col : block)
804 result.columns.emplace_back(std::move(col.column));
805
806 filter.swap(result.columns[prewhere_column_pos]);
807 }
808
809 if (result.getFilter())
810 {
811 /// TODO: implement for prewhere chain.
812 /// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
813 throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.",
814 ErrorCodes::LOGICAL_ERROR);
815 }
816
817 result.setFilter(filter);
818
819 /// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
820 if (!last_reader_in_chain)
821 result.optimize();
822
823 /// If we read nothing or filter gets optimized to nothing
824 if (result.totalRowsPerGranule() == 0)
825 result.setFilterConstFalse();
826 /// If we need to filter in PREWHERE
827 else if (prewhere->need_filter || result.need_filter)
828 {
829 /// If there is a filter and without optimized
830 if (result.getFilter() && last_reader_in_chain)
831 {
832 auto result_filter = result.getFilter();
833 /// optimize is not called, need to check const 1 and const 0
834 size_t bytes_in_filter = result.countBytesInResultFilter(result_filter->getData());
835 if (bytes_in_filter == 0)
836 result.setFilterConstFalse();
837 else if (bytes_in_filter == result.num_rows)
838 result.setFilterConstTrue();
839 }
840
841 /// If there is still a filter, do the filtering now
842 if (result.getFilter())
843 {
844 /// filter might be shrinked while columns not
845 auto result_filter = result.getFilterOriginal() ? result.getFilterOriginal() : result.getFilter();
846 filterColumns(result.columns, result_filter->getData());
847 result.need_filter = true;
848
849 bool has_column = false;
850 for (auto & column : result.columns)
851 {
852 if (column)
853 {
854 has_column = true;
855 result.num_rows = column->size();
856 break;
857 }
858 }
859
860 /// There is only one filter column. Record the actual number
861 if (!has_column)
862 result.num_rows = result.countBytesInResultFilter(result_filter->getData());
863 }
864
865 /// Check if the PREWHERE column is needed
866 if (result.columns.size())
867 {
868 if (prewhere->remove_prewhere_column)
869 result.columns.erase(result.columns.begin() + prewhere_column_pos);
870 else
871 result.columns[prewhere_column_pos] = DataTypeUInt8().createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
872 }
873 }
874 /// Filter in WHERE instead
875 else
876 {
877 result.columns[prewhere_column_pos] = result.getFilterHolder()->convertToFullColumnIfConst();
878 result.clearFilter(); // Acting as a flag to not filter in PREWHERE
879 }
880}
881
882}
883