1#include "ParquetBlockOutputFormat.h"
2
3#if USE_PARQUET
4
5// TODO: clean includes
6#include <Columns/ColumnDecimal.h>
7#include <Columns/ColumnFixedString.h>
8#include <Columns/ColumnNullable.h>
9#include <Columns/ColumnString.h>
10#include <Columns/ColumnVector.h>
11#include <Columns/ColumnsNumber.h>
12#include <Common/assert_cast.h>
13#include <Core/ColumnWithTypeAndName.h>
14#include <Core/callOnTypeIndex.h>
15#include <DataTypes/DataTypeDateTime.h>
16#include <DataTypes/DataTypeNullable.h>
17#include <DataTypes/DataTypesDecimal.h>
18#include <DataStreams/SquashingBlockOutputStream.h>
19#include <Formats/FormatFactory.h>
20#include <IO/WriteHelpers.h>
21#include <arrow/api.h>
22#include <arrow/io/api.h>
23#include <arrow/util/decimal.h>
24#include <arrow/util/memory.h>
25#include <parquet/arrow/writer.h>
26#include <parquet/exception.h>
27#include <parquet/deprecated_io.h>
28
29
30namespace DB
31{
32namespace ErrorCodes
33{
34 extern const int UNKNOWN_EXCEPTION;
35 extern const int UNKNOWN_TYPE;
36}
37
38ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
39 : IOutputFormat(header_, out_), format_settings{format_settings_}
40{
41}
42
43static void checkStatus(arrow::Status & status, const std::string & column_name)
44{
45 if (!status.ok())
46 throw Exception{"Error with a parquet column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
47}
48
49template <typename NumericType, typename ArrowBuilderType>
50static void fillArrowArrayWithNumericColumnData(
51 ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
52{
53 const PaddedPODArray<NumericType> & internal_data = assert_cast<const ColumnVector<NumericType> &>(*write_column).getData();
54 ArrowBuilderType builder;
55 arrow::Status status;
56
57 const UInt8 * arrow_null_bytemap_raw_ptr = nullptr;
58 PaddedPODArray<UInt8> arrow_null_bytemap;
59 if (null_bytemap)
60 {
61 /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
62 arrow_null_bytemap.reserve(null_bytemap->size());
63 for (size_t i = 0, size = null_bytemap->size(); i < size; ++i)
64 arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]);
65
66 arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
67 }
68
69 status = builder.AppendValues(internal_data.data(), internal_data.size(), arrow_null_bytemap_raw_ptr);
70 checkStatus(status, write_column->getName());
71
72 status = builder.Finish(&arrow_array);
73 checkStatus(status, write_column->getName());
74}
75
76template <typename ColumnType>
77static void fillArrowArrayWithStringColumnData(
78 ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
79{
80 const auto & internal_column = assert_cast<const ColumnType &>(*write_column);
81 arrow::StringBuilder builder;
82 arrow::Status status;
83
84 for (size_t string_i = 0, size = internal_column.size(); string_i < size; ++string_i)
85 {
86 if (null_bytemap && (*null_bytemap)[string_i])
87 {
88 status = builder.AppendNull();
89 }
90 else
91 {
92 StringRef string_ref = internal_column.getDataAt(string_i);
93 status = builder.Append(string_ref.data, string_ref.size);
94 }
95
96 checkStatus(status, write_column->getName());
97 }
98
99 status = builder.Finish(&arrow_array);
100 checkStatus(status, write_column->getName());
101}
102
103static void fillArrowArrayWithDateColumnData(
104 ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
105{
106 const PaddedPODArray<UInt16> & internal_data = assert_cast<const ColumnVector<UInt16> &>(*write_column).getData();
107 //arrow::Date32Builder date_builder;
108 arrow::UInt16Builder builder;
109 arrow::Status status;
110
111 for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
112 {
113 if (null_bytemap && (*null_bytemap)[value_i])
114 status = builder.AppendNull();
115 else
116 /// Implicitly converts UInt16 to Int32
117 status = builder.Append(internal_data[value_i]);
118 checkStatus(status, write_column->getName());
119 }
120
121 status = builder.Finish(&arrow_array);
122 checkStatus(status, write_column->getName());
123}
124
125static void fillArrowArrayWithDateTimeColumnData(
126 ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array, const PaddedPODArray<UInt8> * null_bytemap)
127{
128 auto & internal_data = assert_cast<const ColumnVector<UInt32> &>(*write_column).getData();
129 //arrow::Date64Builder builder;
130 arrow::UInt32Builder builder;
131 arrow::Status status;
132
133 for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i)
134 {
135 if (null_bytemap && (*null_bytemap)[value_i])
136 status = builder.AppendNull();
137 else
138 /// Implicitly converts UInt16 to Int32
139 //status = date_builder.Append(static_cast<int64_t>(internal_data[value_i]) * 1000); // now ms. TODO check other units
140 status = builder.Append(internal_data[value_i]);
141
142 checkStatus(status, write_column->getName());
143 }
144
145 status = builder.Finish(&arrow_array);
146 checkStatus(status, write_column->getName());
147}
148
149template <typename DataType>
150static void fillArrowArrayWithDecimalColumnData(
151 ColumnPtr write_column,
152 std::shared_ptr<arrow::Array> & arrow_array,
153 const PaddedPODArray<UInt8> * null_bytemap,
154 const DataType * decimal_type)
155{
156 const auto & column = static_cast<const typename DataType::ColumnType &>(*write_column);
157 arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()));
158 arrow::Status status;
159
160 for (size_t value_i = 0, size = column.size(); value_i < size; ++value_i)
161 {
162 if (null_bytemap && (*null_bytemap)[value_i])
163 status = builder.AppendNull();
164 else
165 status = builder.Append(
166 arrow::Decimal128(reinterpret_cast<const uint8_t *>(&column.getElement(value_i).value))); // TODO: try copy column
167
168 checkStatus(status, write_column->getName());
169 }
170 status = builder.Finish(&arrow_array);
171 checkStatus(status, write_column->getName());
172
173/* TODO column copy
174 const auto & internal_data = static_cast<const typename DataType::ColumnType &>(*write_column).getData();
175 //ArrowBuilderType numeric_builder;
176 arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()));
177 arrow::Status status;
178
179 const uint8_t * arrow_null_bytemap_raw_ptr = nullptr;
180 PaddedPODArray<UInt8> arrow_null_bytemap;
181 if (null_bytemap)
182 {
183 /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
184 arrow_null_bytemap.reserve(null_bytemap->size());
185 for (size_t i = 0, size = null_bytemap->size(); i < size; ++i)
186 arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]);
187
188 arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
189 }
190
191 status = builder.AppendValues(reinterpret_cast<const uint8_t*>(internal_data.data()), internal_data.size(), arrow_null_bytemap_raw_ptr);
192 checkStatus(status, write_column->getName());
193
194 status = builder.Finish(&arrow_array);
195 checkStatus(status, write_column->getName());
196*/
197}
198
199#define FOR_INTERNAL_NUMERIC_TYPES(M) \
200 M(UInt8, arrow::UInt8Builder) \
201 M(Int8, arrow::Int8Builder) \
202 M(UInt16, arrow::UInt16Builder) \
203 M(Int16, arrow::Int16Builder) \
204 M(UInt32, arrow::UInt32Builder) \
205 M(Int32, arrow::Int32Builder) \
206 M(UInt64, arrow::UInt64Builder) \
207 M(Int64, arrow::Int64Builder) \
208 M(Float32, arrow::FloatBuilder) \
209 M(Float64, arrow::DoubleBuilder)
210
211const std::unordered_map<String, std::shared_ptr<arrow::DataType>> internal_type_to_arrow_type = {
212 {"UInt8", arrow::uint8()},
213 {"Int8", arrow::int8()},
214 {"UInt16", arrow::uint16()},
215 {"Int16", arrow::int16()},
216 {"UInt32", arrow::uint32()},
217 {"Int32", arrow::int32()},
218 {"UInt64", arrow::uint64()},
219 {"Int64", arrow::int64()},
220 {"Float32", arrow::float32()},
221 {"Float64", arrow::float64()},
222
223 //{"Date", arrow::date64()},
224 //{"Date", arrow::date32()},
225 {"Date", arrow::uint16()}, // CHECK
226 //{"DateTime", arrow::date64()}, // BUG! saves as date32
227 {"DateTime", arrow::uint32()},
228
229 // TODO: ClickHouse can actually store non-utf8 strings!
230 {"String", arrow::utf8()},
231 {"FixedString", arrow::utf8()},
232};
233
234static const PaddedPODArray<UInt8> * extractNullBytemapPtr(ColumnPtr column)
235{
236 ColumnPtr null_column = assert_cast<const ColumnNullable &>(*column).getNullMapColumnPtr();
237 const PaddedPODArray<UInt8> & null_bytemap = assert_cast<const ColumnVector<UInt8> &>(*null_column).getData();
238 return &null_bytemap;
239}
240
241
242class OstreamOutputStream : public arrow::io::OutputStream
243{
244public:
245 explicit OstreamOutputStream(WriteBuffer & ostr_) : ostr(ostr_) { is_open = true; }
246 ~OstreamOutputStream() override {}
247
248 // FileInterface
249 ::arrow::Status Close() override
250 {
251 is_open = false;
252 return ::arrow::Status::OK();
253 }
254
255 ::arrow::Status Tell(int64_t* position) const override
256 {
257 *position = total_length;
258 return ::arrow::Status::OK();
259 }
260
261 bool closed() const override { return !is_open; }
262
263 // Writable
264 ::arrow::Status Write(const void* data, int64_t length) override
265 {
266 ostr.write(reinterpret_cast<const char *>(data), length);
267 total_length += length;
268 return ::arrow::Status::OK();
269 }
270
271private:
272 WriteBuffer & ostr;
273 int64_t total_length = 0;
274 bool is_open = false;
275
276 PARQUET_DISALLOW_COPY_AND_ASSIGN(OstreamOutputStream);
277};
278
279
280void ParquetBlockOutputFormat::consume(Chunk chunk)
281{
282 auto & header = getPort(PortKind::Main).getHeader();
283 const size_t columns_num = chunk.getNumColumns();
284
285 /// For arrow::Schema and arrow::Table creation
286 std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
287 std::vector<std::shared_ptr<arrow::Array>> arrow_arrays;
288 arrow_fields.reserve(columns_num);
289 arrow_arrays.reserve(columns_num);
290
291 for (size_t column_i = 0; column_i < columns_num; ++column_i)
292 {
293 // TODO: constructed every iteration
294 ColumnWithTypeAndName column = header.safeGetByPosition(column_i);
295 column.column = chunk.getColumns()[column_i];
296
297 const bool is_column_nullable = column.type->isNullable();
298 const auto & column_nested_type
299 = is_column_nullable ? static_cast<const DataTypeNullable *>(column.type.get())->getNestedType() : column.type;
300 const std::string column_nested_type_name = column_nested_type->getFamilyName();
301
302 if (isDecimal(column_nested_type))
303 {
304 const auto add_decimal_field = [&](const auto & types) -> bool {
305 using Types = std::decay_t<decltype(types)>;
306 using ToDataType = typename Types::LeftType;
307
308 if constexpr (
309 std::is_same_v<
310 ToDataType,
311 DataTypeDecimal<
312 Decimal32>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
313 {
314 const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
315 arrow_fields.emplace_back(std::make_shared<arrow::Field>(
316 column.name, arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()), is_column_nullable));
317 }
318
319 return false;
320 };
321 callOnIndexAndDataType<void>(column_nested_type->getTypeId(), add_decimal_field);
322 }
323 else
324 {
325 if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end())
326 {
327 throw Exception{"The type \"" + column_nested_type_name + "\" of a column \"" + column.name
328 + "\""
329 " is not supported for conversion into a Parquet data format",
330 ErrorCodes::UNKNOWN_TYPE};
331 }
332
333 arrow_fields.emplace_back(std::make_shared<arrow::Field>(column.name, internal_type_to_arrow_type.at(column_nested_type_name), is_column_nullable));
334 }
335
336 std::shared_ptr<arrow::Array> arrow_array;
337
338 ColumnPtr nested_column
339 = is_column_nullable ? assert_cast<const ColumnNullable &>(*column.column).getNestedColumnPtr() : column.column;
340 const PaddedPODArray<UInt8> * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr;
341
342 if ("String" == column_nested_type_name)
343 {
344 fillArrowArrayWithStringColumnData<ColumnString>(nested_column, arrow_array, null_bytemap);
345 }
346 else if ("FixedString" == column_nested_type_name)
347 {
348 fillArrowArrayWithStringColumnData<ColumnFixedString>(nested_column, arrow_array, null_bytemap);
349 }
350 else if ("Date" == column_nested_type_name)
351 {
352 fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap);
353 }
354 else if ("DateTime" == column_nested_type_name)
355 {
356 fillArrowArrayWithDateTimeColumnData(nested_column, arrow_array, null_bytemap);
357 }
358
359 else if (isDecimal(column_nested_type))
360 {
361 auto fill_decimal = [&](const auto & types) -> bool
362 {
363 using Types = std::decay_t<decltype(types)>;
364 using ToDataType = typename Types::LeftType;
365 if constexpr (
366 std::is_same_v<
367 ToDataType,
368 DataTypeDecimal<
369 Decimal32>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal64>> || std::is_same_v<ToDataType, DataTypeDecimal<Decimal128>>)
370 {
371 const auto & decimal_type = static_cast<const ToDataType *>(column_nested_type.get());
372 fillArrowArrayWithDecimalColumnData(nested_column, arrow_array, null_bytemap, decimal_type);
373 }
374 return false;
375 };
376 callOnIndexAndDataType<void>(column_nested_type->getTypeId(), fill_decimal);
377 }
378#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
379 else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \
380 { \
381 fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(nested_column, arrow_array, null_bytemap); \
382 }
383
384 FOR_INTERNAL_NUMERIC_TYPES(DISPATCH)
385#undef DISPATCH
386 else
387 {
388 throw Exception{"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name
389 + "\""
390 " is not supported for conversion into a Parquet data format",
391 ErrorCodes::UNKNOWN_TYPE};
392 }
393
394
395 arrow_arrays.emplace_back(std::move(arrow_array));
396 }
397
398 std::shared_ptr<arrow::Schema> arrow_schema = std::make_shared<arrow::Schema>(std::move(arrow_fields));
399
400 std::shared_ptr<arrow::Table> arrow_table = arrow::Table::Make(arrow_schema, arrow_arrays);
401
402 auto sink = std::make_shared<OstreamOutputStream>(out);
403
404 if (!file_writer)
405 {
406
407 parquet::WriterProperties::Builder builder;
408#if USE_SNAPPY
409 builder.compression(parquet::Compression::SNAPPY);
410#endif
411 auto props = builder.build();
412 auto status = parquet::arrow::FileWriter::Open(
413 *arrow_table->schema(),
414 arrow::default_memory_pool(),
415 sink,
416 props, /*parquet::default_writer_properties(),*/
417 &file_writer);
418 if (!status.ok())
419 throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
420 }
421
422 // TODO: calculate row_group_size depending on a number of rows and table size
423 auto status = file_writer->WriteTable(*arrow_table, format_settings.parquet.row_group_size);
424
425 if (!status.ok())
426 throw Exception{"Error while writing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
427}
428
429void ParquetBlockOutputFormat::finalize()
430{
431 if (file_writer)
432 {
433 auto status = file_writer->Close();
434 if (!status.ok())
435 throw Exception{"Error while closing a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
436 }
437}
438
439
440void registerOutputFormatProcessorParquet(FormatFactory & factory)
441{
442 factory.registerOutputFormatProcessor(
443 "Parquet",
444 [](WriteBuffer & buf,
445 const Block & sample,
446 FormatFactory::WriteCallback,
447 const FormatSettings & format_settings)
448 {
449 auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
450 /// TODO
451 // auto res = std::make_shared<SquashingBlockOutputStream>(impl, impl->getHeader(), format_settings.parquet.row_group_size, 0);
452 // res->disableFlush();
453 return impl;
454 });
455}
456
457}
458
459
460#else
461
462namespace DB
463{
464class FormatFactory;
465void registerOutputFormatProcessorParquet(FormatFactory &)
466{
467}
468}
469
470
471#endif
472