1#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
2#include <Storages/MergeTree/MergeTreeData.h>
3#include <Common/typeid_cast.h>
4#include <Columns/ColumnConst.h>
5#include <unordered_set>
6
7
8namespace DB
9{
10
11NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns)
12{
13 NameSet required_columns{std::begin(columns), std::end(columns)};
14 NameSet injected_columns;
15
16 auto all_column_files_missing = true;
17
18 for (size_t i = 0; i < columns.size(); ++i)
19 {
20 const auto & column_name = columns[i];
21
22 /// column has files and hence does not require evaluation
23 if (part->hasColumnFiles(column_name, *storage.getColumn(column_name).type))
24 {
25 all_column_files_missing = false;
26 continue;
27 }
28
29 const auto column_default = storage.getColumns().getDefault(column_name);
30 if (!column_default)
31 continue;
32
33 /// collect identifiers required for evaluation
34 IdentifierNameSet identifiers;
35 column_default->expression->collectIdentifierNames(identifiers);
36
37 for (const auto & identifier : identifiers)
38 {
39 if (storage.hasColumn(identifier))
40 {
41 /// ensure each column is added only once
42 if (required_columns.count(identifier) == 0)
43 {
44 columns.emplace_back(identifier);
45 required_columns.emplace(identifier);
46 injected_columns.emplace(identifier);
47 }
48 }
49 }
50 }
51
52 /** Add a column of the minimum size.
53 * Used in case when no column is needed or files are missing, but at least you need to know number of rows.
54 * Adds to the columns.
55 */
56 if (all_column_files_missing)
57 {
58 const auto minimum_size_column_name = part->getColumnNameWithMinumumCompressedSize();
59 columns.push_back(minimum_size_column_name);
60 /// correctly report added column
61 injected_columns.insert(columns.back());
62 }
63
64 return injected_columns;
65}
66
67
68MergeTreeReadTask::MergeTreeReadTask(
69 const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, const size_t part_index_in_query_,
70 const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
71 const NamesAndTypesList & pre_columns_, const bool remove_prewhere_column_, const bool should_reorder_,
72 MergeTreeBlockSizePredictorPtr && size_predictor_)
73 : data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
74 ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_},
75 remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
76{}
77
78MergeTreeReadTask::~MergeTreeReadTask() = default;
79
80
81MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor(
82 const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block)
83 : data_part(data_part_)
84{
85 number_of_rows_in_part = data_part->rows_count;
86 /// Initialize with sample block until update won't called.
87 initialize(sample_block, {}, columns);
88}
89
90void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update)
91{
92 fixed_columns_bytes_per_row = 0;
93 dynamic_columns_infos.clear();
94
95 std::unordered_set<String> names_set;
96 if (!from_update)
97 names_set.insert(names.begin(), names.end());
98
99 size_t num_columns = sample_block.columns();
100 for (size_t pos = 0; pos < num_columns; ++pos)
101 {
102 const auto & column_with_type_and_name = sample_block.getByPosition(pos);
103 const String & column_name = column_with_type_and_name.name;
104 const ColumnPtr & column_data = from_update ? columns[pos]
105 : column_with_type_and_name.column;
106
107 if (!from_update && !names_set.count(column_name))
108 continue;
109
110 /// At least PREWHERE filter column might be const.
111 if (typeid_cast<const ColumnConst *>(column_data.get()))
112 continue;
113
114 if (column_data->valuesHaveFixedSize())
115 {
116 size_t size_of_value = column_data->sizeOfValueIfFixed();
117 fixed_columns_bytes_per_row += column_data->sizeOfValueIfFixed();
118 max_size_per_row_fixed = std::max<size_t>(max_size_per_row_fixed, size_of_value);
119 }
120 else
121 {
122 ColumnInfo info;
123 info.name = column_name;
124 /// If column isn't fixed and doesn't have checksum, than take first
125 ColumnSize column_size = data_part->getColumnSize(
126 column_name, *column_with_type_and_name.type);
127
128 info.bytes_per_row_global = column_size.data_uncompressed
129 ? column_size.data_uncompressed / number_of_rows_in_part
130 : column_data->byteSize() / std::max<size_t>(1, column_data->size());
131
132 dynamic_columns_infos.emplace_back(info);
133 }
134 }
135
136 bytes_per_row_global = fixed_columns_bytes_per_row;
137 for (auto & info : dynamic_columns_infos)
138 {
139 info.bytes_per_row = info.bytes_per_row_global;
140 bytes_per_row_global += info.bytes_per_row_global;
141
142 max_size_per_row_dynamic = std::max<double>(max_size_per_row_dynamic, info.bytes_per_row);
143 }
144 bytes_per_row_current = bytes_per_row_global;
145}
146
147void MergeTreeBlockSizePredictor::startBlock()
148{
149 block_size_bytes = 0;
150 block_size_rows = 0;
151 for (auto & info : dynamic_columns_infos)
152 info.size_bytes = 0;
153}
154
155
156/// TODO: add last_read_row_in_part parameter to take into account gaps between adjacent ranges
157void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay)
158{
159 if (columns.size() != sample_block.columns())
160 throw Exception("Inconsistent number of columns passed to MergeTreeBlockSizePredictor. "
161 "Have " + toString(sample_block.columns()) + " in sample block "
162 "and " + toString(columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR);
163
164 if (!is_initialized_in_update)
165 {
166 /// Reinitialize with read block to update estimation for DEFAULT and MATERIALIZED columns without data.
167 initialize(sample_block, columns, {}, true);
168 is_initialized_in_update = true;
169 }
170
171 if (num_rows < block_size_rows)
172 {
173 throw Exception("Updated block has less rows (" + toString(num_rows) + ") than previous one (" + toString(block_size_rows) + ")",
174 ErrorCodes::LOGICAL_ERROR);
175 }
176
177 size_t diff_rows = num_rows - block_size_rows;
178 block_size_bytes = num_rows * fixed_columns_bytes_per_row;
179 bytes_per_row_current = fixed_columns_bytes_per_row;
180 block_size_rows = num_rows;
181
182 /// Make recursive updates for each read row: v_{i+1} = (1 - decay) v_{i} + decay v_{target}
183 /// Use sum of geometric sequence formula to update multiple rows: v{n} = (1 - decay)^n v_{0} + (1 - (1 - decay)^n) v_{target}
184 /// NOTE: DEFAULT and MATERIALIZED columns without data has inaccurate estimation of v_{target}
185 double alpha = std::pow(1. - decay, diff_rows);
186
187 max_size_per_row_dynamic = 0;
188 for (auto & info : dynamic_columns_infos)
189 {
190 size_t new_size = columns[sample_block.getPositionByName(info.name)]->byteSize();
191 size_t diff_size = new_size - info.size_bytes;
192
193 double local_bytes_per_row = static_cast<double>(diff_size) / diff_rows;
194 info.bytes_per_row = alpha * info.bytes_per_row + (1. - alpha) * local_bytes_per_row;
195
196 info.size_bytes = new_size;
197 block_size_bytes += new_size;
198 bytes_per_row_current += info.bytes_per_row;
199
200 max_size_per_row_dynamic = std::max<double>(max_size_per_row_dynamic, info.bytes_per_row);
201 }
202}
203
204
205MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & data_part,
206 const Names & required_columns, const PrewhereInfoPtr & prewhere_info, bool check_columns)
207{
208 Names column_names = required_columns;
209 Names pre_column_names;
210
211 /// inject columns required for defaults evaluation
212 bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
213
214 if (prewhere_info)
215 {
216 if (prewhere_info->alias_actions)
217 pre_column_names = prewhere_info->alias_actions->getRequiredColumns();
218 else
219 pre_column_names = prewhere_info->prewhere_actions->getRequiredColumns();
220
221 if (pre_column_names.empty())
222 pre_column_names.push_back(column_names[0]);
223
224 const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
225 if (!injected_pre_columns.empty())
226 should_reorder = true;
227
228 const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
229
230 Names post_column_names;
231 for (const auto & name : column_names)
232 if (!pre_name_set.count(name))
233 post_column_names.push_back(name);
234
235 column_names = post_column_names;
236 }
237
238 MergeTreeReadTaskColumns result;
239
240 if (check_columns)
241 {
242 /// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
243 /// This may be not true in case of ALTER MODIFY.
244 if (!pre_column_names.empty())
245 storage.check(data_part->columns, pre_column_names);
246 if (!column_names.empty())
247 storage.check(data_part->columns, column_names);
248
249 const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
250 result.pre_columns = physical_columns.addTypes(pre_column_names);
251 result.columns = physical_columns.addTypes(column_names);
252 }
253 else
254 {
255 result.pre_columns = data_part->columns.addTypes(pre_column_names);
256 result.columns = data_part->columns.addTypes(column_names);
257 }
258
259 result.should_reorder = should_reorder;
260
261 return result;
262}
263
264}
265