1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/arrow/writer.h"
19
20#include <algorithm>
21#include <string>
22#include <utility>
23#include <vector>
24
25#include "arrow/api.h"
26#include "arrow/compute/api.h"
27#include "arrow/util/bit-util.h"
28#include "arrow/visitor_inline.h"
29
30#include "arrow/util/logging.h"
31#include "parquet/arrow/schema.h"
32
33using arrow::Array;
34using arrow::BinaryArray;
35using arrow::BooleanArray;
36using arrow::ChunkedArray;
37using arrow::Decimal128Array;
38using arrow::Field;
39using arrow::FixedSizeBinaryArray;
40using arrow::Int16Array;
41using arrow::Int16Builder;
42using arrow::ListArray;
43using arrow::MemoryPool;
44using arrow::NumericArray;
45using arrow::PrimitiveArray;
46using arrow::ResizableBuffer;
47using arrow::Status;
48using arrow::Table;
49using arrow::TimeUnit;
50
51using arrow::compute::Cast;
52using arrow::compute::CastOptions;
53using arrow::compute::FunctionContext;
54
55using parquet::ParquetFileWriter;
56using parquet::ParquetVersion;
57using parquet::schema::GroupNode;
58
59namespace parquet {
60namespace arrow {
61
62namespace BitUtil = ::arrow::BitUtil;
63
64std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
65 static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
66 ArrowWriterProperties::Builder().build();
67 return default_writer_properties;
68}
69
70namespace {
71
72class LevelBuilder {
73 public:
74 explicit LevelBuilder(MemoryPool* pool)
75 : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {}
76
77 Status VisitInline(const Array& array);
78
79 template <typename T>
80 typename std::enable_if<std::is_base_of<::arrow::FlatArray, T>::value, Status>::type
81 Visit(const T& array) {
82 array_offsets_.push_back(static_cast<int32_t>(array.offset()));
83 valid_bitmaps_.push_back(array.null_bitmap_data());
84 null_counts_.push_back(array.null_count());
85 values_array_ = std::make_shared<T>(array.data());
86 return Status::OK();
87 }
88
89 Status Visit(const ListArray& array) {
90 array_offsets_.push_back(static_cast<int32_t>(array.offset()));
91 valid_bitmaps_.push_back(array.null_bitmap_data());
92 null_counts_.push_back(array.null_count());
93 offsets_.push_back(array.raw_value_offsets());
94
95 min_offset_idx_ = array.value_offset(min_offset_idx_);
96 max_offset_idx_ = array.value_offset(max_offset_idx_);
97
98 return VisitInline(*array.values());
99 }
100
101#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \
102 Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \
103 return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
104 " not supported yet"); \
105 }
106
107 NOT_IMPLEMENTED_VISIT(Struct)
108 NOT_IMPLEMENTED_VISIT(Union)
109 NOT_IMPLEMENTED_VISIT(Dictionary)
110
111 Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
112 int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
113 const std::shared_ptr<ResizableBuffer>& def_levels_scratch,
114 std::shared_ptr<Buffer>* def_levels_out,
115 std::shared_ptr<Buffer>* rep_levels_out,
116 std::shared_ptr<Array>* values_array) {
117 // Work downwards to extract bitmaps and offsets
118 min_offset_idx_ = 0;
119 max_offset_idx_ = array.length();
120 RETURN_NOT_OK(VisitInline(array));
121 *num_values = max_offset_idx_ - min_offset_idx_;
122 *values_offset = min_offset_idx_;
123 *values_array = values_array_;
124
125 // Walk downwards to extract nullability
126 std::shared_ptr<Field> current_field = field;
127 nullable_.push_back(current_field->nullable());
128 while (current_field->type()->num_children() > 0) {
129 if (current_field->type()->num_children() > 1) {
130 return Status::NotImplemented(
131 "Fields with more than one child are not supported.");
132 } else {
133 current_field = current_field->type()->child(0);
134 }
135 nullable_.push_back(current_field->nullable());
136 }
137
138 // Generate the levels.
139 if (nullable_.size() == 1) {
140 // We have a PrimitiveArray
141 *rep_levels_out = nullptr;
142 if (nullable_[0]) {
143 RETURN_NOT_OK(
144 def_levels_scratch->Resize(array.length() * sizeof(int16_t), false));
145 auto def_levels_ptr =
146 reinterpret_cast<int16_t*>(def_levels_scratch->mutable_data());
147 if (array.null_count() == 0) {
148 std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
149 } else if (array.null_count() == array.length()) {
150 std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
151 } else {
152 ::arrow::internal::BitmapReader valid_bits_reader(
153 array.null_bitmap_data(), array.offset(), array.length());
154 for (int i = 0; i < array.length(); i++) {
155 def_levels_ptr[i] = valid_bits_reader.IsSet() ? 1 : 0;
156 valid_bits_reader.Next();
157 }
158 }
159
160 *def_levels_out = def_levels_scratch;
161 } else {
162 *def_levels_out = nullptr;
163 }
164 *num_levels = array.length();
165 } else {
166 RETURN_NOT_OK(rep_levels_.Append(0));
167 RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length()));
168
169 std::shared_ptr<Array> def_levels_array;
170 std::shared_ptr<Array> rep_levels_array;
171
172 RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
173 RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array));
174
175 *def_levels_out = static_cast<PrimitiveArray*>(def_levels_array.get())->values();
176 *rep_levels_out = static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
177 *num_levels = rep_levels_array->length();
178 }
179
180 return Status::OK();
181 }
182
183 Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) {
184 if (nullable_[rep_level]) {
185 if (null_counts_[rep_level] == 0 ||
186 BitUtil::GetBit(valid_bitmaps_[rep_level], index + array_offsets_[rep_level])) {
187 return HandleNonNullList(static_cast<int16_t>(def_level + 1), rep_level, index);
188 } else {
189 return def_levels_.Append(def_level);
190 }
191 } else {
192 return HandleNonNullList(def_level, rep_level, index);
193 }
194 }
195
196 Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) {
197 const int32_t inner_offset = offsets_[rep_level][index];
198 const int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
199 const int64_t recursion_level = rep_level + 1;
200 if (inner_length == 0) {
201 return def_levels_.Append(def_level);
202 }
203 if (recursion_level < static_cast<int64_t>(offsets_.size())) {
204 return HandleListEntries(static_cast<int16_t>(def_level + 1),
205 static_cast<int16_t>(rep_level + 1), inner_offset,
206 inner_length);
207 } else {
208 // We have reached the leaf: primitive list, handle remaining nullables
209 const bool nullable_level = nullable_[recursion_level];
210 const int64_t level_null_count = null_counts_[recursion_level];
211 const uint8_t* level_valid_bitmap = valid_bitmaps_[recursion_level];
212
213 for (int64_t i = 0; i < inner_length; i++) {
214 if (i > 0) {
215 RETURN_NOT_OK(rep_levels_.Append(static_cast<int16_t>(rep_level + 1)));
216 }
217 if (level_null_count && level_valid_bitmap == nullptr) {
218 // Special case: this is a null array (all elements are null)
219 RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
220 } else if (nullable_level &&
221 ((level_null_count == 0) ||
222 BitUtil::GetBit(
223 level_valid_bitmap,
224 inner_offset + i + array_offsets_[recursion_level]))) {
225 // Non-null element in a null level
226 RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2)));
227 } else {
228 // This can be produced in two case:
229 // * elements are nullable and this one is null (i.e. max_def_level = def_level
230 // + 2)
231 // * elements are non-nullable (i.e. max_def_level = def_level + 1)
232 RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
233 }
234 }
235 return Status::OK();
236 }
237 }
238
239 Status HandleListEntries(int16_t def_level, int16_t rep_level, int64_t offset,
240 int64_t length) {
241 for (int64_t i = 0; i < length; i++) {
242 if (i > 0) {
243 RETURN_NOT_OK(rep_levels_.Append(rep_level));
244 }
245 RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i));
246 }
247 return Status::OK();
248 }
249
250 private:
251 Int16Builder def_levels_;
252 Int16Builder rep_levels_;
253
254 std::vector<int64_t> null_counts_;
255 std::vector<const uint8_t*> valid_bitmaps_;
256 std::vector<const int32_t*> offsets_;
257 std::vector<int32_t> array_offsets_;
258 std::vector<bool> nullable_;
259
260 int64_t min_offset_idx_;
261 int64_t max_offset_idx_;
262 std::shared_ptr<Array> values_array_;
263};
264
265Status LevelBuilder::VisitInline(const Array& array) {
266 return VisitArrayInline(array, this);
267}
268
269struct ColumnWriterContext {
270 ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
271 : memory_pool(memory_pool), properties(properties) {
272 this->data_buffer = AllocateBuffer(memory_pool);
273 this->def_levels_buffer = AllocateBuffer(memory_pool);
274 }
275
276 template <typename T>
277 Status GetScratchData(const int64_t num_values, T** out) {
278 RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
279 *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
280 return Status::OK();
281 }
282
283 MemoryPool* memory_pool;
284 ArrowWriterProperties* properties;
285
286 // Buffer used for storing the data of an array converted to the physical type
287 // as expected by parquet-cpp.
288 std::shared_ptr<ResizableBuffer> data_buffer;
289
290 // We use the shared ownership of this buffer
291 std::shared_ptr<ResizableBuffer> def_levels_buffer;
292};
293
294Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
295 if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
296 if (type.num_children() != 1) {
297 return Status::Invalid("Nested column branch had multiple children");
298 }
299 return GetLeafType(*type.child(0)->type(), leaf_type);
300 } else {
301 *leaf_type = type.id();
302 return Status::OK();
303 }
304}
305
306class ArrowColumnWriter {
307 public:
308 ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
309 const std::shared_ptr<Field>& field)
310 : ctx_(ctx), writer_(column_writer), field_(field) {}
311
312 Status Write(const Array& data);
313
314 Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
315 if (data.length() == 0) {
316 return Status::OK();
317 }
318
319 int64_t absolute_position = 0;
320 int chunk_index = 0;
321 int64_t chunk_offset = 0;
322 while (chunk_index < data.num_chunks() && absolute_position < offset) {
323 const int64_t chunk_length = data.chunk(chunk_index)->length();
324 if (absolute_position + chunk_length > offset) {
325 // Relative offset into the chunk to reach the desired start offset for
326 // writing
327 chunk_offset = offset - absolute_position;
328 break;
329 } else {
330 ++chunk_index;
331 absolute_position += chunk_length;
332 }
333 }
334
335 if (absolute_position >= data.length()) {
336 return Status::Invalid("Cannot write data at offset past end of chunked array");
337 }
338
339 int64_t values_written = 0;
340 while (values_written < size) {
341 const Array& chunk = *data.chunk(chunk_index);
342 const int64_t available_values = chunk.length() - chunk_offset;
343 const int64_t chunk_write_size = std::min(size - values_written, available_values);
344
345 // The chunk offset here will be 0 except for possibly the first chunk
346 // because of the advancing logic above
347 std::shared_ptr<Array> array_to_write = chunk.Slice(chunk_offset, chunk_write_size);
348 RETURN_NOT_OK(Write(*array_to_write));
349
350 if (chunk_write_size == available_values) {
351 chunk_offset = 0;
352 ++chunk_index;
353 }
354 values_written += chunk_write_size;
355 }
356
357 return Status::OK();
358 }
359
360 Status Close() {
361 PARQUET_CATCH_NOT_OK(writer_->Close());
362 return Status::OK();
363 }
364
365 private:
366 template <typename ParquetType, typename ArrowType>
367 Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels,
368 const int16_t* rep_levels);
369
370 Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
371 const int16_t* rep_levels);
372
373 Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const Array& data,
374 int64_t num_levels, const int16_t* def_levels,
375 const int16_t* rep_levels);
376
377 template <typename ParquetType, typename ArrowType>
378 Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
379 int64_t num_levels, const int16_t* def_levels,
380 const int16_t* rep_levels,
381 const typename ArrowType::c_type* values);
382
383 template <typename ParquetType, typename ArrowType>
384 Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels,
385 const int16_t* def_levels, const int16_t* rep_levels,
386 const uint8_t* valid_bits, int64_t valid_bits_offset,
387 const typename ArrowType::c_type* values);
388
389 template <typename ParquetType>
390 Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
391 const int16_t* rep_levels,
392 const typename ParquetType::c_type* values) {
393 auto typed_writer =
394 ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
395 // WriteBatch was called with type mismatching the writer_'s type. This
396 // could be a schema conversion problem.
397 DCHECK(typed_writer);
398 PARQUET_CATCH_NOT_OK(
399 typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
400 return Status::OK();
401 }
402
403 template <typename ParquetType>
404 Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels,
405 const int16_t* rep_levels, const uint8_t* valid_bits,
406 int64_t valid_bits_offset,
407 const typename ParquetType::c_type* values) {
408 auto typed_writer =
409 ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
410 // WriteBatchSpaced was called with type mismatching the writer_'s type. This
411 // could be a schema conversion problem.
412 DCHECK(typed_writer);
413 PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
414 num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
415 return Status::OK();
416 }
417
418 ColumnWriterContext* ctx_;
419 ColumnWriter* writer_;
420 std::shared_ptr<Field> field_;
421};
422
423template <typename ParquetType, typename ArrowType>
424Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels,
425 const int16_t* def_levels,
426 const int16_t* rep_levels) {
427 using ArrowCType = typename ArrowType::c_type;
428
429 const auto& data = static_cast<const PrimitiveArray&>(array);
430 const ArrowCType* values = nullptr;
431 // The values buffer may be null if the array is empty (ARROW-2744)
432 if (data.values() != nullptr) {
433 values = reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset();
434 } else {
435 DCHECK_EQ(data.length(), 0);
436 }
437
438 if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
439 // no nulls, just dump the data
440 RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
441 static_cast<const ArrowType&>(*array.type()), array.length(), num_levels,
442 def_levels, rep_levels, values)));
443 } else {
444 const uint8_t* valid_bits = data.null_bitmap_data();
445 RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
446 static_cast<const ArrowType&>(*array.type()), data.length(), num_levels,
447 def_levels, rep_levels, valid_bits, data.offset(), values)));
448 }
449 return Status::OK();
450}
451
452template <typename ParquetType, typename ArrowType>
453Status ArrowColumnWriter::WriteNonNullableBatch(
454 const ArrowType& type, int64_t num_values, int64_t num_levels,
455 const int16_t* def_levels, const int16_t* rep_levels,
456 const typename ArrowType::c_type* values) {
457 using ParquetCType = typename ParquetType::c_type;
458 ParquetCType* buffer;
459 RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
460
461 std::copy(values, values + num_values, buffer);
462
463 return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);
464}
465
466template <>
467Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
468 const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
469 const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
470 int32_t* buffer;
471 RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
472
473 for (int i = 0; i < num_values; i++) {
474 buffer[i] = static_cast<int32_t>(values[i] / 86400000);
475 }
476
477 return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
478}
479
480template <>
481Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
482 const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
483 const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) {
484 int32_t* buffer;
485 RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
486 if (type.unit() == TimeUnit::SECOND) {
487 for (int i = 0; i < num_values; i++) {
488 buffer[i] = values[i] * 1000;
489 }
490 } else {
491 std::copy(values, values + num_values, buffer);
492 }
493 return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
494}
495
496#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
497 template <> \
498 Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>( \
499 const ArrowType& type, int64_t num_values, int64_t num_levels, \
500 const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \
501 return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer); \
502 }
503
504NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
505NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
506NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
507NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
508NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
509NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
510
511template <typename ParquetType, typename ArrowType>
512Status ArrowColumnWriter::WriteNullableBatch(
513 const ArrowType& type, int64_t num_values, int64_t num_levels,
514 const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
515 int64_t valid_bits_offset, const typename ArrowType::c_type* values) {
516 using ParquetCType = typename ParquetType::c_type;
517
518 ParquetCType* buffer;
519 RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
520 for (int i = 0; i < num_values; i++) {
521 buffer[i] = static_cast<ParquetCType>(values[i]);
522 }
523
524 return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits,
525 valid_bits_offset, buffer);
526}
527
528template <>
529Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
530 const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
531 const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
532 int64_t valid_bits_offset, const int64_t* values) {
533 int32_t* buffer;
534 RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
535
536 for (int i = 0; i < num_values; i++) {
537 // Convert from milliseconds into days since the epoch
538 buffer[i] = static_cast<int32_t>(values[i] / 86400000);
539 }
540
541 return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
542 valid_bits_offset, buffer);
543}
544
545template <>
546Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
547 const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
548 const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
549 int64_t valid_bits_offset, const int32_t* values) {
550 int32_t* buffer;
551 RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
552
553 if (type.unit() == TimeUnit::SECOND) {
554 for (int i = 0; i < num_values; i++) {
555 buffer[i] = values[i] * 1000;
556 }
557 } else {
558 for (int i = 0; i < num_values; i++) {
559 buffer[i] = values[i];
560 }
561 }
562 return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
563 valid_bits_offset, buffer);
564}
565
566#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
567 template <> \
568 Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>( \
569 const ArrowType& type, int64_t num_values, int64_t num_levels, \
570 const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
571 int64_t valid_bits_offset, const CType* values) { \
572 return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, \
573 valid_bits_offset, values); \
574 }
575
576NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
577NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
578NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
579NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
580NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
581NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
582NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
583NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
584
585#define CONV_CASE_LOOP(ConversionFunction) \
586 for (int64_t i = 0; i < num_values; i++) \
587 ConversionFunction(arrow_values[i], &output[i]);
588
589static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values,
590 int64_t num_values,
591 ::arrow::TimeUnit ::type unit_type,
592 Int96* output) {
593 switch (unit_type) {
594 case TimeUnit::NANO:
595 CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp);
596 break;
597 case TimeUnit::MICRO:
598 CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp);
599 break;
600 case TimeUnit::MILLI:
601 CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp);
602 break;
603 case TimeUnit::SECOND:
604 CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp);
605 break;
606 }
607}
608
609#undef CONV_CASE_LOOP
610
611template <>
612Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
613 const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
614 const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
615 int64_t valid_bits_offset, const int64_t* values) {
616 Int96* buffer = nullptr;
617 RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
618
619 ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
620
621 return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
622 valid_bits_offset, buffer);
623}
624
625template <>
626Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
627 const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
628 const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
629 Int96* buffer = nullptr;
630 RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
631
632 ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
633
634 return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
635}
636
637Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels,
638 const int16_t* def_levels,
639 const int16_t* rep_levels) {
640 const auto& type = static_cast<const ::arrow::TimestampType&>(*values.type());
641
642 const bool is_nanosecond = type.unit() == TimeUnit::NANO;
643
644 if (ctx_->properties->support_deprecated_int96_timestamps()) {
645 // The user explicitly required to use Int96 storage.
646 return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
647 def_levels, rep_levels);
648 } else if (is_nanosecond ||
649 (ctx_->properties->coerce_timestamps_enabled() &&
650 (type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
651 // Casting is required. This covers several cases
652 // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved)
653 // * coerce_timestamps_enabled_, cast all timestamps to requested unit
654 return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values,
655 num_levels, def_levels, rep_levels);
656 } else {
657 // No casting of timestamps is required, take the fast path
658 return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
659 def_levels, rep_levels);
660 }
661}
662
663Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_allowed,
664 const Array& array, int64_t num_levels,
665 const int16_t* def_levels,
666 const int16_t* rep_levels) {
667 int64_t* buffer;
668 RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
669
670 const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
671
672 auto values = data.raw_values();
673 const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
674
675 TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
676 ? ctx_->properties->coerce_timestamps_unit()
677 : TimeUnit::MICRO;
678 auto target_type = ::arrow::timestamp(target_unit);
679
680 auto DivideBy = [&](const int64_t factor) {
681 for (int64_t i = 0; i < array.length(); i++) {
682 if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] % factor != 0)) {
683 return Status::Invalid("Casting from ", type.ToString(), " to ",
684 target_type->ToString(), " would lose data: ", values[i]);
685 }
686 buffer[i] = values[i] / factor;
687 }
688 return Status::OK();
689 };
690
691 auto MultiplyBy = [&](const int64_t factor) {
692 for (int64_t i = 0; i < array.length(); i++) {
693 buffer[i] = values[i] * factor;
694 }
695 return Status::OK();
696 };
697
698 if (type.unit() == TimeUnit::NANO) {
699 if (target_unit == TimeUnit::MICRO) {
700 RETURN_NOT_OK(DivideBy(1000));
701 } else {
702 DCHECK_EQ(TimeUnit::MILLI, target_unit);
703 RETURN_NOT_OK(DivideBy(1000000));
704 }
705 } else if (type.unit() == TimeUnit::SECOND) {
706 RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000));
707 } else if (type.unit() == TimeUnit::MILLI) {
708 DCHECK_EQ(TimeUnit::MICRO, target_unit);
709 RETURN_NOT_OK(MultiplyBy(1000));
710 } else {
711 DCHECK_EQ(TimeUnit::MILLI, target_unit);
712 RETURN_NOT_OK(DivideBy(1000));
713 }
714
715 if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
716 // no nulls, just dump the data
717 RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
718 static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
719 num_levels, def_levels, rep_levels, buffer)));
720 } else {
721 const uint8_t* valid_bits = data.null_bitmap_data();
722 RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
723 static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
724 num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
725 }
726 return Status::OK();
727}
728
729// This specialization seems quite similar but it significantly differs in two points:
730// * offset is added at the most latest time to the pointer as we have sub-byte access
731// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
732// ArrowType::c_type to ParquetType::c_type
733
734template <>
735Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
736 const Array& array, int64_t num_levels, const int16_t* def_levels,
737 const int16_t* rep_levels) {
738 bool* buffer;
739 RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer));
740
741 const auto& data = static_cast<const BooleanArray&>(array);
742 const uint8_t* values = nullptr;
743 // The values buffer may be null if the array is empty (ARROW-2744)
744 if (data.values() != nullptr) {
745 values = reinterpret_cast<const uint8_t*>(data.values()->data());
746 } else {
747 DCHECK_EQ(data.length(), 0);
748 }
749
750 int buffer_idx = 0;
751 int64_t offset = array.offset();
752 for (int i = 0; i < data.length(); i++) {
753 if (!data.IsNull(i)) {
754 buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
755 }
756 }
757
758 return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer);
759}
760
761template <>
762Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>(
763 const Array& array, int64_t num_levels, const int16_t* def_levels,
764 const int16_t* rep_levels) {
765 return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr);
766}
767
768template <>
769Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
770 const Array& array, int64_t num_levels, const int16_t* def_levels,
771 const int16_t* rep_levels) {
772 ByteArray* buffer;
773 RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer));
774
775 const auto& data = static_cast<const BinaryArray&>(array);
776
777 // In the case of an array consisting of only empty strings or all null,
778 // data.data() points already to a nullptr, thus data.data()->data() will
779 // segfault.
780 const uint8_t* values = nullptr;
781 if (data.value_data()) {
782 values = reinterpret_cast<const uint8_t*>(data.value_data()->data());
783 DCHECK(values != nullptr);
784 }
785
786 // Slice offset is accounted for in raw_value_offsets
787 const int32_t* value_offset = data.raw_value_offsets();
788
789 if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
790 // no nulls, just dump the data
791 for (int64_t i = 0; i < data.length(); i++) {
792 buffer[i] =
793 ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
794 }
795 } else {
796 int buffer_idx = 0;
797 for (int64_t i = 0; i < data.length(); i++) {
798 if (!data.IsNull(i)) {
799 buffer[buffer_idx++] =
800 ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
801 }
802 }
803 }
804
805 return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer);
806}
807
808template <>
809Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
810 const Array& array, int64_t num_levels, const int16_t* def_levels,
811 const int16_t* rep_levels) {
812 const auto& data = static_cast<const FixedSizeBinaryArray&>(array);
813 const int64_t length = data.length();
814
815 FLBA* buffer;
816 RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
817
818 if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) {
819 // no nulls, just dump the data
820 // todo(advancedxy): use a writeBatch to avoid this step
821 for (int64_t i = 0; i < length; i++) {
822 buffer[i] = FixedLenByteArray(data.GetValue(i));
823 }
824 } else {
825 int buffer_idx = 0;
826 for (int64_t i = 0; i < length; i++) {
827 if (!data.IsNull(i)) {
828 buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
829 }
830 }
831 }
832
833 return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
834}
835
836template <>
837Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
838 const Array& array, int64_t num_levels, const int16_t* def_levels,
839 const int16_t* rep_levels) {
840 const auto& data = static_cast<const Decimal128Array&>(array);
841 const int64_t length = data.length();
842
843 FLBA* buffer;
844 RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
845
846 const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
847 const int32_t offset =
848 decimal_type.byte_width() - DecimalSize(decimal_type.precision());
849
850 const bool does_not_have_nulls =
851 writer_->descr()->schema_node()->is_required() || data.null_count() == 0;
852
853 const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2;
854 std::vector<uint64_t> big_endian_values(valid_value_count);
855
856 // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
857 // don't have to keep writing two loops to handle the case where we know there are no
858 // nulls
859 if (does_not_have_nulls) {
860 // no nulls, just dump the data
861 // todo(advancedxy): use a writeBatch to avoid this step
862 for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
863 auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
864 big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
865 big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
866 buffer[i] = FixedLenByteArray(
867 reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
868 }
869 } else {
870 for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
871 if (!data.IsNull(i)) {
872 auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
873 big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
874 big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
875 buffer[buffer_idx++] = FixedLenByteArray(
876 reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
877 j += 2;
878 }
879 }
880 }
881
882 return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
883}
884
885Status ArrowColumnWriter::Write(const Array& data) {
886 if (data.length() == 0) {
887 // Write nothing when length is 0
888 return Status::OK();
889 }
890
891 ::arrow::Type::type values_type;
892 RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
893
894 std::shared_ptr<Array> _values_array;
895 int64_t values_offset = 0;
896 int64_t num_levels = 0;
897 int64_t num_values = 0;
898 LevelBuilder level_builder(ctx_->memory_pool);
899
900 std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
901 RETURN_NOT_OK(level_builder.GenerateLevels(
902 data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
903 &def_levels_buffer, &rep_levels_buffer, &_values_array));
904 const int16_t* def_levels = nullptr;
905 if (def_levels_buffer) {
906 def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
907 }
908 const int16_t* rep_levels = nullptr;
909 if (rep_levels_buffer) {
910 rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
911 }
912 std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
913
914#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \
915 case ::arrow::Type::ArrowEnum: \
916 return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array, num_levels, \
917 def_levels, rep_levels);
918
919 switch (values_type) {
920 case ::arrow::Type::UINT32: {
921 if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
922 // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
923 // to use the larger Int64Type to store them lossless.
924 return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array, num_levels,
925 def_levels, rep_levels);
926 } else {
927 return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array, num_levels,
928 def_levels, rep_levels);
929 }
930 }
931 WRITE_BATCH_CASE(NA, NullType, Int32Type)
932 case ::arrow::Type::TIMESTAMP:
933 return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels);
934 WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
935 WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
936 WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
937 WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
938 WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
939 WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
940 WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
941 WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
942 WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
943 WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
944 WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
945 WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
946 WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
947 WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
948 WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
949 WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
950 WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
951 WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
952 default:
953 break;
954 }
955 return Status::NotImplemented("Data type not supported as list value: ",
956 values_array->type()->ToString());
957}
958
959} // namespace
960
961// ----------------------------------------------------------------------
962// FileWriter implementation
963
964class FileWriter::Impl {
965 public:
966 Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
967 const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
968 : writer_(std::move(writer)),
969 row_group_writer_(nullptr),
970 column_write_context_(pool, arrow_properties.get()),
971 arrow_properties_(arrow_properties),
972 closed_(false) {}
973
974 Status NewRowGroup(int64_t chunk_size) {
975 if (row_group_writer_ != nullptr) {
976 PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
977 }
978 PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
979 return Status::OK();
980 }
981
982 Status Close() {
983 if (!closed_) {
984 // Make idempotent
985 closed_ = true;
986 if (row_group_writer_ != nullptr) {
987 PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
988 }
989 PARQUET_CATCH_NOT_OK(writer_->Close());
990 }
991 return Status::OK();
992 }
993
994 Status WriteColumnChunk(const Array& data) {
995 // A bit awkward here since cannot instantiate ChunkedArray from const Array&
996 ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())};
997 auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks);
998 return WriteColumnChunk(chunked_array, 0, data.length());
999 }
1000
1001 Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
1002 const int64_t size) {
1003 // DictionaryArrays are not yet handled with a fast path. To still support
1004 // writing them as a workaround, we convert them back to their non-dictionary
1005 // representation.
1006 if (data->type()->id() == ::arrow::Type::DICTIONARY) {
1007 const ::arrow::DictionaryType& dict_type =
1008 static_cast<const ::arrow::DictionaryType&>(*data->type());
1009
1010 // TODO(ARROW-1648): Remove this special handling once we require an Arrow
1011 // version that has this fixed.
1012 if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
1013 auto null_array = std::make_shared<::arrow::NullArray>(data->length());
1014 return WriteColumnChunk(*null_array);
1015 }
1016
1017 FunctionContext ctx(this->memory_pool());
1018 ::arrow::compute::Datum cast_input(data);
1019 ::arrow::compute::Datum cast_output;
1020 RETURN_NOT_OK(Cast(&ctx, cast_input, dict_type.dictionary()->type(), CastOptions(),
1021 &cast_output));
1022 return WriteColumnChunk(cast_output.chunked_array(), offset, size);
1023 }
1024
1025 ColumnWriter* column_writer;
1026 PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
1027
1028 // TODO(wesm): This trick to construct a schema for one Parquet root node
1029 // will not work for arbitrary nested data
1030 int current_column_idx = row_group_writer_->current_column();
1031 std::shared_ptr<::arrow::Schema> arrow_schema;
1032 RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
1033 writer_->key_value_metadata(), &arrow_schema));
1034
1035 ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
1036 arrow_schema->field(0));
1037
1038 RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
1039 return arrow_writer.Close();
1040 }
1041
1042 const WriterProperties& properties() const { return *writer_->properties(); }
1043
1044 ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; }
1045
1046 virtual ~Impl() {}
1047
1048 private:
1049 friend class FileWriter;
1050
1051 std::unique_ptr<ParquetFileWriter> writer_;
1052 RowGroupWriter* row_group_writer_;
1053 ColumnWriterContext column_write_context_;
1054 std::shared_ptr<ArrowWriterProperties> arrow_properties_;
1055 bool closed_;
1056};
1057
1058Status FileWriter::NewRowGroup(int64_t chunk_size) {
1059 return impl_->NewRowGroup(chunk_size);
1060}
1061
1062Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) {
1063 return impl_->WriteColumnChunk(data);
1064}
1065
1066Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
1067 const int64_t offset, const int64_t size) {
1068 return impl_->WriteColumnChunk(data, offset, size);
1069}
1070
1071Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) {
1072 return WriteColumnChunk(data, 0, data->length());
1073}
1074
1075Status FileWriter::Close() { return impl_->Close(); }
1076
1077MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
1078
1079FileWriter::~FileWriter() {}
1080
1081FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
1082 const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
1083 : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)) {}
1084
1085Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
1086 const std::shared_ptr<OutputStream>& sink,
1087 const std::shared_ptr<WriterProperties>& properties,
1088 std::unique_ptr<FileWriter>* writer) {
1089 return Open(schema, pool, sink, properties, default_arrow_writer_properties(), writer);
1090}
1091
1092Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
1093 const std::shared_ptr<OutputStream>& sink,
1094 const std::shared_ptr<WriterProperties>& properties,
1095 const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
1096 std::unique_ptr<FileWriter>* writer) {
1097 std::shared_ptr<SchemaDescriptor> parquet_schema;
1098 RETURN_NOT_OK(
1099 ToParquetSchema(&schema, *properties, *arrow_properties, &parquet_schema));
1100
1101 auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
1102
1103 std::unique_ptr<ParquetFileWriter> base_writer =
1104 ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata());
1105
1106 writer->reset(new FileWriter(pool, std::move(base_writer), arrow_properties));
1107 return Status::OK();
1108}
1109
1110Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
1111 const std::shared_ptr<::arrow::io::OutputStream>& sink,
1112 const std::shared_ptr<WriterProperties>& properties,
1113 std::unique_ptr<FileWriter>* writer) {
1114 auto wrapper = std::make_shared<ArrowOutputStream>(sink);
1115 return Open(schema, pool, wrapper, properties, writer);
1116}
1117
1118Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
1119 const std::shared_ptr<::arrow::io::OutputStream>& sink,
1120 const std::shared_ptr<WriterProperties>& properties,
1121 const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
1122 std::unique_ptr<FileWriter>* writer) {
1123 auto wrapper = std::make_shared<ArrowOutputStream>(sink);
1124 return Open(schema, pool, wrapper, properties, arrow_properties, writer);
1125}
1126
1127Status WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) {
1128 PARQUET_CATCH_NOT_OK(::parquet::WriteFileMetaData(file_metadata, sink));
1129 return Status::OK();
1130}
1131
1132Status WriteFileMetaData(const FileMetaData& file_metadata,
1133 const std::shared_ptr<::arrow::io::OutputStream>& sink) {
1134 ArrowOutputStream wrapper(sink);
1135 return ::parquet::arrow::WriteFileMetaData(file_metadata, &wrapper);
1136}
1137
1138namespace {} // namespace
1139
1140Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
1141 if (chunk_size <= 0 && table.num_rows() > 0) {
1142 return Status::Invalid("chunk size per row_group must be greater than 0");
1143 } else if (chunk_size > impl_->properties().max_row_group_length()) {
1144 chunk_size = impl_->properties().max_row_group_length();
1145 }
1146
1147 auto WriteRowGroup = [&](int64_t offset, int64_t size) {
1148 RETURN_NOT_OK(NewRowGroup(size));
1149 for (int i = 0; i < table.num_columns(); i++) {
1150 auto chunked_data = table.column(i)->data();
1151 RETURN_NOT_OK(WriteColumnChunk(chunked_data, offset, size));
1152 }
1153 return Status::OK();
1154 };
1155
1156 if (table.num_rows() == 0) {
1157 // Append a row group with 0 rows
1158 RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
1159 return Status::OK();
1160 }
1161
1162 for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
1163 int64_t offset = chunk * chunk_size;
1164 RETURN_NOT_OK_ELSE(
1165 WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
1166 PARQUET_IGNORE_NOT_OK(Close()));
1167 }
1168 return Status::OK();
1169}
1170
1171Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
1172 const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
1173 const std::shared_ptr<WriterProperties>& properties,
1174 const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
1175 std::unique_ptr<FileWriter> writer;
1176 RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties,
1177 arrow_properties, &writer));
1178 RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
1179 return writer->Close();
1180}
1181
1182Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
1183 const std::shared_ptr<::arrow::io::OutputStream>& sink,
1184 int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties,
1185 const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
1186 auto wrapper = std::make_shared<ArrowOutputStream>(sink);
1187 return WriteTable(table, pool, wrapper, chunk_size, properties, arrow_properties);
1188}
1189
1190} // namespace arrow
1191} // namespace parquet
1192