1#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
2#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
3#include <Storages/MergeTree/MergeTreeReader.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int MEMORY_LIMIT_EXCEEDED;
12}
13
14static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part)
15{
16 /// Types may be different during ALTER (when this stream is used to perform an ALTER).
17 /// NOTE: We may use similar code to implement non blocking ALTERs.
18 for (const auto & name_type : data_part->columns)
19 {
20 if (header.has(name_type.name))
21 {
22 auto & elem = header.getByName(name_type.name);
23 if (!elem.type->equals(*name_type.type))
24 {
25 elem.type = name_type.type;
26 elem.column = elem.type->createColumn();
27 }
28 }
29 }
30
31 return std::move(header);
32}
33
34MergeTreeSelectProcessor::MergeTreeSelectProcessor(
35 const MergeTreeData & storage_,
36 const MergeTreeData::DataPartPtr & owned_data_part_,
37 UInt64 max_block_size_rows_,
38 size_t preferred_block_size_bytes_,
39 size_t preferred_max_column_in_block_size_bytes_,
40 Names required_columns_,
41 MarkRanges mark_ranges_,
42 bool use_uncompressed_cache_,
43 const PrewhereInfoPtr & prewhere_info_,
44 bool check_columns_,
45 size_t min_bytes_to_use_direct_io_,
46 size_t max_read_buffer_size_,
47 bool save_marks_in_cache_,
48 const Names & virt_column_names_,
49 size_t part_index_in_query_,
50 bool quiet)
51 :
52 MergeTreeBaseSelectProcessor{
53 replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_),
54 storage_, prewhere_info_, max_block_size_rows_,
55 preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
56 max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
57 required_columns{std::move(required_columns_)},
58 data_part{owned_data_part_},
59 part_columns_lock(data_part->columns_lock),
60 all_mark_ranges(std::move(mark_ranges_)),
61 part_index_in_query(part_index_in_query_),
62 check_columns(check_columns_),
63 path(data_part->getFullPath())
64{
65 /// Let's estimate total number of rows for progress bar.
66 for (const auto & range : all_mark_ranges)
67 total_marks_count += range.end - range.begin;
68
69 size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
70
71 if (!quiet)
72 LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << data_part->name
73 << ", approx. " << total_rows
74 << (all_mark_ranges.size() > 1
75 ? ", up to " + toString(total_rows)
76 : "")
77 << " rows starting from " << data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
78
79 addTotalRowsApprox(total_rows);
80 ordered_names = header_without_virtual_columns.getNames();
81}
82
83
84bool MergeTreeSelectProcessor::getNewTask()
85try
86{
87 /// Produce no more than one task
88 if (!is_first_task || total_marks_count == 0)
89 {
90 finish();
91 return false;
92 }
93 is_first_task = false;
94
95 task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
96
97 /** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor,
98 * and remove this reverse. */
99 MarkRanges remaining_mark_ranges = all_mark_ranges;
100 std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
101
102 auto size_predictor = (preferred_block_size_bytes == 0)
103 ? nullptr
104 : std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
105
106 /// will be used to distinguish between PREWHERE and WHERE columns when applying filter
107 const auto & column_names = task_columns.columns.getNames();
108 column_name_set = NameSet{column_names.begin(), column_names.end()};
109
110 task = std::make_unique<MergeTreeReadTask>(
111 data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
112 task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
113 task_columns.should_reorder, std::move(size_predictor));
114
115 if (!reader)
116 {
117 if (use_uncompressed_cache)
118 owned_uncompressed_cache = storage.global_context.getUncompressedCache();
119
120 owned_mark_cache = storage.global_context.getMarkCache();
121
122 reader = std::make_unique<MergeTreeReader>(
123 path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
124 owned_mark_cache.get(), save_marks_in_cache, storage,
125 all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
126
127 if (prewhere_info)
128 pre_reader = std::make_unique<MergeTreeReader>(
129 path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
130 owned_mark_cache.get(), save_marks_in_cache, storage,
131 all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
132 }
133
134 return true;
135}
136catch (...)
137{
138 /// Suspicion of the broken part. A part is added to the queue for verification.
139 if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
140 storage.reportBrokenPart(data_part->name);
141 throw;
142}
143
144
145void MergeTreeSelectProcessor::finish()
146{
147 /** Close the files (before destroying the object).
148 * When many sources are created, but simultaneously reading only a few of them,
149 * buffers don't waste memory.
150 */
151 reader.reset();
152 pre_reader.reset();
153 part_columns_lock.unlock();
154 data_part.reset();
155}
156
157
158MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default;
159
160
161}
162