1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/arrow/writer.h" |
19 | |
20 | #include <algorithm> |
21 | #include <string> |
22 | #include <utility> |
23 | #include <vector> |
24 | |
25 | #include "arrow/api.h" |
26 | #include "arrow/compute/api.h" |
27 | #include "arrow/util/bit-util.h" |
28 | #include "arrow/visitor_inline.h" |
29 | |
30 | #include "arrow/util/logging.h" |
31 | #include "parquet/arrow/schema.h" |
32 | |
33 | using arrow::Array; |
34 | using arrow::BinaryArray; |
35 | using arrow::BooleanArray; |
36 | using arrow::ChunkedArray; |
37 | using arrow::Decimal128Array; |
38 | using arrow::Field; |
39 | using arrow::FixedSizeBinaryArray; |
40 | using arrow::Int16Array; |
41 | using arrow::Int16Builder; |
42 | using arrow::ListArray; |
43 | using arrow::MemoryPool; |
44 | using arrow::NumericArray; |
45 | using arrow::PrimitiveArray; |
46 | using arrow::ResizableBuffer; |
47 | using arrow::Status; |
48 | using arrow::Table; |
49 | using arrow::TimeUnit; |
50 | |
51 | using arrow::compute::Cast; |
52 | using arrow::compute::CastOptions; |
53 | using arrow::compute::FunctionContext; |
54 | |
55 | using parquet::ParquetFileWriter; |
56 | using parquet::ParquetVersion; |
57 | using parquet::schema::GroupNode; |
58 | |
59 | namespace parquet { |
60 | namespace arrow { |
61 | |
62 | namespace BitUtil = ::arrow::BitUtil; |
63 | |
64 | std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() { |
65 | static std::shared_ptr<ArrowWriterProperties> default_writer_properties = |
66 | ArrowWriterProperties::Builder().build(); |
67 | return default_writer_properties; |
68 | } |
69 | |
70 | namespace { |
71 | |
72 | class LevelBuilder { |
73 | public: |
74 | explicit LevelBuilder(MemoryPool* pool) |
75 | : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {} |
76 | |
77 | Status VisitInline(const Array& array); |
78 | |
79 | template <typename T> |
80 | typename std::enable_if<std::is_base_of<::arrow::FlatArray, T>::value, Status>::type |
81 | Visit(const T& array) { |
82 | array_offsets_.push_back(static_cast<int32_t>(array.offset())); |
83 | valid_bitmaps_.push_back(array.null_bitmap_data()); |
84 | null_counts_.push_back(array.null_count()); |
85 | values_array_ = std::make_shared<T>(array.data()); |
86 | return Status::OK(); |
87 | } |
88 | |
89 | Status Visit(const ListArray& array) { |
90 | array_offsets_.push_back(static_cast<int32_t>(array.offset())); |
91 | valid_bitmaps_.push_back(array.null_bitmap_data()); |
92 | null_counts_.push_back(array.null_count()); |
93 | offsets_.push_back(array.raw_value_offsets()); |
94 | |
95 | min_offset_idx_ = array.value_offset(min_offset_idx_); |
96 | max_offset_idx_ = array.value_offset(max_offset_idx_); |
97 | |
98 | return VisitInline(*array.values()); |
99 | } |
100 | |
101 | #define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \ |
102 | Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ |
103 | return Status::NotImplemented("Level generation for " #ArrowTypePrefix \ |
104 | " not supported yet"); \ |
105 | } |
106 | |
107 | NOT_IMPLEMENTED_VISIT(Struct) |
108 | NOT_IMPLEMENTED_VISIT(Union) |
109 | NOT_IMPLEMENTED_VISIT(Dictionary) |
110 | |
111 | Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field, |
112 | int64_t* values_offset, int64_t* num_values, int64_t* num_levels, |
113 | const std::shared_ptr<ResizableBuffer>& def_levels_scratch, |
114 | std::shared_ptr<Buffer>* def_levels_out, |
115 | std::shared_ptr<Buffer>* rep_levels_out, |
116 | std::shared_ptr<Array>* values_array) { |
117 | // Work downwards to extract bitmaps and offsets |
118 | min_offset_idx_ = 0; |
119 | max_offset_idx_ = array.length(); |
120 | RETURN_NOT_OK(VisitInline(array)); |
121 | *num_values = max_offset_idx_ - min_offset_idx_; |
122 | *values_offset = min_offset_idx_; |
123 | *values_array = values_array_; |
124 | |
125 | // Walk downwards to extract nullability |
126 | std::shared_ptr<Field> current_field = field; |
127 | nullable_.push_back(current_field->nullable()); |
128 | while (current_field->type()->num_children() > 0) { |
129 | if (current_field->type()->num_children() > 1) { |
130 | return Status::NotImplemented( |
131 | "Fields with more than one child are not supported." ); |
132 | } else { |
133 | current_field = current_field->type()->child(0); |
134 | } |
135 | nullable_.push_back(current_field->nullable()); |
136 | } |
137 | |
138 | // Generate the levels. |
139 | if (nullable_.size() == 1) { |
140 | // We have a PrimitiveArray |
141 | *rep_levels_out = nullptr; |
142 | if (nullable_[0]) { |
143 | RETURN_NOT_OK( |
144 | def_levels_scratch->Resize(array.length() * sizeof(int16_t), false)); |
145 | auto def_levels_ptr = |
146 | reinterpret_cast<int16_t*>(def_levels_scratch->mutable_data()); |
147 | if (array.null_count() == 0) { |
148 | std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1); |
149 | } else if (array.null_count() == array.length()) { |
150 | std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0); |
151 | } else { |
152 | ::arrow::internal::BitmapReader valid_bits_reader( |
153 | array.null_bitmap_data(), array.offset(), array.length()); |
154 | for (int i = 0; i < array.length(); i++) { |
155 | def_levels_ptr[i] = valid_bits_reader.IsSet() ? 1 : 0; |
156 | valid_bits_reader.Next(); |
157 | } |
158 | } |
159 | |
160 | *def_levels_out = def_levels_scratch; |
161 | } else { |
162 | *def_levels_out = nullptr; |
163 | } |
164 | *num_levels = array.length(); |
165 | } else { |
166 | RETURN_NOT_OK(rep_levels_.Append(0)); |
167 | RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length())); |
168 | |
169 | std::shared_ptr<Array> def_levels_array; |
170 | std::shared_ptr<Array> rep_levels_array; |
171 | |
172 | RETURN_NOT_OK(def_levels_.Finish(&def_levels_array)); |
173 | RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array)); |
174 | |
175 | *def_levels_out = static_cast<PrimitiveArray*>(def_levels_array.get())->values(); |
176 | *rep_levels_out = static_cast<PrimitiveArray*>(rep_levels_array.get())->values(); |
177 | *num_levels = rep_levels_array->length(); |
178 | } |
179 | |
180 | return Status::OK(); |
181 | } |
182 | |
183 | Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) { |
184 | if (nullable_[rep_level]) { |
185 | if (null_counts_[rep_level] == 0 || |
186 | BitUtil::GetBit(valid_bitmaps_[rep_level], index + array_offsets_[rep_level])) { |
187 | return HandleNonNullList(static_cast<int16_t>(def_level + 1), rep_level, index); |
188 | } else { |
189 | return def_levels_.Append(def_level); |
190 | } |
191 | } else { |
192 | return HandleNonNullList(def_level, rep_level, index); |
193 | } |
194 | } |
195 | |
196 | Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) { |
197 | const int32_t inner_offset = offsets_[rep_level][index]; |
198 | const int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset; |
199 | const int64_t recursion_level = rep_level + 1; |
200 | if (inner_length == 0) { |
201 | return def_levels_.Append(def_level); |
202 | } |
203 | if (recursion_level < static_cast<int64_t>(offsets_.size())) { |
204 | return HandleListEntries(static_cast<int16_t>(def_level + 1), |
205 | static_cast<int16_t>(rep_level + 1), inner_offset, |
206 | inner_length); |
207 | } else { |
208 | // We have reached the leaf: primitive list, handle remaining nullables |
209 | const bool nullable_level = nullable_[recursion_level]; |
210 | const int64_t level_null_count = null_counts_[recursion_level]; |
211 | const uint8_t* level_valid_bitmap = valid_bitmaps_[recursion_level]; |
212 | |
213 | for (int64_t i = 0; i < inner_length; i++) { |
214 | if (i > 0) { |
215 | RETURN_NOT_OK(rep_levels_.Append(static_cast<int16_t>(rep_level + 1))); |
216 | } |
217 | if (level_null_count && level_valid_bitmap == nullptr) { |
218 | // Special case: this is a null array (all elements are null) |
219 | RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1))); |
220 | } else if (nullable_level && |
221 | ((level_null_count == 0) || |
222 | BitUtil::GetBit( |
223 | level_valid_bitmap, |
224 | inner_offset + i + array_offsets_[recursion_level]))) { |
225 | // Non-null element in a null level |
226 | RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2))); |
227 | } else { |
228 | // This can be produced in two case: |
229 | // * elements are nullable and this one is null (i.e. max_def_level = def_level |
230 | // + 2) |
231 | // * elements are non-nullable (i.e. max_def_level = def_level + 1) |
232 | RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1))); |
233 | } |
234 | } |
235 | return Status::OK(); |
236 | } |
237 | } |
238 | |
239 | Status HandleListEntries(int16_t def_level, int16_t rep_level, int64_t offset, |
240 | int64_t length) { |
241 | for (int64_t i = 0; i < length; i++) { |
242 | if (i > 0) { |
243 | RETURN_NOT_OK(rep_levels_.Append(rep_level)); |
244 | } |
245 | RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i)); |
246 | } |
247 | return Status::OK(); |
248 | } |
249 | |
250 | private: |
251 | Int16Builder def_levels_; |
252 | Int16Builder rep_levels_; |
253 | |
254 | std::vector<int64_t> null_counts_; |
255 | std::vector<const uint8_t*> valid_bitmaps_; |
256 | std::vector<const int32_t*> offsets_; |
257 | std::vector<int32_t> array_offsets_; |
258 | std::vector<bool> nullable_; |
259 | |
260 | int64_t min_offset_idx_; |
261 | int64_t max_offset_idx_; |
262 | std::shared_ptr<Array> values_array_; |
263 | }; |
264 | |
265 | Status LevelBuilder::VisitInline(const Array& array) { |
266 | return VisitArrayInline(array, this); |
267 | } |
268 | |
269 | struct ColumnWriterContext { |
270 | ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties) |
271 | : memory_pool(memory_pool), properties(properties) { |
272 | this->data_buffer = AllocateBuffer(memory_pool); |
273 | this->def_levels_buffer = AllocateBuffer(memory_pool); |
274 | } |
275 | |
276 | template <typename T> |
277 | Status GetScratchData(const int64_t num_values, T** out) { |
278 | RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false)); |
279 | *out = reinterpret_cast<T*>(this->data_buffer->mutable_data()); |
280 | return Status::OK(); |
281 | } |
282 | |
283 | MemoryPool* memory_pool; |
284 | ArrowWriterProperties* properties; |
285 | |
286 | // Buffer used for storing the data of an array converted to the physical type |
287 | // as expected by parquet-cpp. |
288 | std::shared_ptr<ResizableBuffer> data_buffer; |
289 | |
290 | // We use the shared ownership of this buffer |
291 | std::shared_ptr<ResizableBuffer> def_levels_buffer; |
292 | }; |
293 | |
294 | Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) { |
295 | if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) { |
296 | if (type.num_children() != 1) { |
297 | return Status::Invalid("Nested column branch had multiple children" ); |
298 | } |
299 | return GetLeafType(*type.child(0)->type(), leaf_type); |
300 | } else { |
301 | *leaf_type = type.id(); |
302 | return Status::OK(); |
303 | } |
304 | } |
305 | |
306 | class ArrowColumnWriter { |
307 | public: |
308 | ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer, |
309 | const std::shared_ptr<Field>& field) |
310 | : ctx_(ctx), writer_(column_writer), field_(field) {} |
311 | |
312 | Status Write(const Array& data); |
313 | |
314 | Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) { |
315 | if (data.length() == 0) { |
316 | return Status::OK(); |
317 | } |
318 | |
319 | int64_t absolute_position = 0; |
320 | int chunk_index = 0; |
321 | int64_t chunk_offset = 0; |
322 | while (chunk_index < data.num_chunks() && absolute_position < offset) { |
323 | const int64_t chunk_length = data.chunk(chunk_index)->length(); |
324 | if (absolute_position + chunk_length > offset) { |
325 | // Relative offset into the chunk to reach the desired start offset for |
326 | // writing |
327 | chunk_offset = offset - absolute_position; |
328 | break; |
329 | } else { |
330 | ++chunk_index; |
331 | absolute_position += chunk_length; |
332 | } |
333 | } |
334 | |
335 | if (absolute_position >= data.length()) { |
336 | return Status::Invalid("Cannot write data at offset past end of chunked array" ); |
337 | } |
338 | |
339 | int64_t values_written = 0; |
340 | while (values_written < size) { |
341 | const Array& chunk = *data.chunk(chunk_index); |
342 | const int64_t available_values = chunk.length() - chunk_offset; |
343 | const int64_t chunk_write_size = std::min(size - values_written, available_values); |
344 | |
345 | // The chunk offset here will be 0 except for possibly the first chunk |
346 | // because of the advancing logic above |
347 | std::shared_ptr<Array> array_to_write = chunk.Slice(chunk_offset, chunk_write_size); |
348 | RETURN_NOT_OK(Write(*array_to_write)); |
349 | |
350 | if (chunk_write_size == available_values) { |
351 | chunk_offset = 0; |
352 | ++chunk_index; |
353 | } |
354 | values_written += chunk_write_size; |
355 | } |
356 | |
357 | return Status::OK(); |
358 | } |
359 | |
360 | Status Close() { |
361 | PARQUET_CATCH_NOT_OK(writer_->Close()); |
362 | return Status::OK(); |
363 | } |
364 | |
365 | private: |
366 | template <typename ParquetType, typename ArrowType> |
367 | Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels, |
368 | const int16_t* rep_levels); |
369 | |
370 | Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels, |
371 | const int16_t* rep_levels); |
372 | |
373 | Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const Array& data, |
374 | int64_t num_levels, const int16_t* def_levels, |
375 | const int16_t* rep_levels); |
376 | |
377 | template <typename ParquetType, typename ArrowType> |
378 | Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values, |
379 | int64_t num_levels, const int16_t* def_levels, |
380 | const int16_t* rep_levels, |
381 | const typename ArrowType::c_type* values); |
382 | |
383 | template <typename ParquetType, typename ArrowType> |
384 | Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels, |
385 | const int16_t* def_levels, const int16_t* rep_levels, |
386 | const uint8_t* valid_bits, int64_t valid_bits_offset, |
387 | const typename ArrowType::c_type* values); |
388 | |
389 | template <typename ParquetType> |
390 | Status WriteBatch(int64_t num_levels, const int16_t* def_levels, |
391 | const int16_t* rep_levels, |
392 | const typename ParquetType::c_type* values) { |
393 | auto typed_writer = |
394 | ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_); |
395 | // WriteBatch was called with type mismatching the writer_'s type. This |
396 | // could be a schema conversion problem. |
397 | DCHECK(typed_writer); |
398 | PARQUET_CATCH_NOT_OK( |
399 | typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values)); |
400 | return Status::OK(); |
401 | } |
402 | |
403 | template <typename ParquetType> |
404 | Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels, |
405 | const int16_t* rep_levels, const uint8_t* valid_bits, |
406 | int64_t valid_bits_offset, |
407 | const typename ParquetType::c_type* values) { |
408 | auto typed_writer = |
409 | ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_); |
410 | // WriteBatchSpaced was called with type mismatching the writer_'s type. This |
411 | // could be a schema conversion problem. |
412 | DCHECK(typed_writer); |
413 | PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced( |
414 | num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values)); |
415 | return Status::OK(); |
416 | } |
417 | |
418 | ColumnWriterContext* ctx_; |
419 | ColumnWriter* writer_; |
420 | std::shared_ptr<Field> field_; |
421 | }; |
422 | |
423 | template <typename ParquetType, typename ArrowType> |
424 | Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels, |
425 | const int16_t* def_levels, |
426 | const int16_t* rep_levels) { |
427 | using ArrowCType = typename ArrowType::c_type; |
428 | |
429 | const auto& data = static_cast<const PrimitiveArray&>(array); |
430 | const ArrowCType* values = nullptr; |
431 | // The values buffer may be null if the array is empty (ARROW-2744) |
432 | if (data.values() != nullptr) { |
433 | values = reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset(); |
434 | } else { |
435 | DCHECK_EQ(data.length(), 0); |
436 | } |
437 | |
438 | if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) { |
439 | // no nulls, just dump the data |
440 | RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>( |
441 | static_cast<const ArrowType&>(*array.type()), array.length(), num_levels, |
442 | def_levels, rep_levels, values))); |
443 | } else { |
444 | const uint8_t* valid_bits = data.null_bitmap_data(); |
445 | RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>( |
446 | static_cast<const ArrowType&>(*array.type()), data.length(), num_levels, |
447 | def_levels, rep_levels, valid_bits, data.offset(), values))); |
448 | } |
449 | return Status::OK(); |
450 | } |
451 | |
452 | template <typename ParquetType, typename ArrowType> |
453 | Status ArrowColumnWriter::WriteNonNullableBatch( |
454 | const ArrowType& type, int64_t num_values, int64_t num_levels, |
455 | const int16_t* def_levels, const int16_t* rep_levels, |
456 | const typename ArrowType::c_type* values) { |
457 | using ParquetCType = typename ParquetType::c_type; |
458 | ParquetCType* buffer; |
459 | RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer)); |
460 | |
461 | std::copy(values, values + num_values, buffer); |
462 | |
463 | return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer); |
464 | } |
465 | |
466 | template <> |
467 | Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>( |
468 | const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels, |
469 | const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) { |
470 | int32_t* buffer; |
471 | RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer)); |
472 | |
473 | for (int i = 0; i < num_values; i++) { |
474 | buffer[i] = static_cast<int32_t>(values[i] / 86400000); |
475 | } |
476 | |
477 | return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer); |
478 | } |
479 | |
480 | template <> |
481 | Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>( |
482 | const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels, |
483 | const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) { |
484 | int32_t* buffer; |
485 | RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer)); |
486 | if (type.unit() == TimeUnit::SECOND) { |
487 | for (int i = 0; i < num_values; i++) { |
488 | buffer[i] = values[i] * 1000; |
489 | } |
490 | } else { |
491 | std::copy(values, values + num_values, buffer); |
492 | } |
493 | return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer); |
494 | } |
495 | |
496 | #define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ |
497 | template <> \ |
498 | Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>( \ |
499 | const ArrowType& type, int64_t num_values, int64_t num_levels, \ |
500 | const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \ |
501 | return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer); \ |
502 | } |
503 | |
504 | NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) |
505 | NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) |
506 | NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) |
507 | NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) |
508 | NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) |
509 | NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) |
510 | |
511 | template <typename ParquetType, typename ArrowType> |
512 | Status ArrowColumnWriter::WriteNullableBatch( |
513 | const ArrowType& type, int64_t num_values, int64_t num_levels, |
514 | const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, |
515 | int64_t valid_bits_offset, const typename ArrowType::c_type* values) { |
516 | using ParquetCType = typename ParquetType::c_type; |
517 | |
518 | ParquetCType* buffer; |
519 | RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer)); |
520 | for (int i = 0; i < num_values; i++) { |
521 | buffer[i] = static_cast<ParquetCType>(values[i]); |
522 | } |
523 | |
524 | return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, |
525 | valid_bits_offset, buffer); |
526 | } |
527 | |
528 | template <> |
529 | Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>( |
530 | const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels, |
531 | const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, |
532 | int64_t valid_bits_offset, const int64_t* values) { |
533 | int32_t* buffer; |
534 | RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer)); |
535 | |
536 | for (int i = 0; i < num_values; i++) { |
537 | // Convert from milliseconds into days since the epoch |
538 | buffer[i] = static_cast<int32_t>(values[i] / 86400000); |
539 | } |
540 | |
541 | return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits, |
542 | valid_bits_offset, buffer); |
543 | } |
544 | |
545 | template <> |
546 | Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>( |
547 | const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels, |
548 | const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, |
549 | int64_t valid_bits_offset, const int32_t* values) { |
550 | int32_t* buffer; |
551 | RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer)); |
552 | |
553 | if (type.unit() == TimeUnit::SECOND) { |
554 | for (int i = 0; i < num_values; i++) { |
555 | buffer[i] = values[i] * 1000; |
556 | } |
557 | } else { |
558 | for (int i = 0; i < num_values; i++) { |
559 | buffer[i] = values[i]; |
560 | } |
561 | } |
562 | return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits, |
563 | valid_bits_offset, buffer); |
564 | } |
565 | |
566 | #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ |
567 | template <> \ |
568 | Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>( \ |
569 | const ArrowType& type, int64_t num_values, int64_t num_levels, \ |
570 | const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \ |
571 | int64_t valid_bits_offset, const CType* values) { \ |
572 | return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, \ |
573 | valid_bits_offset, values); \ |
574 | } |
575 | |
576 | NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) |
577 | NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) |
578 | NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) |
579 | NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) |
580 | NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) |
581 | NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) |
582 | NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) |
583 | NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) |
584 | |
585 | #define CONV_CASE_LOOP(ConversionFunction) \ |
586 | for (int64_t i = 0; i < num_values; i++) \ |
587 | ConversionFunction(arrow_values[i], &output[i]); |
588 | |
589 | static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values, |
590 | int64_t num_values, |
591 | ::arrow::TimeUnit ::type unit_type, |
592 | Int96* output) { |
593 | switch (unit_type) { |
594 | case TimeUnit::NANO: |
595 | CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp); |
596 | break; |
597 | case TimeUnit::MICRO: |
598 | CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp); |
599 | break; |
600 | case TimeUnit::MILLI: |
601 | CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp); |
602 | break; |
603 | case TimeUnit::SECOND: |
604 | CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp); |
605 | break; |
606 | } |
607 | } |
608 | |
609 | #undef CONV_CASE_LOOP |
610 | |
611 | template <> |
612 | Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>( |
613 | const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, |
614 | const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, |
615 | int64_t valid_bits_offset, const int64_t* values) { |
616 | Int96* buffer = nullptr; |
617 | RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer)); |
618 | |
619 | ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); |
620 | |
621 | return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits, |
622 | valid_bits_offset, buffer); |
623 | } |
624 | |
625 | template <> |
626 | Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>( |
627 | const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, |
628 | const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) { |
629 | Int96* buffer = nullptr; |
630 | RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer)); |
631 | |
632 | ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); |
633 | |
634 | return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer); |
635 | } |
636 | |
637 | Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels, |
638 | const int16_t* def_levels, |
639 | const int16_t* rep_levels) { |
640 | const auto& type = static_cast<const ::arrow::TimestampType&>(*values.type()); |
641 | |
642 | const bool is_nanosecond = type.unit() == TimeUnit::NANO; |
643 | |
644 | if (ctx_->properties->support_deprecated_int96_timestamps()) { |
645 | // The user explicitly required to use Int96 storage. |
646 | return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels, |
647 | def_levels, rep_levels); |
648 | } else if (is_nanosecond || |
649 | (ctx_->properties->coerce_timestamps_enabled() && |
650 | (type.unit() != ctx_->properties->coerce_timestamps_unit()))) { |
651 | // Casting is required. This covers several cases |
652 | // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved) |
653 | // * coerce_timestamps_enabled_, cast all timestamps to requested unit |
654 | return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values, |
655 | num_levels, def_levels, rep_levels); |
656 | } else { |
657 | // No casting of timestamps is required, take the fast path |
658 | return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels, |
659 | def_levels, rep_levels); |
660 | } |
661 | } |
662 | |
663 | Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_allowed, |
664 | const Array& array, int64_t num_levels, |
665 | const int16_t* def_levels, |
666 | const int16_t* rep_levels) { |
667 | int64_t* buffer; |
668 | RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer)); |
669 | |
670 | const auto& data = static_cast<const ::arrow::TimestampArray&>(array); |
671 | |
672 | auto values = data.raw_values(); |
673 | const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type()); |
674 | |
675 | TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled() |
676 | ? ctx_->properties->coerce_timestamps_unit() |
677 | : TimeUnit::MICRO; |
678 | auto target_type = ::arrow::timestamp(target_unit); |
679 | |
680 | auto DivideBy = [&](const int64_t factor) { |
681 | for (int64_t i = 0; i < array.length(); i++) { |
682 | if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] % factor != 0)) { |
683 | return Status::Invalid("Casting from " , type.ToString(), " to " , |
684 | target_type->ToString(), " would lose data: " , values[i]); |
685 | } |
686 | buffer[i] = values[i] / factor; |
687 | } |
688 | return Status::OK(); |
689 | }; |
690 | |
691 | auto MultiplyBy = [&](const int64_t factor) { |
692 | for (int64_t i = 0; i < array.length(); i++) { |
693 | buffer[i] = values[i] * factor; |
694 | } |
695 | return Status::OK(); |
696 | }; |
697 | |
698 | if (type.unit() == TimeUnit::NANO) { |
699 | if (target_unit == TimeUnit::MICRO) { |
700 | RETURN_NOT_OK(DivideBy(1000)); |
701 | } else { |
702 | DCHECK_EQ(TimeUnit::MILLI, target_unit); |
703 | RETURN_NOT_OK(DivideBy(1000000)); |
704 | } |
705 | } else if (type.unit() == TimeUnit::SECOND) { |
706 | RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000)); |
707 | } else if (type.unit() == TimeUnit::MILLI) { |
708 | DCHECK_EQ(TimeUnit::MICRO, target_unit); |
709 | RETURN_NOT_OK(MultiplyBy(1000)); |
710 | } else { |
711 | DCHECK_EQ(TimeUnit::MILLI, target_unit); |
712 | RETURN_NOT_OK(DivideBy(1000)); |
713 | } |
714 | |
715 | if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) { |
716 | // no nulls, just dump the data |
717 | RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>( |
718 | static_cast<const ::arrow::TimestampType&>(*target_type), array.length(), |
719 | num_levels, def_levels, rep_levels, buffer))); |
720 | } else { |
721 | const uint8_t* valid_bits = data.null_bitmap_data(); |
722 | RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>( |
723 | static_cast<const ::arrow::TimestampType&>(*target_type), array.length(), |
724 | num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer))); |
725 | } |
726 | return Status::OK(); |
727 | } |
728 | |
729 | // This specialization seems quite similar but it significantly differs in two points: |
730 | // * offset is added at the most latest time to the pointer as we have sub-byte access |
731 | // * Arrow data is stored bitwise thus we cannot use std::copy to transform from |
732 | // ArrowType::c_type to ParquetType::c_type |
733 | |
734 | template <> |
735 | Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>( |
736 | const Array& array, int64_t num_levels, const int16_t* def_levels, |
737 | const int16_t* rep_levels) { |
738 | bool* buffer; |
739 | RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer)); |
740 | |
741 | const auto& data = static_cast<const BooleanArray&>(array); |
742 | const uint8_t* values = nullptr; |
743 | // The values buffer may be null if the array is empty (ARROW-2744) |
744 | if (data.values() != nullptr) { |
745 | values = reinterpret_cast<const uint8_t*>(data.values()->data()); |
746 | } else { |
747 | DCHECK_EQ(data.length(), 0); |
748 | } |
749 | |
750 | int buffer_idx = 0; |
751 | int64_t offset = array.offset(); |
752 | for (int i = 0; i < data.length(); i++) { |
753 | if (!data.IsNull(i)) { |
754 | buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i); |
755 | } |
756 | } |
757 | |
758 | return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer); |
759 | } |
760 | |
761 | template <> |
762 | Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>( |
763 | const Array& array, int64_t num_levels, const int16_t* def_levels, |
764 | const int16_t* rep_levels) { |
765 | return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr); |
766 | } |
767 | |
768 | template <> |
769 | Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>( |
770 | const Array& array, int64_t num_levels, const int16_t* def_levels, |
771 | const int16_t* rep_levels) { |
772 | ByteArray* buffer; |
773 | RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer)); |
774 | |
775 | const auto& data = static_cast<const BinaryArray&>(array); |
776 | |
777 | // In the case of an array consisting of only empty strings or all null, |
778 | // data.data() points already to a nullptr, thus data.data()->data() will |
779 | // segfault. |
780 | const uint8_t* values = nullptr; |
781 | if (data.value_data()) { |
782 | values = reinterpret_cast<const uint8_t*>(data.value_data()->data()); |
783 | DCHECK(values != nullptr); |
784 | } |
785 | |
786 | // Slice offset is accounted for in raw_value_offsets |
787 | const int32_t* value_offset = data.raw_value_offsets(); |
788 | |
789 | if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) { |
790 | // no nulls, just dump the data |
791 | for (int64_t i = 0; i < data.length(); i++) { |
792 | buffer[i] = |
793 | ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]); |
794 | } |
795 | } else { |
796 | int buffer_idx = 0; |
797 | for (int64_t i = 0; i < data.length(); i++) { |
798 | if (!data.IsNull(i)) { |
799 | buffer[buffer_idx++] = |
800 | ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]); |
801 | } |
802 | } |
803 | } |
804 | |
805 | return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer); |
806 | } |
807 | |
808 | template <> |
809 | Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>( |
810 | const Array& array, int64_t num_levels, const int16_t* def_levels, |
811 | const int16_t* rep_levels) { |
812 | const auto& data = static_cast<const FixedSizeBinaryArray&>(array); |
813 | const int64_t length = data.length(); |
814 | |
815 | FLBA* buffer; |
816 | RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer)); |
817 | |
818 | if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) { |
819 | // no nulls, just dump the data |
820 | // todo(advancedxy): use a writeBatch to avoid this step |
821 | for (int64_t i = 0; i < length; i++) { |
822 | buffer[i] = FixedLenByteArray(data.GetValue(i)); |
823 | } |
824 | } else { |
825 | int buffer_idx = 0; |
826 | for (int64_t i = 0; i < length; i++) { |
827 | if (!data.IsNull(i)) { |
828 | buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i)); |
829 | } |
830 | } |
831 | } |
832 | |
833 | return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer); |
834 | } |
835 | |
836 | template <> |
837 | Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>( |
838 | const Array& array, int64_t num_levels, const int16_t* def_levels, |
839 | const int16_t* rep_levels) { |
840 | const auto& data = static_cast<const Decimal128Array&>(array); |
841 | const int64_t length = data.length(); |
842 | |
843 | FLBA* buffer; |
844 | RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer)); |
845 | |
846 | const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type()); |
847 | const int32_t offset = |
848 | decimal_type.byte_width() - DecimalSize(decimal_type.precision()); |
849 | |
850 | const bool does_not_have_nulls = |
851 | writer_->descr()->schema_node()->is_required() || data.null_count() == 0; |
852 | |
853 | const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2; |
854 | std::vector<uint64_t> big_endian_values(valid_value_count); |
855 | |
856 | // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we |
857 | // don't have to keep writing two loops to handle the case where we know there are no |
858 | // nulls |
859 | if (does_not_have_nulls) { |
860 | // no nulls, just dump the data |
861 | // todo(advancedxy): use a writeBatch to avoid this step |
862 | for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { |
863 | auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i)); |
864 | big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); |
865 | big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); |
866 | buffer[i] = FixedLenByteArray( |
867 | reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset); |
868 | } |
869 | } else { |
870 | for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) { |
871 | if (!data.IsNull(i)) { |
872 | auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i)); |
873 | big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); |
874 | big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); |
875 | buffer[buffer_idx++] = FixedLenByteArray( |
876 | reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset); |
877 | j += 2; |
878 | } |
879 | } |
880 | } |
881 | |
882 | return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer); |
883 | } |
884 | |
885 | Status ArrowColumnWriter::Write(const Array& data) { |
886 | if (data.length() == 0) { |
887 | // Write nothing when length is 0 |
888 | return Status::OK(); |
889 | } |
890 | |
891 | ::arrow::Type::type values_type; |
892 | RETURN_NOT_OK(GetLeafType(*data.type(), &values_type)); |
893 | |
894 | std::shared_ptr<Array> _values_array; |
895 | int64_t values_offset = 0; |
896 | int64_t num_levels = 0; |
897 | int64_t num_values = 0; |
898 | LevelBuilder level_builder(ctx_->memory_pool); |
899 | |
900 | std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer; |
901 | RETURN_NOT_OK(level_builder.GenerateLevels( |
902 | data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer, |
903 | &def_levels_buffer, &rep_levels_buffer, &_values_array)); |
904 | const int16_t* def_levels = nullptr; |
905 | if (def_levels_buffer) { |
906 | def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data()); |
907 | } |
908 | const int16_t* rep_levels = nullptr; |
909 | if (rep_levels_buffer) { |
910 | rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data()); |
911 | } |
912 | std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values); |
913 | |
914 | #define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \ |
915 | case ::arrow::Type::ArrowEnum: \ |
916 | return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array, num_levels, \ |
917 | def_levels, rep_levels); |
918 | |
919 | switch (values_type) { |
920 | case ::arrow::Type::UINT32: { |
921 | if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { |
922 | // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need |
923 | // to use the larger Int64Type to store them lossless. |
924 | return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array, num_levels, |
925 | def_levels, rep_levels); |
926 | } else { |
927 | return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array, num_levels, |
928 | def_levels, rep_levels); |
929 | } |
930 | } |
931 | WRITE_BATCH_CASE(NA, NullType, Int32Type) |
932 | case ::arrow::Type::TIMESTAMP: |
933 | return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels); |
934 | WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType) |
935 | WRITE_BATCH_CASE(INT8, Int8Type, Int32Type) |
936 | WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type) |
937 | WRITE_BATCH_CASE(INT16, Int16Type, Int32Type) |
938 | WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type) |
939 | WRITE_BATCH_CASE(INT32, Int32Type, Int32Type) |
940 | WRITE_BATCH_CASE(INT64, Int64Type, Int64Type) |
941 | WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type) |
942 | WRITE_BATCH_CASE(FLOAT, FloatType, FloatType) |
943 | WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType) |
944 | WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) |
945 | WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) |
946 | WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) |
947 | WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType) |
948 | WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) |
949 | WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) |
950 | WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type) |
951 | WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type) |
952 | default: |
953 | break; |
954 | } |
955 | return Status::NotImplemented("Data type not supported as list value: " , |
956 | values_array->type()->ToString()); |
957 | } |
958 | |
959 | } // namespace |
960 | |
961 | // ---------------------------------------------------------------------- |
962 | // FileWriter implementation |
963 | |
964 | class FileWriter::Impl { |
965 | public: |
966 | Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer, |
967 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties) |
968 | : writer_(std::move(writer)), |
969 | row_group_writer_(nullptr), |
970 | column_write_context_(pool, arrow_properties.get()), |
971 | arrow_properties_(arrow_properties), |
972 | closed_(false) {} |
973 | |
974 | Status NewRowGroup(int64_t chunk_size) { |
975 | if (row_group_writer_ != nullptr) { |
976 | PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); |
977 | } |
978 | PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup()); |
979 | return Status::OK(); |
980 | } |
981 | |
982 | Status Close() { |
983 | if (!closed_) { |
984 | // Make idempotent |
985 | closed_ = true; |
986 | if (row_group_writer_ != nullptr) { |
987 | PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); |
988 | } |
989 | PARQUET_CATCH_NOT_OK(writer_->Close()); |
990 | } |
991 | return Status::OK(); |
992 | } |
993 | |
994 | Status WriteColumnChunk(const Array& data) { |
995 | // A bit awkward here since cannot instantiate ChunkedArray from const Array& |
996 | ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())}; |
997 | auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks); |
998 | return WriteColumnChunk(chunked_array, 0, data.length()); |
999 | } |
1000 | |
1001 | Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset, |
1002 | const int64_t size) { |
1003 | // DictionaryArrays are not yet handled with a fast path. To still support |
1004 | // writing them as a workaround, we convert them back to their non-dictionary |
1005 | // representation. |
1006 | if (data->type()->id() == ::arrow::Type::DICTIONARY) { |
1007 | const ::arrow::DictionaryType& dict_type = |
1008 | static_cast<const ::arrow::DictionaryType&>(*data->type()); |
1009 | |
1010 | // TODO(ARROW-1648): Remove this special handling once we require an Arrow |
1011 | // version that has this fixed. |
1012 | if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) { |
1013 | auto null_array = std::make_shared<::arrow::NullArray>(data->length()); |
1014 | return WriteColumnChunk(*null_array); |
1015 | } |
1016 | |
1017 | FunctionContext ctx(this->memory_pool()); |
1018 | ::arrow::compute::Datum cast_input(data); |
1019 | ::arrow::compute::Datum cast_output; |
1020 | RETURN_NOT_OK(Cast(&ctx, cast_input, dict_type.dictionary()->type(), CastOptions(), |
1021 | &cast_output)); |
1022 | return WriteColumnChunk(cast_output.chunked_array(), offset, size); |
1023 | } |
1024 | |
1025 | ColumnWriter* column_writer; |
1026 | PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); |
1027 | |
1028 | // TODO(wesm): This trick to construct a schema for one Parquet root node |
1029 | // will not work for arbitrary nested data |
1030 | int current_column_idx = row_group_writer_->current_column(); |
1031 | std::shared_ptr<::arrow::Schema> arrow_schema; |
1032 | RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1}, |
1033 | writer_->key_value_metadata(), &arrow_schema)); |
1034 | |
1035 | ArrowColumnWriter arrow_writer(&column_write_context_, column_writer, |
1036 | arrow_schema->field(0)); |
1037 | |
1038 | RETURN_NOT_OK(arrow_writer.Write(*data, offset, size)); |
1039 | return arrow_writer.Close(); |
1040 | } |
1041 | |
1042 | const WriterProperties& properties() const { return *writer_->properties(); } |
1043 | |
1044 | ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; } |
1045 | |
1046 | virtual ~Impl() {} |
1047 | |
1048 | private: |
1049 | friend class FileWriter; |
1050 | |
1051 | std::unique_ptr<ParquetFileWriter> writer_; |
1052 | RowGroupWriter* row_group_writer_; |
1053 | ColumnWriterContext column_write_context_; |
1054 | std::shared_ptr<ArrowWriterProperties> arrow_properties_; |
1055 | bool closed_; |
1056 | }; |
1057 | |
1058 | Status FileWriter::NewRowGroup(int64_t chunk_size) { |
1059 | return impl_->NewRowGroup(chunk_size); |
1060 | } |
1061 | |
1062 | Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) { |
1063 | return impl_->WriteColumnChunk(data); |
1064 | } |
1065 | |
1066 | Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data, |
1067 | const int64_t offset, const int64_t size) { |
1068 | return impl_->WriteColumnChunk(data, offset, size); |
1069 | } |
1070 | |
1071 | Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) { |
1072 | return WriteColumnChunk(data, 0, data->length()); |
1073 | } |
1074 | |
1075 | Status FileWriter::Close() { return impl_->Close(); } |
1076 | |
1077 | MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); } |
1078 | |
1079 | FileWriter::~FileWriter() {} |
1080 | |
1081 | FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer, |
1082 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties) |
1083 | : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)) {} |
1084 | |
1085 | Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
1086 | const std::shared_ptr<OutputStream>& sink, |
1087 | const std::shared_ptr<WriterProperties>& properties, |
1088 | std::unique_ptr<FileWriter>* writer) { |
1089 | return Open(schema, pool, sink, properties, default_arrow_writer_properties(), writer); |
1090 | } |
1091 | |
1092 | Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
1093 | const std::shared_ptr<OutputStream>& sink, |
1094 | const std::shared_ptr<WriterProperties>& properties, |
1095 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties, |
1096 | std::unique_ptr<FileWriter>* writer) { |
1097 | std::shared_ptr<SchemaDescriptor> parquet_schema; |
1098 | RETURN_NOT_OK( |
1099 | ToParquetSchema(&schema, *properties, *arrow_properties, &parquet_schema)); |
1100 | |
1101 | auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root()); |
1102 | |
1103 | std::unique_ptr<ParquetFileWriter> base_writer = |
1104 | ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata()); |
1105 | |
1106 | writer->reset(new FileWriter(pool, std::move(base_writer), arrow_properties)); |
1107 | return Status::OK(); |
1108 | } |
1109 | |
1110 | Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
1111 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
1112 | const std::shared_ptr<WriterProperties>& properties, |
1113 | std::unique_ptr<FileWriter>* writer) { |
1114 | auto wrapper = std::make_shared<ArrowOutputStream>(sink); |
1115 | return Open(schema, pool, wrapper, properties, writer); |
1116 | } |
1117 | |
1118 | Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
1119 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
1120 | const std::shared_ptr<WriterProperties>& properties, |
1121 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties, |
1122 | std::unique_ptr<FileWriter>* writer) { |
1123 | auto wrapper = std::make_shared<ArrowOutputStream>(sink); |
1124 | return Open(schema, pool, wrapper, properties, arrow_properties, writer); |
1125 | } |
1126 | |
1127 | Status WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) { |
1128 | PARQUET_CATCH_NOT_OK(::parquet::WriteFileMetaData(file_metadata, sink)); |
1129 | return Status::OK(); |
1130 | } |
1131 | |
1132 | Status WriteFileMetaData(const FileMetaData& file_metadata, |
1133 | const std::shared_ptr<::arrow::io::OutputStream>& sink) { |
1134 | ArrowOutputStream wrapper(sink); |
1135 | return ::parquet::arrow::WriteFileMetaData(file_metadata, &wrapper); |
1136 | } |
1137 | |
1138 | namespace {} // namespace |
1139 | |
1140 | Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) { |
1141 | if (chunk_size <= 0 && table.num_rows() > 0) { |
1142 | return Status::Invalid("chunk size per row_group must be greater than 0" ); |
1143 | } else if (chunk_size > impl_->properties().max_row_group_length()) { |
1144 | chunk_size = impl_->properties().max_row_group_length(); |
1145 | } |
1146 | |
1147 | auto WriteRowGroup = [&](int64_t offset, int64_t size) { |
1148 | RETURN_NOT_OK(NewRowGroup(size)); |
1149 | for (int i = 0; i < table.num_columns(); i++) { |
1150 | auto chunked_data = table.column(i)->data(); |
1151 | RETURN_NOT_OK(WriteColumnChunk(chunked_data, offset, size)); |
1152 | } |
1153 | return Status::OK(); |
1154 | }; |
1155 | |
1156 | if (table.num_rows() == 0) { |
1157 | // Append a row group with 0 rows |
1158 | RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); |
1159 | return Status::OK(); |
1160 | } |
1161 | |
1162 | for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { |
1163 | int64_t offset = chunk * chunk_size; |
1164 | RETURN_NOT_OK_ELSE( |
1165 | WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), |
1166 | PARQUET_IGNORE_NOT_OK(Close())); |
1167 | } |
1168 | return Status::OK(); |
1169 | } |
1170 | |
1171 | Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, |
1172 | const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, |
1173 | const std::shared_ptr<WriterProperties>& properties, |
1174 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties) { |
1175 | std::unique_ptr<FileWriter> writer; |
1176 | RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties, |
1177 | arrow_properties, &writer)); |
1178 | RETURN_NOT_OK(writer->WriteTable(table, chunk_size)); |
1179 | return writer->Close(); |
1180 | } |
1181 | |
1182 | Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, |
1183 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
1184 | int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties, |
1185 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties) { |
1186 | auto wrapper = std::make_shared<ArrowOutputStream>(sink); |
1187 | return WriteTable(table, pool, wrapper, chunk_size, properties, arrow_properties); |
1188 | } |
1189 | |
1190 | } // namespace arrow |
1191 | } // namespace parquet |
1192 | |