1#include <Core/Defines.h>
2
3#include <IO/ReadHelpers.h>
4#include <IO/VarInt.h>
5#include <Compression/CompressedReadBufferFromFile.h>
6
7#include <DataTypes/DataTypeFactory.h>
8#include <Common/typeid_cast.h>
9#include <ext/range.h>
10
11#include <DataStreams/NativeBlockInputStream.h>
12#include <DataTypes/DataTypeLowCardinality.h>
13
14
15namespace DB
16{
17
18namespace ErrorCodes
19{
20 extern const int INCORRECT_INDEX;
21 extern const int LOGICAL_ERROR;
22 extern const int CANNOT_READ_ALL_DATA;
23 extern const int NOT_IMPLEMENTED;
24}
25
26
27NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
28 : istr(istr_), server_revision(server_revision_)
29{
30}
31
32NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
33 : istr(istr_), header(header_), server_revision(server_revision_)
34{
35}
36
37NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
38 IndexForNativeFormat::Blocks::const_iterator index_block_it_,
39 IndexForNativeFormat::Blocks::const_iterator index_block_end_)
40 : istr(istr_), server_revision(server_revision_),
41 use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_)
42{
43 istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
44 if (!istr_concrete)
45 throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
46
47 if (index_block_it == index_block_end)
48 return;
49
50 index_column_it = index_block_it->columns.begin();
51
52 /// Initialize header from the index.
53 for (const auto & column : index_block_it->columns)
54 {
55 auto type = DataTypeFactory::instance().get(column.type);
56 header.insert(ColumnWithTypeAndName{ type, column.name });
57 }
58}
59
60// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
61void NativeBlockInputStream::resetParser()
62{
63 istr_concrete = nullptr;
64 use_index = false;
65
66#ifndef NDEBUG
67 read_prefix_is_called = false;
68 read_suffix_is_called = false;
69#endif
70
71 is_cancelled.store(false);
72 is_killed.store(false);
73}
74
75void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
76{
77 IDataType::DeserializeBinaryBulkSettings settings;
78 settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; };
79 settings.avg_value_size_hint = avg_value_size_hint;
80 settings.position_independent_encoding = false;
81
82 IDataType::DeserializeBinaryBulkStatePtr state;
83 type.deserializeBinaryBulkStatePrefix(settings, state);
84 type.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state);
85
86 if (column.size() != rows)
87 throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + ".",
88 ErrorCodes::CANNOT_READ_ALL_DATA);
89}
90
91
92Block NativeBlockInputStream::getHeader() const
93{
94 return header;
95}
96
97
98Block NativeBlockInputStream::readImpl()
99{
100 Block res;
101
102 const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
103
104 if (use_index && index_block_it == index_block_end)
105 return res;
106
107 if (istr.eof())
108 {
109 if (use_index)
110 throw Exception("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA);
111
112 return res;
113 }
114
115 /// Additional information about the block.
116 if (server_revision > 0)
117 res.info.read(istr);
118
119 /// Dimensions
120 size_t columns = 0;
121 size_t rows = 0;
122
123 if (!use_index)
124 {
125 readVarUInt(columns, istr);
126 readVarUInt(rows, istr);
127 }
128 else
129 {
130 columns = index_block_it->num_columns;
131 rows = index_block_it->num_rows;
132 }
133
134 for (size_t i = 0; i < columns; ++i)
135 {
136 if (use_index)
137 {
138 /// If the current position is what is required, the real seek does not occur.
139 istr_concrete->seek(index_column_it->location.offset_in_compressed_file, index_column_it->location.offset_in_decompressed_block);
140 }
141
142 ColumnWithTypeAndName column;
143
144 /// Name
145 readBinary(column.name, istr);
146
147 /// Type
148 String type_name;
149 readBinary(type_name, istr);
150 column.type = data_type_factory.get(type_name);
151
152 if (use_index)
153 {
154 /// Index allows to do more checks.
155 if (index_column_it->name != column.name)
156 throw Exception("Index points to column with wrong name: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
157 if (index_column_it->type != type_name)
158 throw Exception("Index points to column with wrong type: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
159 }
160
161 /// Data
162 MutableColumnPtr read_column = column.type->createColumn();
163
164 double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];
165 if (rows) /// If no rows, nothing to read.
166 readData(*column.type, *read_column, istr, rows, avg_value_size_hint);
167
168 column.column = std::move(read_column);
169
170 if (header)
171 {
172 /// Support insert from old clients without low cardinality type.
173 auto & header_column = header.getByName(column.name);
174 if (!header_column.type->equals(*column.type))
175 {
176 column.column = recursiveTypeConversion(column.column, column.type, header.getByPosition(i).type);
177 column.type = header.getByPosition(i).type;
178 }
179 }
180
181 res.insert(std::move(column));
182
183 if (use_index)
184 ++index_column_it;
185 }
186
187 if (use_index)
188 {
189 if (index_column_it != index_block_it->columns.end())
190 throw Exception("Inconsistent index: not all columns were read", ErrorCodes::INCORRECT_INDEX);
191
192 ++index_block_it;
193 if (index_block_it != index_block_end)
194 index_column_it = index_block_it->columns.begin();
195 }
196
197 if (rows && header)
198 {
199 /// Allow to skip columns. Fill them with default values.
200 Block tmp_res;
201
202 for (auto & col : header)
203 {
204 if (res.has(col.name))
205 tmp_res.insert(res.getByName(col.name));
206 else
207 tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name});
208 }
209
210 res.swap(tmp_res);
211 }
212
213 return res;
214}
215
216void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block)
217{
218 auto rows = block.rows();
219 if (rows < 10)
220 return;
221
222 avg_value_size_hints.resize_fill(block.columns(), 0);
223
224 for (auto idx : ext::range(0, block.columns()))
225 {
226 auto & avg_value_size_hint = avg_value_size_hints[idx];
227 IDataType::updateAvgValueSizeHint(*block.getByPosition(idx).column, avg_value_size_hint);
228 }
229}
230
231void IndexForNativeFormat::read(ReadBuffer & istr, const NameSet & required_columns)
232{
233 while (!istr.eof())
234 {
235 blocks.emplace_back();
236 IndexOfBlockForNativeFormat & block = blocks.back();
237
238 readVarUInt(block.num_columns, istr);
239 readVarUInt(block.num_rows, istr);
240
241 if (block.num_columns < required_columns.size())
242 throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
243
244 for (size_t i = 0; i < block.num_columns; ++i)
245 {
246 IndexOfOneColumnForNativeFormat column_index;
247
248 readBinary(column_index.name, istr);
249 readBinary(column_index.type, istr);
250 readBinary(column_index.location.offset_in_compressed_file, istr);
251 readBinary(column_index.location.offset_in_decompressed_block, istr);
252
253 if (required_columns.count(column_index.name))
254 block.columns.push_back(std::move(column_index));
255 }
256
257 if (block.columns.size() < required_columns.size())
258 throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
259 if (block.columns.size() > required_columns.size())
260 throw Exception("Index contain duplicate columns", ErrorCodes::INCORRECT_INDEX);
261
262 block.num_columns = block.columns.size();
263 }
264}
265
266}
267