1#pragma once
2
3#include <optional>
4#include <Core/NamesAndTypes.h>
5#include <Storages/MergeTree/RangesInDataPart.h>
6#include <Storages/MergeTree/MergeTreeRangeReader.h>
7
8
9namespace DB
10{
11
12class MergeTreeData;
13struct MergeTreeReadTask;
14struct MergeTreeBlockSizePredictor;
15
16using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
17using MergeTreeBlockSizePredictorPtr = std::unique_ptr<MergeTreeBlockSizePredictor>;
18
19
20/** If some of the requested columns are not in the part,
21 * then find out which columns may need to be read further,
22 * so that you can calculate the DEFAULT expression for these columns.
23 * Adds them to the `columns`.
24 */
25NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns);
26
27
28/// A batch of work for MergeTreeThreadSelectBlockInputStream
29struct MergeTreeReadTask
30{
31 /// data part which should be read while performing this task
32 MergeTreeData::DataPartPtr data_part;
33 /** Ranges to read from `data_part`.
34 * Specified in reverse order for MergeTreeThreadSelectBlockInputStream's convenience of calling .pop_back(). */
35 MarkRanges mark_ranges;
36 /// for virtual `part_index` virtual column
37 size_t part_index_in_query;
38 /// ordered list of column names used in this query, allows returning blocks with consistent ordering
39 const Names & ordered_names;
40 /// used to determine whether column should be filtered during PREWHERE or WHERE
41 const NameSet & column_name_set;
42 /// column names to read during WHERE
43 const NamesAndTypesList & columns;
44 /// column names to read during PREWHERE
45 const NamesAndTypesList & pre_columns;
46 /// should PREWHERE column be returned to requesting side?
47 const bool remove_prewhere_column;
48 /// resulting block may require reordering in accordance with `ordered_names`
49 const bool should_reorder;
50 /// Used to satistfy preferred_block_size_bytes limitation
51 MergeTreeBlockSizePredictorPtr size_predictor;
52 /// Used to save current range processing status
53 MergeTreeRangeReader range_reader;
54 MergeTreeRangeReader pre_range_reader;
55
56 bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
57
58 MergeTreeReadTask(
59 const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_,
60 const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
61 const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_,
62 MergeTreeBlockSizePredictorPtr && size_predictor_);
63
64 virtual ~MergeTreeReadTask();
65};
66
67struct MergeTreeReadTaskColumns
68{
69 /// column names to read during WHERE
70 NamesAndTypesList columns;
71 /// column names to read during PREWHERE
72 NamesAndTypesList pre_columns;
73 /// resulting block may require reordering in accordance with `ordered_names`
74 bool should_reorder;
75};
76
77MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
78 const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns);
79
80struct MergeTreeBlockSizePredictor
81{
82 MergeTreeBlockSizePredictor(const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block);
83
84 /// Reset some values for correct statistics calculating
85 void startBlock();
86
87 /// Updates statistic for more accurate prediction
88 void update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay = DECAY());
89
90 /// Return current block size (after update())
91 inline size_t getBlockSize() const
92 {
93 return block_size_bytes;
94 }
95
96
97 /// Predicts what number of rows should be read to exhaust byte quota per column
98 inline size_t estimateNumRowsForMaxSizeColumn(size_t bytes_quota) const
99 {
100 double max_size_per_row = std::max<double>(std::max<size_t>(max_size_per_row_fixed, 1), max_size_per_row_dynamic);
101 return (bytes_quota > block_size_rows * max_size_per_row)
102 ? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows
103 : 0;
104 }
105
106 /// Predicts what number of rows should be read to exhaust byte quota per block
107 inline size_t estimateNumRows(size_t bytes_quota) const
108 {
109 return (bytes_quota > block_size_bytes)
110 ? static_cast<size_t>((bytes_quota - block_size_bytes) / std::max<size_t>(1, bytes_per_row_current))
111 : 0;
112 }
113
114 inline void updateFilteredRowsRation(size_t rows_was_read, size_t rows_was_filtered, double decay = DECAY())
115 {
116 double alpha = std::pow(1. - decay, rows_was_read);
117 double current_ration = rows_was_filtered / std::max(1.0, static_cast<double>(rows_was_read));
118 filtered_rows_ratio = current_ration < filtered_rows_ratio
119 ? current_ration
120 : alpha * filtered_rows_ratio + (1.0 - alpha) * current_ration;
121 }
122
123 /// Aggressiveness of bytes_per_row updates. See update() implementation.
124 /// After n=NUM_UPDATES_TO_TARGET_WEIGHT updates v_{n} = (1 - TARGET_WEIGHT) * v_{0} + TARGET_WEIGHT * v_{target}
125 static constexpr double TARGET_WEIGHT = 0.5;
126 static constexpr size_t NUM_UPDATES_TO_TARGET_WEIGHT = 8192;
127 static double DECAY() { return 1. - std::pow(TARGET_WEIGHT, 1. / NUM_UPDATES_TO_TARGET_WEIGHT); }
128
129protected:
130
131 MergeTreeData::DataPartPtr data_part;
132
133 struct ColumnInfo
134 {
135 String name;
136 double bytes_per_row_global = 0;
137 double bytes_per_row = 0;
138 size_t size_bytes = 0;
139 };
140
141 std::vector<ColumnInfo> dynamic_columns_infos;
142 size_t fixed_columns_bytes_per_row = 0;
143
144 size_t max_size_per_row_fixed = 0;
145 double max_size_per_row_dynamic = 0;
146
147 size_t number_of_rows_in_part;
148
149 bool is_initialized_in_update = false;
150
151 void initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update = false);
152
153public:
154
155 size_t block_size_bytes = 0;
156 size_t block_size_rows = 0;
157
158 /// Total statistics
159 double bytes_per_row_current = 0;
160 double bytes_per_row_global = 0;
161
162 double filtered_rows_ratio = 0;
163};
164
165}
166