| 1 | #include <Storages/MergeTree/MergeTreeReader.h> |
| 2 | #include <Storages/MergeTree/MergeTreeReadPool.h> |
| 3 | #include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h> |
| 4 | |
| 5 | |
| 6 | namespace DB |
| 7 | { |
| 8 | |
| 9 | |
| 10 | MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcessor( |
| 11 | const size_t thread_, |
| 12 | const MergeTreeReadPoolPtr & pool_, |
| 13 | const size_t min_marks_to_read_, |
| 14 | const UInt64 max_block_size_rows_, |
| 15 | size_t preferred_block_size_bytes_, |
| 16 | size_t preferred_max_column_in_block_size_bytes_, |
| 17 | const MergeTreeData & storage_, |
| 18 | const bool use_uncompressed_cache_, |
| 19 | const PrewhereInfoPtr & prewhere_info_, |
| 20 | const Settings & settings, |
| 21 | const Names & virt_column_names_) |
| 22 | : |
| 23 | MergeTreeBaseSelectProcessor{pool_->getHeader(), storage_, prewhere_info_, max_block_size_rows_, |
| 24 | preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, |
| 25 | settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, |
| 26 | use_uncompressed_cache_, true, virt_column_names_}, |
| 27 | thread{thread_}, |
| 28 | pool{pool_} |
| 29 | { |
| 30 | /// round min_marks_to_read up to nearest multiple of block_size expressed in marks |
| 31 | /// If granularity is adaptive it doesn't make sense |
| 32 | /// Maybe it will make sence to add settings `max_block_size_bytes` |
| 33 | if (max_block_size_rows && !storage.canUseAdaptiveGranularity()) |
| 34 | { |
| 35 | size_t fixed_index_granularity = storage.getSettings()->index_granularity; |
| 36 | min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1) |
| 37 | / max_block_size_rows * max_block_size_rows / fixed_index_granularity; |
| 38 | } |
| 39 | else |
| 40 | min_marks_to_read = min_marks_to_read_; |
| 41 | |
| 42 | ordered_names = getPort().getHeader().getNames(); |
| 43 | } |
| 44 | |
| 45 | /// Requests read task from MergeTreeReadPool and signals whether it got one |
| 46 | bool MergeTreeThreadSelectBlockInputProcessor::getNewTask() |
| 47 | { |
| 48 | task = pool->getTask(min_marks_to_read, thread, ordered_names); |
| 49 | |
| 50 | if (!task) |
| 51 | { |
| 52 | /** Close the files (before destroying the object). |
| 53 | * When many sources are created, but simultaneously reading only a few of them, |
| 54 | * buffers don't waste memory. |
| 55 | */ |
| 56 | reader.reset(); |
| 57 | pre_reader.reset(); |
| 58 | return false; |
| 59 | } |
| 60 | |
| 61 | const std::string path = task->data_part->getFullPath(); |
| 62 | |
| 63 | /// Allows pool to reduce number of threads in case of too slow reads. |
| 64 | auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); }; |
| 65 | |
| 66 | if (!reader) |
| 67 | { |
| 68 | auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]); |
| 69 | |
| 70 | if (use_uncompressed_cache) |
| 71 | owned_uncompressed_cache = storage.global_context.getUncompressedCache(); |
| 72 | owned_mark_cache = storage.global_context.getMarkCache(); |
| 73 | |
| 74 | reader = std::make_unique<MergeTreeReader>( |
| 75 | path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
| 76 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback); |
| 77 | |
| 78 | if (prewhere_info) |
| 79 | pre_reader = std::make_unique<MergeTreeReader>( |
| 80 | path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
| 81 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, |
| 82 | max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback); |
| 83 | } |
| 84 | else |
| 85 | { |
| 86 | /// in other case we can reuse readers, anyway they will be "seeked" to required mark |
| 87 | if (path != last_readed_part_path) |
| 88 | { |
| 89 | auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]); |
| 90 | /// retain avg_value_size_hints |
| 91 | reader = std::make_unique<MergeTreeReader>( |
| 92 | path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
| 93 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, |
| 94 | reader->getAvgValueSizeHints(), profile_callback); |
| 95 | |
| 96 | if (prewhere_info) |
| 97 | pre_reader = std::make_unique<MergeTreeReader>( |
| 98 | path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
| 99 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, |
| 100 | max_read_buffer_size, pre_reader->getAvgValueSizeHints(), profile_callback); |
| 101 | } |
| 102 | } |
| 103 | last_readed_part_path = path; |
| 104 | |
| 105 | return true; |
| 106 | } |
| 107 | |
| 108 | |
| 109 | MergeTreeThreadSelectBlockInputProcessor::~MergeTreeThreadSelectBlockInputProcessor() = default; |
| 110 | |
| 111 | } |
| 112 | |