1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/column_writer.h"
19
20#include <cstdint>
21#include <memory>
22#include <utility>
23
24#include "arrow/util/bit-util.h"
25#include "arrow/util/compression.h"
26#include "arrow/util/logging.h"
27#include "arrow/util/rle-encoding.h"
28
29#include "parquet/encoding-internal.h"
30#include "parquet/properties.h"
31#include "parquet/statistics.h"
32#include "parquet/thrift.h"
33#include "parquet/util/memory.h"
34
35namespace parquet {
36
37using BitWriter = ::arrow::BitUtil::BitWriter;
38using RleEncoder = ::arrow::util::RleEncoder;
39
40LevelEncoder::LevelEncoder() {}
41LevelEncoder::~LevelEncoder() {}
42
43void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
44 int num_buffered_values, uint8_t* data, int data_size) {
45 bit_width_ = BitUtil::Log2(max_level + 1);
46 encoding_ = encoding;
47 switch (encoding) {
48 case Encoding::RLE: {
49 rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
50 break;
51 }
52 case Encoding::BIT_PACKED: {
53 int num_bytes =
54 static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
55 bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
56 break;
57 }
58 default:
59 throw ParquetException("Unknown encoding type for levels.");
60 }
61}
62
63int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level,
64 int num_buffered_values) {
65 int bit_width = BitUtil::Log2(max_level + 1);
66 int num_bytes = 0;
67 switch (encoding) {
68 case Encoding::RLE: {
69 // TODO: Due to the way we currently check if the buffer is full enough,
70 // we need to have MinBufferSize as head room.
71 num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
72 RleEncoder::MinBufferSize(bit_width);
73 break;
74 }
75 case Encoding::BIT_PACKED: {
76 num_bytes =
77 static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width));
78 break;
79 }
80 default:
81 throw ParquetException("Unknown encoding type for levels.");
82 }
83 return num_bytes;
84}
85
86int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
87 int num_encoded = 0;
88 if (!rle_encoder_ && !bit_packed_encoder_) {
89 throw ParquetException("Level encoders are not initialized.");
90 }
91
92 if (encoding_ == Encoding::RLE) {
93 for (int i = 0; i < batch_size; ++i) {
94 if (!rle_encoder_->Put(*(levels + i))) {
95 break;
96 }
97 ++num_encoded;
98 }
99 rle_encoder_->Flush();
100 rle_length_ = rle_encoder_->len();
101 } else {
102 for (int i = 0; i < batch_size; ++i) {
103 if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
104 break;
105 }
106 ++num_encoded;
107 }
108 bit_packed_encoder_->Flush();
109 }
110 return num_encoded;
111}
112
113// ----------------------------------------------------------------------
114// PageWriter implementation
115
116static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) {
117 format::Statistics statistics;
118 if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min());
119 if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max());
120 if (row_group_statistics.has_null_count)
121 statistics.__set_null_count(row_group_statistics.null_count);
122 if (row_group_statistics.has_distinct_count)
123 statistics.__set_distinct_count(row_group_statistics.distinct_count);
124 return statistics;
125}
126
127// This subclass delimits pages appearing in a serialized stream, each preceded
128// by a serialized Thrift format::PageHeader indicating the type of each page
129// and the page metadata.
130class SerializedPageWriter : public PageWriter {
131 public:
132 SerializedPageWriter(OutputStream* sink, Compression::type codec,
133 ColumnChunkMetaDataBuilder* metadata,
134 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
135 : sink_(sink),
136 metadata_(metadata),
137 pool_(pool),
138 num_values_(0),
139 dictionary_page_offset_(0),
140 data_page_offset_(0),
141 total_uncompressed_size_(0),
142 total_compressed_size_(0) {
143 compressor_ = GetCodecFromArrow(codec);
144 thrift_serializer_.reset(new ThriftSerializer);
145 }
146
147 int64_t WriteDictionaryPage(const DictionaryPage& page) override {
148 int64_t uncompressed_size = page.size();
149 std::shared_ptr<Buffer> compressed_data = nullptr;
150 if (has_compressor()) {
151 auto buffer = std::static_pointer_cast<ResizableBuffer>(
152 AllocateBuffer(pool_, uncompressed_size));
153 Compress(*(page.buffer().get()), buffer.get());
154 compressed_data = std::static_pointer_cast<Buffer>(buffer);
155 } else {
156 compressed_data = page.buffer();
157 }
158
159 format::DictionaryPageHeader dict_page_header;
160 dict_page_header.__set_num_values(page.num_values());
161 dict_page_header.__set_encoding(ToThrift(page.encoding()));
162 dict_page_header.__set_is_sorted(page.is_sorted());
163
164 format::PageHeader page_header;
165 page_header.__set_type(format::PageType::DICTIONARY_PAGE);
166 page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
167 page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
168 page_header.__set_dictionary_page_header(dict_page_header);
169 // TODO(PARQUET-594) crc checksum
170
171 int64_t start_pos = sink_->Tell();
172 if (dictionary_page_offset_ == 0) {
173 dictionary_page_offset_ = start_pos;
174 }
175 int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_);
176 sink_->Write(compressed_data->data(), compressed_data->size());
177
178 total_uncompressed_size_ += uncompressed_size + header_size;
179 total_compressed_size_ += compressed_data->size() + header_size;
180
181 return sink_->Tell() - start_pos;
182 }
183
184 void Close(bool has_dictionary, bool fallback) override {
185 // index_page_offset = -1 since they are not supported
186 metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
187 total_compressed_size_, total_uncompressed_size_, has_dictionary,
188 fallback);
189
190 // Write metadata at end of column chunk
191 metadata_->WriteTo(sink_);
192 }
193
194 /**
195 * Compress a buffer.
196 */
197 void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
198 DCHECK(compressor_ != nullptr);
199
200 // Compress the data
201 int64_t max_compressed_size =
202 compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
203
204 // Use Arrow::Buffer::shrink_to_fit = false
205 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
206 PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
207
208 int64_t compressed_size;
209 PARQUET_THROW_NOT_OK(
210 compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
211 dest_buffer->mutable_data(), &compressed_size));
212 PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
213 }
214
215 int64_t WriteDataPage(const CompressedDataPage& page) override {
216 int64_t uncompressed_size = page.uncompressed_size();
217 std::shared_ptr<Buffer> compressed_data = page.buffer();
218
219 format::DataPageHeader data_page_header;
220 data_page_header.__set_num_values(page.num_values());
221 data_page_header.__set_encoding(ToThrift(page.encoding()));
222 data_page_header.__set_definition_level_encoding(
223 ToThrift(page.definition_level_encoding()));
224 data_page_header.__set_repetition_level_encoding(
225 ToThrift(page.repetition_level_encoding()));
226 data_page_header.__set_statistics(ToThrift(page.statistics()));
227
228 format::PageHeader page_header;
229 page_header.__set_type(format::PageType::DATA_PAGE);
230 page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
231 page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
232 page_header.__set_data_page_header(data_page_header);
233 // TODO(PARQUET-594) crc checksum
234
235 int64_t start_pos = sink_->Tell();
236 if (data_page_offset_ == 0) {
237 data_page_offset_ = start_pos;
238 }
239
240 int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_);
241 sink_->Write(compressed_data->data(), compressed_data->size());
242
243 total_uncompressed_size_ += uncompressed_size + header_size;
244 total_compressed_size_ += compressed_data->size() + header_size;
245 num_values_ += page.num_values();
246
247 return sink_->Tell() - start_pos;
248 }
249
250 bool has_compressor() override { return (compressor_ != nullptr); }
251
252 int64_t num_values() { return num_values_; }
253
254 int64_t dictionary_page_offset() { return dictionary_page_offset_; }
255
256 int64_t data_page_offset() { return data_page_offset_; }
257
258 int64_t total_compressed_size() { return total_compressed_size_; }
259
260 int64_t total_uncompressed_size() { return total_uncompressed_size_; }
261
262 private:
263 OutputStream* sink_;
264 ColumnChunkMetaDataBuilder* metadata_;
265 ::arrow::MemoryPool* pool_;
266 int64_t num_values_;
267 int64_t dictionary_page_offset_;
268 int64_t data_page_offset_;
269 int64_t total_uncompressed_size_;
270 int64_t total_compressed_size_;
271
272 std::unique_ptr<ThriftSerializer> thrift_serializer_;
273
274 // Compression codec to use.
275 std::unique_ptr<::arrow::util::Codec> compressor_;
276};
277
278// This implementation of the PageWriter writes to the final sink on Close .
279class BufferedPageWriter : public PageWriter {
280 public:
281 BufferedPageWriter(OutputStream* sink, Compression::type codec,
282 ColumnChunkMetaDataBuilder* metadata,
283 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
284 : final_sink_(sink),
285 metadata_(metadata),
286 in_memory_sink_(new InMemoryOutputStream(pool)),
287 pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, metadata, pool)) {}
288
289 int64_t WriteDictionaryPage(const DictionaryPage& page) override {
290 return pager_->WriteDictionaryPage(page);
291 }
292
293 void Close(bool has_dictionary, bool fallback) override {
294 // index_page_offset = -1 since they are not supported
295 metadata_->Finish(
296 pager_->num_values(), pager_->dictionary_page_offset() + final_sink_->Tell(), -1,
297 pager_->data_page_offset() + final_sink_->Tell(), pager_->total_compressed_size(),
298 pager_->total_uncompressed_size(), has_dictionary, fallback);
299
300 // Write metadata at end of column chunk
301 metadata_->WriteTo(in_memory_sink_.get());
302
303 // flush everything to the serialized sink
304 auto buffer = in_memory_sink_->GetBuffer();
305 final_sink_->Write(buffer->data(), buffer->size());
306 }
307
308 int64_t WriteDataPage(const CompressedDataPage& page) override {
309 return pager_->WriteDataPage(page);
310 }
311
312 void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
313 pager_->Compress(src_buffer, dest_buffer);
314 }
315
316 bool has_compressor() override { return pager_->has_compressor(); }
317
318 private:
319 OutputStream* final_sink_;
320 ColumnChunkMetaDataBuilder* metadata_;
321 std::unique_ptr<InMemoryOutputStream> in_memory_sink_;
322 std::unique_ptr<SerializedPageWriter> pager_;
323};
324
325std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, Compression::type codec,
326 ColumnChunkMetaDataBuilder* metadata,
327 ::arrow::MemoryPool* pool,
328 bool buffered_row_group) {
329 if (buffered_row_group) {
330 return std::unique_ptr<PageWriter>(
331 new BufferedPageWriter(sink, codec, metadata, pool));
332 } else {
333 return std::unique_ptr<PageWriter>(
334 new SerializedPageWriter(sink, codec, metadata, pool));
335 }
336}
337
338// ----------------------------------------------------------------------
339// ColumnWriter
340
341std::shared_ptr<WriterProperties> default_writer_properties() {
342 static std::shared_ptr<WriterProperties> default_writer_properties =
343 WriterProperties::Builder().build();
344 return default_writer_properties;
345}
346
347ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
348 std::unique_ptr<PageWriter> pager, bool has_dictionary,
349 Encoding::type encoding, const WriterProperties* properties)
350 : metadata_(metadata),
351 descr_(metadata->descr()),
352 pager_(std::move(pager)),
353 has_dictionary_(has_dictionary),
354 encoding_(encoding),
355 properties_(properties),
356 allocator_(properties->memory_pool()),
357 num_buffered_values_(0),
358 num_buffered_encoded_values_(0),
359 rows_written_(0),
360 total_bytes_written_(0),
361 total_compressed_bytes_(0),
362 closed_(false),
363 fallback_(false) {
364 definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
365 repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
366 definition_levels_rle_ =
367 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
368 repetition_levels_rle_ =
369 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
370 uncompressed_data_ =
371 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
372 if (pager_->has_compressor()) {
373 compressed_data_ =
374 std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
375 }
376}
377
378void ColumnWriter::InitSinks() {
379 definition_levels_sink_->Clear();
380 repetition_levels_sink_->Clear();
381}
382
383void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
384 DCHECK(!closed_);
385 definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
386 sizeof(int16_t) * num_levels);
387}
388
389void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
390 DCHECK(!closed_);
391 repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
392 sizeof(int16_t) * num_levels);
393}
394
395// return the size of the encoded buffer
396int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer,
397 ResizableBuffer* dest_buffer, int16_t max_level) {
398 // TODO: This only works with due to some RLE specifics
399 int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
400 static_cast<int>(num_buffered_values_)) +
401 sizeof(int32_t);
402
403 // Use Arrow::Buffer::shrink_to_fit = false
404 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
405 PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
406
407 level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
408 dest_buffer->mutable_data() + sizeof(int32_t),
409 static_cast<int>(dest_buffer->size() - sizeof(int32_t)));
410 int encoded =
411 level_encoder_.Encode(static_cast<int>(num_buffered_values_),
412 reinterpret_cast<const int16_t*>(src_buffer.data()));
413 DCHECK_EQ(encoded, num_buffered_values_);
414 reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
415 int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
416 return encoded_size;
417}
418
419void ColumnWriter::AddDataPage() {
420 int64_t definition_levels_rle_size = 0;
421 int64_t repetition_levels_rle_size = 0;
422
423 std::shared_ptr<Buffer> values = GetValuesBuffer();
424
425 if (descr_->max_definition_level() > 0) {
426 definition_levels_rle_size =
427 RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
428 definition_levels_rle_.get(), descr_->max_definition_level());
429 }
430
431 if (descr_->max_repetition_level() > 0) {
432 repetition_levels_rle_size =
433 RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
434 repetition_levels_rle_.get(), descr_->max_repetition_level());
435 }
436
437 int64_t uncompressed_size =
438 definition_levels_rle_size + repetition_levels_rle_size + values->size();
439
440 // Use Arrow::Buffer::shrink_to_fit = false
441 // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
442 PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
443
444 // Concatenate data into a single buffer
445 uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
446 memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
447 uncompressed_ptr += repetition_levels_rle_size;
448 memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
449 uncompressed_ptr += definition_levels_rle_size;
450 memcpy(uncompressed_ptr, values->data(), values->size());
451
452 EncodedStatistics page_stats = GetPageStatistics();
453 ResetPageStatistics();
454
455 std::shared_ptr<Buffer> compressed_data;
456 if (pager_->has_compressor()) {
457 pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
458 compressed_data = compressed_data_;
459 } else {
460 compressed_data = uncompressed_data_;
461 }
462
463 // Write the page to OutputStream eagerly if there is no dictionary or
464 // if dictionary encoding has fallen back to PLAIN
465 if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
466 std::shared_ptr<Buffer> compressed_data_copy;
467 PARQUET_THROW_NOT_OK(compressed_data->Copy(0, compressed_data->size(), allocator_,
468 &compressed_data_copy));
469 CompressedDataPage page(compressed_data_copy,
470 static_cast<int32_t>(num_buffered_values_), encoding_,
471 Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
472 total_compressed_bytes_ += page.size() + sizeof(format::PageHeader);
473 data_pages_.push_back(std::move(page));
474 } else { // Eagerly write pages
475 CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
476 encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
477 page_stats);
478 WriteDataPage(page);
479 }
480
481 // Re-initialize the sinks for next Page.
482 InitSinks();
483 num_buffered_values_ = 0;
484 num_buffered_encoded_values_ = 0;
485}
486
487void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
488 total_bytes_written_ += pager_->WriteDataPage(page);
489}
490
491int64_t ColumnWriter::Close() {
492 if (!closed_) {
493 closed_ = true;
494 if (has_dictionary_ && !fallback_) {
495 WriteDictionaryPage();
496 }
497
498 FlushBufferedDataPages();
499
500 EncodedStatistics chunk_statistics = GetChunkStatistics();
501 // Write stats only if the column has at least one row written
502 // From parquet-mr
503 // Don't write stats larger than the max size rather than truncating. The
504 // rationale is that some engines may use the minimum value in the page as
505 // the true minimum for aggregations and there is no way to mark that a
506 // value has been truncated and is a lower bound and not in the page.
507 if (rows_written_ > 0 && chunk_statistics.is_set() &&
508 chunk_statistics.max_stat_length() <=
509 properties_->max_statistics_size(descr_->path())) {
510 metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
511 chunk_statistics);
512 }
513 pager_->Close(has_dictionary_, fallback_);
514 }
515
516 return total_bytes_written_;
517}
518
519void ColumnWriter::FlushBufferedDataPages() {
520 // Write all outstanding data to a new page
521 if (num_buffered_values_ > 0) {
522 AddDataPage();
523 }
524 for (size_t i = 0; i < data_pages_.size(); i++) {
525 WriteDataPage(data_pages_[i]);
526 }
527 data_pages_.clear();
528 total_compressed_bytes_ = 0;
529}
530
531// ----------------------------------------------------------------------
532// TypedColumnWriter
533
534template <typename Type>
535TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
536 std::unique_ptr<PageWriter> pager,
537 const bool use_dictionary,
538 Encoding::type encoding,
539 const WriterProperties* properties)
540 : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding, properties) {
541 if (use_dictionary) {
542 current_encoder_.reset(new DictEncoder<Type>(descr_, properties->memory_pool()));
543 } else if (encoding == Encoding::PLAIN) {
544 current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool()));
545 } else {
546 ParquetException::NYI("Selected encoding is not supported");
547 }
548
549 if (properties->statistics_enabled(descr_->path()) &&
550 (SortOrder::UNKNOWN != descr_->sort_order())) {
551 page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
552 chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
553 }
554}
555
556// Only one Dictionary Page is written.
557// Fallback to PLAIN if dictionary page limit is reached.
558template <typename Type>
559void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
560 auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
561 if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
562 WriteDictionaryPage();
563 // Serialize the buffered Dictionary Indicies
564 FlushBufferedDataPages();
565 fallback_ = true;
566 // Only PLAIN encoding is supported for fallback in V1
567 current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool()));
568 encoding_ = Encoding::PLAIN;
569 }
570}
571
572template <typename Type>
573void TypedColumnWriter<Type>::WriteDictionaryPage() {
574 auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
575 std::shared_ptr<ResizableBuffer> buffer =
576 AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
577 dict_encoder->WriteDict(buffer->mutable_data());
578
579 DictionaryPage page(buffer, dict_encoder->num_entries(),
580 properties_->dictionary_page_encoding());
581 total_bytes_written_ += pager_->WriteDictionaryPage(page);
582}
583
584template <typename Type>
585EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() {
586 EncodedStatistics result;
587 if (page_statistics_) result = page_statistics_->Encode();
588 return result;
589}
590
591template <typename Type>
592EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() {
593 EncodedStatistics result;
594 if (chunk_statistics_) result = chunk_statistics_->Encode();
595 return result;
596}
597
598template <typename Type>
599void TypedColumnWriter<Type>::ResetPageStatistics() {
600 if (chunk_statistics_ != nullptr) {
601 chunk_statistics_->Merge(*page_statistics_);
602 page_statistics_->Reset();
603 }
604}
605
606// ----------------------------------------------------------------------
607// Dynamic column writer constructor
608
609std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
610 std::unique_ptr<PageWriter> pager,
611 const WriterProperties* properties) {
612 const ColumnDescriptor* descr = metadata->descr();
613 const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
614 descr->physical_type() != Type::BOOLEAN;
615 Encoding::type encoding = properties->encoding(descr->path());
616 if (use_dictionary) {
617 encoding = properties->dictionary_index_encoding();
618 }
619 switch (descr->physical_type()) {
620 case Type::BOOLEAN:
621 return std::make_shared<BoolWriter>(metadata, std::move(pager), use_dictionary,
622 encoding, properties);
623 case Type::INT32:
624 return std::make_shared<Int32Writer>(metadata, std::move(pager), use_dictionary,
625 encoding, properties);
626 case Type::INT64:
627 return std::make_shared<Int64Writer>(metadata, std::move(pager), use_dictionary,
628 encoding, properties);
629 case Type::INT96:
630 return std::make_shared<Int96Writer>(metadata, std::move(pager), use_dictionary,
631 encoding, properties);
632 case Type::FLOAT:
633 return std::make_shared<FloatWriter>(metadata, std::move(pager), use_dictionary,
634 encoding, properties);
635 case Type::DOUBLE:
636 return std::make_shared<DoubleWriter>(metadata, std::move(pager), use_dictionary,
637 encoding, properties);
638 case Type::BYTE_ARRAY:
639 return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), use_dictionary,
640 encoding, properties);
641 case Type::FIXED_LEN_BYTE_ARRAY:
642 return std::make_shared<FixedLenByteArrayWriter>(
643 metadata, std::move(pager), use_dictionary, encoding, properties);
644 default:
645 ParquetException::NYI("type reader not implemented");
646 }
647 // Unreachable code, but supress compiler warning
648 return std::shared_ptr<ColumnWriter>(nullptr);
649}
650
651// ----------------------------------------------------------------------
652// Instantiate templated classes
653
654template <typename DType>
655inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
656 const int16_t* def_levels,
657 const int16_t* rep_levels,
658 const T* values) {
659 int64_t values_to_write = 0;
660 // If the field is required and non-repeated, there are no definition levels
661 if (descr_->max_definition_level() > 0) {
662 for (int64_t i = 0; i < num_values; ++i) {
663 if (def_levels[i] == descr_->max_definition_level()) {
664 ++values_to_write;
665 }
666 }
667
668 WriteDefinitionLevels(num_values, def_levels);
669 } else {
670 // Required field, write all values
671 values_to_write = num_values;
672 }
673
674 // Not present for non-repeated fields
675 if (descr_->max_repetition_level() > 0) {
676 // A row could include more than one value
677 // Count the occasions where we start a new row
678 for (int64_t i = 0; i < num_values; ++i) {
679 if (rep_levels[i] == 0) {
680 rows_written_++;
681 }
682 }
683
684 WriteRepetitionLevels(num_values, rep_levels);
685 } else {
686 // Each value is exactly one row
687 rows_written_ += static_cast<int>(num_values);
688 }
689
690 // PARQUET-780
691 if (values_to_write > 0) {
692 DCHECK(nullptr != values) << "Values ptr cannot be NULL";
693 }
694
695 WriteValues(values_to_write, values);
696
697 if (page_statistics_ != nullptr) {
698 page_statistics_->Update(values, values_to_write, num_values - values_to_write);
699 }
700
701 num_buffered_values_ += num_values;
702 num_buffered_encoded_values_ += values_to_write;
703
704 if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
705 AddDataPage();
706 }
707 if (has_dictionary_ && !fallback_) {
708 CheckDictionarySizeLimit();
709 }
710
711 return values_to_write;
712}
713
714template <typename DType>
715inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(
716 int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
717 const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values,
718 int64_t* num_spaced_written) {
719 int64_t values_to_write = 0;
720 int64_t spaced_values_to_write = 0;
721 // If the field is required and non-repeated, there are no definition levels
722 if (descr_->max_definition_level() > 0) {
723 // Minimal definition level for which spaced values are written
724 int16_t min_spaced_def_level = descr_->max_definition_level();
725 if (descr_->schema_node()->is_optional()) {
726 min_spaced_def_level--;
727 }
728 for (int64_t i = 0; i < num_levels; ++i) {
729 if (def_levels[i] == descr_->max_definition_level()) {
730 ++values_to_write;
731 }
732 if (def_levels[i] >= min_spaced_def_level) {
733 ++spaced_values_to_write;
734 }
735 }
736
737 WriteDefinitionLevels(num_levels, def_levels);
738 } else {
739 // Required field, write all values
740 values_to_write = num_levels;
741 spaced_values_to_write = num_levels;
742 }
743
744 // Not present for non-repeated fields
745 if (descr_->max_repetition_level() > 0) {
746 // A row could include more than one value
747 // Count the occasions where we start a new row
748 for (int64_t i = 0; i < num_levels; ++i) {
749 if (rep_levels[i] == 0) {
750 rows_written_++;
751 }
752 }
753
754 WriteRepetitionLevels(num_levels, rep_levels);
755 } else {
756 // Each value is exactly one row
757 rows_written_ += static_cast<int>(num_levels);
758 }
759
760 if (descr_->schema_node()->is_optional()) {
761 WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values);
762 } else {
763 WriteValues(values_to_write, values);
764 }
765 *num_spaced_written = spaced_values_to_write;
766
767 if (page_statistics_ != nullptr) {
768 page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
769 spaced_values_to_write - values_to_write);
770 }
771
772 num_buffered_values_ += num_levels;
773 num_buffered_encoded_values_ += values_to_write;
774
775 if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
776 AddDataPage();
777 }
778 if (has_dictionary_ && !fallback_) {
779 CheckDictionarySizeLimit();
780 }
781
782 return values_to_write;
783}
784
785template <typename DType>
786void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
787 const int16_t* rep_levels, const T* values) {
788 // We check for DataPage limits only after we have inserted the values. If a user
789 // writes a large number of values, the DataPage size can be much above the limit.
790 // The purpose of this chunking is to bound this. Even if a user writes large number
791 // of values, the chunking will ensure the AddDataPage() is called at a reasonable
792 // pagesize limit
793 int64_t write_batch_size = properties_->write_batch_size();
794 int num_batches = static_cast<int>(num_values / write_batch_size);
795 int64_t num_remaining = num_values % write_batch_size;
796 int64_t value_offset = 0;
797 for (int round = 0; round < num_batches; round++) {
798 int64_t offset = round * write_batch_size;
799 int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
800 &rep_levels[offset], &values[value_offset]);
801 value_offset += num_values;
802 }
803 // Write the remaining values
804 int64_t offset = num_batches * write_batch_size;
805 WriteMiniBatch(num_remaining, &def_levels[offset], &rep_levels[offset],
806 &values[value_offset]);
807}
808
809template <typename DType>
810void TypedColumnWriter<DType>::WriteBatchSpaced(
811 int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels,
812 const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
813 // We check for DataPage limits only after we have inserted the values. If a user
814 // writes a large number of values, the DataPage size can be much above the limit.
815 // The purpose of this chunking is to bound this. Even if a user writes large number
816 // of values, the chunking will ensure the AddDataPage() is called at a reasonable
817 // pagesize limit
818 int64_t write_batch_size = properties_->write_batch_size();
819 int num_batches = static_cast<int>(num_values / write_batch_size);
820 int64_t num_remaining = num_values % write_batch_size;
821 int64_t num_spaced_written = 0;
822 int64_t values_offset = 0;
823 for (int round = 0; round < num_batches; round++) {
824 int64_t offset = round * write_batch_size;
825 WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
826 valid_bits, valid_bits_offset + values_offset,
827 values + values_offset, &num_spaced_written);
828 values_offset += num_spaced_written;
829 }
830 // Write the remaining values
831 int64_t offset = num_batches * write_batch_size;
832 WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
833 valid_bits, valid_bits_offset + values_offset,
834 values + values_offset, &num_spaced_written);
835}
836
837template <typename DType>
838void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
839 current_encoder_->Put(values, static_cast<int>(num_values));
840}
841
842template <typename DType>
843void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
844 const uint8_t* valid_bits,
845 int64_t valid_bits_offset,
846 const T* values) {
847 current_encoder_->PutSpaced(values, static_cast<int>(num_values), valid_bits,
848 valid_bits_offset);
849}
850
851template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
852template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>;
853template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>;
854template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>;
855template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>;
856template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>;
857template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>;
858template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>;
859
860} // namespace parquet
861