1 | #pragma once |
2 | |
3 | #include <optional> |
4 | #include <Core/NamesAndTypes.h> |
5 | #include <Storages/MergeTree/RangesInDataPart.h> |
6 | #include <Storages/MergeTree/MergeTreeRangeReader.h> |
7 | |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | class MergeTreeData; |
13 | struct MergeTreeReadTask; |
14 | struct MergeTreeBlockSizePredictor; |
15 | |
16 | using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>; |
17 | using MergeTreeBlockSizePredictorPtr = std::unique_ptr<MergeTreeBlockSizePredictor>; |
18 | |
19 | |
20 | /** If some of the requested columns are not in the part, |
21 | * then find out which columns may need to be read further, |
22 | * so that you can calculate the DEFAULT expression for these columns. |
23 | * Adds them to the `columns`. |
24 | */ |
25 | NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns); |
26 | |
27 | |
28 | /// A batch of work for MergeTreeThreadSelectBlockInputStream |
29 | struct MergeTreeReadTask |
30 | { |
31 | /// data part which should be read while performing this task |
32 | MergeTreeData::DataPartPtr data_part; |
33 | /** Ranges to read from `data_part`. |
34 | * Specified in reverse order for MergeTreeThreadSelectBlockInputStream's convenience of calling .pop_back(). */ |
35 | MarkRanges mark_ranges; |
36 | /// for virtual `part_index` virtual column |
37 | size_t part_index_in_query; |
38 | /// ordered list of column names used in this query, allows returning blocks with consistent ordering |
39 | const Names & ordered_names; |
40 | /// used to determine whether column should be filtered during PREWHERE or WHERE |
41 | const NameSet & column_name_set; |
42 | /// column names to read during WHERE |
43 | const NamesAndTypesList & columns; |
44 | /// column names to read during PREWHERE |
45 | const NamesAndTypesList & pre_columns; |
46 | /// should PREWHERE column be returned to requesting side? |
47 | const bool remove_prewhere_column; |
48 | /// resulting block may require reordering in accordance with `ordered_names` |
49 | const bool should_reorder; |
50 | /// Used to satistfy preferred_block_size_bytes limitation |
51 | MergeTreeBlockSizePredictorPtr size_predictor; |
52 | /// Used to save current range processing status |
53 | MergeTreeRangeReader range_reader; |
54 | MergeTreeRangeReader pre_range_reader; |
55 | |
56 | bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); } |
57 | |
58 | MergeTreeReadTask( |
59 | const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_, |
60 | const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_, |
61 | const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_, |
62 | MergeTreeBlockSizePredictorPtr && size_predictor_); |
63 | |
64 | virtual ~MergeTreeReadTask(); |
65 | }; |
66 | |
67 | struct MergeTreeReadTaskColumns |
68 | { |
69 | /// column names to read during WHERE |
70 | NamesAndTypesList columns; |
71 | /// column names to read during PREWHERE |
72 | NamesAndTypesList pre_columns; |
73 | /// resulting block may require reordering in accordance with `ordered_names` |
74 | bool should_reorder; |
75 | }; |
76 | |
77 | MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part, |
78 | const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns); |
79 | |
80 | struct MergeTreeBlockSizePredictor |
81 | { |
82 | MergeTreeBlockSizePredictor(const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block); |
83 | |
84 | /// Reset some values for correct statistics calculating |
85 | void startBlock(); |
86 | |
87 | /// Updates statistic for more accurate prediction |
88 | void update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay = DECAY()); |
89 | |
90 | /// Return current block size (after update()) |
91 | inline size_t getBlockSize() const |
92 | { |
93 | return block_size_bytes; |
94 | } |
95 | |
96 | |
97 | /// Predicts what number of rows should be read to exhaust byte quota per column |
98 | inline size_t estimateNumRowsForMaxSizeColumn(size_t bytes_quota) const |
99 | { |
100 | double max_size_per_row = std::max<double>(std::max<size_t>(max_size_per_row_fixed, 1), max_size_per_row_dynamic); |
101 | return (bytes_quota > block_size_rows * max_size_per_row) |
102 | ? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows |
103 | : 0; |
104 | } |
105 | |
106 | /// Predicts what number of rows should be read to exhaust byte quota per block |
107 | inline size_t estimateNumRows(size_t bytes_quota) const |
108 | { |
109 | return (bytes_quota > block_size_bytes) |
110 | ? static_cast<size_t>((bytes_quota - block_size_bytes) / std::max<size_t>(1, bytes_per_row_current)) |
111 | : 0; |
112 | } |
113 | |
114 | inline void updateFilteredRowsRation(size_t rows_was_read, size_t rows_was_filtered, double decay = DECAY()) |
115 | { |
116 | double alpha = std::pow(1. - decay, rows_was_read); |
117 | double current_ration = rows_was_filtered / std::max(1.0, static_cast<double>(rows_was_read)); |
118 | filtered_rows_ratio = current_ration < filtered_rows_ratio |
119 | ? current_ration |
120 | : alpha * filtered_rows_ratio + (1.0 - alpha) * current_ration; |
121 | } |
122 | |
123 | /// Aggressiveness of bytes_per_row updates. See update() implementation. |
124 | /// After n=NUM_UPDATES_TO_TARGET_WEIGHT updates v_{n} = (1 - TARGET_WEIGHT) * v_{0} + TARGET_WEIGHT * v_{target} |
125 | static constexpr double TARGET_WEIGHT = 0.5; |
126 | static constexpr size_t NUM_UPDATES_TO_TARGET_WEIGHT = 8192; |
127 | static double DECAY() { return 1. - std::pow(TARGET_WEIGHT, 1. / NUM_UPDATES_TO_TARGET_WEIGHT); } |
128 | |
129 | protected: |
130 | |
131 | MergeTreeData::DataPartPtr data_part; |
132 | |
133 | struct ColumnInfo |
134 | { |
135 | String name; |
136 | double bytes_per_row_global = 0; |
137 | double bytes_per_row = 0; |
138 | size_t size_bytes = 0; |
139 | }; |
140 | |
141 | std::vector<ColumnInfo> dynamic_columns_infos; |
142 | size_t fixed_columns_bytes_per_row = 0; |
143 | |
144 | size_t max_size_per_row_fixed = 0; |
145 | double max_size_per_row_dynamic = 0; |
146 | |
147 | size_t number_of_rows_in_part; |
148 | |
149 | bool is_initialized_in_update = false; |
150 | |
151 | void initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update = false); |
152 | |
153 | public: |
154 | |
155 | size_t block_size_bytes = 0; |
156 | size_t block_size_rows = 0; |
157 | |
158 | /// Total statistics |
159 | double bytes_per_row_current = 0; |
160 | double bytes_per_row_global = 0; |
161 | |
162 | double filtered_rows_ratio = 0; |
163 | }; |
164 | |
165 | } |
166 | |