1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/column_writer.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstring>
23#include <memory>
24#include <utility>
25#include <vector>
26
27#include "arrow/array.h"
28#include "arrow/buffer_builder.h"
29#include "arrow/compute/api.h"
30#include "arrow/type.h"
31#include "arrow/type_traits.h"
32#include "arrow/util/bit_stream_utils.h"
33#include "arrow/util/checked_cast.h"
34#include "arrow/util/compression.h"
35#include "arrow/util/logging.h"
36#include "arrow/util/rle_encoding.h"
37
38#include "parquet/column_page.h"
39#include "parquet/encoding.h"
40#include "parquet/metadata.h"
41#include "parquet/platform.h"
42#include "parquet/properties.h"
43#include "parquet/schema.h"
44#include "parquet/statistics.h"
45#include "parquet/thrift_internal.h"
46#include "parquet/types.h"
47
48namespace parquet {
49
50using arrow::Status;
51using arrow::compute::Datum;
52using arrow::internal::checked_cast;
53
54using BitWriter = arrow::BitUtil::BitWriter;
55using RleEncoder = arrow::util::RleEncoder;
56
57LevelEncoder::LevelEncoder() {}
58LevelEncoder::~LevelEncoder() {}
59
60void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
61 int num_buffered_values, uint8_t* data, int data_size) {
62 bit_width_ = BitUtil::Log2(max_level + 1);
63 encoding_ = encoding;
64 switch (encoding) {
65 case Encoding::RLE: {
66 rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
67 break;
68 }
69 case Encoding::BIT_PACKED: {
70 int num_bytes =
71 static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
72 bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
73 break;
74 }
75 default:
76 throw ParquetException("Unknown encoding type for levels.");
77 }
78}
79
80int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level,
81 int num_buffered_values) {
82 int bit_width = BitUtil::Log2(max_level + 1);
83 int num_bytes = 0;
84 switch (encoding) {
85 case Encoding::RLE: {
86 // TODO: Due to the way we currently check if the buffer is full enough,
87 // we need to have MinBufferSize as head room.
88 num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
89 RleEncoder::MinBufferSize(bit_width);
90 break;
91 }
92 case Encoding::BIT_PACKED: {
93 num_bytes =
94 static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width));
95 break;
96 }
97 default:
98 throw ParquetException("Unknown encoding type for levels.");
99 }
100 return num_bytes;
101}
102
103int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
104 int num_encoded = 0;
105 if (!rle_encoder_ && !bit_packed_encoder_) {
106 throw ParquetException("Level encoders are not initialized.");
107 }
108
109 if (encoding_ == Encoding::RLE) {
110 for (int i = 0; i < batch_size; ++i) {
111 if (!rle_encoder_->Put(*(levels + i))) {
112 break;
113 }
114 ++num_encoded;
115 }
116 rle_encoder_->Flush();
117 rle_length_ = rle_encoder_->len();
118 } else {
119 for (int i = 0; i < batch_size; ++i) {
120 if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
121 break;
122 }
123 ++num_encoded;
124 }
125 bit_packed_encoder_->Flush();
126 }
127 return num_encoded;
128}
129
130// ----------------------------------------------------------------------
131// PageWriter implementation
132
133// This subclass delimits pages appearing in a serialized stream, each preceded
134// by a serialized Thrift format::PageHeader indicating the type of each page
135// and the page metadata.
136class SerializedPageWriter : public PageWriter {
137 public:
138 SerializedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink,
139 Compression::type codec, int compression_level,
140 ColumnChunkMetaDataBuilder* metadata,
141 MemoryPool* pool = arrow::default_memory_pool())
142 : sink_(sink),
143 metadata_(metadata),
144 pool_(pool),
145 num_values_(0),
146 dictionary_page_offset_(0),
147 data_page_offset_(0),
148 total_uncompressed_size_(0),
149 total_compressed_size_(0) {
150 compressor_ = GetCodec(codec, compression_level);
151 thrift_serializer_.reset(new ThriftSerializer);
152 }
153
154 int64_t WriteDictionaryPage(const DictionaryPage& page) override {
155 int64_t uncompressed_size = page.size();
156 std::shared_ptr<Buffer> compressed_data = nullptr;
157 if (has_compressor()) {
158 auto buffer = std::static_pointer_cast<ResizableBuffer>(
159 AllocateBuffer(pool_, uncompressed_size));
160 Compress(*(page.buffer().get()), buffer.get());
161 compressed_data = std::static_pointer_cast<Buffer>(buffer);
162 } else {
163 compressed_data = page.buffer();
164 }
165
166 format::DictionaryPageHeader dict_page_header;
167 dict_page_header.__set_num_values(page.num_values());
168 dict_page_header.__set_encoding(ToThrift(page.encoding()));
169 dict_page_header.__set_is_sorted(page.is_sorted());
170
171 format::PageHeader page_header;
172 page_header.__set_type(format::PageType::DICTIONARY_PAGE);
173 page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
174 page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
175 page_header.__set_dictionary_page_header(dict_page_header);
176 // TODO(PARQUET-594) crc checksum
177
178 int64_t start_pos = -1;
179 PARQUET_THROW_NOT_OK(sink_->Tell(&start_pos));
180 if (dictionary_page_offset_ == 0) {
181 dictionary_page_offset_ = start_pos;
182 }
183 int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_.get());
184 PARQUET_THROW_NOT_OK(sink_->Write(compressed_data));
185
186 total_uncompressed_size_ += uncompressed_size + header_size;
187 total_compressed_size_ += compressed_data->size() + header_size;
188
189 int64_t final_pos = -1;
190 PARQUET_THROW_NOT_OK(sink_->Tell(&final_pos));
191 return final_pos - start_pos;
192 }
193
194 void Close(bool has_dictionary, bool fallback) override {
195 // index_page_offset = -1 since they are not supported
196 metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
197 total_compressed_size_, total_uncompressed_size_, has_dictionary,
198 fallback);
199
200 // Write metadata at end of column chunk
201 metadata_->WriteTo(sink_.get());
202 }
203
204 /**
205 * Compress a buffer.
206 */
207 void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
208 DCHECK(compressor_ != nullptr);
209
210 // Compress the data
211 int64_t max_compressed_size =
212 compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
213
214 // Use Arrow::Buffer::shrink_to_fit = false
215 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
216 PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
217
218 int64_t compressed_size;
219 PARQUET_THROW_NOT_OK(
220 compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
221 dest_buffer->mutable_data(), &compressed_size));
222 PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
223 }
224
225 int64_t WriteDataPage(const CompressedDataPage& page) override {
226 int64_t uncompressed_size = page.uncompressed_size();
227 std::shared_ptr<Buffer> compressed_data = page.buffer();
228
229 format::DataPageHeader data_page_header;
230 data_page_header.__set_num_values(page.num_values());
231 data_page_header.__set_encoding(ToThrift(page.encoding()));
232 data_page_header.__set_definition_level_encoding(
233 ToThrift(page.definition_level_encoding()));
234 data_page_header.__set_repetition_level_encoding(
235 ToThrift(page.repetition_level_encoding()));
236 data_page_header.__set_statistics(ToThrift(page.statistics()));
237
238 format::PageHeader page_header;
239 page_header.__set_type(format::PageType::DATA_PAGE);
240 page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
241 page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
242 page_header.__set_data_page_header(data_page_header);
243 // TODO(PARQUET-594) crc checksum
244
245 int64_t start_pos = -1;
246 PARQUET_THROW_NOT_OK(sink_->Tell(&start_pos));
247 if (data_page_offset_ == 0) {
248 data_page_offset_ = start_pos;
249 }
250
251 int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_.get());
252 PARQUET_THROW_NOT_OK(sink_->Write(compressed_data));
253
254 total_uncompressed_size_ += uncompressed_size + header_size;
255 total_compressed_size_ += compressed_data->size() + header_size;
256 num_values_ += page.num_values();
257
258 int64_t current_pos = -1;
259 PARQUET_THROW_NOT_OK(sink_->Tell(&current_pos));
260 return current_pos - start_pos;
261 }
262
263 bool has_compressor() override { return (compressor_ != nullptr); }
264
265 int64_t num_values() { return num_values_; }
266
267 int64_t dictionary_page_offset() { return dictionary_page_offset_; }
268
269 int64_t data_page_offset() { return data_page_offset_; }
270
271 int64_t total_compressed_size() { return total_compressed_size_; }
272
273 int64_t total_uncompressed_size() { return total_uncompressed_size_; }
274
275 private:
276 std::shared_ptr<ArrowOutputStream> sink_;
277 ColumnChunkMetaDataBuilder* metadata_;
278 MemoryPool* pool_;
279 int64_t num_values_;
280 int64_t dictionary_page_offset_;
281 int64_t data_page_offset_;
282 int64_t total_uncompressed_size_;
283 int64_t total_compressed_size_;
284
285 std::unique_ptr<ThriftSerializer> thrift_serializer_;
286
287 // Compression codec to use.
288 std::unique_ptr<arrow::util::Codec> compressor_;
289};
290
291// This implementation of the PageWriter writes to the final sink on Close .
292class BufferedPageWriter : public PageWriter {
293 public:
294 BufferedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink,
295 Compression::type codec, int compression_level,
296 ColumnChunkMetaDataBuilder* metadata,
297 MemoryPool* pool = arrow::default_memory_pool())
298 : final_sink_(sink), metadata_(metadata) {
299 in_memory_sink_ = CreateOutputStream(pool);
300 pager_ = std::unique_ptr<SerializedPageWriter>(new SerializedPageWriter(
301 in_memory_sink_, codec, compression_level, metadata, pool));
302 }
303
304 int64_t WriteDictionaryPage(const DictionaryPage& page) override {
305 return pager_->WriteDictionaryPage(page);
306 }
307
308 void Close(bool has_dictionary, bool fallback) override {
309 // index_page_offset = -1 since they are not supported
310 int64_t final_position = -1;
311 PARQUET_THROW_NOT_OK(final_sink_->Tell(&final_position));
312 metadata_->Finish(
313 pager_->num_values(), pager_->dictionary_page_offset() + final_position, -1,
314 pager_->data_page_offset() + final_position, pager_->total_compressed_size(),
315 pager_->total_uncompressed_size(), has_dictionary, fallback);
316
317 // Write metadata at end of column chunk
318 metadata_->WriteTo(in_memory_sink_.get());
319
320 // flush everything to the serialized sink
321 std::shared_ptr<Buffer> buffer;
322 PARQUET_THROW_NOT_OK(in_memory_sink_->Finish(&buffer));
323 PARQUET_THROW_NOT_OK(final_sink_->Write(buffer));
324 }
325
326 int64_t WriteDataPage(const CompressedDataPage& page) override {
327 return pager_->WriteDataPage(page);
328 }
329
330 void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
331 pager_->Compress(src_buffer, dest_buffer);
332 }
333
334 bool has_compressor() override { return pager_->has_compressor(); }
335
336 private:
337 std::shared_ptr<ArrowOutputStream> final_sink_;
338 ColumnChunkMetaDataBuilder* metadata_;
339 std::shared_ptr<arrow::io::BufferOutputStream> in_memory_sink_;
340 std::unique_ptr<SerializedPageWriter> pager_;
341};
342
343std::unique_ptr<PageWriter> PageWriter::Open(
344 const std::shared_ptr<ArrowOutputStream>& sink, Compression::type codec,
345 int compression_level, ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool,
346 bool buffered_row_group) {
347 if (buffered_row_group) {
348 return std::unique_ptr<PageWriter>(
349 new BufferedPageWriter(sink, codec, compression_level, metadata, pool));
350 } else {
351 return std::unique_ptr<PageWriter>(
352 new SerializedPageWriter(sink, codec, compression_level, metadata, pool));
353 }
354}
355
356// ----------------------------------------------------------------------
357// ColumnWriter
358
359std::shared_ptr<WriterProperties> default_writer_properties() {
360 static std::shared_ptr<WriterProperties> default_writer_properties =
361 WriterProperties::Builder().build();
362 return default_writer_properties;
363}
364
365class ColumnWriterImpl {
366 public:
367 ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
368 std::unique_ptr<PageWriter> pager, const bool use_dictionary,
369 Encoding::type encoding, const WriterProperties* properties)
370 : metadata_(metadata),
371 descr_(metadata->descr()),
372 pager_(std::move(pager)),
373 has_dictionary_(use_dictionary),
374 encoding_(encoding),
375 properties_(properties),
376 allocator_(properties->memory_pool()),
377 num_buffered_values_(0),
378 num_buffered_encoded_values_(0),
379 rows_written_(0),
380 total_bytes_written_(0),
381 total_compressed_bytes_(0),
382 closed_(false),
383 fallback_(false),
384 definition_levels_sink_(allocator_),
385 repetition_levels_sink_(allocator_) {
386 definition_levels_rle_ =
387 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
388 repetition_levels_rle_ =
389 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
390 uncompressed_data_ =
391 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
392 if (pager_->has_compressor()) {
393 compressed_data_ =
394 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
395 }
396 }
397
398 virtual ~ColumnWriterImpl() = default;
399
400 int64_t Close();
401
402 protected:
403 virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
404
405 // Serializes Dictionary Page if enabled
406 virtual void WriteDictionaryPage() = 0;
407
408 // Plain-encoded statistics of the current page
409 virtual EncodedStatistics GetPageStatistics() = 0;
410
411 // Plain-encoded statistics of the whole chunk
412 virtual EncodedStatistics GetChunkStatistics() = 0;
413
414 // Merges page statistics into chunk statistics, then resets the values
415 virtual void ResetPageStatistics() = 0;
416
417 // Adds Data Pages to an in memory buffer in dictionary encoding mode
418 // Serializes the Data Pages in other encoding modes
419 void AddDataPage();
420
421 // Serializes Data Pages
422 void WriteDataPage(const CompressedDataPage& page) {
423 total_bytes_written_ += pager_->WriteDataPage(page);
424 }
425
426 // Write multiple definition levels
427 void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
428 DCHECK(!closed_);
429 PARQUET_THROW_NOT_OK(
430 definition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
431 }
432
433 // Write multiple repetition levels
434 void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
435 DCHECK(!closed_);
436 PARQUET_THROW_NOT_OK(
437 repetition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels));
438 }
439
440 // RLE encode the src_buffer into dest_buffer and return the encoded size
441 int64_t RleEncodeLevels(const void* src_buffer, ResizableBuffer* dest_buffer,
442 int16_t max_level);
443
444 // Serialize the buffered Data Pages
445 void FlushBufferedDataPages();
446
447 ColumnChunkMetaDataBuilder* metadata_;
448 const ColumnDescriptor* descr_;
449
450 std::unique_ptr<PageWriter> pager_;
451
452 bool has_dictionary_;
453 Encoding::type encoding_;
454 const WriterProperties* properties_;
455
456 LevelEncoder level_encoder_;
457
458 MemoryPool* allocator_;
459
460 // The total number of values stored in the data page. This is the maximum of
461 // the number of encoded definition levels or encoded values. For
462 // non-repeated, required columns, this is equal to the number of encoded
463 // values. For repeated or optional values, there may be fewer data values
464 // than levels, and this tells you how many encoded levels there are in that
465 // case.
466 int64_t num_buffered_values_;
467
468 // The total number of stored values. For repeated or optional values, this
469 // number may be lower than num_buffered_values_.
470 int64_t num_buffered_encoded_values_;
471
472 // Total number of rows written with this ColumnWriter
473 int rows_written_;
474
475 // Records the total number of bytes written by the serializer
476 int64_t total_bytes_written_;
477
478 // Records the current number of compressed bytes in a column
479 int64_t total_compressed_bytes_;
480
481 // Flag to check if the Writer has been closed
482 bool closed_;
483
484 // Flag to infer if dictionary encoding has fallen back to PLAIN
485 bool fallback_;
486
487 arrow::BufferBuilder definition_levels_sink_;
488 arrow::BufferBuilder repetition_levels_sink_;
489
490 std::shared_ptr<ResizableBuffer> definition_levels_rle_;
491 std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
492
493 std::shared_ptr<ResizableBuffer> uncompressed_data_;
494 std::shared_ptr<ResizableBuffer> compressed_data_;
495
496 std::vector<CompressedDataPage> data_pages_;
497
498 private:
499 void InitSinks() {
500 definition_levels_sink_.Rewind(0);
501 repetition_levels_sink_.Rewind(0);
502 }
503};
504
505// return the size of the encoded buffer
506int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer,
507 ResizableBuffer* dest_buffer,
508 int16_t max_level) {
509 // TODO: This only works with due to some RLE specifics
510 int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
511 static_cast<int>(num_buffered_values_)) +
512 sizeof(int32_t);
513
514 // Use Arrow::Buffer::shrink_to_fit = false
515 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
516 PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
517
518 level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
519 dest_buffer->mutable_data() + sizeof(int32_t),
520 static_cast<int>(dest_buffer->size() - sizeof(int32_t)));
521 int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
522 reinterpret_cast<const int16_t*>(src_buffer));
523 DCHECK_EQ(encoded, num_buffered_values_);
524 reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
525 int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
526 return encoded_size;
527}
528
529void ColumnWriterImpl::AddDataPage() {
530 int64_t definition_levels_rle_size = 0;
531 int64_t repetition_levels_rle_size = 0;
532
533 std::shared_ptr<Buffer> values = GetValuesBuffer();
534
535 if (descr_->max_definition_level() > 0) {
536 definition_levels_rle_size =
537 RleEncodeLevels(definition_levels_sink_.data(), definition_levels_rle_.get(),
538 descr_->max_definition_level());
539 }
540
541 if (descr_->max_repetition_level() > 0) {
542 repetition_levels_rle_size =
543 RleEncodeLevels(repetition_levels_sink_.data(), repetition_levels_rle_.get(),
544 descr_->max_repetition_level());
545 }
546
547 int64_t uncompressed_size =
548 definition_levels_rle_size + repetition_levels_rle_size + values->size();
549
550 // Use Arrow::Buffer::shrink_to_fit = false
551 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
552 PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
553
554 // Concatenate data into a single buffer
555 uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
556 memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
557 uncompressed_ptr += repetition_levels_rle_size;
558 memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
559 uncompressed_ptr += definition_levels_rle_size;
560 memcpy(uncompressed_ptr, values->data(), values->size());
561
562 EncodedStatistics page_stats = GetPageStatistics();
563 page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
564 page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
565 ResetPageStatistics();
566
567 std::shared_ptr<Buffer> compressed_data;
568 if (pager_->has_compressor()) {
569 pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
570 compressed_data = compressed_data_;
571 } else {
572 compressed_data = uncompressed_data_;
573 }
574
575 // Write the page to OutputStream eagerly if there is no dictionary or
576 // if dictionary encoding has fallen back to PLAIN
577 if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
578 std::shared_ptr<Buffer> compressed_data_copy;
579 PARQUET_THROW_NOT_OK(compressed_data->Copy(0, compressed_data->size(), allocator_,
580 &compressed_data_copy));
581 CompressedDataPage page(compressed_data_copy,
582 static_cast<int32_t>(num_buffered_values_), encoding_,
583 Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
584 total_compressed_bytes_ += page.size() + sizeof(format::PageHeader);
585 data_pages_.push_back(std::move(page));
586 } else { // Eagerly write pages
587 CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
588 encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
589 page_stats);
590 WriteDataPage(page);
591 }
592
593 // Re-initialize the sinks for next Page.
594 InitSinks();
595 num_buffered_values_ = 0;
596 num_buffered_encoded_values_ = 0;
597}
598
599int64_t ColumnWriterImpl::Close() {
600 if (!closed_) {
601 closed_ = true;
602 if (has_dictionary_ && !fallback_) {
603 WriteDictionaryPage();
604 }
605
606 FlushBufferedDataPages();
607
608 EncodedStatistics chunk_statistics = GetChunkStatistics();
609 chunk_statistics.ApplyStatSizeLimits(
610 properties_->max_statistics_size(descr_->path()));
611 chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
612
613 // Write stats only if the column has at least one row written
614 if (rows_written_ > 0 && chunk_statistics.is_set()) {
615 metadata_->SetStatistics(chunk_statistics);
616 }
617 pager_->Close(has_dictionary_, fallback_);
618 }
619
620 return total_bytes_written_;
621}
622
623void ColumnWriterImpl::FlushBufferedDataPages() {
624 // Write all outstanding data to a new page
625 if (num_buffered_values_ > 0) {
626 AddDataPage();
627 }
628 for (size_t i = 0; i < data_pages_.size(); i++) {
629 WriteDataPage(data_pages_[i]);
630 }
631 data_pages_.clear();
632 total_compressed_bytes_ = 0;
633}
634
635// ----------------------------------------------------------------------
636// TypedColumnWriter
637
638template <typename Action>
639inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
640 int64_t num_batches = static_cast<int>(total / batch_size);
641 for (int round = 0; round < num_batches; round++) {
642 action(round * batch_size, batch_size);
643 }
644 // Write the remaining values
645 if (total % batch_size > 0) {
646 action(num_batches * batch_size, total % batch_size);
647 }
648}
649
650bool DictionaryDirectWriteSupported(const arrow::Array& array) {
651 DCHECK_EQ(array.type_id(), arrow::Type::DICTIONARY);
652 const arrow::DictionaryType& dict_type =
653 static_cast<const arrow::DictionaryType&>(*array.type());
654 auto id = dict_type.value_type()->id();
655 return id == arrow::Type::BINARY || id == arrow::Type::STRING;
656}
657
658Status ConvertDictionaryToDense(const arrow::Array& array, MemoryPool* pool,
659 std::shared_ptr<arrow::Array>* out) {
660 const arrow::DictionaryType& dict_type =
661 static_cast<const arrow::DictionaryType&>(*array.type());
662
663 // TODO(ARROW-1648): Remove this special handling once we require an Arrow
664 // version that has this fixed.
665 if (dict_type.value_type()->id() == arrow::Type::NA) {
666 *out = std::make_shared<arrow::NullArray>(array.length());
667 return Status::OK();
668 }
669
670 arrow::compute::FunctionContext ctx(pool);
671 Datum cast_output;
672 RETURN_NOT_OK(arrow::compute::Cast(&ctx, Datum(array.data()), dict_type.value_type(),
673 arrow::compute::CastOptions(), &cast_output));
674 *out = cast_output.make_array();
675 return Status::OK();
676}
677
678static inline bool IsDictionaryEncoding(Encoding::type encoding) {
679 return encoding == Encoding::PLAIN_DICTIONARY;
680}
681
682template <typename DType>
683class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> {
684 public:
685 using T = typename DType::c_type;
686
687 TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
688 std::unique_ptr<PageWriter> pager, const bool use_dictionary,
689 Encoding::type encoding, const WriterProperties* properties)
690 : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding,
691 properties) {
692 current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_,
693 properties->memory_pool());
694
695 if (properties->statistics_enabled(descr_->path()) &&
696 (SortOrder::UNKNOWN != descr_->sort_order())) {
697 page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
698 chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
699 }
700 }
701
702 int64_t Close() override { return ColumnWriterImpl::Close(); }
703
704 void WriteBatch(int64_t num_values, const int16_t* def_levels,
705 const int16_t* rep_levels, const T* values) override {
706 // We check for DataPage limits only after we have inserted the values. If a user
707 // writes a large number of values, the DataPage size can be much above the limit.
708 // The purpose of this chunking is to bound this. Even if a user writes large number
709 // of values, the chunking will ensure the AddDataPage() is called at a reasonable
710 // pagesize limit
711 int64_t value_offset = 0;
712 auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
713 int64_t values_to_write =
714 WriteLevels(batch_size, def_levels + offset, rep_levels + offset);
715 // PARQUET-780
716 if (values_to_write > 0) {
717 DCHECK_NE(nullptr, values);
718 }
719 WriteValues(values + value_offset, values_to_write, batch_size - values_to_write);
720 CommitWriteAndCheckPageLimit(batch_size, values_to_write);
721 value_offset += values_to_write;
722
723 // Dictionary size checked separately from data page size since we
724 // circumvent this check when writing arrow::DictionaryArray directly
725 CheckDictionarySizeLimit();
726 };
727 DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
728 }
729
730 void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
731 const int16_t* rep_levels, const uint8_t* valid_bits,
732 int64_t valid_bits_offset, const T* values) override {
733 // Like WriteBatch, but for spaced values
734 int64_t value_offset = 0;
735 auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
736 int64_t batch_num_values = 0;
737 int64_t batch_num_spaced_values = 0;
738 WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset,
739 &batch_num_values, &batch_num_spaced_values);
740 WriteValuesSpaced(values + value_offset, batch_num_values, batch_num_spaced_values,
741 valid_bits, valid_bits_offset + value_offset);
742 CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values);
743 value_offset += batch_num_spaced_values;
744
745 // Dictionary size checked separately from data page size since we
746 // circumvent this check when writing arrow::DictionaryArray directly
747 CheckDictionarySizeLimit();
748 };
749 DoInBatches(num_values, properties_->write_batch_size(), WriteChunk);
750 }
751
752 Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
753 int64_t num_levels, const arrow::Array& array,
754 ArrowWriteContext* ctx) override {
755 if (array.type()->id() == arrow::Type::DICTIONARY) {
756 return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx);
757 } else {
758 return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx);
759 }
760 }
761
762 int64_t EstimatedBufferedValueBytes() const override {
763 return current_encoder_->EstimatedDataEncodedSize();
764 }
765
766 protected:
767 std::shared_ptr<Buffer> GetValuesBuffer() override {
768 return current_encoder_->FlushValues();
769 }
770
771 // Internal function to handle direct writing of arrow::DictionaryArray,
772 // since the standard logic concerning dictionary size limits and fallback to
773 // plain encoding is circumvented
774 Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels,
775 int64_t num_levels, const arrow::Array& array,
776 ArrowWriteContext* context);
777
778 Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels,
779 int64_t num_levels, const arrow::Array& array,
780 ArrowWriteContext* context);
781
782 void WriteDictionaryPage() override {
783 // We have to dynamic cast here because of TypedEncoder<Type> as
784 // some compilers don't want to cast through virtual inheritance
785 auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
786 DCHECK(dict_encoder);
787 std::shared_ptr<ResizableBuffer> buffer =
788 AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
789 dict_encoder->WriteDict(buffer->mutable_data());
790
791 DictionaryPage page(buffer, dict_encoder->num_entries(),
792 properties_->dictionary_page_encoding());
793 total_bytes_written_ += pager_->WriteDictionaryPage(page);
794 }
795
796 EncodedStatistics GetPageStatistics() override {
797 EncodedStatistics result;
798 if (page_statistics_) result = page_statistics_->Encode();
799 return result;
800 }
801
802 EncodedStatistics GetChunkStatistics() override {
803 EncodedStatistics result;
804 if (chunk_statistics_) result = chunk_statistics_->Encode();
805 return result;
806 }
807
808 void ResetPageStatistics() override {
809 if (chunk_statistics_ != nullptr) {
810 chunk_statistics_->Merge(*page_statistics_);
811 page_statistics_->Reset();
812 }
813 }
814
815 Type::type type() const override { return descr_->physical_type(); }
816
817 const ColumnDescriptor* descr() const override { return descr_; }
818
819 int64_t rows_written() const override { return rows_written_; }
820
821 int64_t total_compressed_bytes() const override { return total_compressed_bytes_; }
822
823 int64_t total_bytes_written() const override { return total_bytes_written_; }
824
825 const WriterProperties* properties() override { return properties_; }
826
827 private:
828 using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
829 using TypedStats = TypedStatistics<DType>;
830 std::unique_ptr<Encoder> current_encoder_;
831 std::shared_ptr<TypedStats> page_statistics_;
832 std::shared_ptr<TypedStats> chunk_statistics_;
833
834 // If writing a sequence of arrow::DictionaryArray to the writer, we keep the
835 // dictionary passed to DictEncoder<T>::PutDictionary so we can check
836 // subsequent array chunks to see either if materialization is required (in
837 // which case we call back to the dense write path)
838 std::shared_ptr<arrow::Array> preserved_dictionary_;
839
840 int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
841 const int16_t* rep_levels) {
842 int64_t values_to_write = 0;
843 // If the field is required and non-repeated, there are no definition levels
844 if (descr_->max_definition_level() > 0) {
845 for (int64_t i = 0; i < num_values; ++i) {
846 if (def_levels[i] == descr_->max_definition_level()) {
847 ++values_to_write;
848 }
849 }
850
851 WriteDefinitionLevels(num_values, def_levels);
852 } else {
853 // Required field, write all values
854 values_to_write = num_values;
855 }
856
857 // Not present for non-repeated fields
858 if (descr_->max_repetition_level() > 0) {
859 // A row could include more than one value
860 // Count the occasions where we start a new row
861 for (int64_t i = 0; i < num_values; ++i) {
862 if (rep_levels[i] == 0) {
863 rows_written_++;
864 }
865 }
866
867 WriteRepetitionLevels(num_values, rep_levels);
868 } else {
869 // Each value is exactly one row
870 rows_written_ += static_cast<int>(num_values);
871 }
872 return values_to_write;
873 }
874
875 void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels,
876 const int16_t* rep_levels, int64_t* out_values_to_write,
877 int64_t* out_spaced_values_to_write) {
878 int64_t values_to_write = 0;
879 int64_t spaced_values_to_write = 0;
880 // If the field is required and non-repeated, there are no definition levels
881 if (descr_->max_definition_level() > 0) {
882 // Minimal definition level for which spaced values are written
883 int16_t min_spaced_def_level = descr_->max_definition_level();
884 if (descr_->schema_node()->is_optional()) {
885 min_spaced_def_level--;
886 }
887 for (int64_t i = 0; i < num_levels; ++i) {
888 if (def_levels[i] == descr_->max_definition_level()) {
889 ++values_to_write;
890 }
891 if (def_levels[i] >= min_spaced_def_level) {
892 ++spaced_values_to_write;
893 }
894 }
895
896 WriteDefinitionLevels(num_levels, def_levels);
897 } else {
898 // Required field, write all values
899 values_to_write = num_levels;
900 spaced_values_to_write = num_levels;
901 }
902
903 // Not present for non-repeated fields
904 if (descr_->max_repetition_level() > 0) {
905 // A row could include more than one value
906 // Count the occasions where we start a new row
907 for (int64_t i = 0; i < num_levels; ++i) {
908 if (rep_levels[i] == 0) {
909 rows_written_++;
910 }
911 }
912
913 WriteRepetitionLevels(num_levels, rep_levels);
914 } else {
915 // Each value is exactly one row
916 rows_written_ += static_cast<int>(num_levels);
917 }
918
919 *out_values_to_write = values_to_write;
920 *out_spaced_values_to_write = spaced_values_to_write;
921 }
922
923 void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) {
924 num_buffered_values_ += num_levels;
925 num_buffered_encoded_values_ += num_values;
926
927 if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
928 AddDataPage();
929 }
930 }
931
932 void FallbackToPlainEncoding() {
933 if (IsDictionaryEncoding(current_encoder_->encoding())) {
934 WriteDictionaryPage();
935 // Serialize the buffered Dictionary Indicies
936 FlushBufferedDataPages();
937 fallback_ = true;
938 // Only PLAIN encoding is supported for fallback in V1
939 current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_,
940 properties_->memory_pool());
941 encoding_ = Encoding::PLAIN;
942 }
943 }
944
945 // Checks if the Dictionary Page size limit is reached
946 // If the limit is reached, the Dictionary and Data Pages are serialized
947 // The encoding is switched to PLAIN
948 //
949 // Only one Dictionary Page is written.
950 // Fallback to PLAIN if dictionary page limit is reached.
951 void CheckDictionarySizeLimit() {
952 if (!has_dictionary_ || fallback_) {
953 // Either not using dictionary encoding, or we have already fallen back
954 // to PLAIN encoding because the size threshold was reached
955 return;
956 }
957
958 // We have to dynamic cast here because TypedEncoder<Type> as some compilers
959 // don't want to cast through virtual inheritance
960 auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
961 if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
962 FallbackToPlainEncoding();
963 }
964 }
965
966 void WriteValues(const T* values, int64_t num_values, int64_t num_nulls) {
967 dynamic_cast<ValueEncoderType*>(current_encoder_.get())
968 ->Put(values, static_cast<int>(num_values));
969 if (page_statistics_ != nullptr) {
970 page_statistics_->Update(values, num_values, num_nulls);
971 }
972 }
973
974 void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values,
975 const uint8_t* valid_bits, int64_t valid_bits_offset) {
976 if (descr_->schema_node()->is_optional()) {
977 dynamic_cast<ValueEncoderType*>(current_encoder_.get())
978 ->PutSpaced(values, static_cast<int>(num_spaced_values), valid_bits,
979 valid_bits_offset);
980 } else {
981 dynamic_cast<ValueEncoderType*>(current_encoder_.get())
982 ->Put(values, static_cast<int>(num_values));
983 }
984 if (page_statistics_ != nullptr) {
985 const int64_t num_nulls = num_spaced_values - num_values;
986 page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_values,
987 num_nulls);
988 }
989 }
990};
991
992template <typename DType>
993Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(const int16_t* def_levels,
994 const int16_t* rep_levels,
995 int64_t num_levels,
996 const arrow::Array& array,
997 ArrowWriteContext* ctx) {
998 // If this is the first time writing a DictionaryArray, then there's
999 // a few possible paths to take:
1000 //
1001 // - If dictionary encoding is not enabled, convert to densely
1002 // encoded and call WriteArrow
1003 // - Dictionary encoding enabled
1004 // - If this is the first time this is called, then we call
1005 // PutDictionary into the encoder and then PutIndices on each
1006 // chunk. We store the dictionary that was written in
1007 // preserved_dictionary_ so that subsequent calls to this method
1008 // can make sure the dictionary has not changed
1009 // - On subsequent calls, we have to check whether the dictionary
1010 // has changed. If it has, then we trigger the varying
1011 // dictionary path and materialize each chunk and then call
1012 // WriteArrow with that
1013 auto WriteDense = [&] {
1014 std::shared_ptr<arrow::Array> dense_array;
1015 RETURN_NOT_OK(
1016 ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array));
1017 return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx);
1018 };
1019
1020 if (!IsDictionaryEncoding(current_encoder_->encoding()) ||
1021 !DictionaryDirectWriteSupported(array)) {
1022 // No longer dictionary-encoding for whatever reason, maybe we never were
1023 // or we decided to stop. Note that WriteArrow can be invoked multiple
1024 // times with both dense and dictionary-encoded versions of the same data
1025 // without a problem. Any dense data will be hashed to indices until the
1026 // dictionary page limit is reached, at which everything (dictionary and
1027 // dense) will fall back to plain encoding
1028 return WriteDense();
1029 }
1030
1031 auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
1032 const auto& data = checked_cast<const arrow::DictionaryArray&>(array);
1033 std::shared_ptr<arrow::Array> dictionary = data.dictionary();
1034 std::shared_ptr<arrow::Array> indices = data.indices();
1035
1036 int64_t value_offset = 0;
1037 auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) {
1038 int64_t batch_num_values = 0;
1039 int64_t batch_num_spaced_values = 0;
1040 WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset,
1041 &batch_num_values, &batch_num_spaced_values);
1042 dict_encoder->PutIndices(*indices->Slice(value_offset, batch_num_spaced_values));
1043 CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
1044 value_offset += batch_num_spaced_values;
1045 };
1046
1047 // Handle seeing dictionary for the first time
1048 if (!preserved_dictionary_) {
1049 // It's a new dictionary. Call PutDictionary and keep track of it
1050 PARQUET_CATCH_NOT_OK(dict_encoder->PutDictionary(*dictionary));
1051
1052 // TODO(wesm): If some dictionary values are unobserved, then the
1053 // statistics will be inaccurate. Do we care enough to fix it?
1054 if (page_statistics_ != nullptr) {
1055 PARQUET_CATCH_NOT_OK(page_statistics_->Update(*dictionary));
1056 }
1057 preserved_dictionary_ = dictionary;
1058 } else if (!dictionary->Equals(*preserved_dictionary_)) {
1059 // Dictionary has changed
1060 PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
1061 return WriteDense();
1062 }
1063
1064 PARQUET_CATCH_NOT_OK(
1065 DoInBatches(num_levels, properties_->write_batch_size(), WriteIndicesChunk));
1066 return Status::OK();
1067}
1068
1069// ----------------------------------------------------------------------
1070// Direct Arrow write path
1071
1072template <typename ParquetType, typename ArrowType, typename Enable = void>
1073struct SerializeFunctor {
1074 using ArrowCType = typename ArrowType::c_type;
1075 using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
1076 using ParquetCType = typename ParquetType::c_type;
1077 Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) {
1078 const ArrowCType* input = array.raw_values();
1079 if (array.null_count() > 0) {
1080 for (int i = 0; i < array.length(); i++) {
1081 out[i] = static_cast<ParquetCType>(input[i]);
1082 }
1083 } else {
1084 std::copy(input, input + array.length(), out);
1085 }
1086 return Status::OK();
1087 }
1088};
1089
1090template <typename ParquetType, typename ArrowType>
1091inline Status SerializeData(const arrow::Array& array, ArrowWriteContext* ctx,
1092 typename ParquetType::c_type* out) {
1093 using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
1094 SerializeFunctor<ParquetType, ArrowType> functor;
1095 return functor.Serialize(checked_cast<const ArrayType&>(array), ctx, out);
1096}
1097
1098template <typename ParquetType, typename ArrowType>
1099Status WriteArrowSerialize(const arrow::Array& array, int64_t num_levels,
1100 const int16_t* def_levels, const int16_t* rep_levels,
1101 ArrowWriteContext* ctx,
1102 TypedColumnWriter<ParquetType>* writer) {
1103 using ParquetCType = typename ParquetType::c_type;
1104
1105 ParquetCType* buffer;
1106 PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));
1107
1108 bool no_nulls =
1109 writer->descr()->schema_node()->is_required() || (array.null_count() == 0);
1110
1111 Status s = SerializeData<ParquetType, ArrowType>(array, ctx, buffer);
1112 RETURN_NOT_OK(s);
1113 if (no_nulls) {
1114 PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
1115 } else {
1116 PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
1117 array.null_bitmap_data(),
1118 array.offset(), buffer));
1119 }
1120 return Status::OK();
1121}
1122
1123template <typename ParquetType>
1124Status WriteArrowZeroCopy(const arrow::Array& array, int64_t num_levels,
1125 const int16_t* def_levels, const int16_t* rep_levels,
1126 ArrowWriteContext* ctx,
1127 TypedColumnWriter<ParquetType>* writer) {
1128 using T = typename ParquetType::c_type;
1129 const auto& data = static_cast<const arrow::PrimitiveArray&>(array);
1130 const T* values = nullptr;
1131 // The values buffer may be null if the array is empty (ARROW-2744)
1132 if (data.values() != nullptr) {
1133 values = reinterpret_cast<const T*>(data.values()->data()) + data.offset();
1134 } else {
1135 DCHECK_EQ(data.length(), 0);
1136 }
1137 if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
1138 PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values));
1139 } else {
1140 PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
1141 data.null_bitmap_data(), data.offset(),
1142 values));
1143 }
1144 return Status::OK();
1145}
1146
1147#define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \
1148 case arrow::Type::ArrowEnum: \
1149 return WriteArrowSerialize<ParquetType, arrow::ArrowType>( \
1150 array, num_levels, def_levels, rep_levels, ctx, this);
1151
1152#define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \
1153 case arrow::Type::ArrowEnum: \
1154 return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \
1155 ctx, this);
1156
1157#define ARROW_UNSUPPORTED() \
1158 std::stringstream ss; \
1159 ss << "Arrow type " << array.type()->ToString() \
1160 << " cannot be written to Parquet type " << descr_->ToString(); \
1161 return Status::Invalid(ss.str());
1162
1163// ----------------------------------------------------------------------
1164// Write Arrow to BooleanType
1165
1166template <>
1167struct SerializeFunctor<BooleanType, arrow::BooleanType> {
1168 Status Serialize(const arrow::BooleanArray& data, ArrowWriteContext*, bool* out) {
1169 for (int i = 0; i < data.length(); i++) {
1170 *out++ = data.Value(i);
1171 }
1172 return Status::OK();
1173 }
1174};
1175
1176template <>
1177Status TypedColumnWriterImpl<BooleanType>::WriteArrowDense(const int16_t* def_levels,
1178 const int16_t* rep_levels,
1179 int64_t num_levels,
1180 const arrow::Array& array,
1181 ArrowWriteContext* ctx) {
1182 if (array.type_id() != arrow::Type::BOOL) {
1183 ARROW_UNSUPPORTED();
1184 }
1185 return WriteArrowSerialize<BooleanType, arrow::BooleanType>(
1186 array, num_levels, def_levels, rep_levels, ctx, this);
1187}
1188
1189// ----------------------------------------------------------------------
1190// Write Arrow types to INT32
1191
1192template <>
1193struct SerializeFunctor<Int32Type, arrow::Date64Type> {
1194 Status Serialize(const arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) {
1195 const int64_t* input = array.raw_values();
1196 for (int i = 0; i < array.length(); i++) {
1197 *out++ = static_cast<int32_t>(*input++ / 86400000);
1198 }
1199 return Status::OK();
1200 }
1201};
1202
1203template <>
1204struct SerializeFunctor<Int32Type, arrow::Time32Type> {
1205 Status Serialize(const arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
1206 const int32_t* input = array.raw_values();
1207 const auto& type = static_cast<const arrow::Time32Type&>(*array.type());
1208 if (type.unit() == arrow::TimeUnit::SECOND) {
1209 for (int i = 0; i < array.length(); i++) {
1210 out[i] = input[i] * 1000;
1211 }
1212 } else {
1213 std::copy(input, input + array.length(), out);
1214 }
1215 return Status::OK();
1216 }
1217};
1218
1219template <>
1220Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(const int16_t* def_levels,
1221 const int16_t* rep_levels,
1222 int64_t num_levels,
1223 const arrow::Array& array,
1224 ArrowWriteContext* ctx) {
1225 switch (array.type()->id()) {
1226 case arrow::Type::NA: {
1227 PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr));
1228 } break;
1229 WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type)
1230 WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type)
1231 WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type)
1232 WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type)
1233 WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type)
1234 WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type)
1235 WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
1236 WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
1237 WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
1238 default:
1239 ARROW_UNSUPPORTED()
1240 }
1241 return Status::OK();
1242}
1243
1244// ----------------------------------------------------------------------
1245// Write Arrow to Int64 and Int96
1246
1247#define INT96_CONVERT_LOOP(ConversionFunction) \
1248 for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]);
1249
1250template <>
1251struct SerializeFunctor<Int96Type, arrow::TimestampType> {
1252 Status Serialize(const arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) {
1253 const int64_t* input = array.raw_values();
1254 const auto& type = static_cast<const arrow::TimestampType&>(*array.type());
1255 switch (type.unit()) {
1256 case arrow::TimeUnit::NANO:
1257 INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp);
1258 break;
1259 case arrow::TimeUnit::MICRO:
1260 INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp);
1261 break;
1262 case arrow::TimeUnit::MILLI:
1263 INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp);
1264 break;
1265 case arrow::TimeUnit::SECOND:
1266 INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp);
1267 break;
1268 }
1269 return Status::OK();
1270 }
1271};
1272
1273#define COERCE_DIVIDE -1
1274#define COERCE_INVALID 0
1275#define COERCE_MULTIPLY +1
1276
1277static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
1278 // from seconds ...
1279 {{COERCE_INVALID, 0}, // ... to seconds
1280 {COERCE_MULTIPLY, 1000}, // ... to millis
1281 {COERCE_MULTIPLY, 1000000}, // ... to micros
1282 {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos
1283 // from millis ...
1284 {{COERCE_INVALID, 0},
1285 {COERCE_MULTIPLY, 1},
1286 {COERCE_MULTIPLY, 1000},
1287 {COERCE_MULTIPLY, 1000000}},
1288 // from micros ...
1289 {{COERCE_INVALID, 0},
1290 {COERCE_DIVIDE, 1000},
1291 {COERCE_MULTIPLY, 1},
1292 {COERCE_MULTIPLY, 1000}},
1293 // from nanos ...
1294 {{COERCE_INVALID, 0},
1295 {COERCE_DIVIDE, 1000000},
1296 {COERCE_DIVIDE, 1000},
1297 {COERCE_MULTIPLY, 1}}};
1298
1299template <>
1300struct SerializeFunctor<Int64Type, arrow::TimestampType> {
1301 Status Serialize(const arrow::TimestampArray& array, ArrowWriteContext* ctx,
1302 int64_t* out) {
1303 const auto& source_type = static_cast<const arrow::TimestampType&>(*array.type());
1304 auto source_unit = source_type.unit();
1305 const int64_t* values = array.raw_values();
1306
1307 arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit();
1308 auto target_type = arrow::timestamp(target_unit);
1309 bool truncation_allowed = ctx->properties->truncated_timestamps_allowed();
1310
1311 auto DivideBy = [&](const int64_t factor) {
1312 for (int64_t i = 0; i < array.length(); i++) {
1313 if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) {
1314 return Status::Invalid("Casting from ", source_type.ToString(), " to ",
1315 target_type->ToString(),
1316 " would lose data: ", values[i]);
1317 }
1318 out[i] = values[i] / factor;
1319 }
1320 return Status::OK();
1321 };
1322
1323 auto MultiplyBy = [&](const int64_t factor) {
1324 for (int64_t i = 0; i < array.length(); i++) {
1325 out[i] = values[i] * factor;
1326 }
1327 return Status::OK();
1328 };
1329
1330 const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
1331 [static_cast<int>(target_unit)];
1332
1333 // .first -> coercion operation; .second -> scale factor
1334 DCHECK_NE(coercion.first, COERCE_INVALID);
1335 return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
1336 : MultiplyBy(coercion.second);
1337 }
1338};
1339
1340#undef COERCE_DIVIDE
1341#undef COERCE_INVALID
1342#undef COERCE_MULTIPLY
1343
1344Status WriteTimestamps(const arrow::Array& values, int64_t num_levels,
1345 const int16_t* def_levels, const int16_t* rep_levels,
1346 ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer) {
1347 const auto& source_type = static_cast<const arrow::TimestampType&>(*values.type());
1348
1349 auto WriteCoerce = [&](const ArrowWriterProperties* properties) {
1350 ArrowWriteContext temp_ctx = *ctx;
1351 temp_ctx.properties = properties;
1352 return WriteArrowSerialize<Int64Type, arrow::TimestampType>(
1353 values, num_levels, def_levels, rep_levels, &temp_ctx, writer);
1354 };
1355
1356 if (ctx->properties->coerce_timestamps_enabled()) {
1357 // User explicitly requested coercion to specific unit
1358 if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
1359 // No data conversion necessary
1360 return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels,
1361 ctx, writer);
1362 } else {
1363 return WriteCoerce(ctx->properties);
1364 }
1365 } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 &&
1366 source_type.unit() == arrow::TimeUnit::NANO) {
1367 // Absent superseding user instructions, when writing Parquet version 1.0 files,
1368 // timestamps in nanoseconds are coerced to microseconds
1369 std::shared_ptr<ArrowWriterProperties> properties =
1370 (ArrowWriterProperties::Builder())
1371 .coerce_timestamps(arrow::TimeUnit::MICRO)
1372 ->disallow_truncated_timestamps()
1373 ->build();
1374 return WriteCoerce(properties.get());
1375 } else if (source_type.unit() == arrow::TimeUnit::SECOND) {
1376 // Absent superseding user instructions, timestamps in seconds are coerced to
1377 // milliseconds
1378 std::shared_ptr<ArrowWriterProperties> properties =
1379 (ArrowWriterProperties::Builder())
1380 .coerce_timestamps(arrow::TimeUnit::MILLI)
1381 ->build();
1382 return WriteCoerce(properties.get());
1383 } else {
1384 // No data conversion necessary
1385 return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx,
1386 writer);
1387 }
1388}
1389
1390template <>
1391Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(const int16_t* def_levels,
1392 const int16_t* rep_levels,
1393 int64_t num_levels,
1394 const arrow::Array& array,
1395 ArrowWriteContext* ctx) {
1396 switch (array.type()->id()) {
1397 case arrow::Type::TIMESTAMP:
1398 return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this);
1399 WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type)
1400 WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type)
1401 WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
1402 WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
1403 default:
1404 ARROW_UNSUPPORTED();
1405 }
1406}
1407
1408template <>
1409Status TypedColumnWriterImpl<Int96Type>::WriteArrowDense(const int16_t* def_levels,
1410 const int16_t* rep_levels,
1411 int64_t num_levels,
1412 const arrow::Array& array,
1413 ArrowWriteContext* ctx) {
1414 if (array.type_id() != arrow::Type::TIMESTAMP) {
1415 ARROW_UNSUPPORTED();
1416 }
1417 return WriteArrowSerialize<Int96Type, arrow::TimestampType>(
1418 array, num_levels, def_levels, rep_levels, ctx, this);
1419}
1420
1421// ----------------------------------------------------------------------
1422// Floating point types
1423
1424template <>
1425Status TypedColumnWriterImpl<FloatType>::WriteArrowDense(const int16_t* def_levels,
1426 const int16_t* rep_levels,
1427 int64_t num_levels,
1428 const arrow::Array& array,
1429 ArrowWriteContext* ctx) {
1430 if (array.type_id() != arrow::Type::FLOAT) {
1431 ARROW_UNSUPPORTED();
1432 }
1433 return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx,
1434 this);
1435}
1436
1437template <>
1438Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(const int16_t* def_levels,
1439 const int16_t* rep_levels,
1440 int64_t num_levels,
1441 const arrow::Array& array,
1442 ArrowWriteContext* ctx) {
1443 if (array.type_id() != arrow::Type::DOUBLE) {
1444 ARROW_UNSUPPORTED();
1445 }
1446 return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx,
1447 this);
1448}
1449
1450// ----------------------------------------------------------------------
1451// Write Arrow to BYTE_ARRAY
1452
1453template <>
1454Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(const int16_t* def_levels,
1455 const int16_t* rep_levels,
1456 int64_t num_levels,
1457 const arrow::Array& array,
1458 ArrowWriteContext* ctx) {
1459 if (array.type()->id() != arrow::Type::BINARY &&
1460 array.type()->id() != arrow::Type::STRING) {
1461 ARROW_UNSUPPORTED();
1462 }
1463
1464 int64_t value_offset = 0;
1465 auto WriteChunk = [&](int64_t offset, int64_t batch_size) {
1466 int64_t batch_num_values = 0;
1467 int64_t batch_num_spaced_values = 0;
1468 WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset,
1469 &batch_num_values, &batch_num_spaced_values);
1470 std::shared_ptr<arrow::Array> data_slice =
1471 array.Slice(value_offset, batch_num_spaced_values);
1472 current_encoder_->Put(*data_slice);
1473 if (page_statistics_ != nullptr) {
1474 page_statistics_->Update(*data_slice);
1475 }
1476 CommitWriteAndCheckPageLimit(batch_size, batch_num_values);
1477 CheckDictionarySizeLimit();
1478 value_offset += batch_num_spaced_values;
1479 };
1480
1481 PARQUET_CATCH_NOT_OK(
1482 DoInBatches(num_levels, properties_->write_batch_size(), WriteChunk));
1483 return Status::OK();
1484}
1485
1486// ----------------------------------------------------------------------
1487// Write Arrow to FIXED_LEN_BYTE_ARRAY
1488
1489template <typename ParquetType, typename ArrowType>
1490struct SerializeFunctor<ParquetType, ArrowType,
1491 arrow::enable_if_fixed_size_binary<ArrowType>> {
1492 Status Serialize(const arrow::FixedSizeBinaryArray& array, ArrowWriteContext*,
1493 FLBA* out) {
1494 if (array.null_count() == 0) {
1495 // no nulls, just dump the data
1496 // todo(advancedxy): use a writeBatch to avoid this step
1497 for (int64_t i = 0; i < array.length(); i++) {
1498 out[i] = FixedLenByteArray(array.GetValue(i));
1499 }
1500 } else {
1501 for (int64_t i = 0; i < array.length(); i++) {
1502 if (array.IsValid(i)) {
1503 out[i] = FixedLenByteArray(array.GetValue(i));
1504 }
1505 }
1506 }
1507 return Status::OK();
1508 }
1509};
1510
1511template <>
1512Status WriteArrowSerialize<FLBAType, arrow::Decimal128Type>(
1513 const arrow::Array& array, int64_t num_levels, const int16_t* def_levels,
1514 const int16_t* rep_levels, ArrowWriteContext* ctx,
1515 TypedColumnWriter<FLBAType>* writer) {
1516 const auto& data = static_cast<const arrow::Decimal128Array&>(array);
1517 const int64_t length = data.length();
1518
1519 FLBA* buffer;
1520 RETURN_NOT_OK(ctx->GetScratchData<FLBA>(num_levels, &buffer));
1521
1522 const auto& decimal_type = static_cast<const arrow::Decimal128Type&>(*data.type());
1523 const int32_t offset =
1524 decimal_type.byte_width() - internal::DecimalSize(decimal_type.precision());
1525
1526 const bool does_not_have_nulls =
1527 writer->descr()->schema_node()->is_required() || data.null_count() == 0;
1528
1529 const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2;
1530 std::vector<uint64_t> big_endian_values(valid_value_count);
1531
1532 // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
1533 // don't have to keep writing two loops to handle the case where we know there are no
1534 // nulls
1535 if (does_not_have_nulls) {
1536 // no nulls, just dump the data
1537 // todo(advancedxy): use a writeBatch to avoid this step
1538 for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
1539 auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
1540 big_endian_values[j] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
1541 big_endian_values[j + 1] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
1542 buffer[i] = FixedLenByteArray(
1543 reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
1544 }
1545 } else {
1546 for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
1547 if (data.IsValid(i)) {
1548 auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
1549 big_endian_values[j] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
1550 big_endian_values[j + 1] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
1551 buffer[buffer_idx++] = FixedLenByteArray(
1552 reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
1553 j += 2;
1554 }
1555 }
1556 }
1557 PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
1558 return Status::OK();
1559}
1560
1561template <>
1562Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(const int16_t* def_levels,
1563 const int16_t* rep_levels,
1564 int64_t num_levels,
1565 const arrow::Array& array,
1566 ArrowWriteContext* ctx) {
1567 switch (array.type()->id()) {
1568 WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
1569 WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType)
1570 default:
1571 break;
1572 }
1573 return Status::OK();
1574}
1575
1576// ----------------------------------------------------------------------
1577// Dynamic column writer constructor
1578
1579std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
1580 std::unique_ptr<PageWriter> pager,
1581 const WriterProperties* properties) {
1582 const ColumnDescriptor* descr = metadata->descr();
1583 const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
1584 descr->physical_type() != Type::BOOLEAN;
1585 Encoding::type encoding = properties->encoding(descr->path());
1586 if (use_dictionary) {
1587 encoding = properties->dictionary_index_encoding();
1588 }
1589 switch (descr->physical_type()) {
1590 case Type::BOOLEAN:
1591 return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
1592 metadata, std::move(pager), use_dictionary, encoding, properties);
1593 case Type::INT32:
1594 return std::make_shared<TypedColumnWriterImpl<Int32Type>>(
1595 metadata, std::move(pager), use_dictionary, encoding, properties);
1596 case Type::INT64:
1597 return std::make_shared<TypedColumnWriterImpl<Int64Type>>(
1598 metadata, std::move(pager), use_dictionary, encoding, properties);
1599 case Type::INT96:
1600 return std::make_shared<TypedColumnWriterImpl<Int96Type>>(
1601 metadata, std::move(pager), use_dictionary, encoding, properties);
1602 case Type::FLOAT:
1603 return std::make_shared<TypedColumnWriterImpl<FloatType>>(
1604 metadata, std::move(pager), use_dictionary, encoding, properties);
1605 case Type::DOUBLE:
1606 return std::make_shared<TypedColumnWriterImpl<DoubleType>>(
1607 metadata, std::move(pager), use_dictionary, encoding, properties);
1608 case Type::BYTE_ARRAY:
1609 return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>(
1610 metadata, std::move(pager), use_dictionary, encoding, properties);
1611 case Type::FIXED_LEN_BYTE_ARRAY:
1612 return std::make_shared<TypedColumnWriterImpl<FLBAType>>(
1613 metadata, std::move(pager), use_dictionary, encoding, properties);
1614 default:
1615 ParquetException::NYI("type reader not implemented");
1616 }
1617 // Unreachable code, but supress compiler warning
1618 return std::shared_ptr<ColumnWriter>(nullptr);
1619}
1620
1621} // namespace parquet
1622