1#include <Storages/MergeTree/MergeTreeReader.h>
2#include <Storages/MergeTree/MergeTreeReadPool.h>
3#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
4
5
6namespace DB
7{
8
9
10MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcessor(
11 const size_t thread_,
12 const MergeTreeReadPoolPtr & pool_,
13 const size_t min_marks_to_read_,
14 const UInt64 max_block_size_rows_,
15 size_t preferred_block_size_bytes_,
16 size_t preferred_max_column_in_block_size_bytes_,
17 const MergeTreeData & storage_,
18 const bool use_uncompressed_cache_,
19 const PrewhereInfoPtr & prewhere_info_,
20 const Settings & settings,
21 const Names & virt_column_names_)
22 :
23 MergeTreeBaseSelectProcessor{pool_->getHeader(), storage_, prewhere_info_, max_block_size_rows_,
24 preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
25 settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size,
26 use_uncompressed_cache_, true, virt_column_names_},
27 thread{thread_},
28 pool{pool_}
29{
30 /// round min_marks_to_read up to nearest multiple of block_size expressed in marks
31 /// If granularity is adaptive it doesn't make sense
32 /// Maybe it will make sence to add settings `max_block_size_bytes`
33 if (max_block_size_rows && !storage.canUseAdaptiveGranularity())
34 {
35 size_t fixed_index_granularity = storage.getSettings()->index_granularity;
36 min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1)
37 / max_block_size_rows * max_block_size_rows / fixed_index_granularity;
38 }
39 else
40 min_marks_to_read = min_marks_to_read_;
41
42 ordered_names = getPort().getHeader().getNames();
43}
44
45/// Requests read task from MergeTreeReadPool and signals whether it got one
46bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
47{
48 task = pool->getTask(min_marks_to_read, thread, ordered_names);
49
50 if (!task)
51 {
52 /** Close the files (before destroying the object).
53 * When many sources are created, but simultaneously reading only a few of them,
54 * buffers don't waste memory.
55 */
56 reader.reset();
57 pre_reader.reset();
58 return false;
59 }
60
61 const std::string path = task->data_part->getFullPath();
62
63 /// Allows pool to reduce number of threads in case of too slow reads.
64 auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); };
65
66 if (!reader)
67 {
68 auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
69
70 if (use_uncompressed_cache)
71 owned_uncompressed_cache = storage.global_context.getUncompressedCache();
72 owned_mark_cache = storage.global_context.getMarkCache();
73
74 reader = std::make_unique<MergeTreeReader>(
75 path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
76 storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
77
78 if (prewhere_info)
79 pre_reader = std::make_unique<MergeTreeReader>(
80 path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
81 storage, rest_mark_ranges, min_bytes_to_use_direct_io,
82 max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
83 }
84 else
85 {
86 /// in other case we can reuse readers, anyway they will be "seeked" to required mark
87 if (path != last_readed_part_path)
88 {
89 auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
90 /// retain avg_value_size_hints
91 reader = std::make_unique<MergeTreeReader>(
92 path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
93 storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size,
94 reader->getAvgValueSizeHints(), profile_callback);
95
96 if (prewhere_info)
97 pre_reader = std::make_unique<MergeTreeReader>(
98 path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
99 storage, rest_mark_ranges, min_bytes_to_use_direct_io,
100 max_read_buffer_size, pre_reader->getAvgValueSizeHints(), profile_callback);
101 }
102 }
103 last_readed_part_path = path;
104
105 return true;
106}
107
108
109MergeTreeThreadSelectBlockInputProcessor::~MergeTreeThreadSelectBlockInputProcessor() = default;
110
111}
112