1#pragma once
2#include <Core/Block.h>
3#include <common/logger_useful.h>
4#include <Interpreters/ExpressionActions.h>
5#include <Storages/MergeTree/MarkRange.h>
6
7namespace DB
8{
9
10template <typename T>
11class ColumnVector;
12using ColumnUInt8 = ColumnVector<UInt8>;
13
14class MergeTreeReader;
15class MergeTreeIndexGranularity;
16struct PrewhereInfo;
17using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
18
19/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
20/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
21/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
22class MergeTreeRangeReader
23{
24public:
25 MergeTreeRangeReader(
26 MergeTreeReader * merge_tree_reader_,
27 MergeTreeRangeReader * prev_reader_,
28 const PrewhereInfoPtr & prewhere_,
29 bool last_reader_in_chain_);
30
31 MergeTreeRangeReader() = default;
32
33 bool isReadingFinished() const;
34
35 size_t numReadRowsInCurrentGranule() const;
36 size_t numPendingRowsInCurrentGranule() const;
37 size_t numRowsInCurrentGranule() const;
38 size_t currentMark() const;
39
40 bool isCurrentRangeFinished() const;
41 bool isInitialized() const { return is_initialized; }
42
43 class DelayedStream
44 {
45 public:
46 DelayedStream() = default;
47 DelayedStream(size_t from_mark, MergeTreeReader * merge_tree_reader);
48
49 /// Read @num_rows rows from @from_mark starting from @offset row
50 /// Returns the number of rows added to block.
51 /// NOTE: have to return number of rows because block has broken invariant:
52 /// some columns may have different size (for example, default columns may be zero size).
53 size_t read(Columns & columns, size_t from_mark, size_t offset, size_t num_rows);
54
55 /// Skip extra rows to current_offset and perform actual reading
56 size_t finalize(Columns & columns);
57
58 bool isFinished() const { return is_finished; }
59
60 private:
61 size_t current_mark = 0;
62 /// Offset from current mark in rows
63 size_t current_offset = 0;
64 /// Num of rows we have to read
65 size_t num_delayed_rows = 0;
66
67 /// Actual reader of data from disk
68 MergeTreeReader * merge_tree_reader = nullptr;
69 const MergeTreeIndexGranularity * index_granularity = nullptr;
70 bool continue_reading = false;
71 bool is_finished = true;
72
73 /// Current position from the begging of file in rows
74 size_t position() const;
75 size_t readRows(Columns & columns, size_t num_rows);
76 };
77
78 /// Very thin wrapper for DelayedStream
79 /// Check bounds of read ranges and make steps between marks
80 class Stream
81 {
82 public:
83 Stream() = default;
84 Stream(size_t from_mark, size_t to_mark, MergeTreeReader * merge_tree_reader);
85
86 /// Returns the number of rows added to block.
87 size_t read(Columns & columns, size_t num_rows, bool skip_remaining_rows_in_current_granule);
88 size_t finalize(Columns & columns);
89 void skip(size_t num_rows);
90
91 void finish() { current_mark = last_mark; }
92 bool isFinished() const { return current_mark >= last_mark; }
93
94 size_t numReadRowsInCurrentGranule() const { return offset_after_current_mark; }
95 size_t numPendingRowsInCurrentGranule() const
96 {
97 return current_mark_index_granularity - numReadRowsInCurrentGranule();
98 }
99 size_t numPendingGranules() const { return last_mark - current_mark; }
100 size_t numPendingRows() const;
101 size_t currentMark() const { return current_mark; }
102
103 size_t current_mark = 0;
104 /// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity
105 size_t offset_after_current_mark = 0;
106
107 size_t last_mark = 0;
108
109 MergeTreeReader * merge_tree_reader = nullptr;
110 const MergeTreeIndexGranularity * index_granularity = nullptr;
111
112 size_t current_mark_index_granularity = 0;
113
114 DelayedStream stream;
115
116 void checkNotFinished() const;
117 void checkEnoughSpaceInCurrentGranule(size_t num_rows) const;
118 size_t readRows(Columns & columns, size_t num_rows);
119 void toNextMark();
120 };
121
122 /// Statistics after next reading step.
123 class ReadResult
124 {
125 public:
126 using NumRows = std::vector<size_t>;
127
128 struct RangeInfo
129 {
130 size_t num_granules_read_before_start;
131 MarkRange range;
132 };
133
134 using RangesInfo = std::vector<RangeInfo>;
135
136 const RangesInfo & startedRanges() const { return started_ranges; }
137 const NumRows & rowsPerGranule() const { return rows_per_granule; }
138
139 /// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
140 size_t totalRowsPerGranule() const { return total_rows_per_granule; }
141 /// The number of rows were added to block as a result of reading chain.
142 size_t numReadRows() const { return num_read_rows; }
143 size_t numRowsToSkipInLastGranule() const { return num_rows_to_skip_in_last_granule; }
144 /// The number of bytes read from disk.
145 size_t numBytesRead() const { return num_bytes_read; }
146 /// Filter you need to apply to newly-read columns in order to add them to block.
147 const ColumnUInt8 * getFilterOriginal() const { return filter_original; }
148 const ColumnUInt8 * getFilter() const { return filter; }
149 ColumnPtr & getFilterHolder() { return filter_holder; }
150
151 void addGranule(size_t num_rows_);
152 void adjustLastGranule();
153 void addRows(size_t rows) { num_read_rows += rows; }
154 void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); }
155
156 /// Set filter or replace old one. Filter must have more zeroes than previous.
157 void setFilter(const ColumnPtr & new_filter);
158 /// For each granule calculate the number of filtered rows at the end. Remove them and update filter.
159 void optimize();
160 /// Remove all rows from granules.
161 void clear();
162
163 void clearFilter() { filter = nullptr; }
164 void setFilterConstTrue();
165 void setFilterConstFalse();
166
167 void addNumBytesRead(size_t count) { num_bytes_read += count; }
168
169 void shrink(Columns & old_columns);
170
171 size_t countBytesInResultFilter(const IColumn::Filter & filter);
172
173 Columns columns;
174 size_t num_rows = 0;
175 bool need_filter = false;
176
177 Block block_before_prewhere;
178
179 private:
180 RangesInfo started_ranges;
181 /// The number of rows read from each granule.
182 /// Granule here is not number of rows between two marks
183 /// It's amount of rows per single reading act
184 NumRows rows_per_granule;
185 NumRows rows_per_granule_original;
186 /// Sum(rows_per_granule)
187 size_t total_rows_per_granule = 0;
188 /// The number of rows was read at first step. May be zero if no read columns present in part.
189 size_t num_read_rows = 0;
190 /// The number of rows was removed from last granule after clear or optimize.
191 size_t num_rows_to_skip_in_last_granule = 0;
192 /// Without any filtration.
193 size_t num_bytes_read = 0;
194 /// nullptr if prev reader hasn't prewhere_actions. Otherwise filter.size() >= total_rows_per_granule.
195 ColumnPtr filter_holder;
196 ColumnPtr filter_holder_original;
197 const ColumnUInt8 * filter = nullptr;
198 const ColumnUInt8 * filter_original = nullptr;
199
200 void collapseZeroTails(const IColumn::Filter & filter, IColumn::Filter & new_filter);
201 size_t countZeroTails(const IColumn::Filter & filter, NumRows & zero_tails) const;
202 static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end);
203
204 std::map<const IColumn::Filter *, size_t> filter_bytes_map;
205 };
206
207 ReadResult read(size_t max_rows, MarkRanges & ranges);
208
209 const Block & getSampleBlock() const { return sample_block; }
210
211private:
212
213 ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
214 Columns continueReadingChain(ReadResult & result, size_t & num_rows);
215 void executePrewhereActionsAndFilterColumns(ReadResult & result);
216 void filterColumns(Columns & columns, const IColumn::Filter & filter) const;
217
218 MergeTreeReader * merge_tree_reader = nullptr;
219 const MergeTreeIndexGranularity * index_granularity = nullptr;
220 MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
221 PrewhereInfoPtr prewhere;
222
223 Stream stream;
224
225 Block sample_block;
226 Block sample_block_before_prewhere;
227
228 bool last_reader_in_chain = false;
229 bool is_initialized = false;
230};
231
232}
233