| 1 | #pragma once |
| 2 | |
| 3 | #include <optional> |
| 4 | #include <Core/NamesAndTypes.h> |
| 5 | #include <Storages/MergeTree/RangesInDataPart.h> |
| 6 | #include <Storages/MergeTree/MergeTreeRangeReader.h> |
| 7 | |
| 8 | |
| 9 | namespace DB |
| 10 | { |
| 11 | |
| 12 | class MergeTreeData; |
| 13 | struct MergeTreeReadTask; |
| 14 | struct MergeTreeBlockSizePredictor; |
| 15 | |
| 16 | using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>; |
| 17 | using MergeTreeBlockSizePredictorPtr = std::unique_ptr<MergeTreeBlockSizePredictor>; |
| 18 | |
| 19 | |
| 20 | /** If some of the requested columns are not in the part, |
| 21 | * then find out which columns may need to be read further, |
| 22 | * so that you can calculate the DEFAULT expression for these columns. |
| 23 | * Adds them to the `columns`. |
| 24 | */ |
| 25 | NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns); |
| 26 | |
| 27 | |
| 28 | /// A batch of work for MergeTreeThreadSelectBlockInputStream |
| 29 | struct MergeTreeReadTask |
| 30 | { |
| 31 | /// data part which should be read while performing this task |
| 32 | MergeTreeData::DataPartPtr data_part; |
| 33 | /** Ranges to read from `data_part`. |
| 34 | * Specified in reverse order for MergeTreeThreadSelectBlockInputStream's convenience of calling .pop_back(). */ |
| 35 | MarkRanges mark_ranges; |
| 36 | /// for virtual `part_index` virtual column |
| 37 | size_t part_index_in_query; |
| 38 | /// ordered list of column names used in this query, allows returning blocks with consistent ordering |
| 39 | const Names & ordered_names; |
| 40 | /// used to determine whether column should be filtered during PREWHERE or WHERE |
| 41 | const NameSet & column_name_set; |
| 42 | /// column names to read during WHERE |
| 43 | const NamesAndTypesList & columns; |
| 44 | /// column names to read during PREWHERE |
| 45 | const NamesAndTypesList & pre_columns; |
| 46 | /// should PREWHERE column be returned to requesting side? |
| 47 | const bool remove_prewhere_column; |
| 48 | /// resulting block may require reordering in accordance with `ordered_names` |
| 49 | const bool should_reorder; |
| 50 | /// Used to satistfy preferred_block_size_bytes limitation |
| 51 | MergeTreeBlockSizePredictorPtr size_predictor; |
| 52 | /// Used to save current range processing status |
| 53 | MergeTreeRangeReader range_reader; |
| 54 | MergeTreeRangeReader pre_range_reader; |
| 55 | |
| 56 | bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); } |
| 57 | |
| 58 | MergeTreeReadTask( |
| 59 | const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_, |
| 60 | const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_, |
| 61 | const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_, |
| 62 | MergeTreeBlockSizePredictorPtr && size_predictor_); |
| 63 | |
| 64 | virtual ~MergeTreeReadTask(); |
| 65 | }; |
| 66 | |
| 67 | struct MergeTreeReadTaskColumns |
| 68 | { |
| 69 | /// column names to read during WHERE |
| 70 | NamesAndTypesList columns; |
| 71 | /// column names to read during PREWHERE |
| 72 | NamesAndTypesList pre_columns; |
| 73 | /// resulting block may require reordering in accordance with `ordered_names` |
| 74 | bool should_reorder; |
| 75 | }; |
| 76 | |
| 77 | MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part, |
| 78 | const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns); |
| 79 | |
| 80 | struct MergeTreeBlockSizePredictor |
| 81 | { |
| 82 | MergeTreeBlockSizePredictor(const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block); |
| 83 | |
| 84 | /// Reset some values for correct statistics calculating |
| 85 | void startBlock(); |
| 86 | |
| 87 | /// Updates statistic for more accurate prediction |
| 88 | void update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay = DECAY()); |
| 89 | |
| 90 | /// Return current block size (after update()) |
| 91 | inline size_t getBlockSize() const |
| 92 | { |
| 93 | return block_size_bytes; |
| 94 | } |
| 95 | |
| 96 | |
| 97 | /// Predicts what number of rows should be read to exhaust byte quota per column |
| 98 | inline size_t estimateNumRowsForMaxSizeColumn(size_t bytes_quota) const |
| 99 | { |
| 100 | double max_size_per_row = std::max<double>(std::max<size_t>(max_size_per_row_fixed, 1), max_size_per_row_dynamic); |
| 101 | return (bytes_quota > block_size_rows * max_size_per_row) |
| 102 | ? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows |
| 103 | : 0; |
| 104 | } |
| 105 | |
| 106 | /// Predicts what number of rows should be read to exhaust byte quota per block |
| 107 | inline size_t estimateNumRows(size_t bytes_quota) const |
| 108 | { |
| 109 | return (bytes_quota > block_size_bytes) |
| 110 | ? static_cast<size_t>((bytes_quota - block_size_bytes) / std::max<size_t>(1, bytes_per_row_current)) |
| 111 | : 0; |
| 112 | } |
| 113 | |
| 114 | inline void updateFilteredRowsRation(size_t rows_was_read, size_t rows_was_filtered, double decay = DECAY()) |
| 115 | { |
| 116 | double alpha = std::pow(1. - decay, rows_was_read); |
| 117 | double current_ration = rows_was_filtered / std::max(1.0, static_cast<double>(rows_was_read)); |
| 118 | filtered_rows_ratio = current_ration < filtered_rows_ratio |
| 119 | ? current_ration |
| 120 | : alpha * filtered_rows_ratio + (1.0 - alpha) * current_ration; |
| 121 | } |
| 122 | |
| 123 | /// Aggressiveness of bytes_per_row updates. See update() implementation. |
| 124 | /// After n=NUM_UPDATES_TO_TARGET_WEIGHT updates v_{n} = (1 - TARGET_WEIGHT) * v_{0} + TARGET_WEIGHT * v_{target} |
| 125 | static constexpr double TARGET_WEIGHT = 0.5; |
| 126 | static constexpr size_t NUM_UPDATES_TO_TARGET_WEIGHT = 8192; |
| 127 | static double DECAY() { return 1. - std::pow(TARGET_WEIGHT, 1. / NUM_UPDATES_TO_TARGET_WEIGHT); } |
| 128 | |
| 129 | protected: |
| 130 | |
| 131 | MergeTreeData::DataPartPtr data_part; |
| 132 | |
| 133 | struct ColumnInfo |
| 134 | { |
| 135 | String name; |
| 136 | double bytes_per_row_global = 0; |
| 137 | double bytes_per_row = 0; |
| 138 | size_t size_bytes = 0; |
| 139 | }; |
| 140 | |
| 141 | std::vector<ColumnInfo> dynamic_columns_infos; |
| 142 | size_t fixed_columns_bytes_per_row = 0; |
| 143 | |
| 144 | size_t max_size_per_row_fixed = 0; |
| 145 | double max_size_per_row_dynamic = 0; |
| 146 | |
| 147 | size_t number_of_rows_in_part; |
| 148 | |
| 149 | bool is_initialized_in_update = false; |
| 150 | |
| 151 | void initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update = false); |
| 152 | |
| 153 | public: |
| 154 | |
| 155 | size_t block_size_bytes = 0; |
| 156 | size_t block_size_rows = 0; |
| 157 | |
| 158 | /// Total statistics |
| 159 | double bytes_per_row_current = 0; |
| 160 | double bytes_per_row_global = 0; |
| 161 | |
| 162 | double filtered_rows_ratio = 0; |
| 163 | }; |
| 164 | |
| 165 | } |
| 166 | |