| 1 | #include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h> |
| 2 | #include <Storages/MergeTree/MergeTreeRangeReader.h> |
| 3 | #include <Storages/MergeTree/MergeTreeReader.h> |
| 4 | #include <Storages/MergeTree/MergeTreeBlockReadUtils.h> |
| 5 | #include <Columns/FilterDescription.h> |
| 6 | #include <Common/typeid_cast.h> |
| 7 | #include <DataTypes/DataTypeNothing.h> |
| 8 | |
| 9 | |
| 10 | namespace DB |
| 11 | { |
| 12 | |
| 13 | namespace ErrorCodes |
| 14 | { |
| 15 | extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; |
| 16 | extern const int LOGICAL_ERROR; |
| 17 | } |
| 18 | |
| 19 | |
| 20 | MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( |
| 21 | Block , |
| 22 | const MergeTreeData & storage_, |
| 23 | const PrewhereInfoPtr & prewhere_info_, |
| 24 | UInt64 max_block_size_rows_, |
| 25 | UInt64 preferred_block_size_bytes_, |
| 26 | UInt64 preferred_max_column_in_block_size_bytes_, |
| 27 | UInt64 min_bytes_to_use_direct_io_, |
| 28 | UInt64 max_read_buffer_size_, |
| 29 | bool use_uncompressed_cache_, |
| 30 | bool save_marks_in_cache_, |
| 31 | const Names & virt_column_names_) |
| 32 | : |
| 33 | SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_)), |
| 34 | storage(storage_), |
| 35 | prewhere_info(prewhere_info_), |
| 36 | max_block_size_rows(max_block_size_rows_), |
| 37 | preferred_block_size_bytes(preferred_block_size_bytes_), |
| 38 | preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_), |
| 39 | min_bytes_to_use_direct_io(min_bytes_to_use_direct_io_), |
| 40 | max_read_buffer_size(max_read_buffer_size_), |
| 41 | use_uncompressed_cache(use_uncompressed_cache_), |
| 42 | save_marks_in_cache(save_marks_in_cache_), |
| 43 | virt_column_names(virt_column_names_) |
| 44 | { |
| 45 | header_without_virtual_columns = getPort().getHeader(); |
| 46 | |
| 47 | for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it) |
| 48 | if (header_without_virtual_columns.has(*it)) |
| 49 | header_without_virtual_columns.erase(*it); |
| 50 | } |
| 51 | |
| 52 | |
| 53 | Chunk MergeTreeBaseSelectProcessor::generate() |
| 54 | { |
| 55 | while (!isCancelled()) |
| 56 | { |
| 57 | if ((!task || task->isFinished()) && !getNewTask()) |
| 58 | return {}; |
| 59 | |
| 60 | auto res = readFromPart(); |
| 61 | |
| 62 | if (res.hasRows()) |
| 63 | { |
| 64 | injectVirtualColumns(res, task.get(), virt_column_names); |
| 65 | return res; |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | return {}; |
| 70 | } |
| 71 | |
| 72 | |
| 73 | void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task) |
| 74 | { |
| 75 | if (prewhere_info) |
| 76 | { |
| 77 | if (reader->getColumns().empty()) |
| 78 | { |
| 79 | current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, true); |
| 80 | } |
| 81 | else |
| 82 | { |
| 83 | MergeTreeRangeReader * pre_reader_ptr = nullptr; |
| 84 | if (pre_reader != nullptr) |
| 85 | { |
| 86 | current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_info, false); |
| 87 | pre_reader_ptr = ¤t_task.pre_range_reader; |
| 88 | } |
| 89 | |
| 90 | current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true); |
| 91 | } |
| 92 | } |
| 93 | else |
| 94 | { |
| 95 | current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true); |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | |
| 100 | Chunk MergeTreeBaseSelectProcessor::readFromPartImpl() |
| 101 | { |
| 102 | if (task->size_predictor) |
| 103 | task->size_predictor->startBlock(); |
| 104 | |
| 105 | const UInt64 current_max_block_size_rows = max_block_size_rows; |
| 106 | const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes; |
| 107 | const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes; |
| 108 | const MergeTreeIndexGranularity & index_granularity = task->data_part->index_granularity; |
| 109 | const double min_filtration_ratio = 0.00001; |
| 110 | |
| 111 | auto estimateNumRows = [current_preferred_block_size_bytes, current_max_block_size_rows, |
| 112 | &index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio]( |
| 113 | MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader) |
| 114 | { |
| 115 | if (!current_task.size_predictor) |
| 116 | return static_cast<size_t>(current_max_block_size_rows); |
| 117 | |
| 118 | /// Calculates number of rows will be read using preferred_block_size_bytes. |
| 119 | /// Can't be less than avg_index_granularity. |
| 120 | size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes); |
| 121 | if (!rows_to_read) |
| 122 | return rows_to_read; |
| 123 | auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule(); |
| 124 | rows_to_read = std::max(total_row_in_current_granule, rows_to_read); |
| 125 | |
| 126 | if (current_preferred_max_column_in_block_size_bytes) |
| 127 | { |
| 128 | /// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes. |
| 129 | auto rows_to_read_for_max_size_column |
| 130 | = current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes); |
| 131 | double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio); |
| 132 | auto rows_to_read_for_max_size_column_with_filtration |
| 133 | = static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio); |
| 134 | |
| 135 | /// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity. |
| 136 | rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration); |
| 137 | } |
| 138 | |
| 139 | auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule(); |
| 140 | if (unread_rows_in_current_granule >= rows_to_read) |
| 141 | return rows_to_read; |
| 142 | |
| 143 | return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule()); |
| 144 | }; |
| 145 | |
| 146 | UInt64 recommended_rows = estimateNumRows(*task, task->range_reader); |
| 147 | UInt64 rows_to_read = std::max(UInt64(1), std::min(current_max_block_size_rows, recommended_rows)); |
| 148 | |
| 149 | auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges); |
| 150 | |
| 151 | /// All rows were filtered. Repeat. |
| 152 | if (read_result.num_rows == 0) |
| 153 | read_result.columns.clear(); |
| 154 | |
| 155 | auto & sample_block = task->range_reader.getSampleBlock(); |
| 156 | if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size()) |
| 157 | throw Exception("Inconsistent number of columns got from MergeTreeRangeReader. " |
| 158 | "Have " + toString(sample_block.columns()) + " in sample block " |
| 159 | "and " + toString(read_result.columns.size()) + " columns in list" , ErrorCodes::LOGICAL_ERROR); |
| 160 | |
| 161 | /// TODO: check columns have the same types as in header. |
| 162 | |
| 163 | UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows; |
| 164 | |
| 165 | progress({ read_result.numReadRows(), read_result.numBytesRead() }); |
| 166 | |
| 167 | if (task->size_predictor) |
| 168 | { |
| 169 | task->size_predictor->updateFilteredRowsRation(read_result.numReadRows(), num_filtered_rows); |
| 170 | |
| 171 | if (!read_result.columns.empty()) |
| 172 | task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows); |
| 173 | } |
| 174 | |
| 175 | if (read_result.num_rows == 0) |
| 176 | return {}; |
| 177 | |
| 178 | Columns ordered_columns; |
| 179 | ordered_columns.reserve(header_without_virtual_columns.columns()); |
| 180 | |
| 181 | /// Reorder columns. TODO: maybe skip for default case. |
| 182 | for (size_t ps = 0; ps < header_without_virtual_columns.columns(); ++ps) |
| 183 | { |
| 184 | auto pos_in_sample_block = sample_block.getPositionByName(header_without_virtual_columns.getByPosition(ps).name); |
| 185 | ordered_columns.emplace_back(std::move(read_result.columns[pos_in_sample_block])); |
| 186 | } |
| 187 | |
| 188 | return Chunk(std::move(ordered_columns), read_result.num_rows); |
| 189 | } |
| 190 | |
| 191 | |
| 192 | Chunk MergeTreeBaseSelectProcessor::readFromPart() |
| 193 | { |
| 194 | if (!task->range_reader.isInitialized()) |
| 195 | initializeRangeReaders(*task); |
| 196 | |
| 197 | return readFromPartImpl(); |
| 198 | } |
| 199 | |
| 200 | |
| 201 | namespace |
| 202 | { |
| 203 | /// Simple interfaces to insert virtual columns. |
| 204 | struct VirtualColumnsInserter |
| 205 | { |
| 206 | virtual ~VirtualColumnsInserter() = default; |
| 207 | |
| 208 | virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0; |
| 209 | virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0; |
| 210 | }; |
| 211 | } |
| 212 | |
| 213 | static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inserter, |
| 214 | MergeTreeReadTask * task, const Names & virtual_columns) |
| 215 | { |
| 216 | /// add virtual columns |
| 217 | /// Except _sample_factor, which is added from the outside. |
| 218 | if (!virtual_columns.empty()) |
| 219 | { |
| 220 | if (unlikely(rows && !task)) |
| 221 | throw Exception("Cannot insert virtual columns to non-empty chunk without specified task." , |
| 222 | ErrorCodes::LOGICAL_ERROR); |
| 223 | |
| 224 | for (const auto & virtual_column_name : virtual_columns) |
| 225 | { |
| 226 | if (virtual_column_name == "_part" ) |
| 227 | { |
| 228 | ColumnPtr column; |
| 229 | if (rows) |
| 230 | column = DataTypeString().createColumnConst(rows, task->data_part->name)->convertToFullColumnIfConst(); |
| 231 | else |
| 232 | column = DataTypeString().createColumn(); |
| 233 | |
| 234 | inserter.insertStringColumn(column, virtual_column_name); |
| 235 | } |
| 236 | else if (virtual_column_name == "_part_index" ) |
| 237 | { |
| 238 | ColumnPtr column; |
| 239 | if (rows) |
| 240 | column = DataTypeUInt64().createColumnConst(rows, task->part_index_in_query)->convertToFullColumnIfConst(); |
| 241 | else |
| 242 | column = DataTypeUInt64().createColumn(); |
| 243 | |
| 244 | inserter.insertUInt64Column(column, virtual_column_name); |
| 245 | } |
| 246 | else if (virtual_column_name == "_partition_id" ) |
| 247 | { |
| 248 | ColumnPtr column; |
| 249 | if (rows) |
| 250 | column = DataTypeString().createColumnConst(rows, task->data_part->info.partition_id)->convertToFullColumnIfConst(); |
| 251 | else |
| 252 | column = DataTypeString().createColumn(); |
| 253 | |
| 254 | inserter.insertStringColumn(column, virtual_column_name); |
| 255 | } |
| 256 | } |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | namespace |
| 261 | { |
| 262 | struct VirtualColumnsInserterIntoBlock : public VirtualColumnsInserter |
| 263 | { |
| 264 | explicit VirtualColumnsInserterIntoBlock(Block & block_) : block(block_) {} |
| 265 | |
| 266 | void insertStringColumn(const ColumnPtr & column, const String & name) final |
| 267 | { |
| 268 | block.insert({column, std::make_shared<DataTypeString>(), name}); |
| 269 | } |
| 270 | |
| 271 | void insertUInt64Column(const ColumnPtr & column, const String & name) final |
| 272 | { |
| 273 | block.insert({column, std::make_shared<DataTypeUInt64>(), name}); |
| 274 | } |
| 275 | |
| 276 | Block & block; |
| 277 | }; |
| 278 | |
| 279 | struct VirtualColumnsInserterIntoColumns : public VirtualColumnsInserter |
| 280 | { |
| 281 | explicit VirtualColumnsInserterIntoColumns(Columns & columns_) : columns(columns_) {} |
| 282 | |
| 283 | void insertStringColumn(const ColumnPtr & column, const String &) final |
| 284 | { |
| 285 | columns.push_back(column); |
| 286 | } |
| 287 | |
| 288 | void insertUInt64Column(const ColumnPtr & column, const String &) final |
| 289 | { |
| 290 | columns.push_back(column); |
| 291 | } |
| 292 | |
| 293 | Columns & columns; |
| 294 | }; |
| 295 | } |
| 296 | |
| 297 | void MergeTreeBaseSelectProcessor::injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns) |
| 298 | { |
| 299 | VirtualColumnsInserterIntoBlock inserter { block }; |
| 300 | injectVirtualColumnsImpl(block.rows(), inserter, task, virtual_columns); |
| 301 | } |
| 302 | |
| 303 | void MergeTreeBaseSelectProcessor::injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns) |
| 304 | { |
| 305 | UInt64 num_rows = chunk.getNumRows(); |
| 306 | auto columns = chunk.detachColumns(); |
| 307 | |
| 308 | VirtualColumnsInserterIntoColumns inserter { columns }; |
| 309 | injectVirtualColumnsImpl(num_rows, inserter, task, virtual_columns); |
| 310 | |
| 311 | chunk.setColumns(columns, num_rows); |
| 312 | } |
| 313 | |
| 314 | void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info) |
| 315 | { |
| 316 | if (prewhere_info) |
| 317 | { |
| 318 | if (prewhere_info->alias_actions) |
| 319 | prewhere_info->alias_actions->execute(block); |
| 320 | |
| 321 | prewhere_info->prewhere_actions->execute(block); |
| 322 | if (prewhere_info->remove_prewhere_column) |
| 323 | block.erase(prewhere_info->prewhere_column_name); |
| 324 | else |
| 325 | { |
| 326 | auto & ctn = block.getByName(prewhere_info->prewhere_column_name); |
| 327 | ctn.type = std::make_shared<DataTypeUInt8>(); |
| 328 | ctn.column = ctn.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst(); |
| 329 | } |
| 330 | |
| 331 | if (!block) |
| 332 | block.insert({nullptr, std::make_shared<DataTypeNothing>(), "_nothing" }); |
| 333 | } |
| 334 | } |
| 335 | |
| 336 | Block MergeTreeBaseSelectProcessor::( |
| 337 | Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns) |
| 338 | { |
| 339 | executePrewhereActions(block, prewhere_info); |
| 340 | injectVirtualColumns(block, nullptr, virtual_columns); |
| 341 | return block; |
| 342 | } |
| 343 | |
| 344 | |
| 345 | MergeTreeBaseSelectProcessor::~MergeTreeBaseSelectProcessor() = default; |
| 346 | |
| 347 | } |
| 348 | |