1 | #include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h> |
2 | #include <Storages/MergeTree/MergeTreeBlockReadUtils.h> |
3 | |
4 | namespace DB |
5 | { |
6 | namespace ErrorCodes |
7 | { |
8 | extern const int MEMORY_LIMIT_EXCEEDED; |
9 | } |
10 | |
11 | MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream( |
12 | const MergeTreeData & storage_, |
13 | const MergeTreeData::DataPartPtr & data_part_, |
14 | Names columns_to_read_, |
15 | bool read_with_direct_io_, |
16 | bool take_column_types_from_storage, |
17 | bool quiet) |
18 | : storage(storage_) |
19 | , data_part(data_part_) |
20 | , part_columns_lock(data_part->columns_lock) |
21 | , columns_to_read(columns_to_read_) |
22 | , read_with_direct_io(read_with_direct_io_) |
23 | , mark_cache(storage.global_context.getMarkCache()) |
24 | { |
25 | if (!quiet) |
26 | { |
27 | std::stringstream message; |
28 | message << "Reading " << data_part->getMarksCount() << " marks from part " << data_part->name |
29 | << ", total " << data_part->rows_count |
30 | << " rows starting from the beginning of the part" ; |
31 | |
32 | LOG_TRACE(log, message.rdbuf()); |
33 | } |
34 | |
35 | addTotalRowsApprox(data_part->rows_count); |
36 | |
37 | header = storage.getSampleBlockForColumns(columns_to_read); |
38 | fixHeader(header); |
39 | |
40 | /// Add columns because we don't want to read empty blocks |
41 | injectRequiredColumns(storage, data_part, columns_to_read); |
42 | NamesAndTypesList columns_for_reader; |
43 | if (take_column_types_from_storage) |
44 | { |
45 | const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical(); |
46 | columns_for_reader = physical_columns.addTypes(columns_to_read); |
47 | } |
48 | else |
49 | { |
50 | /// take columns from data_part |
51 | columns_for_reader = data_part->columns.addTypes(columns_to_read); |
52 | } |
53 | |
54 | reader = std::make_unique<MergeTreeReader>( |
55 | data_part->getFullPath(), data_part, columns_for_reader, /* uncompressed_cache = */ nullptr, |
56 | mark_cache.get(), /* save_marks_in_cache = */ false, storage, |
57 | MarkRanges{MarkRange(0, data_part->getMarksCount())}, |
58 | /* bytes to use AIO (this is hack) */ |
59 | read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(), |
60 | DBMS_DEFAULT_BUFFER_SIZE); |
61 | } |
62 | |
63 | |
64 | void MergeTreeSequentialBlockInputStream::(Block & ) const |
65 | { |
66 | /// Types may be different during ALTER (when this stream is used to perform an ALTER). |
67 | for (const auto & name_type : data_part->columns) |
68 | { |
69 | if (header_block.has(name_type.name)) |
70 | { |
71 | auto & elem = header_block.getByName(name_type.name); |
72 | if (!elem.type->equals(*name_type.type)) |
73 | { |
74 | elem.type = name_type.type; |
75 | elem.column = elem.type->createColumn(); |
76 | } |
77 | } |
78 | } |
79 | } |
80 | |
81 | Block MergeTreeSequentialBlockInputStream::() const |
82 | { |
83 | return header; |
84 | } |
85 | |
86 | Block MergeTreeSequentialBlockInputStream::readImpl() |
87 | try |
88 | { |
89 | Block res; |
90 | if (!isCancelled() && current_row < data_part->rows_count) |
91 | { |
92 | size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark); |
93 | bool continue_reading = (current_mark != 0); |
94 | |
95 | auto & sample = reader->getColumns(); |
96 | Columns columns(sample.size()); |
97 | size_t rows_readed = reader->readRows(current_mark, continue_reading, rows_to_read, columns); |
98 | |
99 | if (rows_readed) |
100 | { |
101 | current_row += rows_readed; |
102 | current_mark += (rows_to_read == rows_readed); |
103 | |
104 | bool should_evaluate_missing_defaults = false; |
105 | reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_readed); |
106 | |
107 | if (should_evaluate_missing_defaults) |
108 | reader->evaluateMissingDefaults({}, columns); |
109 | |
110 | res = header.cloneEmpty(); |
111 | |
112 | /// Reorder columns and fill result block. |
113 | size_t num_columns = sample.size(); |
114 | auto it = sample.begin(); |
115 | for (size_t i = 0; i < num_columns; ++i) |
116 | { |
117 | if (res.has(it->name)) |
118 | res.getByName(it->name).column = std::move(columns[i]); |
119 | |
120 | ++it; |
121 | } |
122 | |
123 | res.checkNumberOfRows(); |
124 | } |
125 | } |
126 | else |
127 | { |
128 | finish(); |
129 | } |
130 | |
131 | return res; |
132 | } |
133 | catch (...) |
134 | { |
135 | /// Suspicion of the broken part. A part is added to the queue for verification. |
136 | if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) |
137 | storage.reportBrokenPart(data_part->name); |
138 | throw; |
139 | } |
140 | |
141 | |
142 | void MergeTreeSequentialBlockInputStream::finish() |
143 | { |
144 | /** Close the files (before destroying the object). |
145 | * When many sources are created, but simultaneously reading only a few of them, |
146 | * buffers don't waste memory. |
147 | */ |
148 | reader.reset(); |
149 | part_columns_lock.unlock(); |
150 | data_part.reset(); |
151 | } |
152 | |
153 | |
154 | MergeTreeSequentialBlockInputStream::~MergeTreeSequentialBlockInputStream() = default; |
155 | |
156 | } |
157 | |