1#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
2#include <Storages/MergeTree/MergeTreeRangeReader.h>
3#include <Storages/MergeTree/MergeTreeReader.h>
4#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
5#include <Columns/FilterDescription.h>
6#include <Common/typeid_cast.h>
7#include <DataTypes/DataTypeNothing.h>
8
9
10namespace DB
11{
12
13namespace ErrorCodes
14{
15 extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
16 extern const int LOGICAL_ERROR;
17}
18
19
20MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
21 Block header,
22 const MergeTreeData & storage_,
23 const PrewhereInfoPtr & prewhere_info_,
24 UInt64 max_block_size_rows_,
25 UInt64 preferred_block_size_bytes_,
26 UInt64 preferred_max_column_in_block_size_bytes_,
27 UInt64 min_bytes_to_use_direct_io_,
28 UInt64 max_read_buffer_size_,
29 bool use_uncompressed_cache_,
30 bool save_marks_in_cache_,
31 const Names & virt_column_names_)
32:
33 SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_)),
34 storage(storage_),
35 prewhere_info(prewhere_info_),
36 max_block_size_rows(max_block_size_rows_),
37 preferred_block_size_bytes(preferred_block_size_bytes_),
38 preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_),
39 min_bytes_to_use_direct_io(min_bytes_to_use_direct_io_),
40 max_read_buffer_size(max_read_buffer_size_),
41 use_uncompressed_cache(use_uncompressed_cache_),
42 save_marks_in_cache(save_marks_in_cache_),
43 virt_column_names(virt_column_names_)
44{
45 header_without_virtual_columns = getPort().getHeader();
46
47 for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
48 if (header_without_virtual_columns.has(*it))
49 header_without_virtual_columns.erase(*it);
50}
51
52
53Chunk MergeTreeBaseSelectProcessor::generate()
54{
55 while (!isCancelled())
56 {
57 if ((!task || task->isFinished()) && !getNewTask())
58 return {};
59
60 auto res = readFromPart();
61
62 if (res.hasRows())
63 {
64 injectVirtualColumns(res, task.get(), virt_column_names);
65 return res;
66 }
67 }
68
69 return {};
70}
71
72
73void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
74{
75 if (prewhere_info)
76 {
77 if (reader->getColumns().empty())
78 {
79 current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true);
80 }
81 else
82 {
83 MergeTreeRangeReader * pre_reader_ptr = nullptr;
84 if (pre_reader != nullptr)
85 {
86 current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false);
87 pre_reader_ptr = &current_task.pre_range_reader;
88 }
89
90 current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true);
91 }
92 }
93 else
94 {
95 current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true);
96 }
97}
98
99
100Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
101{
102 if (task->size_predictor)
103 task->size_predictor->startBlock();
104
105 const UInt64 current_max_block_size_rows = max_block_size_rows;
106 const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes;
107 const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes;
108 const MergeTreeIndexGranularity & index_granularity = task->data_part->index_granularity;
109 const double min_filtration_ratio = 0.00001;
110
111 auto estimateNumRows = [current_preferred_block_size_bytes, current_max_block_size_rows,
112 &index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio](
113 MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader)
114 {
115 if (!current_task.size_predictor)
116 return static_cast<size_t>(current_max_block_size_rows);
117
118 /// Calculates number of rows will be read using preferred_block_size_bytes.
119 /// Can't be less than avg_index_granularity.
120 size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
121 if (!rows_to_read)
122 return rows_to_read;
123 auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule();
124 rows_to_read = std::max(total_row_in_current_granule, rows_to_read);
125
126 if (current_preferred_max_column_in_block_size_bytes)
127 {
128 /// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
129 auto rows_to_read_for_max_size_column
130 = current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes);
131 double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio);
132 auto rows_to_read_for_max_size_column_with_filtration
133 = static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
134
135 /// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity.
136 rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
137 }
138
139 auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule();
140 if (unread_rows_in_current_granule >= rows_to_read)
141 return rows_to_read;
142
143 return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule());
144 };
145
146 UInt64 recommended_rows = estimateNumRows(*task, task->range_reader);
147 UInt64 rows_to_read = std::max(UInt64(1), std::min(current_max_block_size_rows, recommended_rows));
148
149 auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
150
151 /// All rows were filtered. Repeat.
152 if (read_result.num_rows == 0)
153 read_result.columns.clear();
154
155 auto & sample_block = task->range_reader.getSampleBlock();
156 if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size())
157 throw Exception("Inconsistent number of columns got from MergeTreeRangeReader. "
158 "Have " + toString(sample_block.columns()) + " in sample block "
159 "and " + toString(read_result.columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR);
160
161 /// TODO: check columns have the same types as in header.
162
163 UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows;
164
165 progress({ read_result.numReadRows(), read_result.numBytesRead() });
166
167 if (task->size_predictor)
168 {
169 task->size_predictor->updateFilteredRowsRation(read_result.numReadRows(), num_filtered_rows);
170
171 if (!read_result.columns.empty())
172 task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows);
173 }
174
175 if (read_result.num_rows == 0)
176 return {};
177
178 Columns ordered_columns;
179 ordered_columns.reserve(header_without_virtual_columns.columns());
180
181 /// Reorder columns. TODO: maybe skip for default case.
182 for (size_t ps = 0; ps < header_without_virtual_columns.columns(); ++ps)
183 {
184 auto pos_in_sample_block = sample_block.getPositionByName(header_without_virtual_columns.getByPosition(ps).name);
185 ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block]));
186 }
187
188 return Chunk(std::move(ordered_columns), read_result.num_rows);
189}
190
191
192Chunk MergeTreeBaseSelectProcessor::readFromPart()
193{
194 if (!task->range_reader.isInitialized())
195 initializeRangeReaders(*task);
196
197 return readFromPartImpl();
198}
199
200
201namespace
202{
203 /// Simple interfaces to insert virtual columns.
204 struct VirtualColumnsInserter
205 {
206 virtual ~VirtualColumnsInserter() = default;
207
208 virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0;
209 virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0;
210 };
211}
212
213static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inserter,
214 MergeTreeReadTask * task, const Names & virtual_columns)
215{
216 /// add virtual columns
217 /// Except _sample_factor, which is added from the outside.
218 if (!virtual_columns.empty())
219 {
220 if (unlikely(rows && !task))
221 throw Exception("Cannot insert virtual columns to non-empty chunk without specified task.",
222 ErrorCodes::LOGICAL_ERROR);
223
224 for (const auto & virtual_column_name : virtual_columns)
225 {
226 if (virtual_column_name == "_part")
227 {
228 ColumnPtr column;
229 if (rows)
230 column = DataTypeString().createColumnConst(rows, task->data_part->name)->convertToFullColumnIfConst();
231 else
232 column = DataTypeString().createColumn();
233
234 inserter.insertStringColumn(column, virtual_column_name);
235 }
236 else if (virtual_column_name == "_part_index")
237 {
238 ColumnPtr column;
239 if (rows)
240 column = DataTypeUInt64().createColumnConst(rows, task->part_index_in_query)->convertToFullColumnIfConst();
241 else
242 column = DataTypeUInt64().createColumn();
243
244 inserter.insertUInt64Column(column, virtual_column_name);
245 }
246 else if (virtual_column_name == "_partition_id")
247 {
248 ColumnPtr column;
249 if (rows)
250 column = DataTypeString().createColumnConst(rows, task->data_part->info.partition_id)->convertToFullColumnIfConst();
251 else
252 column = DataTypeString().createColumn();
253
254 inserter.insertStringColumn(column, virtual_column_name);
255 }
256 }
257 }
258}
259
260namespace
261{
262 struct VirtualColumnsInserterIntoBlock : public VirtualColumnsInserter
263 {
264 explicit VirtualColumnsInserterIntoBlock(Block & block_) : block(block_) {}
265
266 void insertStringColumn(const ColumnPtr & column, const String & name) final
267 {
268 block.insert({column, std::make_shared<DataTypeString>(), name});
269 }
270
271 void insertUInt64Column(const ColumnPtr & column, const String & name) final
272 {
273 block.insert({column, std::make_shared<DataTypeUInt64>(), name});
274 }
275
276 Block & block;
277 };
278
279 struct VirtualColumnsInserterIntoColumns : public VirtualColumnsInserter
280 {
281 explicit VirtualColumnsInserterIntoColumns(Columns & columns_) : columns(columns_) {}
282
283 void insertStringColumn(const ColumnPtr & column, const String &) final
284 {
285 columns.push_back(column);
286 }
287
288 void insertUInt64Column(const ColumnPtr & column, const String &) final
289 {
290 columns.push_back(column);
291 }
292
293 Columns & columns;
294 };
295}
296
297void MergeTreeBaseSelectProcessor::injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns)
298{
299 VirtualColumnsInserterIntoBlock inserter { block };
300 injectVirtualColumnsImpl(block.rows(), inserter, task, virtual_columns);
301}
302
303void MergeTreeBaseSelectProcessor::injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns)
304{
305 UInt64 num_rows = chunk.getNumRows();
306 auto columns = chunk.detachColumns();
307
308 VirtualColumnsInserterIntoColumns inserter { columns };
309 injectVirtualColumnsImpl(num_rows, inserter, task, virtual_columns);
310
311 chunk.setColumns(columns, num_rows);
312}
313
314void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info)
315{
316 if (prewhere_info)
317 {
318 if (prewhere_info->alias_actions)
319 prewhere_info->alias_actions->execute(block);
320
321 prewhere_info->prewhere_actions->execute(block);
322 if (prewhere_info->remove_prewhere_column)
323 block.erase(prewhere_info->prewhere_column_name);
324 else
325 {
326 auto & ctn = block.getByName(prewhere_info->prewhere_column_name);
327 ctn.type = std::make_shared<DataTypeUInt8>();
328 ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
329 }
330
331 if (!block)
332 block.insert({nullptr, std::make_shared<DataTypeNothing>(), "_nothing"});
333 }
334}
335
336Block MergeTreeBaseSelectProcessor::getHeader(
337 Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns)
338{
339 executePrewhereActions(block, prewhere_info);
340 injectVirtualColumns(block, nullptr, virtual_columns);
341 return block;
342}
343
344
345MergeTreeBaseSelectProcessor::~MergeTreeBaseSelectProcessor() = default;
346
347}
348