1 | #include <Columns/ColumnArray.h> |
2 | |
3 | #include <IO/ReadHelpers.h> |
4 | #include <IO/WriteHelpers.h> |
5 | #include <IO/ReadBufferFromString.h> |
6 | #include <IO/WriteBufferFromString.h> |
7 | |
8 | #include <Formats/FormatSettings.h> |
9 | #include <Formats/ProtobufReader.h> |
10 | #include <DataTypes/DataTypesNumber.h> |
11 | #include <DataTypes/DataTypeArray.h> |
12 | #include <DataTypes/DataTypeFactory.h> |
13 | |
14 | #include <Parsers/IAST.h> |
15 | |
16 | #include <Common/typeid_cast.h> |
17 | #include <Common/assert_cast.h> |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | namespace ErrorCodes |
24 | { |
25 | extern const int CANNOT_READ_ARRAY_FROM_TEXT; |
26 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
27 | extern const int LOGICAL_ERROR; |
28 | } |
29 | |
30 | |
31 | DataTypeArray::DataTypeArray(const DataTypePtr & nested_) |
32 | : nested{nested_} |
33 | { |
34 | } |
35 | |
36 | |
37 | void DataTypeArray::serializeBinary(const Field & field, WriteBuffer & ostr) const |
38 | { |
39 | const Array & a = get<const Array &>(field); |
40 | writeVarUInt(a.size(), ostr); |
41 | for (size_t i = 0; i < a.size(); ++i) |
42 | { |
43 | nested->serializeBinary(a[i], ostr); |
44 | } |
45 | } |
46 | |
47 | |
48 | void DataTypeArray::deserializeBinary(Field & field, ReadBuffer & istr) const |
49 | { |
50 | size_t size; |
51 | readVarUInt(size, istr); |
52 | field = Array(size); |
53 | Array & arr = get<Array &>(field); |
54 | for (size_t i = 0; i < size; ++i) |
55 | nested->deserializeBinary(arr[i], istr); |
56 | } |
57 | |
58 | |
59 | void DataTypeArray::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const |
60 | { |
61 | const ColumnArray & column_array = assert_cast<const ColumnArray &>(column); |
62 | const ColumnArray::Offsets & offsets = column_array.getOffsets(); |
63 | |
64 | size_t offset = offsets[row_num - 1]; |
65 | size_t next_offset = offsets[row_num]; |
66 | size_t size = next_offset - offset; |
67 | |
68 | writeVarUInt(size, ostr); |
69 | |
70 | const IColumn & nested_column = column_array.getData(); |
71 | for (size_t i = offset; i < next_offset; ++i) |
72 | nested->serializeBinary(nested_column, i, ostr); |
73 | } |
74 | |
75 | |
76 | void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr) const |
77 | { |
78 | ColumnArray & column_array = assert_cast<ColumnArray &>(column); |
79 | ColumnArray::Offsets & offsets = column_array.getOffsets(); |
80 | |
81 | size_t size; |
82 | readVarUInt(size, istr); |
83 | |
84 | IColumn & nested_column = column_array.getData(); |
85 | |
86 | size_t i = 0; |
87 | try |
88 | { |
89 | for (; i < size; ++i) |
90 | nested->deserializeBinary(nested_column, istr); |
91 | } |
92 | catch (...) |
93 | { |
94 | if (i) |
95 | nested_column.popBack(i); |
96 | throw; |
97 | } |
98 | |
99 | offsets.push_back(offsets.back() + size); |
100 | } |
101 | |
102 | |
103 | namespace |
104 | { |
105 | void serializeArraySizesPositionIndependent(const IColumn & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) |
106 | { |
107 | const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column); |
108 | const ColumnArray::Offsets & offset_values = column_array.getOffsets(); |
109 | size_t size = offset_values.size(); |
110 | |
111 | if (!size) |
112 | return; |
113 | |
114 | size_t end = limit && (offset + limit < size) |
115 | ? offset + limit |
116 | : size; |
117 | |
118 | ColumnArray::Offset prev_offset = offset_values[offset - 1]; |
119 | for (size_t i = offset; i < end; ++i) |
120 | { |
121 | ColumnArray::Offset current_offset = offset_values[i]; |
122 | writeIntBinary(current_offset - prev_offset, ostr); |
123 | prev_offset = current_offset; |
124 | } |
125 | } |
126 | |
127 | void deserializeArraySizesPositionIndependent(IColumn & column, ReadBuffer & istr, UInt64 limit) |
128 | { |
129 | ColumnArray & column_array = typeid_cast<ColumnArray &>(column); |
130 | ColumnArray::Offsets & offset_values = column_array.getOffsets(); |
131 | size_t initial_size = offset_values.size(); |
132 | offset_values.resize(initial_size + limit); |
133 | |
134 | size_t i = initial_size; |
135 | ColumnArray::Offset current_offset = initial_size ? offset_values[initial_size - 1] : 0; |
136 | while (i < initial_size + limit && !istr.eof()) |
137 | { |
138 | ColumnArray::Offset current_size = 0; |
139 | readIntBinary(current_size, istr); |
140 | current_offset += current_size; |
141 | offset_values[i] = current_offset; |
142 | ++i; |
143 | } |
144 | |
145 | offset_values.resize(i); |
146 | } |
147 | } |
148 | |
149 | |
150 | void DataTypeArray::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const |
151 | { |
152 | path.push_back(Substream::ArraySizes); |
153 | callback(path); |
154 | path.back() = Substream::ArrayElements; |
155 | nested->enumerateStreams(callback, path); |
156 | path.pop_back(); |
157 | } |
158 | |
159 | |
160 | void DataTypeArray::serializeBinaryBulkStatePrefix( |
161 | SerializeBinaryBulkSettings & settings, |
162 | SerializeBinaryBulkStatePtr & state) const |
163 | { |
164 | settings.path.push_back(Substream::ArrayElements); |
165 | nested->serializeBinaryBulkStatePrefix(settings, state); |
166 | settings.path.pop_back(); |
167 | } |
168 | |
169 | |
170 | void DataTypeArray::serializeBinaryBulkStateSuffix( |
171 | SerializeBinaryBulkSettings & settings, |
172 | SerializeBinaryBulkStatePtr & state) const |
173 | { |
174 | settings.path.push_back(Substream::ArrayElements); |
175 | nested->serializeBinaryBulkStateSuffix(settings, state); |
176 | settings.path.pop_back(); |
177 | } |
178 | |
179 | |
180 | void DataTypeArray::deserializeBinaryBulkStatePrefix( |
181 | DeserializeBinaryBulkSettings & settings, |
182 | DeserializeBinaryBulkStatePtr & state) const |
183 | { |
184 | settings.path.push_back(Substream::ArrayElements); |
185 | nested->deserializeBinaryBulkStatePrefix(settings, state); |
186 | settings.path.pop_back(); |
187 | } |
188 | |
189 | |
190 | void DataTypeArray::serializeBinaryBulkWithMultipleStreams( |
191 | const IColumn & column, |
192 | size_t offset, |
193 | size_t limit, |
194 | SerializeBinaryBulkSettings & settings, |
195 | SerializeBinaryBulkStatePtr & state) const |
196 | { |
197 | const ColumnArray & column_array = typeid_cast<const ColumnArray &>(column); |
198 | |
199 | /// First serialize array sizes. |
200 | settings.path.push_back(Substream::ArraySizes); |
201 | if (auto stream = settings.getter(settings.path)) |
202 | { |
203 | if (settings.position_independent_encoding) |
204 | serializeArraySizesPositionIndependent(column, *stream, offset, limit); |
205 | else |
206 | DataTypeNumber<ColumnArray::Offset>().serializeBinaryBulk(*column_array.getOffsetsPtr(), *stream, offset, limit); |
207 | } |
208 | |
209 | /// Then serialize contents of arrays. |
210 | settings.path.back() = Substream::ArrayElements; |
211 | const ColumnArray::Offsets & offset_values = column_array.getOffsets(); |
212 | |
213 | if (offset > offset_values.size()) |
214 | return; |
215 | |
216 | /** offset - from which array to write. |
217 | * limit - how many arrays should be written, or 0, if you write everything that is. |
218 | * end - up to which array the recorded piece ends. |
219 | * |
220 | * nested_offset - from which element of the innards to write. |
221 | * nested_limit - how many elements of the innards to write, or 0, if you write everything that is. |
222 | */ |
223 | |
224 | size_t end = std::min(offset + limit, offset_values.size()); |
225 | |
226 | size_t nested_offset = offset ? offset_values[offset - 1] : 0; |
227 | size_t nested_limit = limit |
228 | ? offset_values[end - 1] - nested_offset |
229 | : 0; |
230 | |
231 | if (limit == 0 || nested_limit) |
232 | nested->serializeBinaryBulkWithMultipleStreams(column_array.getData(), nested_offset, nested_limit, settings, state); |
233 | settings.path.pop_back(); |
234 | } |
235 | |
236 | |
237 | void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( |
238 | IColumn & column, |
239 | size_t limit, |
240 | DeserializeBinaryBulkSettings & settings, |
241 | DeserializeBinaryBulkStatePtr & state) const |
242 | { |
243 | ColumnArray & column_array = typeid_cast<ColumnArray &>(column); |
244 | |
245 | settings.path.push_back(Substream::ArraySizes); |
246 | if (auto stream = settings.getter(settings.path)) |
247 | { |
248 | if (settings.position_independent_encoding) |
249 | deserializeArraySizesPositionIndependent(column, *stream, limit); |
250 | else |
251 | DataTypeNumber<ColumnArray::Offset>().deserializeBinaryBulk(column_array.getOffsetsColumn(), *stream, limit, 0); |
252 | } |
253 | |
254 | settings.path.back() = Substream::ArrayElements; |
255 | |
256 | ColumnArray::Offsets & offset_values = column_array.getOffsets(); |
257 | IColumn & nested_column = column_array.getData(); |
258 | |
259 | /// Number of values corresponding with `offset_values` must be read. |
260 | size_t last_offset = offset_values.back(); |
261 | if (last_offset < nested_column.size()) |
262 | throw Exception("Nested column is longer than last offset" , ErrorCodes::LOGICAL_ERROR); |
263 | size_t nested_limit = last_offset - nested_column.size(); |
264 | |
265 | /// Adjust value size hint. Divide it to the average array size. |
266 | settings.avg_value_size_hint = nested_limit ? settings.avg_value_size_hint / nested_limit * offset_values.size() : 0; |
267 | |
268 | nested->deserializeBinaryBulkWithMultipleStreams(nested_column, nested_limit, settings, state); |
269 | settings.path.pop_back(); |
270 | |
271 | /// Check consistency between offsets and elements subcolumns. |
272 | /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. |
273 | if (!nested_column.empty() && nested_column.size() != last_offset) |
274 | throw Exception("Cannot read all array values: read just " + toString(nested_column.size()) + " of " + toString(last_offset), |
275 | ErrorCodes::CANNOT_READ_ALL_DATA); |
276 | } |
277 | |
278 | |
279 | template <typename Writer> |
280 | static void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, Writer && write_nested) |
281 | { |
282 | const ColumnArray & column_array = assert_cast<const ColumnArray &>(column); |
283 | const ColumnArray::Offsets & offsets = column_array.getOffsets(); |
284 | |
285 | size_t offset = offsets[row_num - 1]; |
286 | size_t next_offset = offsets[row_num]; |
287 | |
288 | const IColumn & nested_column = column_array.getData(); |
289 | |
290 | writeChar('[', ostr); |
291 | for (size_t i = offset; i < next_offset; ++i) |
292 | { |
293 | if (i != offset) |
294 | writeChar(',', ostr); |
295 | write_nested(nested_column, i); |
296 | } |
297 | writeChar(']', ostr); |
298 | } |
299 | |
300 | |
301 | template <typename Reader> |
302 | static void deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reader && read_nested) |
303 | { |
304 | ColumnArray & column_array = assert_cast<ColumnArray &>(column); |
305 | ColumnArray::Offsets & offsets = column_array.getOffsets(); |
306 | |
307 | IColumn & nested_column = column_array.getData(); |
308 | |
309 | size_t size = 0; |
310 | assertChar('[', istr); |
311 | |
312 | try |
313 | { |
314 | bool first = true; |
315 | while (!istr.eof() && *istr.position() != ']') |
316 | { |
317 | if (!first) |
318 | { |
319 | if (*istr.position() == ',') |
320 | ++istr.position(); |
321 | else |
322 | throw Exception("Cannot read array from text" , ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT); |
323 | } |
324 | |
325 | first = false; |
326 | |
327 | skipWhitespaceIfAny(istr); |
328 | |
329 | if (*istr.position() == ']') |
330 | break; |
331 | |
332 | read_nested(nested_column); |
333 | ++size; |
334 | |
335 | skipWhitespaceIfAny(istr); |
336 | } |
337 | assertChar(']', istr); |
338 | } |
339 | catch (...) |
340 | { |
341 | if (size) |
342 | nested_column.popBack(size); |
343 | throw; |
344 | } |
345 | |
346 | offsets.push_back(offsets.back() + size); |
347 | } |
348 | |
349 | |
350 | void DataTypeArray::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
351 | { |
352 | serializeTextImpl(column, row_num, ostr, |
353 | [&](const IColumn & nested_column, size_t i) |
354 | { |
355 | nested->serializeAsTextQuoted(nested_column, i, ostr, settings); |
356 | }); |
357 | } |
358 | |
359 | |
360 | void DataTypeArray::deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
361 | { |
362 | deserializeTextImpl(column, istr, |
363 | [&](IColumn & nested_column) |
364 | { |
365 | nested->deserializeAsTextQuoted(nested_column, istr, settings); |
366 | }); |
367 | } |
368 | |
369 | void DataTypeArray::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
370 | { |
371 | const ColumnArray & column_array = assert_cast<const ColumnArray &>(column); |
372 | const ColumnArray::Offsets & offsets = column_array.getOffsets(); |
373 | |
374 | size_t offset = offsets[row_num - 1]; |
375 | size_t next_offset = offsets[row_num]; |
376 | |
377 | const IColumn & nested_column = column_array.getData(); |
378 | |
379 | writeChar('[', ostr); |
380 | for (size_t i = offset; i < next_offset; ++i) |
381 | { |
382 | if (i != offset) |
383 | writeChar(',', ostr); |
384 | nested->serializeAsTextJSON(nested_column, i, ostr, settings); |
385 | } |
386 | writeChar(']', ostr); |
387 | } |
388 | |
389 | |
390 | void DataTypeArray::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
391 | { |
392 | deserializeTextImpl(column, istr, [&](IColumn & nested_column) { nested->deserializeAsTextJSON(nested_column, istr, settings); }); |
393 | } |
394 | |
395 | |
396 | void DataTypeArray::serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
397 | { |
398 | const ColumnArray & column_array = assert_cast<const ColumnArray &>(column); |
399 | const ColumnArray::Offsets & offsets = column_array.getOffsets(); |
400 | |
401 | size_t offset = offsets[row_num - 1]; |
402 | size_t next_offset = offsets[row_num]; |
403 | |
404 | const IColumn & nested_column = column_array.getData(); |
405 | |
406 | writeCString("<array>" , ostr); |
407 | for (size_t i = offset; i < next_offset; ++i) |
408 | { |
409 | writeCString("<elem>" , ostr); |
410 | nested->serializeAsTextXML(nested_column, i, ostr, settings); |
411 | writeCString("</elem>" , ostr); |
412 | } |
413 | writeCString("</array>" , ostr); |
414 | } |
415 | |
416 | |
417 | void DataTypeArray::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const |
418 | { |
419 | /// There is no good way to serialize an array in CSV. Therefore, we serialize it into a string, and then write the resulting string in CSV. |
420 | WriteBufferFromOwnString wb; |
421 | serializeText(column, row_num, wb, settings); |
422 | writeCSV(wb.str(), ostr); |
423 | } |
424 | |
425 | |
426 | void DataTypeArray::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const |
427 | { |
428 | String s; |
429 | readCSV(s, istr, settings.csv); |
430 | ReadBufferFromString rb(s); |
431 | deserializeText(column, rb, settings); |
432 | } |
433 | |
434 | |
435 | void DataTypeArray::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf, size_t & value_index) const |
436 | { |
437 | const ColumnArray & column_array = assert_cast<const ColumnArray &>(column); |
438 | const ColumnArray::Offsets & offsets = column_array.getOffsets(); |
439 | size_t offset = offsets[row_num - 1] + value_index; |
440 | size_t next_offset = offsets[row_num]; |
441 | const IColumn & nested_column = column_array.getData(); |
442 | size_t i; |
443 | for (i = offset; i < next_offset; ++i) |
444 | { |
445 | size_t element_stored = 0; |
446 | nested->serializeProtobuf(nested_column, i, protobuf, element_stored); |
447 | if (!element_stored) |
448 | break; |
449 | } |
450 | value_index += i - offset; |
451 | } |
452 | |
453 | |
454 | void DataTypeArray::deserializeProtobuf(IColumn & column, ProtobufReader & protobuf, bool allow_add_row, bool & row_added) const |
455 | { |
456 | row_added = false; |
457 | ColumnArray & column_array = assert_cast<ColumnArray &>(column); |
458 | IColumn & nested_column = column_array.getData(); |
459 | ColumnArray::Offsets & offsets = column_array.getOffsets(); |
460 | size_t old_size = offsets.size(); |
461 | try |
462 | { |
463 | bool nested_row_added; |
464 | do |
465 | nested->deserializeProtobuf(nested_column, protobuf, true, nested_row_added); |
466 | while (nested_row_added && protobuf.canReadMoreValues()); |
467 | if (allow_add_row) |
468 | { |
469 | offsets.emplace_back(nested_column.size()); |
470 | row_added = true; |
471 | } |
472 | else |
473 | offsets.back() = nested_column.size(); |
474 | } |
475 | catch (...) |
476 | { |
477 | offsets.resize_assume_reserved(old_size); |
478 | nested_column.popBack(nested_column.size() - offsets.back()); |
479 | throw; |
480 | } |
481 | } |
482 | |
483 | |
484 | MutableColumnPtr DataTypeArray::createColumn() const |
485 | { |
486 | return ColumnArray::create(nested->createColumn(), ColumnArray::ColumnOffsets::create()); |
487 | } |
488 | |
489 | |
490 | Field DataTypeArray::getDefault() const |
491 | { |
492 | return Array(); |
493 | } |
494 | |
495 | |
496 | bool DataTypeArray::equals(const IDataType & rhs) const |
497 | { |
498 | return typeid(rhs) == typeid(*this) && nested->equals(*static_cast<const DataTypeArray &>(rhs).nested); |
499 | } |
500 | |
501 | |
502 | size_t DataTypeArray::getNumberOfDimensions() const |
503 | { |
504 | const DataTypeArray * nested_array = typeid_cast<const DataTypeArray *>(nested.get()); |
505 | if (!nested_array) |
506 | return 1; |
507 | return 1 + nested_array->getNumberOfDimensions(); /// Every modern C++ compiler optimizes tail recursion. |
508 | } |
509 | |
510 | |
511 | static DataTypePtr create(const String & /*type_name*/, const ASTPtr & arguments) |
512 | { |
513 | if (!arguments || arguments->children.size() != 1) |
514 | throw Exception("Array data type family must have exactly one argument - type of elements" , ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
515 | |
516 | return std::make_shared<DataTypeArray>(DataTypeFactory::instance().get(arguments->children[0])); |
517 | } |
518 | |
519 | |
520 | void registerDataTypeArray(DataTypeFactory & factory) |
521 | { |
522 | factory.registerDataType("Array" , create); |
523 | } |
524 | |
525 | } |
526 | |