1 | #include <Storages/MergeTree/MergeTreeReader.h> |
2 | #include <Storages/MergeTree/MergeTreeReadPool.h> |
3 | #include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h> |
4 | |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | |
10 | MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcessor( |
11 | const size_t thread_, |
12 | const MergeTreeReadPoolPtr & pool_, |
13 | const size_t min_marks_to_read_, |
14 | const UInt64 max_block_size_rows_, |
15 | size_t preferred_block_size_bytes_, |
16 | size_t preferred_max_column_in_block_size_bytes_, |
17 | const MergeTreeData & storage_, |
18 | const bool use_uncompressed_cache_, |
19 | const PrewhereInfoPtr & prewhere_info_, |
20 | const Settings & settings, |
21 | const Names & virt_column_names_) |
22 | : |
23 | MergeTreeBaseSelectProcessor{pool_->getHeader(), storage_, prewhere_info_, max_block_size_rows_, |
24 | preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, |
25 | settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, |
26 | use_uncompressed_cache_, true, virt_column_names_}, |
27 | thread{thread_}, |
28 | pool{pool_} |
29 | { |
30 | /// round min_marks_to_read up to nearest multiple of block_size expressed in marks |
31 | /// If granularity is adaptive it doesn't make sense |
32 | /// Maybe it will make sence to add settings `max_block_size_bytes` |
33 | if (max_block_size_rows && !storage.canUseAdaptiveGranularity()) |
34 | { |
35 | size_t fixed_index_granularity = storage.getSettings()->index_granularity; |
36 | min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1) |
37 | / max_block_size_rows * max_block_size_rows / fixed_index_granularity; |
38 | } |
39 | else |
40 | min_marks_to_read = min_marks_to_read_; |
41 | |
42 | ordered_names = getPort().getHeader().getNames(); |
43 | } |
44 | |
45 | /// Requests read task from MergeTreeReadPool and signals whether it got one |
46 | bool MergeTreeThreadSelectBlockInputProcessor::getNewTask() |
47 | { |
48 | task = pool->getTask(min_marks_to_read, thread, ordered_names); |
49 | |
50 | if (!task) |
51 | { |
52 | /** Close the files (before destroying the object). |
53 | * When many sources are created, but simultaneously reading only a few of them, |
54 | * buffers don't waste memory. |
55 | */ |
56 | reader.reset(); |
57 | pre_reader.reset(); |
58 | return false; |
59 | } |
60 | |
61 | const std::string path = task->data_part->getFullPath(); |
62 | |
63 | /// Allows pool to reduce number of threads in case of too slow reads. |
64 | auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); }; |
65 | |
66 | if (!reader) |
67 | { |
68 | auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]); |
69 | |
70 | if (use_uncompressed_cache) |
71 | owned_uncompressed_cache = storage.global_context.getUncompressedCache(); |
72 | owned_mark_cache = storage.global_context.getMarkCache(); |
73 | |
74 | reader = std::make_unique<MergeTreeReader>( |
75 | path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
76 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback); |
77 | |
78 | if (prewhere_info) |
79 | pre_reader = std::make_unique<MergeTreeReader>( |
80 | path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
81 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, |
82 | max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback); |
83 | } |
84 | else |
85 | { |
86 | /// in other case we can reuse readers, anyway they will be "seeked" to required mark |
87 | if (path != last_readed_part_path) |
88 | { |
89 | auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]); |
90 | /// retain avg_value_size_hints |
91 | reader = std::make_unique<MergeTreeReader>( |
92 | path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
93 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, |
94 | reader->getAvgValueSizeHints(), profile_callback); |
95 | |
96 | if (prewhere_info) |
97 | pre_reader = std::make_unique<MergeTreeReader>( |
98 | path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache, |
99 | storage, rest_mark_ranges, min_bytes_to_use_direct_io, |
100 | max_read_buffer_size, pre_reader->getAvgValueSizeHints(), profile_callback); |
101 | } |
102 | } |
103 | last_readed_part_path = path; |
104 | |
105 | return true; |
106 | } |
107 | |
108 | |
109 | MergeTreeThreadSelectBlockInputProcessor::~MergeTreeThreadSelectBlockInputProcessor() = default; |
110 | |
111 | } |
112 | |