| 1 | #pragma once |
| 2 | #include <Core/Block.h> |
| 3 | #include <common/logger_useful.h> |
| 4 | #include <Interpreters/ExpressionActions.h> |
| 5 | #include <Storages/MergeTree/MarkRange.h> |
| 6 | |
| 7 | namespace DB |
| 8 | { |
| 9 | |
| 10 | template <typename T> |
| 11 | class ColumnVector; |
| 12 | using ColumnUInt8 = ColumnVector<UInt8>; |
| 13 | |
| 14 | class MergeTreeReader; |
| 15 | class MergeTreeIndexGranularity; |
| 16 | struct PrewhereInfo; |
| 17 | using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>; |
| 18 | |
| 19 | /// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part. |
| 20 | /// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark. |
| 21 | /// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks. |
| 22 | class MergeTreeRangeReader |
| 23 | { |
| 24 | public: |
| 25 | MergeTreeRangeReader( |
| 26 | MergeTreeReader * merge_tree_reader_, |
| 27 | MergeTreeRangeReader * prev_reader_, |
| 28 | const PrewhereInfoPtr & prewhere_, |
| 29 | bool last_reader_in_chain_); |
| 30 | |
| 31 | MergeTreeRangeReader() = default; |
| 32 | |
| 33 | bool isReadingFinished() const; |
| 34 | |
| 35 | size_t numReadRowsInCurrentGranule() const; |
| 36 | size_t numPendingRowsInCurrentGranule() const; |
| 37 | size_t numRowsInCurrentGranule() const; |
| 38 | size_t currentMark() const; |
| 39 | |
| 40 | bool isCurrentRangeFinished() const; |
| 41 | bool isInitialized() const { return is_initialized; } |
| 42 | |
| 43 | class DelayedStream |
| 44 | { |
| 45 | public: |
| 46 | DelayedStream() = default; |
| 47 | DelayedStream(size_t from_mark, MergeTreeReader * merge_tree_reader); |
| 48 | |
| 49 | /// Read @num_rows rows from @from_mark starting from @offset row |
| 50 | /// Returns the number of rows added to block. |
| 51 | /// NOTE: have to return number of rows because block has broken invariant: |
| 52 | /// some columns may have different size (for example, default columns may be zero size). |
| 53 | size_t read(Columns & columns, size_t from_mark, size_t offset, size_t num_rows); |
| 54 | |
| 55 | /// Skip extra rows to current_offset and perform actual reading |
| 56 | size_t finalize(Columns & columns); |
| 57 | |
| 58 | bool isFinished() const { return is_finished; } |
| 59 | |
| 60 | private: |
| 61 | size_t current_mark = 0; |
| 62 | /// Offset from current mark in rows |
| 63 | size_t current_offset = 0; |
| 64 | /// Num of rows we have to read |
| 65 | size_t num_delayed_rows = 0; |
| 66 | |
| 67 | /// Actual reader of data from disk |
| 68 | MergeTreeReader * merge_tree_reader = nullptr; |
| 69 | const MergeTreeIndexGranularity * index_granularity = nullptr; |
| 70 | bool continue_reading = false; |
| 71 | bool is_finished = true; |
| 72 | |
| 73 | /// Current position from the begging of file in rows |
| 74 | size_t position() const; |
| 75 | size_t readRows(Columns & columns, size_t num_rows); |
| 76 | }; |
| 77 | |
| 78 | /// Very thin wrapper for DelayedStream |
| 79 | /// Check bounds of read ranges and make steps between marks |
| 80 | class Stream |
| 81 | { |
| 82 | public: |
| 83 | Stream() = default; |
| 84 | Stream(size_t from_mark, size_t to_mark, MergeTreeReader * merge_tree_reader); |
| 85 | |
| 86 | /// Returns the number of rows added to block. |
| 87 | size_t read(Columns & columns, size_t num_rows, bool skip_remaining_rows_in_current_granule); |
| 88 | size_t finalize(Columns & columns); |
| 89 | void skip(size_t num_rows); |
| 90 | |
| 91 | void finish() { current_mark = last_mark; } |
| 92 | bool isFinished() const { return current_mark >= last_mark; } |
| 93 | |
| 94 | size_t numReadRowsInCurrentGranule() const { return offset_after_current_mark; } |
| 95 | size_t numPendingRowsInCurrentGranule() const |
| 96 | { |
| 97 | return current_mark_index_granularity - numReadRowsInCurrentGranule(); |
| 98 | } |
| 99 | size_t numPendingGranules() const { return last_mark - current_mark; } |
| 100 | size_t numPendingRows() const; |
| 101 | size_t currentMark() const { return current_mark; } |
| 102 | |
| 103 | size_t current_mark = 0; |
| 104 | /// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity |
| 105 | size_t offset_after_current_mark = 0; |
| 106 | |
| 107 | size_t last_mark = 0; |
| 108 | |
| 109 | MergeTreeReader * merge_tree_reader = nullptr; |
| 110 | const MergeTreeIndexGranularity * index_granularity = nullptr; |
| 111 | |
| 112 | size_t current_mark_index_granularity = 0; |
| 113 | |
| 114 | DelayedStream stream; |
| 115 | |
| 116 | void checkNotFinished() const; |
| 117 | void checkEnoughSpaceInCurrentGranule(size_t num_rows) const; |
| 118 | size_t readRows(Columns & columns, size_t num_rows); |
| 119 | void toNextMark(); |
| 120 | }; |
| 121 | |
| 122 | /// Statistics after next reading step. |
| 123 | class ReadResult |
| 124 | { |
| 125 | public: |
| 126 | using NumRows = std::vector<size_t>; |
| 127 | |
| 128 | struct RangeInfo |
| 129 | { |
| 130 | size_t num_granules_read_before_start; |
| 131 | MarkRange range; |
| 132 | }; |
| 133 | |
| 134 | using RangesInfo = std::vector<RangeInfo>; |
| 135 | |
| 136 | const RangesInfo & startedRanges() const { return started_ranges; } |
| 137 | const NumRows & rowsPerGranule() const { return rows_per_granule; } |
| 138 | |
| 139 | /// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows. |
| 140 | size_t totalRowsPerGranule() const { return total_rows_per_granule; } |
| 141 | /// The number of rows were added to block as a result of reading chain. |
| 142 | size_t numReadRows() const { return num_read_rows; } |
| 143 | size_t numRowsToSkipInLastGranule() const { return num_rows_to_skip_in_last_granule; } |
| 144 | /// The number of bytes read from disk. |
| 145 | size_t numBytesRead() const { return num_bytes_read; } |
| 146 | /// Filter you need to apply to newly-read columns in order to add them to block. |
| 147 | const ColumnUInt8 * getFilterOriginal() const { return filter_original; } |
| 148 | const ColumnUInt8 * getFilter() const { return filter; } |
| 149 | ColumnPtr & getFilterHolder() { return filter_holder; } |
| 150 | |
| 151 | void addGranule(size_t num_rows_); |
| 152 | void adjustLastGranule(); |
| 153 | void addRows(size_t rows) { num_read_rows += rows; } |
| 154 | void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); } |
| 155 | |
| 156 | /// Set filter or replace old one. Filter must have more zeroes than previous. |
| 157 | void setFilter(const ColumnPtr & new_filter); |
| 158 | /// For each granule calculate the number of filtered rows at the end. Remove them and update filter. |
| 159 | void optimize(); |
| 160 | /// Remove all rows from granules. |
| 161 | void clear(); |
| 162 | |
| 163 | void clearFilter() { filter = nullptr; } |
| 164 | void setFilterConstTrue(); |
| 165 | void setFilterConstFalse(); |
| 166 | |
| 167 | void addNumBytesRead(size_t count) { num_bytes_read += count; } |
| 168 | |
| 169 | void shrink(Columns & old_columns); |
| 170 | |
| 171 | size_t countBytesInResultFilter(const IColumn::Filter & filter); |
| 172 | |
| 173 | Columns columns; |
| 174 | size_t num_rows = 0; |
| 175 | bool need_filter = false; |
| 176 | |
| 177 | Block block_before_prewhere; |
| 178 | |
| 179 | private: |
| 180 | RangesInfo started_ranges; |
| 181 | /// The number of rows read from each granule. |
| 182 | /// Granule here is not number of rows between two marks |
| 183 | /// It's amount of rows per single reading act |
| 184 | NumRows rows_per_granule; |
| 185 | NumRows rows_per_granule_original; |
| 186 | /// Sum(rows_per_granule) |
| 187 | size_t total_rows_per_granule = 0; |
| 188 | /// The number of rows was read at first step. May be zero if no read columns present in part. |
| 189 | size_t num_read_rows = 0; |
| 190 | /// The number of rows was removed from last granule after clear or optimize. |
| 191 | size_t num_rows_to_skip_in_last_granule = 0; |
| 192 | /// Without any filtration. |
| 193 | size_t num_bytes_read = 0; |
| 194 | /// nullptr if prev reader hasn't prewhere_actions. Otherwise filter.size() >= total_rows_per_granule. |
| 195 | ColumnPtr filter_holder; |
| 196 | ColumnPtr filter_holder_original; |
| 197 | const ColumnUInt8 * filter = nullptr; |
| 198 | const ColumnUInt8 * filter_original = nullptr; |
| 199 | |
| 200 | void collapseZeroTails(const IColumn::Filter & filter, IColumn::Filter & new_filter); |
| 201 | size_t countZeroTails(const IColumn::Filter & filter, NumRows & zero_tails) const; |
| 202 | static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end); |
| 203 | |
| 204 | std::map<const IColumn::Filter *, size_t> filter_bytes_map; |
| 205 | }; |
| 206 | |
| 207 | ReadResult read(size_t max_rows, MarkRanges & ranges); |
| 208 | |
| 209 | const Block & getSampleBlock() const { return sample_block; } |
| 210 | |
| 211 | private: |
| 212 | |
| 213 | ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges); |
| 214 | Columns continueReadingChain(ReadResult & result, size_t & num_rows); |
| 215 | void executePrewhereActionsAndFilterColumns(ReadResult & result); |
| 216 | void filterColumns(Columns & columns, const IColumn::Filter & filter) const; |
| 217 | |
| 218 | MergeTreeReader * merge_tree_reader = nullptr; |
| 219 | const MergeTreeIndexGranularity * index_granularity = nullptr; |
| 220 | MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly. |
| 221 | PrewhereInfoPtr prewhere; |
| 222 | |
| 223 | Stream stream; |
| 224 | |
| 225 | Block sample_block; |
| 226 | Block sample_block_before_prewhere; |
| 227 | |
| 228 | bool last_reader_in_chain = false; |
| 229 | bool is_initialized = false; |
| 230 | }; |
| 231 | |
| 232 | } |
| 233 | |