1 | #include <Core/Defines.h> |
2 | |
3 | #include <IO/ReadHelpers.h> |
4 | #include <IO/VarInt.h> |
5 | #include <Compression/CompressedReadBufferFromFile.h> |
6 | |
7 | #include <DataTypes/DataTypeFactory.h> |
8 | #include <Common/typeid_cast.h> |
9 | #include <ext/range.h> |
10 | |
11 | #include <DataStreams/NativeBlockInputStream.h> |
12 | #include <DataTypes/DataTypeLowCardinality.h> |
13 | |
14 | |
15 | namespace DB |
16 | { |
17 | |
18 | namespace ErrorCodes |
19 | { |
20 | extern const int INCORRECT_INDEX; |
21 | extern const int LOGICAL_ERROR; |
22 | extern const int CANNOT_READ_ALL_DATA; |
23 | extern const int NOT_IMPLEMENTED; |
24 | } |
25 | |
26 | |
27 | NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_) |
28 | : istr(istr_), server_revision(server_revision_) |
29 | { |
30 | } |
31 | |
32 | NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & , UInt64 server_revision_) |
33 | : istr(istr_), header(header_), server_revision(server_revision_) |
34 | { |
35 | } |
36 | |
37 | NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, |
38 | IndexForNativeFormat::Blocks::const_iterator index_block_it_, |
39 | IndexForNativeFormat::Blocks::const_iterator index_block_end_) |
40 | : istr(istr_), server_revision(server_revision_), |
41 | use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_) |
42 | { |
43 | istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr); |
44 | if (!istr_concrete) |
45 | throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile." , ErrorCodes::LOGICAL_ERROR); |
46 | |
47 | if (index_block_it == index_block_end) |
48 | return; |
49 | |
50 | index_column_it = index_block_it->columns.begin(); |
51 | |
52 | /// Initialize header from the index. |
53 | for (const auto & column : index_block_it->columns) |
54 | { |
55 | auto type = DataTypeFactory::instance().get(column.type); |
56 | header.insert(ColumnWithTypeAndName{ type, column.name }); |
57 | } |
58 | } |
59 | |
60 | // also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere) |
61 | void NativeBlockInputStream::resetParser() |
62 | { |
63 | istr_concrete = nullptr; |
64 | use_index = false; |
65 | |
66 | #ifndef NDEBUG |
67 | read_prefix_is_called = false; |
68 | read_suffix_is_called = false; |
69 | #endif |
70 | |
71 | is_cancelled.store(false); |
72 | is_killed.store(false); |
73 | } |
74 | |
75 | void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint) |
76 | { |
77 | IDataType::DeserializeBinaryBulkSettings settings; |
78 | settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; }; |
79 | settings.avg_value_size_hint = avg_value_size_hint; |
80 | settings.position_independent_encoding = false; |
81 | |
82 | IDataType::DeserializeBinaryBulkStatePtr state; |
83 | type.deserializeBinaryBulkStatePrefix(settings, state); |
84 | type.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state); |
85 | |
86 | if (column.size() != rows) |
87 | throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + "." , |
88 | ErrorCodes::CANNOT_READ_ALL_DATA); |
89 | } |
90 | |
91 | |
92 | Block NativeBlockInputStream::() const |
93 | { |
94 | return header; |
95 | } |
96 | |
97 | |
98 | Block NativeBlockInputStream::readImpl() |
99 | { |
100 | Block res; |
101 | |
102 | const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); |
103 | |
104 | if (use_index && index_block_it == index_block_end) |
105 | return res; |
106 | |
107 | if (istr.eof()) |
108 | { |
109 | if (use_index) |
110 | throw Exception("Input doesn't contain all data for index." , ErrorCodes::CANNOT_READ_ALL_DATA); |
111 | |
112 | return res; |
113 | } |
114 | |
115 | /// Additional information about the block. |
116 | if (server_revision > 0) |
117 | res.info.read(istr); |
118 | |
119 | /// Dimensions |
120 | size_t columns = 0; |
121 | size_t rows = 0; |
122 | |
123 | if (!use_index) |
124 | { |
125 | readVarUInt(columns, istr); |
126 | readVarUInt(rows, istr); |
127 | } |
128 | else |
129 | { |
130 | columns = index_block_it->num_columns; |
131 | rows = index_block_it->num_rows; |
132 | } |
133 | |
134 | for (size_t i = 0; i < columns; ++i) |
135 | { |
136 | if (use_index) |
137 | { |
138 | /// If the current position is what is required, the real seek does not occur. |
139 | istr_concrete->seek(index_column_it->location.offset_in_compressed_file, index_column_it->location.offset_in_decompressed_block); |
140 | } |
141 | |
142 | ColumnWithTypeAndName column; |
143 | |
144 | /// Name |
145 | readBinary(column.name, istr); |
146 | |
147 | /// Type |
148 | String type_name; |
149 | readBinary(type_name, istr); |
150 | column.type = data_type_factory.get(type_name); |
151 | |
152 | if (use_index) |
153 | { |
154 | /// Index allows to do more checks. |
155 | if (index_column_it->name != column.name) |
156 | throw Exception("Index points to column with wrong name: corrupted index or data" , ErrorCodes::INCORRECT_INDEX); |
157 | if (index_column_it->type != type_name) |
158 | throw Exception("Index points to column with wrong type: corrupted index or data" , ErrorCodes::INCORRECT_INDEX); |
159 | } |
160 | |
161 | /// Data |
162 | MutableColumnPtr read_column = column.type->createColumn(); |
163 | |
164 | double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i]; |
165 | if (rows) /// If no rows, nothing to read. |
166 | readData(*column.type, *read_column, istr, rows, avg_value_size_hint); |
167 | |
168 | column.column = std::move(read_column); |
169 | |
170 | if (header) |
171 | { |
172 | /// Support insert from old clients without low cardinality type. |
173 | auto & header_column = header.getByName(column.name); |
174 | if (!header_column.type->equals(*column.type)) |
175 | { |
176 | column.column = recursiveTypeConversion(column.column, column.type, header.getByPosition(i).type); |
177 | column.type = header.getByPosition(i).type; |
178 | } |
179 | } |
180 | |
181 | res.insert(std::move(column)); |
182 | |
183 | if (use_index) |
184 | ++index_column_it; |
185 | } |
186 | |
187 | if (use_index) |
188 | { |
189 | if (index_column_it != index_block_it->columns.end()) |
190 | throw Exception("Inconsistent index: not all columns were read" , ErrorCodes::INCORRECT_INDEX); |
191 | |
192 | ++index_block_it; |
193 | if (index_block_it != index_block_end) |
194 | index_column_it = index_block_it->columns.begin(); |
195 | } |
196 | |
197 | if (rows && header) |
198 | { |
199 | /// Allow to skip columns. Fill them with default values. |
200 | Block tmp_res; |
201 | |
202 | for (auto & col : header) |
203 | { |
204 | if (res.has(col.name)) |
205 | tmp_res.insert(res.getByName(col.name)); |
206 | else |
207 | tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name}); |
208 | } |
209 | |
210 | res.swap(tmp_res); |
211 | } |
212 | |
213 | return res; |
214 | } |
215 | |
216 | void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block) |
217 | { |
218 | auto rows = block.rows(); |
219 | if (rows < 10) |
220 | return; |
221 | |
222 | avg_value_size_hints.resize_fill(block.columns(), 0); |
223 | |
224 | for (auto idx : ext::range(0, block.columns())) |
225 | { |
226 | auto & avg_value_size_hint = avg_value_size_hints[idx]; |
227 | IDataType::updateAvgValueSizeHint(*block.getByPosition(idx).column, avg_value_size_hint); |
228 | } |
229 | } |
230 | |
231 | void IndexForNativeFormat::read(ReadBuffer & istr, const NameSet & required_columns) |
232 | { |
233 | while (!istr.eof()) |
234 | { |
235 | blocks.emplace_back(); |
236 | IndexOfBlockForNativeFormat & block = blocks.back(); |
237 | |
238 | readVarUInt(block.num_columns, istr); |
239 | readVarUInt(block.num_rows, istr); |
240 | |
241 | if (block.num_columns < required_columns.size()) |
242 | throw Exception("Index contain less than required columns" , ErrorCodes::INCORRECT_INDEX); |
243 | |
244 | for (size_t i = 0; i < block.num_columns; ++i) |
245 | { |
246 | IndexOfOneColumnForNativeFormat column_index; |
247 | |
248 | readBinary(column_index.name, istr); |
249 | readBinary(column_index.type, istr); |
250 | readBinary(column_index.location.offset_in_compressed_file, istr); |
251 | readBinary(column_index.location.offset_in_decompressed_block, istr); |
252 | |
253 | if (required_columns.count(column_index.name)) |
254 | block.columns.push_back(std::move(column_index)); |
255 | } |
256 | |
257 | if (block.columns.size() < required_columns.size()) |
258 | throw Exception("Index contain less than required columns" , ErrorCodes::INCORRECT_INDEX); |
259 | if (block.columns.size() > required_columns.size()) |
260 | throw Exception("Index contain duplicate columns" , ErrorCodes::INCORRECT_INDEX); |
261 | |
262 | block.num_columns = block.columns.size(); |
263 | } |
264 | } |
265 | |
266 | } |
267 | |