1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/column_writer.h" |
19 | |
20 | #include <cstdint> |
21 | #include <memory> |
22 | #include <utility> |
23 | |
24 | #include "arrow/util/bit-util.h" |
25 | #include "arrow/util/compression.h" |
26 | #include "arrow/util/logging.h" |
27 | #include "arrow/util/rle-encoding.h" |
28 | |
29 | #include "parquet/encoding-internal.h" |
30 | #include "parquet/properties.h" |
31 | #include "parquet/statistics.h" |
32 | #include "parquet/thrift.h" |
33 | #include "parquet/util/memory.h" |
34 | |
35 | namespace parquet { |
36 | |
37 | using BitWriter = ::arrow::BitUtil::BitWriter; |
38 | using RleEncoder = ::arrow::util::RleEncoder; |
39 | |
40 | LevelEncoder::LevelEncoder() {} |
41 | LevelEncoder::~LevelEncoder() {} |
42 | |
43 | void LevelEncoder::Init(Encoding::type encoding, int16_t max_level, |
44 | int num_buffered_values, uint8_t* data, int data_size) { |
45 | bit_width_ = BitUtil::Log2(max_level + 1); |
46 | encoding_ = encoding; |
47 | switch (encoding) { |
48 | case Encoding::RLE: { |
49 | rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_)); |
50 | break; |
51 | } |
52 | case Encoding::BIT_PACKED: { |
53 | int num_bytes = |
54 | static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width_)); |
55 | bit_packed_encoder_.reset(new BitWriter(data, num_bytes)); |
56 | break; |
57 | } |
58 | default: |
59 | throw ParquetException("Unknown encoding type for levels." ); |
60 | } |
61 | } |
62 | |
63 | int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level, |
64 | int num_buffered_values) { |
65 | int bit_width = BitUtil::Log2(max_level + 1); |
66 | int num_bytes = 0; |
67 | switch (encoding) { |
68 | case Encoding::RLE: { |
69 | // TODO: Due to the way we currently check if the buffer is full enough, |
70 | // we need to have MinBufferSize as head room. |
71 | num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) + |
72 | RleEncoder::MinBufferSize(bit_width); |
73 | break; |
74 | } |
75 | case Encoding::BIT_PACKED: { |
76 | num_bytes = |
77 | static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width)); |
78 | break; |
79 | } |
80 | default: |
81 | throw ParquetException("Unknown encoding type for levels." ); |
82 | } |
83 | return num_bytes; |
84 | } |
85 | |
86 | int LevelEncoder::Encode(int batch_size, const int16_t* levels) { |
87 | int num_encoded = 0; |
88 | if (!rle_encoder_ && !bit_packed_encoder_) { |
89 | throw ParquetException("Level encoders are not initialized." ); |
90 | } |
91 | |
92 | if (encoding_ == Encoding::RLE) { |
93 | for (int i = 0; i < batch_size; ++i) { |
94 | if (!rle_encoder_->Put(*(levels + i))) { |
95 | break; |
96 | } |
97 | ++num_encoded; |
98 | } |
99 | rle_encoder_->Flush(); |
100 | rle_length_ = rle_encoder_->len(); |
101 | } else { |
102 | for (int i = 0; i < batch_size; ++i) { |
103 | if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { |
104 | break; |
105 | } |
106 | ++num_encoded; |
107 | } |
108 | bit_packed_encoder_->Flush(); |
109 | } |
110 | return num_encoded; |
111 | } |
112 | |
113 | // ---------------------------------------------------------------------- |
114 | // PageWriter implementation |
115 | |
116 | static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) { |
117 | format::Statistics statistics; |
118 | if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min()); |
119 | if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max()); |
120 | if (row_group_statistics.has_null_count) |
121 | statistics.__set_null_count(row_group_statistics.null_count); |
122 | if (row_group_statistics.has_distinct_count) |
123 | statistics.__set_distinct_count(row_group_statistics.distinct_count); |
124 | return statistics; |
125 | } |
126 | |
127 | // This subclass delimits pages appearing in a serialized stream, each preceded |
128 | // by a serialized Thrift format::PageHeader indicating the type of each page |
129 | // and the page metadata. |
130 | class SerializedPageWriter : public PageWriter { |
131 | public: |
132 | SerializedPageWriter(OutputStream* sink, Compression::type codec, |
133 | ColumnChunkMetaDataBuilder* metadata, |
134 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
135 | : sink_(sink), |
136 | metadata_(metadata), |
137 | pool_(pool), |
138 | num_values_(0), |
139 | dictionary_page_offset_(0), |
140 | data_page_offset_(0), |
141 | total_uncompressed_size_(0), |
142 | total_compressed_size_(0) { |
143 | compressor_ = GetCodecFromArrow(codec); |
144 | thrift_serializer_.reset(new ThriftSerializer); |
145 | } |
146 | |
147 | int64_t WriteDictionaryPage(const DictionaryPage& page) override { |
148 | int64_t uncompressed_size = page.size(); |
149 | std::shared_ptr<Buffer> compressed_data = nullptr; |
150 | if (has_compressor()) { |
151 | auto buffer = std::static_pointer_cast<ResizableBuffer>( |
152 | AllocateBuffer(pool_, uncompressed_size)); |
153 | Compress(*(page.buffer().get()), buffer.get()); |
154 | compressed_data = std::static_pointer_cast<Buffer>(buffer); |
155 | } else { |
156 | compressed_data = page.buffer(); |
157 | } |
158 | |
159 | format::DictionaryPageHeader ; |
160 | dict_page_header.__set_num_values(page.num_values()); |
161 | dict_page_header.__set_encoding(ToThrift(page.encoding())); |
162 | dict_page_header.__set_is_sorted(page.is_sorted()); |
163 | |
164 | format::PageHeader ; |
165 | page_header.__set_type(format::PageType::DICTIONARY_PAGE); |
166 | page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size)); |
167 | page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size())); |
168 | page_header.__set_dictionary_page_header(dict_page_header); |
169 | // TODO(PARQUET-594) crc checksum |
170 | |
171 | int64_t start_pos = sink_->Tell(); |
172 | if (dictionary_page_offset_ == 0) { |
173 | dictionary_page_offset_ = start_pos; |
174 | } |
175 | int64_t = thrift_serializer_->Serialize(&page_header, sink_); |
176 | sink_->Write(compressed_data->data(), compressed_data->size()); |
177 | |
178 | total_uncompressed_size_ += uncompressed_size + header_size; |
179 | total_compressed_size_ += compressed_data->size() + header_size; |
180 | |
181 | return sink_->Tell() - start_pos; |
182 | } |
183 | |
184 | void Close(bool has_dictionary, bool fallback) override { |
185 | // index_page_offset = -1 since they are not supported |
186 | metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_, |
187 | total_compressed_size_, total_uncompressed_size_, has_dictionary, |
188 | fallback); |
189 | |
190 | // Write metadata at end of column chunk |
191 | metadata_->WriteTo(sink_); |
192 | } |
193 | |
194 | /** |
195 | * Compress a buffer. |
196 | */ |
197 | void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override { |
198 | DCHECK(compressor_ != nullptr); |
199 | |
200 | // Compress the data |
201 | int64_t max_compressed_size = |
202 | compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data()); |
203 | |
204 | // Use Arrow::Buffer::shrink_to_fit = false |
205 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
206 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); |
207 | |
208 | int64_t compressed_size; |
209 | PARQUET_THROW_NOT_OK( |
210 | compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size, |
211 | dest_buffer->mutable_data(), &compressed_size)); |
212 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); |
213 | } |
214 | |
215 | int64_t WriteDataPage(const CompressedDataPage& page) override { |
216 | int64_t uncompressed_size = page.uncompressed_size(); |
217 | std::shared_ptr<Buffer> compressed_data = page.buffer(); |
218 | |
219 | format::DataPageHeader ; |
220 | data_page_header.__set_num_values(page.num_values()); |
221 | data_page_header.__set_encoding(ToThrift(page.encoding())); |
222 | data_page_header.__set_definition_level_encoding( |
223 | ToThrift(page.definition_level_encoding())); |
224 | data_page_header.__set_repetition_level_encoding( |
225 | ToThrift(page.repetition_level_encoding())); |
226 | data_page_header.__set_statistics(ToThrift(page.statistics())); |
227 | |
228 | format::PageHeader ; |
229 | page_header.__set_type(format::PageType::DATA_PAGE); |
230 | page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size)); |
231 | page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size())); |
232 | page_header.__set_data_page_header(data_page_header); |
233 | // TODO(PARQUET-594) crc checksum |
234 | |
235 | int64_t start_pos = sink_->Tell(); |
236 | if (data_page_offset_ == 0) { |
237 | data_page_offset_ = start_pos; |
238 | } |
239 | |
240 | int64_t = thrift_serializer_->Serialize(&page_header, sink_); |
241 | sink_->Write(compressed_data->data(), compressed_data->size()); |
242 | |
243 | total_uncompressed_size_ += uncompressed_size + header_size; |
244 | total_compressed_size_ += compressed_data->size() + header_size; |
245 | num_values_ += page.num_values(); |
246 | |
247 | return sink_->Tell() - start_pos; |
248 | } |
249 | |
250 | bool has_compressor() override { return (compressor_ != nullptr); } |
251 | |
252 | int64_t num_values() { return num_values_; } |
253 | |
254 | int64_t dictionary_page_offset() { return dictionary_page_offset_; } |
255 | |
256 | int64_t data_page_offset() { return data_page_offset_; } |
257 | |
258 | int64_t total_compressed_size() { return total_compressed_size_; } |
259 | |
260 | int64_t total_uncompressed_size() { return total_uncompressed_size_; } |
261 | |
262 | private: |
263 | OutputStream* sink_; |
264 | ColumnChunkMetaDataBuilder* metadata_; |
265 | ::arrow::MemoryPool* pool_; |
266 | int64_t num_values_; |
267 | int64_t dictionary_page_offset_; |
268 | int64_t data_page_offset_; |
269 | int64_t total_uncompressed_size_; |
270 | int64_t total_compressed_size_; |
271 | |
272 | std::unique_ptr<ThriftSerializer> thrift_serializer_; |
273 | |
274 | // Compression codec to use. |
275 | std::unique_ptr<::arrow::util::Codec> compressor_; |
276 | }; |
277 | |
278 | // This implementation of the PageWriter writes to the final sink on Close . |
279 | class BufferedPageWriter : public PageWriter { |
280 | public: |
281 | BufferedPageWriter(OutputStream* sink, Compression::type codec, |
282 | ColumnChunkMetaDataBuilder* metadata, |
283 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
284 | : final_sink_(sink), |
285 | metadata_(metadata), |
286 | in_memory_sink_(new InMemoryOutputStream(pool)), |
287 | pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, metadata, pool)) {} |
288 | |
289 | int64_t WriteDictionaryPage(const DictionaryPage& page) override { |
290 | return pager_->WriteDictionaryPage(page); |
291 | } |
292 | |
293 | void Close(bool has_dictionary, bool fallback) override { |
294 | // index_page_offset = -1 since they are not supported |
295 | metadata_->Finish( |
296 | pager_->num_values(), pager_->dictionary_page_offset() + final_sink_->Tell(), -1, |
297 | pager_->data_page_offset() + final_sink_->Tell(), pager_->total_compressed_size(), |
298 | pager_->total_uncompressed_size(), has_dictionary, fallback); |
299 | |
300 | // Write metadata at end of column chunk |
301 | metadata_->WriteTo(in_memory_sink_.get()); |
302 | |
303 | // flush everything to the serialized sink |
304 | auto buffer = in_memory_sink_->GetBuffer(); |
305 | final_sink_->Write(buffer->data(), buffer->size()); |
306 | } |
307 | |
308 | int64_t WriteDataPage(const CompressedDataPage& page) override { |
309 | return pager_->WriteDataPage(page); |
310 | } |
311 | |
312 | void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override { |
313 | pager_->Compress(src_buffer, dest_buffer); |
314 | } |
315 | |
316 | bool has_compressor() override { return pager_->has_compressor(); } |
317 | |
318 | private: |
319 | OutputStream* final_sink_; |
320 | ColumnChunkMetaDataBuilder* metadata_; |
321 | std::unique_ptr<InMemoryOutputStream> in_memory_sink_; |
322 | std::unique_ptr<SerializedPageWriter> ; |
323 | }; |
324 | |
325 | std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, Compression::type codec, |
326 | ColumnChunkMetaDataBuilder* metadata, |
327 | ::arrow::MemoryPool* pool, |
328 | bool buffered_row_group) { |
329 | if (buffered_row_group) { |
330 | return std::unique_ptr<PageWriter>( |
331 | new BufferedPageWriter(sink, codec, metadata, pool)); |
332 | } else { |
333 | return std::unique_ptr<PageWriter>( |
334 | new SerializedPageWriter(sink, codec, metadata, pool)); |
335 | } |
336 | } |
337 | |
338 | // ---------------------------------------------------------------------- |
339 | // ColumnWriter |
340 | |
341 | std::shared_ptr<WriterProperties> default_writer_properties() { |
342 | static std::shared_ptr<WriterProperties> default_writer_properties = |
343 | WriterProperties::Builder().build(); |
344 | return default_writer_properties; |
345 | } |
346 | |
347 | ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata, |
348 | std::unique_ptr<PageWriter> , bool has_dictionary, |
349 | Encoding::type encoding, const WriterProperties* properties) |
350 | : metadata_(metadata), |
351 | descr_(metadata->descr()), |
352 | pager_(std::move(pager)), |
353 | has_dictionary_(has_dictionary), |
354 | encoding_(encoding), |
355 | properties_(properties), |
356 | allocator_(properties->memory_pool()), |
357 | num_buffered_values_(0), |
358 | num_buffered_encoded_values_(0), |
359 | rows_written_(0), |
360 | total_bytes_written_(0), |
361 | total_compressed_bytes_(0), |
362 | closed_(false), |
363 | fallback_(false) { |
364 | definition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); |
365 | repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_)); |
366 | definition_levels_rle_ = |
367 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
368 | repetition_levels_rle_ = |
369 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
370 | uncompressed_data_ = |
371 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
372 | if (pager_->has_compressor()) { |
373 | compressed_data_ = |
374 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
375 | } |
376 | } |
377 | |
378 | void ColumnWriter::InitSinks() { |
379 | definition_levels_sink_->Clear(); |
380 | repetition_levels_sink_->Clear(); |
381 | } |
382 | |
383 | void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { |
384 | DCHECK(!closed_); |
385 | definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels), |
386 | sizeof(int16_t) * num_levels); |
387 | } |
388 | |
389 | void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) { |
390 | DCHECK(!closed_); |
391 | repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels), |
392 | sizeof(int16_t) * num_levels); |
393 | } |
394 | |
395 | // return the size of the encoded buffer |
396 | int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer, |
397 | ResizableBuffer* dest_buffer, int16_t max_level) { |
398 | // TODO: This only works with due to some RLE specifics |
399 | int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, |
400 | static_cast<int>(num_buffered_values_)) + |
401 | sizeof(int32_t); |
402 | |
403 | // Use Arrow::Buffer::shrink_to_fit = false |
404 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
405 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false)); |
406 | |
407 | level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_), |
408 | dest_buffer->mutable_data() + sizeof(int32_t), |
409 | static_cast<int>(dest_buffer->size() - sizeof(int32_t))); |
410 | int encoded = |
411 | level_encoder_.Encode(static_cast<int>(num_buffered_values_), |
412 | reinterpret_cast<const int16_t*>(src_buffer.data())); |
413 | DCHECK_EQ(encoded, num_buffered_values_); |
414 | reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len(); |
415 | int64_t encoded_size = level_encoder_.len() + sizeof(int32_t); |
416 | return encoded_size; |
417 | } |
418 | |
419 | void ColumnWriter::AddDataPage() { |
420 | int64_t definition_levels_rle_size = 0; |
421 | int64_t repetition_levels_rle_size = 0; |
422 | |
423 | std::shared_ptr<Buffer> values = GetValuesBuffer(); |
424 | |
425 | if (descr_->max_definition_level() > 0) { |
426 | definition_levels_rle_size = |
427 | RleEncodeLevels(definition_levels_sink_->GetBufferRef(), |
428 | definition_levels_rle_.get(), descr_->max_definition_level()); |
429 | } |
430 | |
431 | if (descr_->max_repetition_level() > 0) { |
432 | repetition_levels_rle_size = |
433 | RleEncodeLevels(repetition_levels_sink_->GetBufferRef(), |
434 | repetition_levels_rle_.get(), descr_->max_repetition_level()); |
435 | } |
436 | |
437 | int64_t uncompressed_size = |
438 | definition_levels_rle_size + repetition_levels_rle_size + values->size(); |
439 | |
440 | // Use Arrow::Buffer::shrink_to_fit = false |
441 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
442 | PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false)); |
443 | |
444 | // Concatenate data into a single buffer |
445 | uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data(); |
446 | memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size); |
447 | uncompressed_ptr += repetition_levels_rle_size; |
448 | memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size); |
449 | uncompressed_ptr += definition_levels_rle_size; |
450 | memcpy(uncompressed_ptr, values->data(), values->size()); |
451 | |
452 | EncodedStatistics page_stats = GetPageStatistics(); |
453 | ResetPageStatistics(); |
454 | |
455 | std::shared_ptr<Buffer> compressed_data; |
456 | if (pager_->has_compressor()) { |
457 | pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get()); |
458 | compressed_data = compressed_data_; |
459 | } else { |
460 | compressed_data = uncompressed_data_; |
461 | } |
462 | |
463 | // Write the page to OutputStream eagerly if there is no dictionary or |
464 | // if dictionary encoding has fallen back to PLAIN |
465 | if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding |
466 | std::shared_ptr<Buffer> compressed_data_copy; |
467 | PARQUET_THROW_NOT_OK(compressed_data->Copy(0, compressed_data->size(), allocator_, |
468 | &compressed_data_copy)); |
469 | CompressedDataPage page(compressed_data_copy, |
470 | static_cast<int32_t>(num_buffered_values_), encoding_, |
471 | Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats); |
472 | total_compressed_bytes_ += page.size() + sizeof(format::PageHeader); |
473 | data_pages_.push_back(std::move(page)); |
474 | } else { // Eagerly write pages |
475 | CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_), |
476 | encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, |
477 | page_stats); |
478 | WriteDataPage(page); |
479 | } |
480 | |
481 | // Re-initialize the sinks for next Page. |
482 | InitSinks(); |
483 | num_buffered_values_ = 0; |
484 | num_buffered_encoded_values_ = 0; |
485 | } |
486 | |
487 | void ColumnWriter::WriteDataPage(const CompressedDataPage& page) { |
488 | total_bytes_written_ += pager_->WriteDataPage(page); |
489 | } |
490 | |
491 | int64_t ColumnWriter::Close() { |
492 | if (!closed_) { |
493 | closed_ = true; |
494 | if (has_dictionary_ && !fallback_) { |
495 | WriteDictionaryPage(); |
496 | } |
497 | |
498 | FlushBufferedDataPages(); |
499 | |
500 | EncodedStatistics chunk_statistics = GetChunkStatistics(); |
501 | // Write stats only if the column has at least one row written |
502 | // From parquet-mr |
503 | // Don't write stats larger than the max size rather than truncating. The |
504 | // rationale is that some engines may use the minimum value in the page as |
505 | // the true minimum for aggregations and there is no way to mark that a |
506 | // value has been truncated and is a lower bound and not in the page. |
507 | if (rows_written_ > 0 && chunk_statistics.is_set() && |
508 | chunk_statistics.max_stat_length() <= |
509 | properties_->max_statistics_size(descr_->path())) { |
510 | metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(), |
511 | chunk_statistics); |
512 | } |
513 | pager_->Close(has_dictionary_, fallback_); |
514 | } |
515 | |
516 | return total_bytes_written_; |
517 | } |
518 | |
519 | void ColumnWriter::FlushBufferedDataPages() { |
520 | // Write all outstanding data to a new page |
521 | if (num_buffered_values_ > 0) { |
522 | AddDataPage(); |
523 | } |
524 | for (size_t i = 0; i < data_pages_.size(); i++) { |
525 | WriteDataPage(data_pages_[i]); |
526 | } |
527 | data_pages_.clear(); |
528 | total_compressed_bytes_ = 0; |
529 | } |
530 | |
531 | // ---------------------------------------------------------------------- |
532 | // TypedColumnWriter |
533 | |
534 | template <typename Type> |
535 | TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, |
536 | std::unique_ptr<PageWriter> , |
537 | const bool use_dictionary, |
538 | Encoding::type encoding, |
539 | const WriterProperties* properties) |
540 | : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding, properties) { |
541 | if (use_dictionary) { |
542 | current_encoder_.reset(new DictEncoder<Type>(descr_, properties->memory_pool())); |
543 | } else if (encoding == Encoding::PLAIN) { |
544 | current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool())); |
545 | } else { |
546 | ParquetException::NYI("Selected encoding is not supported" ); |
547 | } |
548 | |
549 | if (properties->statistics_enabled(descr_->path()) && |
550 | (SortOrder::UNKNOWN != descr_->sort_order())) { |
551 | page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_)); |
552 | chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_)); |
553 | } |
554 | } |
555 | |
556 | // Only one Dictionary Page is written. |
557 | // Fallback to PLAIN if dictionary page limit is reached. |
558 | template <typename Type> |
559 | void TypedColumnWriter<Type>::CheckDictionarySizeLimit() { |
560 | auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get()); |
561 | if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) { |
562 | WriteDictionaryPage(); |
563 | // Serialize the buffered Dictionary Indicies |
564 | FlushBufferedDataPages(); |
565 | fallback_ = true; |
566 | // Only PLAIN encoding is supported for fallback in V1 |
567 | current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool())); |
568 | encoding_ = Encoding::PLAIN; |
569 | } |
570 | } |
571 | |
572 | template <typename Type> |
573 | void TypedColumnWriter<Type>::WriteDictionaryPage() { |
574 | auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get()); |
575 | std::shared_ptr<ResizableBuffer> buffer = |
576 | AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); |
577 | dict_encoder->WriteDict(buffer->mutable_data()); |
578 | |
579 | DictionaryPage page(buffer, dict_encoder->num_entries(), |
580 | properties_->dictionary_page_encoding()); |
581 | total_bytes_written_ += pager_->WriteDictionaryPage(page); |
582 | } |
583 | |
584 | template <typename Type> |
585 | EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() { |
586 | EncodedStatistics result; |
587 | if (page_statistics_) result = page_statistics_->Encode(); |
588 | return result; |
589 | } |
590 | |
591 | template <typename Type> |
592 | EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() { |
593 | EncodedStatistics result; |
594 | if (chunk_statistics_) result = chunk_statistics_->Encode(); |
595 | return result; |
596 | } |
597 | |
598 | template <typename Type> |
599 | void TypedColumnWriter<Type>::ResetPageStatistics() { |
600 | if (chunk_statistics_ != nullptr) { |
601 | chunk_statistics_->Merge(*page_statistics_); |
602 | page_statistics_->Reset(); |
603 | } |
604 | } |
605 | |
606 | // ---------------------------------------------------------------------- |
607 | // Dynamic column writer constructor |
608 | |
609 | std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata, |
610 | std::unique_ptr<PageWriter> , |
611 | const WriterProperties* properties) { |
612 | const ColumnDescriptor* descr = metadata->descr(); |
613 | const bool use_dictionary = properties->dictionary_enabled(descr->path()) && |
614 | descr->physical_type() != Type::BOOLEAN; |
615 | Encoding::type encoding = properties->encoding(descr->path()); |
616 | if (use_dictionary) { |
617 | encoding = properties->dictionary_index_encoding(); |
618 | } |
619 | switch (descr->physical_type()) { |
620 | case Type::BOOLEAN: |
621 | return std::make_shared<BoolWriter>(metadata, std::move(pager), use_dictionary, |
622 | encoding, properties); |
623 | case Type::INT32: |
624 | return std::make_shared<Int32Writer>(metadata, std::move(pager), use_dictionary, |
625 | encoding, properties); |
626 | case Type::INT64: |
627 | return std::make_shared<Int64Writer>(metadata, std::move(pager), use_dictionary, |
628 | encoding, properties); |
629 | case Type::INT96: |
630 | return std::make_shared<Int96Writer>(metadata, std::move(pager), use_dictionary, |
631 | encoding, properties); |
632 | case Type::FLOAT: |
633 | return std::make_shared<FloatWriter>(metadata, std::move(pager), use_dictionary, |
634 | encoding, properties); |
635 | case Type::DOUBLE: |
636 | return std::make_shared<DoubleWriter>(metadata, std::move(pager), use_dictionary, |
637 | encoding, properties); |
638 | case Type::BYTE_ARRAY: |
639 | return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), use_dictionary, |
640 | encoding, properties); |
641 | case Type::FIXED_LEN_BYTE_ARRAY: |
642 | return std::make_shared<FixedLenByteArrayWriter>( |
643 | metadata, std::move(pager), use_dictionary, encoding, properties); |
644 | default: |
645 | ParquetException::NYI("type reader not implemented" ); |
646 | } |
647 | // Unreachable code, but supress compiler warning |
648 | return std::shared_ptr<ColumnWriter>(nullptr); |
649 | } |
650 | |
651 | // ---------------------------------------------------------------------- |
652 | // Instantiate templated classes |
653 | |
654 | template <typename DType> |
655 | inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values, |
656 | const int16_t* def_levels, |
657 | const int16_t* rep_levels, |
658 | const T* values) { |
659 | int64_t values_to_write = 0; |
660 | // If the field is required and non-repeated, there are no definition levels |
661 | if (descr_->max_definition_level() > 0) { |
662 | for (int64_t i = 0; i < num_values; ++i) { |
663 | if (def_levels[i] == descr_->max_definition_level()) { |
664 | ++values_to_write; |
665 | } |
666 | } |
667 | |
668 | WriteDefinitionLevels(num_values, def_levels); |
669 | } else { |
670 | // Required field, write all values |
671 | values_to_write = num_values; |
672 | } |
673 | |
674 | // Not present for non-repeated fields |
675 | if (descr_->max_repetition_level() > 0) { |
676 | // A row could include more than one value |
677 | // Count the occasions where we start a new row |
678 | for (int64_t i = 0; i < num_values; ++i) { |
679 | if (rep_levels[i] == 0) { |
680 | rows_written_++; |
681 | } |
682 | } |
683 | |
684 | WriteRepetitionLevels(num_values, rep_levels); |
685 | } else { |
686 | // Each value is exactly one row |
687 | rows_written_ += static_cast<int>(num_values); |
688 | } |
689 | |
690 | // PARQUET-780 |
691 | if (values_to_write > 0) { |
692 | DCHECK(nullptr != values) << "Values ptr cannot be NULL" ; |
693 | } |
694 | |
695 | WriteValues(values_to_write, values); |
696 | |
697 | if (page_statistics_ != nullptr) { |
698 | page_statistics_->Update(values, values_to_write, num_values - values_to_write); |
699 | } |
700 | |
701 | num_buffered_values_ += num_values; |
702 | num_buffered_encoded_values_ += values_to_write; |
703 | |
704 | if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { |
705 | AddDataPage(); |
706 | } |
707 | if (has_dictionary_ && !fallback_) { |
708 | CheckDictionarySizeLimit(); |
709 | } |
710 | |
711 | return values_to_write; |
712 | } |
713 | |
714 | template <typename DType> |
715 | inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced( |
716 | int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, |
717 | const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values, |
718 | int64_t* num_spaced_written) { |
719 | int64_t values_to_write = 0; |
720 | int64_t spaced_values_to_write = 0; |
721 | // If the field is required and non-repeated, there are no definition levels |
722 | if (descr_->max_definition_level() > 0) { |
723 | // Minimal definition level for which spaced values are written |
724 | int16_t min_spaced_def_level = descr_->max_definition_level(); |
725 | if (descr_->schema_node()->is_optional()) { |
726 | min_spaced_def_level--; |
727 | } |
728 | for (int64_t i = 0; i < num_levels; ++i) { |
729 | if (def_levels[i] == descr_->max_definition_level()) { |
730 | ++values_to_write; |
731 | } |
732 | if (def_levels[i] >= min_spaced_def_level) { |
733 | ++spaced_values_to_write; |
734 | } |
735 | } |
736 | |
737 | WriteDefinitionLevels(num_levels, def_levels); |
738 | } else { |
739 | // Required field, write all values |
740 | values_to_write = num_levels; |
741 | spaced_values_to_write = num_levels; |
742 | } |
743 | |
744 | // Not present for non-repeated fields |
745 | if (descr_->max_repetition_level() > 0) { |
746 | // A row could include more than one value |
747 | // Count the occasions where we start a new row |
748 | for (int64_t i = 0; i < num_levels; ++i) { |
749 | if (rep_levels[i] == 0) { |
750 | rows_written_++; |
751 | } |
752 | } |
753 | |
754 | WriteRepetitionLevels(num_levels, rep_levels); |
755 | } else { |
756 | // Each value is exactly one row |
757 | rows_written_ += static_cast<int>(num_levels); |
758 | } |
759 | |
760 | if (descr_->schema_node()->is_optional()) { |
761 | WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values); |
762 | } else { |
763 | WriteValues(values_to_write, values); |
764 | } |
765 | *num_spaced_written = spaced_values_to_write; |
766 | |
767 | if (page_statistics_ != nullptr) { |
768 | page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write, |
769 | spaced_values_to_write - values_to_write); |
770 | } |
771 | |
772 | num_buffered_values_ += num_levels; |
773 | num_buffered_encoded_values_ += values_to_write; |
774 | |
775 | if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { |
776 | AddDataPage(); |
777 | } |
778 | if (has_dictionary_ && !fallback_) { |
779 | CheckDictionarySizeLimit(); |
780 | } |
781 | |
782 | return values_to_write; |
783 | } |
784 | |
785 | template <typename DType> |
786 | void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels, |
787 | const int16_t* rep_levels, const T* values) { |
788 | // We check for DataPage limits only after we have inserted the values. If a user |
789 | // writes a large number of values, the DataPage size can be much above the limit. |
790 | // The purpose of this chunking is to bound this. Even if a user writes large number |
791 | // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
792 | // pagesize limit |
793 | int64_t write_batch_size = properties_->write_batch_size(); |
794 | int num_batches = static_cast<int>(num_values / write_batch_size); |
795 | int64_t num_remaining = num_values % write_batch_size; |
796 | int64_t value_offset = 0; |
797 | for (int round = 0; round < num_batches; round++) { |
798 | int64_t offset = round * write_batch_size; |
799 | int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset], |
800 | &rep_levels[offset], &values[value_offset]); |
801 | value_offset += num_values; |
802 | } |
803 | // Write the remaining values |
804 | int64_t offset = num_batches * write_batch_size; |
805 | WriteMiniBatch(num_remaining, &def_levels[offset], &rep_levels[offset], |
806 | &values[value_offset]); |
807 | } |
808 | |
809 | template <typename DType> |
810 | void TypedColumnWriter<DType>::WriteBatchSpaced( |
811 | int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, |
812 | const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) { |
813 | // We check for DataPage limits only after we have inserted the values. If a user |
814 | // writes a large number of values, the DataPage size can be much above the limit. |
815 | // The purpose of this chunking is to bound this. Even if a user writes large number |
816 | // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
817 | // pagesize limit |
818 | int64_t write_batch_size = properties_->write_batch_size(); |
819 | int num_batches = static_cast<int>(num_values / write_batch_size); |
820 | int64_t num_remaining = num_values % write_batch_size; |
821 | int64_t num_spaced_written = 0; |
822 | int64_t values_offset = 0; |
823 | for (int round = 0; round < num_batches; round++) { |
824 | int64_t offset = round * write_batch_size; |
825 | WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset], |
826 | valid_bits, valid_bits_offset + values_offset, |
827 | values + values_offset, &num_spaced_written); |
828 | values_offset += num_spaced_written; |
829 | } |
830 | // Write the remaining values |
831 | int64_t offset = num_batches * write_batch_size; |
832 | WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset], |
833 | valid_bits, valid_bits_offset + values_offset, |
834 | values + values_offset, &num_spaced_written); |
835 | } |
836 | |
837 | template <typename DType> |
838 | void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) { |
839 | current_encoder_->Put(values, static_cast<int>(num_values)); |
840 | } |
841 | |
842 | template <typename DType> |
843 | void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values, |
844 | const uint8_t* valid_bits, |
845 | int64_t valid_bits_offset, |
846 | const T* values) { |
847 | current_encoder_->PutSpaced(values, static_cast<int>(num_values), valid_bits, |
848 | valid_bits_offset); |
849 | } |
850 | |
851 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>; |
852 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>; |
853 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>; |
854 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>; |
855 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>; |
856 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>; |
857 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>; |
858 | template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>; |
859 | |
860 | } // namespace parquet |
861 | |