1#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
2#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
3#include <Storages/MergeTree/MergeTreeReader.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int MEMORY_LIMIT_EXCEEDED;
12}
13
14static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part)
15{
16 /// Types may be different during ALTER (when this stream is used to perform an ALTER).
17 /// NOTE: We may use similar code to implement non blocking ALTERs.
18 for (const auto & name_type : data_part->columns)
19 {
20 if (header.has(name_type.name))
21 {
22 auto & elem = header.getByName(name_type.name);
23 if (!elem.type->equals(*name_type.type))
24 {
25 elem.type = name_type.type;
26 elem.column = elem.type->createColumn();
27 }
28 }
29 }
30
31 return std::move(header);
32}
33
34MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
35 const MergeTreeData & storage_,
36 const MergeTreeData::DataPartPtr & owned_data_part_,
37 UInt64 max_block_size_rows_,
38 size_t preferred_block_size_bytes_,
39 size_t preferred_max_column_in_block_size_bytes_,
40 Names required_columns_,
41 MarkRanges mark_ranges_,
42 bool use_uncompressed_cache_,
43 const PrewhereInfoPtr & prewhere_info_,
44 bool check_columns,
45 size_t min_bytes_to_use_direct_io_,
46 size_t max_read_buffer_size_,
47 bool save_marks_in_cache_,
48 const Names & virt_column_names_,
49 size_t part_index_in_query_,
50 bool quiet)
51 :
52 MergeTreeBaseSelectProcessor{
53 replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_),
54 storage_, prewhere_info_, max_block_size_rows_,
55 preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
56 max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
57 required_columns{std::move(required_columns_)},
58 data_part{owned_data_part_},
59 part_columns_lock(data_part->columns_lock),
60 all_mark_ranges(std::move(mark_ranges_)),
61 part_index_in_query(part_index_in_query_),
62 path(data_part->getFullPath())
63{
64 /// Let's estimate total number of rows for progress bar.
65 for (const auto & range : all_mark_ranges)
66 total_marks_count += range.end - range.begin;
67
68 size_t total_rows = data_part->index_granularity.getTotalRows();
69
70 if (!quiet)
71 LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges in reverse order from part " << data_part->name
72 << ", approx. " << total_rows
73 << (all_mark_ranges.size() > 1
74 ? ", up to " + toString(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
75 : "")
76 << " rows starting from " << data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
77
78 addTotalRowsApprox(total_rows);
79
80 ordered_names = header_without_virtual_columns.getNames();
81
82 task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
83
84 /// will be used to distinguish between PREWHERE and WHERE columns when applying filter
85 const auto & column_names = task_columns.columns.getNames();
86 column_name_set = NameSet{column_names.begin(), column_names.end()};
87
88 if (use_uncompressed_cache)
89 owned_uncompressed_cache = storage.global_context.getUncompressedCache();
90
91 owned_mark_cache = storage.global_context.getMarkCache();
92
93 reader = std::make_unique<MergeTreeReader>(
94 path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
95 owned_mark_cache.get(), save_marks_in_cache, storage,
96 all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
97
98 if (prewhere_info)
99 pre_reader = std::make_unique<MergeTreeReader>(
100 path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
101 owned_mark_cache.get(), save_marks_in_cache, storage,
102 all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
103}
104
105bool MergeTreeReverseSelectProcessor::getNewTask()
106try
107{
108 if ((chunks.empty() && all_mark_ranges.empty()) || total_marks_count == 0)
109 {
110 finish();
111 return false;
112 }
113
114 /// We have some blocks to return in buffer.
115 /// Return true to continue reading, but actually don't create a task.
116 if (all_mark_ranges.empty())
117 return true;
118
119 /// Read ranges from right to left.
120 MarkRanges mark_ranges_for_task = { all_mark_ranges.back() };
121 all_mark_ranges.pop_back();
122
123 auto size_predictor = (preferred_block_size_bytes == 0)
124 ? nullptr
125 : std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
126
127 task = std::make_unique<MergeTreeReadTask>(
128 data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
129 task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
130 task_columns.should_reorder, std::move(size_predictor));
131
132 return true;
133}
134catch (...)
135{
136 /// Suspicion of the broken part. A part is added to the queue for verification.
137 if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
138 storage.reportBrokenPart(data_part->name);
139 throw;
140}
141
142Chunk MergeTreeReverseSelectProcessor::readFromPart()
143{
144 Chunk res;
145
146 if (!chunks.empty())
147 {
148 res = std::move(chunks.back());
149 chunks.pop_back();
150 return res;
151 }
152
153 if (!task->range_reader.isInitialized())
154 initializeRangeReaders(*task);
155
156 while (!task->isFinished())
157 {
158 Chunk chunk = readFromPartImpl();
159 chunks.push_back(std::move(chunk));
160 }
161
162 if (chunks.empty())
163 return {};
164
165 res = std::move(chunks.back());
166 chunks.pop_back();
167
168 return res;
169}
170
171void MergeTreeReverseSelectProcessor::finish()
172{
173 /** Close the files (before destroying the object).
174 * When many sources are created, but simultaneously reading only a few of them,
175 * buffers don't waste memory.
176 */
177 reader.reset();
178 pre_reader.reset();
179 part_columns_lock.unlock();
180 data_part.reset();
181}
182
183MergeTreeReverseSelectProcessor::~MergeTreeReverseSelectProcessor() = default;
184
185}
186