1#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
2#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
3
4namespace DB
5{
6namespace ErrorCodes
7{
8 extern const int MEMORY_LIMIT_EXCEEDED;
9}
10
11MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
12 const MergeTreeData & storage_,
13 const MergeTreeData::DataPartPtr & data_part_,
14 Names columns_to_read_,
15 bool read_with_direct_io_,
16 bool take_column_types_from_storage,
17 bool quiet)
18 : storage(storage_)
19 , data_part(data_part_)
20 , part_columns_lock(data_part->columns_lock)
21 , columns_to_read(columns_to_read_)
22 , read_with_direct_io(read_with_direct_io_)
23 , mark_cache(storage.global_context.getMarkCache())
24{
25 if (!quiet)
26 {
27 std::stringstream message;
28 message << "Reading " << data_part->getMarksCount() << " marks from part " << data_part->name
29 << ", total " << data_part->rows_count
30 << " rows starting from the beginning of the part";
31
32 LOG_TRACE(log, message.rdbuf());
33 }
34
35 addTotalRowsApprox(data_part->rows_count);
36
37 header = storage.getSampleBlockForColumns(columns_to_read);
38 fixHeader(header);
39
40 /// Add columns because we don't want to read empty blocks
41 injectRequiredColumns(storage, data_part, columns_to_read);
42 NamesAndTypesList columns_for_reader;
43 if (take_column_types_from_storage)
44 {
45 const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
46 columns_for_reader = physical_columns.addTypes(columns_to_read);
47 }
48 else
49 {
50 /// take columns from data_part
51 columns_for_reader = data_part->columns.addTypes(columns_to_read);
52 }
53
54 reader = std::make_unique<MergeTreeReader>(
55 data_part->getFullPath(), data_part, columns_for_reader, /* uncompressed_cache = */ nullptr,
56 mark_cache.get(), /* save_marks_in_cache = */ false, storage,
57 MarkRanges{MarkRange(0, data_part->getMarksCount())},
58 /* bytes to use AIO (this is hack) */
59 read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
60 DBMS_DEFAULT_BUFFER_SIZE);
61}
62
63
64void MergeTreeSequentialBlockInputStream::fixHeader(Block & header_block) const
65{
66 /// Types may be different during ALTER (when this stream is used to perform an ALTER).
67 for (const auto & name_type : data_part->columns)
68 {
69 if (header_block.has(name_type.name))
70 {
71 auto & elem = header_block.getByName(name_type.name);
72 if (!elem.type->equals(*name_type.type))
73 {
74 elem.type = name_type.type;
75 elem.column = elem.type->createColumn();
76 }
77 }
78 }
79}
80
81Block MergeTreeSequentialBlockInputStream::getHeader() const
82{
83 return header;
84}
85
86Block MergeTreeSequentialBlockInputStream::readImpl()
87try
88{
89 Block res;
90 if (!isCancelled() && current_row < data_part->rows_count)
91 {
92 size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
93 bool continue_reading = (current_mark != 0);
94
95 auto & sample = reader->getColumns();
96 Columns columns(sample.size());
97 size_t rows_readed = reader->readRows(current_mark, continue_reading, rows_to_read, columns);
98
99 if (rows_readed)
100 {
101 current_row += rows_readed;
102 current_mark += (rows_to_read == rows_readed);
103
104 bool should_evaluate_missing_defaults = false;
105 reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_readed);
106
107 if (should_evaluate_missing_defaults)
108 reader->evaluateMissingDefaults({}, columns);
109
110 res = header.cloneEmpty();
111
112 /// Reorder columns and fill result block.
113 size_t num_columns = sample.size();
114 auto it = sample.begin();
115 for (size_t i = 0; i < num_columns; ++i)
116 {
117 if (res.has(it->name))
118 res.getByName(it->name).column = std::move(columns[i]);
119
120 ++it;
121 }
122
123 res.checkNumberOfRows();
124 }
125 }
126 else
127 {
128 finish();
129 }
130
131 return res;
132}
133catch (...)
134{
135 /// Suspicion of the broken part. A part is added to the queue for verification.
136 if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
137 storage.reportBrokenPart(data_part->name);
138 throw;
139}
140
141
142void MergeTreeSequentialBlockInputStream::finish()
143{
144 /** Close the files (before destroying the object).
145 * When many sources are created, but simultaneously reading only a few of them,
146 * buffers don't waste memory.
147 */
148 reader.reset();
149 part_columns_lock.unlock();
150 data_part.reset();
151}
152
153
154MergeTreeSequentialBlockInputStream::~MergeTreeSequentialBlockInputStream() = default;
155
156}
157