1#include "config_formats.h"
2#include "ArrowColumnToCHColumn.h"
3
4#if USE_ORC || USE_PARQUET
5#include <DataTypes/DataTypeFactory.h>
6#include <DataTypes/DataTypeNullable.h>
7#include <DataTypes/DataTypeString.h>
8#include <DataTypes/DataTypesDecimal.h>
9#include <DataTypes/DataTypesNumber.h>
10#include <common/DateLUTImpl.h>
11#include <Core/Types.h>
12#include <Core/Block.h>
13#include <Columns/ColumnString.h>
14#include <Columns/ColumnNullable.h>
15#include <Interpreters/castColumn.h>
16#include <algorithm>
17
18
19namespace DB
20{
21 namespace ErrorCodes
22 {
23 extern const int UNKNOWN_TYPE;
24 extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
25 extern const int CANNOT_READ_ALL_DATA;
26 extern const int EMPTY_DATA_PASSED;
27 extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
28 extern const int CANNOT_CONVERT_TYPE;
29 extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
30 extern const int THERE_IS_NO_COLUMN;
31 }
32
33 static const std::initializer_list<std::pair<arrow::Type::type, const char *>> arrow_type_to_internal_type =
34 {
35 {arrow::Type::UINT8, "UInt8"},
36 {arrow::Type::INT8, "Int8"},
37 {arrow::Type::UINT16, "UInt16"},
38 {arrow::Type::INT16, "Int16"},
39 {arrow::Type::UINT32, "UInt32"},
40 {arrow::Type::INT32, "Int32"},
41 {arrow::Type::UINT64, "UInt64"},
42 {arrow::Type::INT64, "Int64"},
43 {arrow::Type::HALF_FLOAT, "Float32"},
44 {arrow::Type::FLOAT, "Float32"},
45 {arrow::Type::DOUBLE, "Float64"},
46
47 {arrow::Type::BOOL, "UInt8"},
48 {arrow::Type::DATE32, "Date"},
49 {arrow::Type::DATE64, "DateTime"},
50 {arrow::Type::TIMESTAMP, "DateTime"},
51
52 {arrow::Type::STRING, "String"},
53 {arrow::Type::BINARY, "String"},
54
55 // TODO: add other types that are convertable to internal ones:
56 // 0. ENUM?
57 // 1. UUID -> String
58 // 2. JSON -> String
59 // Full list of types: contrib/arrow/cpp/src/arrow/type.h
60 };
61
62/// Inserts numeric data right into internal column data to reduce an overhead
63 template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
64 static void fillColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
65 {
66 auto & column_data = static_cast<VectorType &>(*internal_column).getData();
67 column_data.reserve(arrow_column->length());
68
69 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
70 {
71 std::shared_ptr<arrow::Array> chunk = arrow_column->chunk(chunk_i);
72 /// buffers[0] is a null bitmap and buffers[1] are actual values
73 std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
74
75 const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
76 column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
77 }
78 }
79
80/// Inserts chars and offsets right into internal column data to reduce an overhead.
81/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
82/// Also internal strings are null terminated.
83 static void fillColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
84 {
85 PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*internal_column).getChars();
86 PaddedPODArray<UInt64> & column_offsets = assert_cast<ColumnString &>(*internal_column).getOffsets();
87
88 size_t chars_t_size = 0;
89 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
90 {
91 arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
92 const size_t chunk_length = chunk.length();
93
94 chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
95 chars_t_size += chunk_length; /// additional space for null bytes
96 }
97
98 column_chars_t.reserve(chars_t_size);
99 column_offsets.reserve(arrow_column->length());
100
101 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
102 {
103 arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
104 std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
105 const size_t chunk_length = chunk.length();
106
107 for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
108 {
109 if (!chunk.IsNull(offset_i) && buffer)
110 {
111 const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i);
112 column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
113 }
114 column_chars_t.emplace_back('\0');
115
116 column_offsets.emplace_back(column_chars_t.size());
117 }
118 }
119 }
120
121 static void fillColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
122 {
123 auto & column_data = assert_cast<ColumnVector<UInt8> &>(*internal_column).getData();
124 column_data.reserve(arrow_column->length());
125
126 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
127 {
128 arrow::BooleanArray & chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->chunk(chunk_i)));
129 /// buffers[0] is a null bitmap and buffers[1] are actual values
130 std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
131
132 for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
133 column_data.emplace_back(chunk.Value(bool_i));
134 }
135 }
136
137/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
138 static void fillColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
139 {
140 PaddedPODArray<UInt16> & column_data = assert_cast<ColumnVector<UInt16> &>(*internal_column).getData();
141 column_data.reserve(arrow_column->length());
142
143 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
144 {
145 arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->chunk(chunk_i)));
146
147 for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
148 {
149 UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
150 if (days_num > DATE_LUT_MAX_DAY_NUM)
151 {
152 // TODO: will it rollback correctly?
153 throw Exception{"Input value " + std::to_string(days_num) + " of a column \"" + internal_column->getName()
154 + "\" is greater than "
155 "max allowed Date value, which is "
156 + std::to_string(DATE_LUT_MAX_DAY_NUM),
157 ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE};
158 }
159
160 column_data.emplace_back(days_num);
161 }
162 }
163 }
164
165/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
166 static void fillColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
167 {
168 auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
169 column_data.reserve(arrow_column->length());
170
171 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
172 {
173 auto & chunk = static_cast<arrow::Date64Array &>(*(arrow_column->chunk(chunk_i)));
174 for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
175 {
176 auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / 1000); // Always? in ms
177 column_data.emplace_back(timestamp);
178 }
179 }
180 }
181
182 static void fillColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
183 {
184 auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
185 column_data.reserve(arrow_column->length());
186
187 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
188 {
189 auto & chunk = static_cast<arrow::TimestampArray &>(*(arrow_column->chunk(chunk_i)));
190 const auto & type = static_cast<const ::arrow::TimestampType &>(*chunk.type());
191
192 UInt32 divide = 1;
193 const auto unit = type.unit();
194 switch (unit)
195 {
196 case arrow::TimeUnit::SECOND:
197 divide = 1;
198 break;
199 case arrow::TimeUnit::MILLI:
200 divide = 1000;
201 break;
202 case arrow::TimeUnit::MICRO:
203 divide = 1000000;
204 break;
205 case arrow::TimeUnit::NANO:
206 divide = 1000000000;
207 break;
208 }
209
210 for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
211 {
212 auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / divide); // ms! TODO: check other 's' 'ns' ...
213 column_data.emplace_back(timestamp);
214 }
215 }
216 }
217
218 static void fillColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & internal_column)
219 {
220 auto & column = assert_cast<ColumnDecimal<Decimal128> &>(*internal_column);
221 auto & column_data = column.getData();
222 column_data.reserve(arrow_column->length());
223
224 for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
225 {
226 auto & chunk = static_cast<arrow::DecimalArray &>(*(arrow_column->chunk(chunk_i)));
227 for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
228 {
229 column_data.emplace_back(chunk.IsNull(value_i) ? Decimal128(0) : *reinterpret_cast<const Decimal128 *>(chunk.Value(value_i))); // TODO: copy column
230 }
231 }
232 }
233
234/// Creates a null bytemap from arrow's null bitmap
235 static void fillByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column, MutableColumnPtr & bytemap)
236 {
237 PaddedPODArray<UInt8> & bytemap_data = assert_cast<ColumnVector<UInt8> &>(*bytemap).getData();
238 bytemap_data.reserve(arrow_column->length());
239
240 for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->num_chunks()); ++chunk_i)
241 {
242 std::shared_ptr<arrow::Array> chunk = arrow_column->chunk(chunk_i);
243
244 for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
245 bytemap_data.emplace_back(chunk->IsNull(value_i));
246 }
247 }
248
249 void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk &res, std::shared_ptr<arrow::Table> &table,
250 arrow::Status &read_status, const Block &header,
251 int &row_group_current, std::string format_name)
252 {
253 Columns columns_list;
254 UInt64 num_rows = 0;
255
256 columns_list.reserve(header.rows());
257
258 using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
259 if (!read_status.ok())
260 throw Exception{"Error while reading " + format_name + " data: " + read_status.ToString(),
261 ErrorCodes::CANNOT_READ_ALL_DATA};
262
263 if (0 == table->num_rows())
264 throw Exception{"Empty table in input data", ErrorCodes::EMPTY_DATA_PASSED};
265
266 if (header.columns() > static_cast<size_t>(table->num_columns()))
267 // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
268 throw Exception{"Number of columns is less than the table has", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
269
270 ++row_group_current;
271
272 NameToColumnPtr name_to_column_ptr;
273 for (const auto& column_name : table->ColumnNames())
274 {
275 std::shared_ptr<arrow::ChunkedArray> arrow_column = table->GetColumnByName(column_name);
276 name_to_column_ptr[column_name] = arrow_column;
277 }
278
279 for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
280 {
281 ColumnWithTypeAndName header_column = header.getByPosition(column_i);
282
283 if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
284 // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
285 throw Exception{"Column \"" + header_column.name + "\" is not presented in input data",
286 ErrorCodes::THERE_IS_NO_COLUMN};
287
288 std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[header_column.name];
289 arrow::Type::type arrow_type = arrow_column->type()->id();
290
291 // TODO: check if a column is const?
292 if (!header_column.type->isNullable() && arrow_column->null_count())
293 {
294 throw Exception{"Can not insert NULL data into non-nullable column \"" + header_column.name + "\"",
295 ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
296 }
297
298 const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count();
299
300 DataTypePtr internal_nested_type;
301
302 if (arrow_type == arrow::Type::DECIMAL)
303 {
304 const auto decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
305 internal_nested_type = std::make_shared<DataTypeDecimal<Decimal128>>(decimal_type->precision(),
306 decimal_type->scale());
307 }
308 else if (auto internal_type_it = std::find_if(arrow_type_to_internal_type.begin(), arrow_type_to_internal_type.end(),
309 [=](auto && elem) { return elem.first == arrow_type; });
310 internal_type_it != arrow_type_to_internal_type.end())
311 {
312 internal_nested_type = DataTypeFactory::instance().get(internal_type_it->second);
313 }
314 else
315 {
316 throw Exception{"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + header_column.name
317 + "\" is not supported for conversion from a " + format_name + " data format",
318 ErrorCodes::CANNOT_CONVERT_TYPE};
319 }
320
321 const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type)
322 : internal_nested_type;
323 const std::string internal_nested_type_name = internal_nested_type->getName();
324
325 const DataTypePtr column_nested_type = header_column.type->isNullable()
326 ? static_cast<const DataTypeNullable *>(header_column.type.get())->getNestedType()
327 : header_column.type;
328
329 const DataTypePtr column_type = header_column.type;
330
331 const std::string column_nested_type_name = column_nested_type->getName();
332
333 ColumnWithTypeAndName column;
334 column.name = header_column.name;
335 column.type = internal_type;
336
337 /// Data
338 MutableColumnPtr read_column = internal_nested_type->createColumn();
339 switch (arrow_type)
340 {
341 case arrow::Type::STRING:
342 case arrow::Type::BINARY:
343 //case arrow::Type::FIXED_SIZE_BINARY:
344 fillColumnWithStringData(arrow_column, read_column);
345 break;
346 case arrow::Type::BOOL:
347 fillColumnWithBooleanData(arrow_column, read_column);
348 break;
349 case arrow::Type::DATE32:
350 fillColumnWithDate32Data(arrow_column, read_column);
351 break;
352 case arrow::Type::DATE64:
353 fillColumnWithDate64Data(arrow_column, read_column);
354 break;
355 case arrow::Type::TIMESTAMP:
356 fillColumnWithTimestampData(arrow_column, read_column);
357 break;
358 case arrow::Type::DECIMAL:
359 //fillColumnWithNumericData<Decimal128, ColumnDecimal<Decimal128>>(arrow_column, read_column); // Have problems with trash values under NULL, but faster
360 fillColumnWithDecimalData(arrow_column, read_column /*, internal_nested_type*/);
361
362 break;
363# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
364 case ARROW_NUMERIC_TYPE: \
365 fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, read_column); \
366 break;
367
368 FOR_ARROW_NUMERIC_TYPES(DISPATCH)
369# undef DISPATCH
370 // TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds?
371 // TODO: read JSON as a string?
372 // TODO: read UUID as a string?
373 default:
374 throw Exception
375 {
376 "Unsupported " + format_name + " type \"" + arrow_column->type()->name() + "\" of an input column \""
377 + header_column.name + "\"",
378 ErrorCodes::UNKNOWN_TYPE
379 };
380 }
381
382
383 if (column.type->isNullable())
384 {
385 MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn();
386 fillByteMapFromArrowColumn(arrow_column, null_bytemap);
387 column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap));
388 }
389 else
390 column.column = std::move(read_column);
391
392 column.column = castColumn(column, column_type);
393 column.type = column_type;
394 num_rows = column.column->size();
395 columns_list.push_back(std::move(column.column));
396 }
397
398 res.setColumns(columns_list, num_rows);
399 }
400}
401#endif
402