1 | #include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h> |
2 | #include <Storages/MergeTree/MergeTreeRangeReader.h> |
3 | #include <Storages/MergeTree/MergeTreeReader.h> |
4 | #include <Storages/MergeTree/MergeTreeBlockReadUtils.h> |
5 | #include <Columns/FilterDescription.h> |
6 | #include <Common/typeid_cast.h> |
7 | #include <DataTypes/DataTypeNothing.h> |
8 | |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | namespace ErrorCodes |
14 | { |
15 | extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; |
16 | extern const int LOGICAL_ERROR; |
17 | } |
18 | |
19 | |
20 | MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( |
21 | Block , |
22 | const MergeTreeData & storage_, |
23 | const PrewhereInfoPtr & prewhere_info_, |
24 | UInt64 max_block_size_rows_, |
25 | UInt64 preferred_block_size_bytes_, |
26 | UInt64 preferred_max_column_in_block_size_bytes_, |
27 | UInt64 min_bytes_to_use_direct_io_, |
28 | UInt64 max_read_buffer_size_, |
29 | bool use_uncompressed_cache_, |
30 | bool save_marks_in_cache_, |
31 | const Names & virt_column_names_) |
32 | : |
33 | SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_)), |
34 | storage(storage_), |
35 | prewhere_info(prewhere_info_), |
36 | max_block_size_rows(max_block_size_rows_), |
37 | preferred_block_size_bytes(preferred_block_size_bytes_), |
38 | preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_), |
39 | min_bytes_to_use_direct_io(min_bytes_to_use_direct_io_), |
40 | max_read_buffer_size(max_read_buffer_size_), |
41 | use_uncompressed_cache(use_uncompressed_cache_), |
42 | save_marks_in_cache(save_marks_in_cache_), |
43 | virt_column_names(virt_column_names_) |
44 | { |
45 | header_without_virtual_columns = getPort().getHeader(); |
46 | |
47 | for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it) |
48 | if (header_without_virtual_columns.has(*it)) |
49 | header_without_virtual_columns.erase(*it); |
50 | } |
51 | |
52 | |
53 | Chunk MergeTreeBaseSelectProcessor::generate() |
54 | { |
55 | while (!isCancelled()) |
56 | { |
57 | if ((!task || task->isFinished()) && !getNewTask()) |
58 | return {}; |
59 | |
60 | auto res = readFromPart(); |
61 | |
62 | if (res.hasRows()) |
63 | { |
64 | injectVirtualColumns(res, task.get(), virt_column_names); |
65 | return res; |
66 | } |
67 | } |
68 | |
69 | return {}; |
70 | } |
71 | |
72 | |
73 | void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task) |
74 | { |
75 | if (prewhere_info) |
76 | { |
77 | if (reader->getColumns().empty()) |
78 | { |
79 | current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true); |
80 | } |
81 | else |
82 | { |
83 | MergeTreeRangeReader * pre_reader_ptr = nullptr; |
84 | if (pre_reader != nullptr) |
85 | { |
86 | current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false); |
87 | pre_reader_ptr = ¤t_task.pre_range_reader; |
88 | } |
89 | |
90 | current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true); |
91 | } |
92 | } |
93 | else |
94 | { |
95 | current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true); |
96 | } |
97 | } |
98 | |
99 | |
100 | Chunk MergeTreeBaseSelectProcessor::readFromPartImpl() |
101 | { |
102 | if (task->size_predictor) |
103 | task->size_predictor->startBlock(); |
104 | |
105 | const UInt64 current_max_block_size_rows = max_block_size_rows; |
106 | const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes; |
107 | const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes; |
108 | const MergeTreeIndexGranularity & index_granularity = task->data_part->index_granularity; |
109 | const double min_filtration_ratio = 0.00001; |
110 | |
111 | auto estimateNumRows = [current_preferred_block_size_bytes, current_max_block_size_rows, |
112 | &index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio]( |
113 | MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader) |
114 | { |
115 | if (!current_task.size_predictor) |
116 | return static_cast<size_t>(current_max_block_size_rows); |
117 | |
118 | /// Calculates number of rows will be read using preferred_block_size_bytes. |
119 | /// Can't be less than avg_index_granularity. |
120 | size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes); |
121 | if (!rows_to_read) |
122 | return rows_to_read; |
123 | auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule(); |
124 | rows_to_read = std::max(total_row_in_current_granule, rows_to_read); |
125 | |
126 | if (current_preferred_max_column_in_block_size_bytes) |
127 | { |
128 | /// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes. |
129 | auto rows_to_read_for_max_size_column |
130 | = current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes); |
131 | double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio); |
132 | auto rows_to_read_for_max_size_column_with_filtration |
133 | = static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio); |
134 | |
135 | /// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity. |
136 | rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration); |
137 | } |
138 | |
139 | auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule(); |
140 | if (unread_rows_in_current_granule >= rows_to_read) |
141 | return rows_to_read; |
142 | |
143 | return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule()); |
144 | }; |
145 | |
146 | UInt64 recommended_rows = estimateNumRows(*task, task->range_reader); |
147 | UInt64 rows_to_read = std::max(UInt64(1), std::min(current_max_block_size_rows, recommended_rows)); |
148 | |
149 | auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges); |
150 | |
151 | /// All rows were filtered. Repeat. |
152 | if (read_result.num_rows == 0) |
153 | read_result.columns.clear(); |
154 | |
155 | auto & sample_block = task->range_reader.getSampleBlock(); |
156 | if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size()) |
157 | throw Exception("Inconsistent number of columns got from MergeTreeRangeReader. " |
158 | "Have " + toString(sample_block.columns()) + " in sample block " |
159 | "and " + toString(read_result.columns.size()) + " columns in list" , ErrorCodes::LOGICAL_ERROR); |
160 | |
161 | /// TODO: check columns have the same types as in header. |
162 | |
163 | UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows; |
164 | |
165 | progress({ read_result.numReadRows(), read_result.numBytesRead() }); |
166 | |
167 | if (task->size_predictor) |
168 | { |
169 | task->size_predictor->updateFilteredRowsRation(read_result.numReadRows(), num_filtered_rows); |
170 | |
171 | if (!read_result.columns.empty()) |
172 | task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows); |
173 | } |
174 | |
175 | if (read_result.num_rows == 0) |
176 | return {}; |
177 | |
178 | Columns ordered_columns; |
179 | ordered_columns.reserve(header_without_virtual_columns.columns()); |
180 | |
181 | /// Reorder columns. TODO: maybe skip for default case. |
182 | for (size_t ps = 0; ps < header_without_virtual_columns.columns(); ++ps) |
183 | { |
184 | auto pos_in_sample_block = sample_block.getPositionByName(header_without_virtual_columns.getByPosition(ps).name); |
185 | ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block])); |
186 | } |
187 | |
188 | return Chunk(std::move(ordered_columns), read_result.num_rows); |
189 | } |
190 | |
191 | |
192 | Chunk MergeTreeBaseSelectProcessor::readFromPart() |
193 | { |
194 | if (!task->range_reader.isInitialized()) |
195 | initializeRangeReaders(*task); |
196 | |
197 | return readFromPartImpl(); |
198 | } |
199 | |
200 | |
201 | namespace |
202 | { |
203 | /// Simple interfaces to insert virtual columns. |
204 | struct VirtualColumnsInserter |
205 | { |
206 | virtual ~VirtualColumnsInserter() = default; |
207 | |
208 | virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0; |
209 | virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0; |
210 | }; |
211 | } |
212 | |
213 | static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inserter, |
214 | MergeTreeReadTask * task, const Names & virtual_columns) |
215 | { |
216 | /// add virtual columns |
217 | /// Except _sample_factor, which is added from the outside. |
218 | if (!virtual_columns.empty()) |
219 | { |
220 | if (unlikely(rows && !task)) |
221 | throw Exception("Cannot insert virtual columns to non-empty chunk without specified task." , |
222 | ErrorCodes::LOGICAL_ERROR); |
223 | |
224 | for (const auto & virtual_column_name : virtual_columns) |
225 | { |
226 | if (virtual_column_name == "_part" ) |
227 | { |
228 | ColumnPtr column; |
229 | if (rows) |
230 | column = DataTypeString().createColumnConst(rows, task->data_part->name)->convertToFullColumnIfConst(); |
231 | else |
232 | column = DataTypeString().createColumn(); |
233 | |
234 | inserter.insertStringColumn(column, virtual_column_name); |
235 | } |
236 | else if (virtual_column_name == "_part_index" ) |
237 | { |
238 | ColumnPtr column; |
239 | if (rows) |
240 | column = DataTypeUInt64().createColumnConst(rows, task->part_index_in_query)->convertToFullColumnIfConst(); |
241 | else |
242 | column = DataTypeUInt64().createColumn(); |
243 | |
244 | inserter.insertUInt64Column(column, virtual_column_name); |
245 | } |
246 | else if (virtual_column_name == "_partition_id" ) |
247 | { |
248 | ColumnPtr column; |
249 | if (rows) |
250 | column = DataTypeString().createColumnConst(rows, task->data_part->info.partition_id)->convertToFullColumnIfConst(); |
251 | else |
252 | column = DataTypeString().createColumn(); |
253 | |
254 | inserter.insertStringColumn(column, virtual_column_name); |
255 | } |
256 | } |
257 | } |
258 | } |
259 | |
260 | namespace |
261 | { |
262 | struct VirtualColumnsInserterIntoBlock : public VirtualColumnsInserter |
263 | { |
264 | explicit VirtualColumnsInserterIntoBlock(Block & block_) : block(block_) {} |
265 | |
266 | void insertStringColumn(const ColumnPtr & column, const String & name) final |
267 | { |
268 | block.insert({column, std::make_shared<DataTypeString>(), name}); |
269 | } |
270 | |
271 | void insertUInt64Column(const ColumnPtr & column, const String & name) final |
272 | { |
273 | block.insert({column, std::make_shared<DataTypeUInt64>(), name}); |
274 | } |
275 | |
276 | Block & block; |
277 | }; |
278 | |
279 | struct VirtualColumnsInserterIntoColumns : public VirtualColumnsInserter |
280 | { |
281 | explicit VirtualColumnsInserterIntoColumns(Columns & columns_) : columns(columns_) {} |
282 | |
283 | void insertStringColumn(const ColumnPtr & column, const String &) final |
284 | { |
285 | columns.push_back(column); |
286 | } |
287 | |
288 | void insertUInt64Column(const ColumnPtr & column, const String &) final |
289 | { |
290 | columns.push_back(column); |
291 | } |
292 | |
293 | Columns & columns; |
294 | }; |
295 | } |
296 | |
297 | void MergeTreeBaseSelectProcessor::injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns) |
298 | { |
299 | VirtualColumnsInserterIntoBlock inserter { block }; |
300 | injectVirtualColumnsImpl(block.rows(), inserter, task, virtual_columns); |
301 | } |
302 | |
303 | void MergeTreeBaseSelectProcessor::injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns) |
304 | { |
305 | UInt64 num_rows = chunk.getNumRows(); |
306 | auto columns = chunk.detachColumns(); |
307 | |
308 | VirtualColumnsInserterIntoColumns inserter { columns }; |
309 | injectVirtualColumnsImpl(num_rows, inserter, task, virtual_columns); |
310 | |
311 | chunk.setColumns(columns, num_rows); |
312 | } |
313 | |
314 | void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info) |
315 | { |
316 | if (prewhere_info) |
317 | { |
318 | if (prewhere_info->alias_actions) |
319 | prewhere_info->alias_actions->execute(block); |
320 | |
321 | prewhere_info->prewhere_actions->execute(block); |
322 | if (prewhere_info->remove_prewhere_column) |
323 | block.erase(prewhere_info->prewhere_column_name); |
324 | else |
325 | { |
326 | auto & ctn = block.getByName(prewhere_info->prewhere_column_name); |
327 | ctn.type = std::make_shared<DataTypeUInt8>(); |
328 | ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst(); |
329 | } |
330 | |
331 | if (!block) |
332 | block.insert({nullptr, std::make_shared<DataTypeNothing>(), "_nothing" }); |
333 | } |
334 | } |
335 | |
336 | Block MergeTreeBaseSelectProcessor::( |
337 | Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns) |
338 | { |
339 | executePrewhereActions(block, prewhere_info); |
340 | injectVirtualColumns(block, nullptr, virtual_columns); |
341 | return block; |
342 | } |
343 | |
344 | |
345 | MergeTreeBaseSelectProcessor::~MergeTreeBaseSelectProcessor() = default; |
346 | |
347 | } |
348 | |