1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/column_writer.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstring> |
23 | #include <memory> |
24 | #include <utility> |
25 | #include <vector> |
26 | |
27 | #include "arrow/array.h" |
28 | #include "arrow/buffer_builder.h" |
29 | #include "arrow/compute/api.h" |
30 | #include "arrow/type.h" |
31 | #include "arrow/type_traits.h" |
32 | #include "arrow/util/bit_stream_utils.h" |
33 | #include "arrow/util/checked_cast.h" |
34 | #include "arrow/util/compression.h" |
35 | #include "arrow/util/logging.h" |
36 | #include "arrow/util/rle_encoding.h" |
37 | |
38 | #include "parquet/column_page.h" |
39 | #include "parquet/encoding.h" |
40 | #include "parquet/metadata.h" |
41 | #include "parquet/platform.h" |
42 | #include "parquet/properties.h" |
43 | #include "parquet/schema.h" |
44 | #include "parquet/statistics.h" |
45 | #include "parquet/thrift_internal.h" |
46 | #include "parquet/types.h" |
47 | |
48 | namespace parquet { |
49 | |
50 | using arrow::Status; |
51 | using arrow::compute::Datum; |
52 | using arrow::internal::checked_cast; |
53 | |
54 | using BitWriter = arrow::BitUtil::BitWriter; |
55 | using RleEncoder = arrow::util::RleEncoder; |
56 | |
57 | LevelEncoder::LevelEncoder() {} |
58 | LevelEncoder::~LevelEncoder() {} |
59 | |
60 | void LevelEncoder::Init(Encoding::type encoding, int16_t max_level, |
61 | int num_buffered_values, uint8_t* data, int data_size) { |
62 | bit_width_ = BitUtil::Log2(max_level + 1); |
63 | encoding_ = encoding; |
64 | switch (encoding) { |
65 | case Encoding::RLE: { |
66 | rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_)); |
67 | break; |
68 | } |
69 | case Encoding::BIT_PACKED: { |
70 | int num_bytes = |
71 | static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width_)); |
72 | bit_packed_encoder_.reset(new BitWriter(data, num_bytes)); |
73 | break; |
74 | } |
75 | default: |
76 | throw ParquetException("Unknown encoding type for levels." ); |
77 | } |
78 | } |
79 | |
80 | int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level, |
81 | int num_buffered_values) { |
82 | int bit_width = BitUtil::Log2(max_level + 1); |
83 | int num_bytes = 0; |
84 | switch (encoding) { |
85 | case Encoding::RLE: { |
86 | // TODO: Due to the way we currently check if the buffer is full enough, |
87 | // we need to have MinBufferSize as head room. |
88 | num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) + |
89 | RleEncoder::MinBufferSize(bit_width); |
90 | break; |
91 | } |
92 | case Encoding::BIT_PACKED: { |
93 | num_bytes = |
94 | static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width)); |
95 | break; |
96 | } |
97 | default: |
98 | throw ParquetException("Unknown encoding type for levels." ); |
99 | } |
100 | return num_bytes; |
101 | } |
102 | |
103 | int LevelEncoder::Encode(int batch_size, const int16_t* levels) { |
104 | int num_encoded = 0; |
105 | if (!rle_encoder_ && !bit_packed_encoder_) { |
106 | throw ParquetException("Level encoders are not initialized." ); |
107 | } |
108 | |
109 | if (encoding_ == Encoding::RLE) { |
110 | for (int i = 0; i < batch_size; ++i) { |
111 | if (!rle_encoder_->Put(*(levels + i))) { |
112 | break; |
113 | } |
114 | ++num_encoded; |
115 | } |
116 | rle_encoder_->Flush(); |
117 | rle_length_ = rle_encoder_->len(); |
118 | } else { |
119 | for (int i = 0; i < batch_size; ++i) { |
120 | if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { |
121 | break; |
122 | } |
123 | ++num_encoded; |
124 | } |
125 | bit_packed_encoder_->Flush(); |
126 | } |
127 | return num_encoded; |
128 | } |
129 | |
130 | // ---------------------------------------------------------------------- |
131 | // PageWriter implementation |
132 | |
133 | // This subclass delimits pages appearing in a serialized stream, each preceded |
134 | // by a serialized Thrift format::PageHeader indicating the type of each page |
135 | // and the page metadata. |
136 | class SerializedPageWriter : public PageWriter { |
137 | public: |
138 | SerializedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink, |
139 | Compression::type codec, int compression_level, |
140 | ColumnChunkMetaDataBuilder* metadata, |
141 | MemoryPool* pool = arrow::default_memory_pool()) |
142 | : sink_(sink), |
143 | metadata_(metadata), |
144 | pool_(pool), |
145 | num_values_(0), |
146 | dictionary_page_offset_(0), |
147 | data_page_offset_(0), |
148 | total_uncompressed_size_(0), |
149 | total_compressed_size_(0) { |
150 | compressor_ = GetCodec(codec, compression_level); |
151 | thrift_serializer_.reset(new ThriftSerializer); |
152 | } |
153 | |
154 | int64_t WriteDictionaryPage(const DictionaryPage& page) override { |
155 | int64_t uncompressed_size = page.size(); |
156 | std::shared_ptr<Buffer> compressed_data = nullptr; |
157 | if (has_compressor()) { |
158 | auto buffer = std::static_pointer_cast<ResizableBuffer>( |
159 | AllocateBuffer(pool_, uncompressed_size)); |
160 | Compress(*(page.buffer().get()), buffer.get()); |
161 | compressed_data = std::static_pointer_cast<Buffer>(buffer); |
162 | } else { |
163 | compressed_data = page.buffer(); |
164 | } |
165 | |
166 | format::DictionaryPageHeader ; |
167 | dict_page_header.__set_num_values(page.num_values()); |
168 | dict_page_header.__set_encoding(ToThrift(page.encoding())); |
169 | dict_page_header.__set_is_sorted(page.is_sorted()); |
170 | |
171 | format::PageHeader ; |
172 | page_header.__set_type(format::PageType::DICTIONARY_PAGE); |
173 | page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size)); |
174 | page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size())); |
175 | page_header.__set_dictionary_page_header(dict_page_header); |
176 | // TODO(PARQUET-594) crc checksum |
177 | |
178 | int64_t start_pos = -1; |
179 | PARQUET_THROW_NOT_OK(sink_->Tell(&start_pos)); |
180 | if (dictionary_page_offset_ == 0) { |
181 | dictionary_page_offset_ = start_pos; |
182 | } |
183 | int64_t = thrift_serializer_->Serialize(&page_header, sink_.get()); |
184 | PARQUET_THROW_NOT_OK(sink_->Write(compressed_data)); |
185 | |
186 | total_uncompressed_size_ += uncompressed_size + header_size; |
187 | total_compressed_size_ += compressed_data->size() + header_size; |
188 | |
189 | int64_t final_pos = -1; |
190 | PARQUET_THROW_NOT_OK(sink_->Tell(&final_pos)); |
191 | return final_pos - start_pos; |
192 | } |
193 | |
194 | void Close(bool has_dictionary, bool fallback) override { |
195 | // index_page_offset = -1 since they are not supported |
196 | metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_, |
197 | total_compressed_size_, total_uncompressed_size_, has_dictionary, |
198 | fallback); |
199 | |
200 | // Write metadata at end of column chunk |
201 | metadata_->WriteTo(sink_.get()); |
202 | } |
203 | |
204 | /** |
205 | * Compress a buffer. |
206 | */ |
207 | void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override { |
208 | DCHECK(compressor_ != nullptr); |
209 | |
210 | // Compress the data |
211 | int64_t max_compressed_size = |
212 | compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data()); |
213 | |
214 | // Use Arrow::Buffer::shrink_to_fit = false |
215 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
216 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); |
217 | |
218 | int64_t compressed_size; |
219 | PARQUET_THROW_NOT_OK( |
220 | compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size, |
221 | dest_buffer->mutable_data(), &compressed_size)); |
222 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); |
223 | } |
224 | |
225 | int64_t WriteDataPage(const CompressedDataPage& page) override { |
226 | int64_t uncompressed_size = page.uncompressed_size(); |
227 | std::shared_ptr<Buffer> compressed_data = page.buffer(); |
228 | |
229 | format::DataPageHeader ; |
230 | data_page_header.__set_num_values(page.num_values()); |
231 | data_page_header.__set_encoding(ToThrift(page.encoding())); |
232 | data_page_header.__set_definition_level_encoding( |
233 | ToThrift(page.definition_level_encoding())); |
234 | data_page_header.__set_repetition_level_encoding( |
235 | ToThrift(page.repetition_level_encoding())); |
236 | data_page_header.__set_statistics(ToThrift(page.statistics())); |
237 | |
238 | format::PageHeader ; |
239 | page_header.__set_type(format::PageType::DATA_PAGE); |
240 | page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size)); |
241 | page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size())); |
242 | page_header.__set_data_page_header(data_page_header); |
243 | // TODO(PARQUET-594) crc checksum |
244 | |
245 | int64_t start_pos = -1; |
246 | PARQUET_THROW_NOT_OK(sink_->Tell(&start_pos)); |
247 | if (data_page_offset_ == 0) { |
248 | data_page_offset_ = start_pos; |
249 | } |
250 | |
251 | int64_t = thrift_serializer_->Serialize(&page_header, sink_.get()); |
252 | PARQUET_THROW_NOT_OK(sink_->Write(compressed_data)); |
253 | |
254 | total_uncompressed_size_ += uncompressed_size + header_size; |
255 | total_compressed_size_ += compressed_data->size() + header_size; |
256 | num_values_ += page.num_values(); |
257 | |
258 | int64_t current_pos = -1; |
259 | PARQUET_THROW_NOT_OK(sink_->Tell(¤t_pos)); |
260 | return current_pos - start_pos; |
261 | } |
262 | |
263 | bool has_compressor() override { return (compressor_ != nullptr); } |
264 | |
265 | int64_t num_values() { return num_values_; } |
266 | |
267 | int64_t dictionary_page_offset() { return dictionary_page_offset_; } |
268 | |
269 | int64_t data_page_offset() { return data_page_offset_; } |
270 | |
271 | int64_t total_compressed_size() { return total_compressed_size_; } |
272 | |
273 | int64_t total_uncompressed_size() { return total_uncompressed_size_; } |
274 | |
275 | private: |
276 | std::shared_ptr<ArrowOutputStream> sink_; |
277 | ColumnChunkMetaDataBuilder* metadata_; |
278 | MemoryPool* pool_; |
279 | int64_t num_values_; |
280 | int64_t dictionary_page_offset_; |
281 | int64_t data_page_offset_; |
282 | int64_t total_uncompressed_size_; |
283 | int64_t total_compressed_size_; |
284 | |
285 | std::unique_ptr<ThriftSerializer> thrift_serializer_; |
286 | |
287 | // Compression codec to use. |
288 | std::unique_ptr<arrow::util::Codec> compressor_; |
289 | }; |
290 | |
291 | // This implementation of the PageWriter writes to the final sink on Close . |
292 | class BufferedPageWriter : public PageWriter { |
293 | public: |
294 | BufferedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink, |
295 | Compression::type codec, int compression_level, |
296 | ColumnChunkMetaDataBuilder* metadata, |
297 | MemoryPool* pool = arrow::default_memory_pool()) |
298 | : final_sink_(sink), metadata_(metadata) { |
299 | in_memory_sink_ = CreateOutputStream(pool); |
300 | pager_ = std::unique_ptr<SerializedPageWriter>(new SerializedPageWriter( |
301 | in_memory_sink_, codec, compression_level, metadata, pool)); |
302 | } |
303 | |
304 | int64_t WriteDictionaryPage(const DictionaryPage& page) override { |
305 | return pager_->WriteDictionaryPage(page); |
306 | } |
307 | |
308 | void Close(bool has_dictionary, bool fallback) override { |
309 | // index_page_offset = -1 since they are not supported |
310 | int64_t final_position = -1; |
311 | PARQUET_THROW_NOT_OK(final_sink_->Tell(&final_position)); |
312 | metadata_->Finish( |
313 | pager_->num_values(), pager_->dictionary_page_offset() + final_position, -1, |
314 | pager_->data_page_offset() + final_position, pager_->total_compressed_size(), |
315 | pager_->total_uncompressed_size(), has_dictionary, fallback); |
316 | |
317 | // Write metadata at end of column chunk |
318 | metadata_->WriteTo(in_memory_sink_.get()); |
319 | |
320 | // flush everything to the serialized sink |
321 | std::shared_ptr<Buffer> buffer; |
322 | PARQUET_THROW_NOT_OK(in_memory_sink_->Finish(&buffer)); |
323 | PARQUET_THROW_NOT_OK(final_sink_->Write(buffer)); |
324 | } |
325 | |
326 | int64_t WriteDataPage(const CompressedDataPage& page) override { |
327 | return pager_->WriteDataPage(page); |
328 | } |
329 | |
330 | void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override { |
331 | pager_->Compress(src_buffer, dest_buffer); |
332 | } |
333 | |
334 | bool has_compressor() override { return pager_->has_compressor(); } |
335 | |
336 | private: |
337 | std::shared_ptr<ArrowOutputStream> final_sink_; |
338 | ColumnChunkMetaDataBuilder* metadata_; |
339 | std::shared_ptr<arrow::io::BufferOutputStream> in_memory_sink_; |
340 | std::unique_ptr<SerializedPageWriter> ; |
341 | }; |
342 | |
343 | std::unique_ptr<PageWriter> PageWriter::Open( |
344 | const std::shared_ptr<ArrowOutputStream>& sink, Compression::type codec, |
345 | int compression_level, ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, |
346 | bool buffered_row_group) { |
347 | if (buffered_row_group) { |
348 | return std::unique_ptr<PageWriter>( |
349 | new BufferedPageWriter(sink, codec, compression_level, metadata, pool)); |
350 | } else { |
351 | return std::unique_ptr<PageWriter>( |
352 | new SerializedPageWriter(sink, codec, compression_level, metadata, pool)); |
353 | } |
354 | } |
355 | |
356 | // ---------------------------------------------------------------------- |
357 | // ColumnWriter |
358 | |
359 | std::shared_ptr<WriterProperties> default_writer_properties() { |
360 | static std::shared_ptr<WriterProperties> default_writer_properties = |
361 | WriterProperties::Builder().build(); |
362 | return default_writer_properties; |
363 | } |
364 | |
365 | class ColumnWriterImpl { |
366 | public: |
367 | ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, |
368 | std::unique_ptr<PageWriter> , const bool use_dictionary, |
369 | Encoding::type encoding, const WriterProperties* properties) |
370 | : metadata_(metadata), |
371 | descr_(metadata->descr()), |
372 | pager_(std::move(pager)), |
373 | has_dictionary_(use_dictionary), |
374 | encoding_(encoding), |
375 | properties_(properties), |
376 | allocator_(properties->memory_pool()), |
377 | num_buffered_values_(0), |
378 | num_buffered_encoded_values_(0), |
379 | rows_written_(0), |
380 | total_bytes_written_(0), |
381 | total_compressed_bytes_(0), |
382 | closed_(false), |
383 | fallback_(false), |
384 | definition_levels_sink_(allocator_), |
385 | repetition_levels_sink_(allocator_) { |
386 | definition_levels_rle_ = |
387 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
388 | repetition_levels_rle_ = |
389 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
390 | uncompressed_data_ = |
391 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
392 | if (pager_->has_compressor()) { |
393 | compressed_data_ = |
394 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
395 | } |
396 | } |
397 | |
398 | virtual ~ColumnWriterImpl() = default; |
399 | |
400 | int64_t Close(); |
401 | |
402 | protected: |
403 | virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0; |
404 | |
405 | // Serializes Dictionary Page if enabled |
406 | virtual void WriteDictionaryPage() = 0; |
407 | |
408 | // Plain-encoded statistics of the current page |
409 | virtual EncodedStatistics GetPageStatistics() = 0; |
410 | |
411 | // Plain-encoded statistics of the whole chunk |
412 | virtual EncodedStatistics GetChunkStatistics() = 0; |
413 | |
414 | // Merges page statistics into chunk statistics, then resets the values |
415 | virtual void ResetPageStatistics() = 0; |
416 | |
417 | // Adds Data Pages to an in memory buffer in dictionary encoding mode |
418 | // Serializes the Data Pages in other encoding modes |
419 | void AddDataPage(); |
420 | |
421 | // Serializes Data Pages |
422 | void WriteDataPage(const CompressedDataPage& page) { |
423 | total_bytes_written_ += pager_->WriteDataPage(page); |
424 | } |
425 | |
426 | // Write multiple definition levels |
427 | void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { |
428 | DCHECK(!closed_); |
429 | PARQUET_THROW_NOT_OK( |
430 | definition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels)); |
431 | } |
432 | |
433 | // Write multiple repetition levels |
434 | void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) { |
435 | DCHECK(!closed_); |
436 | PARQUET_THROW_NOT_OK( |
437 | repetition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels)); |
438 | } |
439 | |
440 | // RLE encode the src_buffer into dest_buffer and return the encoded size |
441 | int64_t RleEncodeLevels(const void* src_buffer, ResizableBuffer* dest_buffer, |
442 | int16_t max_level); |
443 | |
444 | // Serialize the buffered Data Pages |
445 | void FlushBufferedDataPages(); |
446 | |
447 | ColumnChunkMetaDataBuilder* metadata_; |
448 | const ColumnDescriptor* descr_; |
449 | |
450 | std::unique_ptr<PageWriter> pager_; |
451 | |
452 | bool has_dictionary_; |
453 | Encoding::type encoding_; |
454 | const WriterProperties* properties_; |
455 | |
456 | LevelEncoder level_encoder_; |
457 | |
458 | MemoryPool* allocator_; |
459 | |
460 | // The total number of values stored in the data page. This is the maximum of |
461 | // the number of encoded definition levels or encoded values. For |
462 | // non-repeated, required columns, this is equal to the number of encoded |
463 | // values. For repeated or optional values, there may be fewer data values |
464 | // than levels, and this tells you how many encoded levels there are in that |
465 | // case. |
466 | int64_t num_buffered_values_; |
467 | |
468 | // The total number of stored values. For repeated or optional values, this |
469 | // number may be lower than num_buffered_values_. |
470 | int64_t num_buffered_encoded_values_; |
471 | |
472 | // Total number of rows written with this ColumnWriter |
473 | int rows_written_; |
474 | |
475 | // Records the total number of bytes written by the serializer |
476 | int64_t total_bytes_written_; |
477 | |
478 | // Records the current number of compressed bytes in a column |
479 | int64_t total_compressed_bytes_; |
480 | |
481 | // Flag to check if the Writer has been closed |
482 | bool closed_; |
483 | |
484 | // Flag to infer if dictionary encoding has fallen back to PLAIN |
485 | bool fallback_; |
486 | |
487 | arrow::BufferBuilder definition_levels_sink_; |
488 | arrow::BufferBuilder repetition_levels_sink_; |
489 | |
490 | std::shared_ptr<ResizableBuffer> definition_levels_rle_; |
491 | std::shared_ptr<ResizableBuffer> repetition_levels_rle_; |
492 | |
493 | std::shared_ptr<ResizableBuffer> uncompressed_data_; |
494 | std::shared_ptr<ResizableBuffer> compressed_data_; |
495 | |
496 | std::vector<CompressedDataPage> data_pages_; |
497 | |
498 | private: |
499 | void InitSinks() { |
500 | definition_levels_sink_.Rewind(0); |
501 | repetition_levels_sink_.Rewind(0); |
502 | } |
503 | }; |
504 | |
505 | // return the size of the encoded buffer |
506 | int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer, |
507 | ResizableBuffer* dest_buffer, |
508 | int16_t max_level) { |
509 | // TODO: This only works with due to some RLE specifics |
510 | int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, |
511 | static_cast<int>(num_buffered_values_)) + |
512 | sizeof(int32_t); |
513 | |
514 | // Use Arrow::Buffer::shrink_to_fit = false |
515 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
516 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false)); |
517 | |
518 | level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_), |
519 | dest_buffer->mutable_data() + sizeof(int32_t), |
520 | static_cast<int>(dest_buffer->size() - sizeof(int32_t))); |
521 | int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_), |
522 | reinterpret_cast<const int16_t*>(src_buffer)); |
523 | DCHECK_EQ(encoded, num_buffered_values_); |
524 | reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len(); |
525 | int64_t encoded_size = level_encoder_.len() + sizeof(int32_t); |
526 | return encoded_size; |
527 | } |
528 | |
529 | void ColumnWriterImpl::AddDataPage() { |
530 | int64_t definition_levels_rle_size = 0; |
531 | int64_t repetition_levels_rle_size = 0; |
532 | |
533 | std::shared_ptr<Buffer> values = GetValuesBuffer(); |
534 | |
535 | if (descr_->max_definition_level() > 0) { |
536 | definition_levels_rle_size = |
537 | RleEncodeLevels(definition_levels_sink_.data(), definition_levels_rle_.get(), |
538 | descr_->max_definition_level()); |
539 | } |
540 | |
541 | if (descr_->max_repetition_level() > 0) { |
542 | repetition_levels_rle_size = |
543 | RleEncodeLevels(repetition_levels_sink_.data(), repetition_levels_rle_.get(), |
544 | descr_->max_repetition_level()); |
545 | } |
546 | |
547 | int64_t uncompressed_size = |
548 | definition_levels_rle_size + repetition_levels_rle_size + values->size(); |
549 | |
550 | // Use Arrow::Buffer::shrink_to_fit = false |
551 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
552 | PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false)); |
553 | |
554 | // Concatenate data into a single buffer |
555 | uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data(); |
556 | memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size); |
557 | uncompressed_ptr += repetition_levels_rle_size; |
558 | memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size); |
559 | uncompressed_ptr += definition_levels_rle_size; |
560 | memcpy(uncompressed_ptr, values->data(), values->size()); |
561 | |
562 | EncodedStatistics page_stats = GetPageStatistics(); |
563 | page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); |
564 | page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); |
565 | ResetPageStatistics(); |
566 | |
567 | std::shared_ptr<Buffer> compressed_data; |
568 | if (pager_->has_compressor()) { |
569 | pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get()); |
570 | compressed_data = compressed_data_; |
571 | } else { |
572 | compressed_data = uncompressed_data_; |
573 | } |
574 | |
575 | // Write the page to OutputStream eagerly if there is no dictionary or |
576 | // if dictionary encoding has fallen back to PLAIN |
577 | if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding |
578 | std::shared_ptr<Buffer> compressed_data_copy; |
579 | PARQUET_THROW_NOT_OK(compressed_data->Copy(0, compressed_data->size(), allocator_, |
580 | &compressed_data_copy)); |
581 | CompressedDataPage page(compressed_data_copy, |
582 | static_cast<int32_t>(num_buffered_values_), encoding_, |
583 | Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats); |
584 | total_compressed_bytes_ += page.size() + sizeof(format::PageHeader); |
585 | data_pages_.push_back(std::move(page)); |
586 | } else { // Eagerly write pages |
587 | CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_), |
588 | encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, |
589 | page_stats); |
590 | WriteDataPage(page); |
591 | } |
592 | |
593 | // Re-initialize the sinks for next Page. |
594 | InitSinks(); |
595 | num_buffered_values_ = 0; |
596 | num_buffered_encoded_values_ = 0; |
597 | } |
598 | |
599 | int64_t ColumnWriterImpl::Close() { |
600 | if (!closed_) { |
601 | closed_ = true; |
602 | if (has_dictionary_ && !fallback_) { |
603 | WriteDictionaryPage(); |
604 | } |
605 | |
606 | FlushBufferedDataPages(); |
607 | |
608 | EncodedStatistics chunk_statistics = GetChunkStatistics(); |
609 | chunk_statistics.ApplyStatSizeLimits( |
610 | properties_->max_statistics_size(descr_->path())); |
611 | chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); |
612 | |
613 | // Write stats only if the column has at least one row written |
614 | if (rows_written_ > 0 && chunk_statistics.is_set()) { |
615 | metadata_->SetStatistics(chunk_statistics); |
616 | } |
617 | pager_->Close(has_dictionary_, fallback_); |
618 | } |
619 | |
620 | return total_bytes_written_; |
621 | } |
622 | |
623 | void ColumnWriterImpl::FlushBufferedDataPages() { |
624 | // Write all outstanding data to a new page |
625 | if (num_buffered_values_ > 0) { |
626 | AddDataPage(); |
627 | } |
628 | for (size_t i = 0; i < data_pages_.size(); i++) { |
629 | WriteDataPage(data_pages_[i]); |
630 | } |
631 | data_pages_.clear(); |
632 | total_compressed_bytes_ = 0; |
633 | } |
634 | |
635 | // ---------------------------------------------------------------------- |
636 | // TypedColumnWriter |
637 | |
638 | template <typename Action> |
639 | inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) { |
640 | int64_t num_batches = static_cast<int>(total / batch_size); |
641 | for (int round = 0; round < num_batches; round++) { |
642 | action(round * batch_size, batch_size); |
643 | } |
644 | // Write the remaining values |
645 | if (total % batch_size > 0) { |
646 | action(num_batches * batch_size, total % batch_size); |
647 | } |
648 | } |
649 | |
650 | bool DictionaryDirectWriteSupported(const arrow::Array& array) { |
651 | DCHECK_EQ(array.type_id(), arrow::Type::DICTIONARY); |
652 | const arrow::DictionaryType& dict_type = |
653 | static_cast<const arrow::DictionaryType&>(*array.type()); |
654 | auto id = dict_type.value_type()->id(); |
655 | return id == arrow::Type::BINARY || id == arrow::Type::STRING; |
656 | } |
657 | |
658 | Status ConvertDictionaryToDense(const arrow::Array& array, MemoryPool* pool, |
659 | std::shared_ptr<arrow::Array>* out) { |
660 | const arrow::DictionaryType& dict_type = |
661 | static_cast<const arrow::DictionaryType&>(*array.type()); |
662 | |
663 | // TODO(ARROW-1648): Remove this special handling once we require an Arrow |
664 | // version that has this fixed. |
665 | if (dict_type.value_type()->id() == arrow::Type::NA) { |
666 | *out = std::make_shared<arrow::NullArray>(array.length()); |
667 | return Status::OK(); |
668 | } |
669 | |
670 | arrow::compute::FunctionContext ctx(pool); |
671 | Datum cast_output; |
672 | RETURN_NOT_OK(arrow::compute::Cast(&ctx, Datum(array.data()), dict_type.value_type(), |
673 | arrow::compute::CastOptions(), &cast_output)); |
674 | *out = cast_output.make_array(); |
675 | return Status::OK(); |
676 | } |
677 | |
678 | static inline bool IsDictionaryEncoding(Encoding::type encoding) { |
679 | return encoding == Encoding::PLAIN_DICTIONARY; |
680 | } |
681 | |
682 | template <typename DType> |
683 | class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> { |
684 | public: |
685 | using T = typename DType::c_type; |
686 | |
687 | TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, |
688 | std::unique_ptr<PageWriter> , const bool use_dictionary, |
689 | Encoding::type encoding, const WriterProperties* properties) |
690 | : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding, |
691 | properties) { |
692 | current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_, |
693 | properties->memory_pool()); |
694 | |
695 | if (properties->statistics_enabled(descr_->path()) && |
696 | (SortOrder::UNKNOWN != descr_->sort_order())) { |
697 | page_statistics_ = MakeStatistics<DType>(descr_, allocator_); |
698 | chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_); |
699 | } |
700 | } |
701 | |
702 | int64_t Close() override { return ColumnWriterImpl::Close(); } |
703 | |
704 | void WriteBatch(int64_t num_values, const int16_t* def_levels, |
705 | const int16_t* rep_levels, const T* values) override { |
706 | // We check for DataPage limits only after we have inserted the values. If a user |
707 | // writes a large number of values, the DataPage size can be much above the limit. |
708 | // The purpose of this chunking is to bound this. Even if a user writes large number |
709 | // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
710 | // pagesize limit |
711 | int64_t value_offset = 0; |
712 | auto WriteChunk = [&](int64_t offset, int64_t batch_size) { |
713 | int64_t values_to_write = |
714 | WriteLevels(batch_size, def_levels + offset, rep_levels + offset); |
715 | // PARQUET-780 |
716 | if (values_to_write > 0) { |
717 | DCHECK_NE(nullptr, values); |
718 | } |
719 | WriteValues(values + value_offset, values_to_write, batch_size - values_to_write); |
720 | CommitWriteAndCheckPageLimit(batch_size, values_to_write); |
721 | value_offset += values_to_write; |
722 | |
723 | // Dictionary size checked separately from data page size since we |
724 | // circumvent this check when writing arrow::DictionaryArray directly |
725 | CheckDictionarySizeLimit(); |
726 | }; |
727 | DoInBatches(num_values, properties_->write_batch_size(), WriteChunk); |
728 | } |
729 | |
730 | void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, |
731 | const int16_t* rep_levels, const uint8_t* valid_bits, |
732 | int64_t valid_bits_offset, const T* values) override { |
733 | // Like WriteBatch, but for spaced values |
734 | int64_t value_offset = 0; |
735 | auto WriteChunk = [&](int64_t offset, int64_t batch_size) { |
736 | int64_t batch_num_values = 0; |
737 | int64_t batch_num_spaced_values = 0; |
738 | WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset, |
739 | &batch_num_values, &batch_num_spaced_values); |
740 | WriteValuesSpaced(values + value_offset, batch_num_values, batch_num_spaced_values, |
741 | valid_bits, valid_bits_offset + value_offset); |
742 | CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values); |
743 | value_offset += batch_num_spaced_values; |
744 | |
745 | // Dictionary size checked separately from data page size since we |
746 | // circumvent this check when writing arrow::DictionaryArray directly |
747 | CheckDictionarySizeLimit(); |
748 | }; |
749 | DoInBatches(num_values, properties_->write_batch_size(), WriteChunk); |
750 | } |
751 | |
752 | Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, |
753 | int64_t num_levels, const arrow::Array& array, |
754 | ArrowWriteContext* ctx) override { |
755 | if (array.type()->id() == arrow::Type::DICTIONARY) { |
756 | return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx); |
757 | } else { |
758 | return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx); |
759 | } |
760 | } |
761 | |
762 | int64_t EstimatedBufferedValueBytes() const override { |
763 | return current_encoder_->EstimatedDataEncodedSize(); |
764 | } |
765 | |
766 | protected: |
767 | std::shared_ptr<Buffer> GetValuesBuffer() override { |
768 | return current_encoder_->FlushValues(); |
769 | } |
770 | |
771 | // Internal function to handle direct writing of arrow::DictionaryArray, |
772 | // since the standard logic concerning dictionary size limits and fallback to |
773 | // plain encoding is circumvented |
774 | Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels, |
775 | int64_t num_levels, const arrow::Array& array, |
776 | ArrowWriteContext* context); |
777 | |
778 | Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels, |
779 | int64_t num_levels, const arrow::Array& array, |
780 | ArrowWriteContext* context); |
781 | |
782 | void WriteDictionaryPage() override { |
783 | // We have to dynamic cast here because of TypedEncoder<Type> as |
784 | // some compilers don't want to cast through virtual inheritance |
785 | auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); |
786 | DCHECK(dict_encoder); |
787 | std::shared_ptr<ResizableBuffer> buffer = |
788 | AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); |
789 | dict_encoder->WriteDict(buffer->mutable_data()); |
790 | |
791 | DictionaryPage page(buffer, dict_encoder->num_entries(), |
792 | properties_->dictionary_page_encoding()); |
793 | total_bytes_written_ += pager_->WriteDictionaryPage(page); |
794 | } |
795 | |
796 | EncodedStatistics GetPageStatistics() override { |
797 | EncodedStatistics result; |
798 | if (page_statistics_) result = page_statistics_->Encode(); |
799 | return result; |
800 | } |
801 | |
802 | EncodedStatistics GetChunkStatistics() override { |
803 | EncodedStatistics result; |
804 | if (chunk_statistics_) result = chunk_statistics_->Encode(); |
805 | return result; |
806 | } |
807 | |
808 | void ResetPageStatistics() override { |
809 | if (chunk_statistics_ != nullptr) { |
810 | chunk_statistics_->Merge(*page_statistics_); |
811 | page_statistics_->Reset(); |
812 | } |
813 | } |
814 | |
815 | Type::type type() const override { return descr_->physical_type(); } |
816 | |
817 | const ColumnDescriptor* descr() const override { return descr_; } |
818 | |
819 | int64_t rows_written() const override { return rows_written_; } |
820 | |
821 | int64_t total_compressed_bytes() const override { return total_compressed_bytes_; } |
822 | |
823 | int64_t total_bytes_written() const override { return total_bytes_written_; } |
824 | |
825 | const WriterProperties* properties() override { return properties_; } |
826 | |
827 | private: |
828 | using ValueEncoderType = typename EncodingTraits<DType>::Encoder; |
829 | using TypedStats = TypedStatistics<DType>; |
830 | std::unique_ptr<Encoder> current_encoder_; |
831 | std::shared_ptr<TypedStats> page_statistics_; |
832 | std::shared_ptr<TypedStats> chunk_statistics_; |
833 | |
834 | // If writing a sequence of arrow::DictionaryArray to the writer, we keep the |
835 | // dictionary passed to DictEncoder<T>::PutDictionary so we can check |
836 | // subsequent array chunks to see either if materialization is required (in |
837 | // which case we call back to the dense write path) |
838 | std::shared_ptr<arrow::Array> preserved_dictionary_; |
839 | |
840 | int64_t WriteLevels(int64_t num_values, const int16_t* def_levels, |
841 | const int16_t* rep_levels) { |
842 | int64_t values_to_write = 0; |
843 | // If the field is required and non-repeated, there are no definition levels |
844 | if (descr_->max_definition_level() > 0) { |
845 | for (int64_t i = 0; i < num_values; ++i) { |
846 | if (def_levels[i] == descr_->max_definition_level()) { |
847 | ++values_to_write; |
848 | } |
849 | } |
850 | |
851 | WriteDefinitionLevels(num_values, def_levels); |
852 | } else { |
853 | // Required field, write all values |
854 | values_to_write = num_values; |
855 | } |
856 | |
857 | // Not present for non-repeated fields |
858 | if (descr_->max_repetition_level() > 0) { |
859 | // A row could include more than one value |
860 | // Count the occasions where we start a new row |
861 | for (int64_t i = 0; i < num_values; ++i) { |
862 | if (rep_levels[i] == 0) { |
863 | rows_written_++; |
864 | } |
865 | } |
866 | |
867 | WriteRepetitionLevels(num_values, rep_levels); |
868 | } else { |
869 | // Each value is exactly one row |
870 | rows_written_ += static_cast<int>(num_values); |
871 | } |
872 | return values_to_write; |
873 | } |
874 | |
875 | void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels, |
876 | const int16_t* rep_levels, int64_t* out_values_to_write, |
877 | int64_t* out_spaced_values_to_write) { |
878 | int64_t values_to_write = 0; |
879 | int64_t spaced_values_to_write = 0; |
880 | // If the field is required and non-repeated, there are no definition levels |
881 | if (descr_->max_definition_level() > 0) { |
882 | // Minimal definition level for which spaced values are written |
883 | int16_t min_spaced_def_level = descr_->max_definition_level(); |
884 | if (descr_->schema_node()->is_optional()) { |
885 | min_spaced_def_level--; |
886 | } |
887 | for (int64_t i = 0; i < num_levels; ++i) { |
888 | if (def_levels[i] == descr_->max_definition_level()) { |
889 | ++values_to_write; |
890 | } |
891 | if (def_levels[i] >= min_spaced_def_level) { |
892 | ++spaced_values_to_write; |
893 | } |
894 | } |
895 | |
896 | WriteDefinitionLevels(num_levels, def_levels); |
897 | } else { |
898 | // Required field, write all values |
899 | values_to_write = num_levels; |
900 | spaced_values_to_write = num_levels; |
901 | } |
902 | |
903 | // Not present for non-repeated fields |
904 | if (descr_->max_repetition_level() > 0) { |
905 | // A row could include more than one value |
906 | // Count the occasions where we start a new row |
907 | for (int64_t i = 0; i < num_levels; ++i) { |
908 | if (rep_levels[i] == 0) { |
909 | rows_written_++; |
910 | } |
911 | } |
912 | |
913 | WriteRepetitionLevels(num_levels, rep_levels); |
914 | } else { |
915 | // Each value is exactly one row |
916 | rows_written_ += static_cast<int>(num_levels); |
917 | } |
918 | |
919 | *out_values_to_write = values_to_write; |
920 | *out_spaced_values_to_write = spaced_values_to_write; |
921 | } |
922 | |
923 | void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) { |
924 | num_buffered_values_ += num_levels; |
925 | num_buffered_encoded_values_ += num_values; |
926 | |
927 | if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { |
928 | AddDataPage(); |
929 | } |
930 | } |
931 | |
932 | void FallbackToPlainEncoding() { |
933 | if (IsDictionaryEncoding(current_encoder_->encoding())) { |
934 | WriteDictionaryPage(); |
935 | // Serialize the buffered Dictionary Indicies |
936 | FlushBufferedDataPages(); |
937 | fallback_ = true; |
938 | // Only PLAIN encoding is supported for fallback in V1 |
939 | current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_, |
940 | properties_->memory_pool()); |
941 | encoding_ = Encoding::PLAIN; |
942 | } |
943 | } |
944 | |
945 | // Checks if the Dictionary Page size limit is reached |
946 | // If the limit is reached, the Dictionary and Data Pages are serialized |
947 | // The encoding is switched to PLAIN |
948 | // |
949 | // Only one Dictionary Page is written. |
950 | // Fallback to PLAIN if dictionary page limit is reached. |
951 | void CheckDictionarySizeLimit() { |
952 | if (!has_dictionary_ || fallback_) { |
953 | // Either not using dictionary encoding, or we have already fallen back |
954 | // to PLAIN encoding because the size threshold was reached |
955 | return; |
956 | } |
957 | |
958 | // We have to dynamic cast here because TypedEncoder<Type> as some compilers |
959 | // don't want to cast through virtual inheritance |
960 | auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); |
961 | if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) { |
962 | FallbackToPlainEncoding(); |
963 | } |
964 | } |
965 | |
966 | void WriteValues(const T* values, int64_t num_values, int64_t num_nulls) { |
967 | dynamic_cast<ValueEncoderType*>(current_encoder_.get()) |
968 | ->Put(values, static_cast<int>(num_values)); |
969 | if (page_statistics_ != nullptr) { |
970 | page_statistics_->Update(values, num_values, num_nulls); |
971 | } |
972 | } |
973 | |
974 | void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values, |
975 | const uint8_t* valid_bits, int64_t valid_bits_offset) { |
976 | if (descr_->schema_node()->is_optional()) { |
977 | dynamic_cast<ValueEncoderType*>(current_encoder_.get()) |
978 | ->PutSpaced(values, static_cast<int>(num_spaced_values), valid_bits, |
979 | valid_bits_offset); |
980 | } else { |
981 | dynamic_cast<ValueEncoderType*>(current_encoder_.get()) |
982 | ->Put(values, static_cast<int>(num_values)); |
983 | } |
984 | if (page_statistics_ != nullptr) { |
985 | const int64_t num_nulls = num_spaced_values - num_values; |
986 | page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_values, |
987 | num_nulls); |
988 | } |
989 | } |
990 | }; |
991 | |
992 | template <typename DType> |
993 | Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(const int16_t* def_levels, |
994 | const int16_t* rep_levels, |
995 | int64_t num_levels, |
996 | const arrow::Array& array, |
997 | ArrowWriteContext* ctx) { |
998 | // If this is the first time writing a DictionaryArray, then there's |
999 | // a few possible paths to take: |
1000 | // |
1001 | // - If dictionary encoding is not enabled, convert to densely |
1002 | // encoded and call WriteArrow |
1003 | // - Dictionary encoding enabled |
1004 | // - If this is the first time this is called, then we call |
1005 | // PutDictionary into the encoder and then PutIndices on each |
1006 | // chunk. We store the dictionary that was written in |
1007 | // preserved_dictionary_ so that subsequent calls to this method |
1008 | // can make sure the dictionary has not changed |
1009 | // - On subsequent calls, we have to check whether the dictionary |
1010 | // has changed. If it has, then we trigger the varying |
1011 | // dictionary path and materialize each chunk and then call |
1012 | // WriteArrow with that |
1013 | auto WriteDense = [&] { |
1014 | std::shared_ptr<arrow::Array> dense_array; |
1015 | RETURN_NOT_OK( |
1016 | ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array)); |
1017 | return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx); |
1018 | }; |
1019 | |
1020 | if (!IsDictionaryEncoding(current_encoder_->encoding()) || |
1021 | !DictionaryDirectWriteSupported(array)) { |
1022 | // No longer dictionary-encoding for whatever reason, maybe we never were |
1023 | // or we decided to stop. Note that WriteArrow can be invoked multiple |
1024 | // times with both dense and dictionary-encoded versions of the same data |
1025 | // without a problem. Any dense data will be hashed to indices until the |
1026 | // dictionary page limit is reached, at which everything (dictionary and |
1027 | // dense) will fall back to plain encoding |
1028 | return WriteDense(); |
1029 | } |
1030 | |
1031 | auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); |
1032 | const auto& data = checked_cast<const arrow::DictionaryArray&>(array); |
1033 | std::shared_ptr<arrow::Array> dictionary = data.dictionary(); |
1034 | std::shared_ptr<arrow::Array> indices = data.indices(); |
1035 | |
1036 | int64_t value_offset = 0; |
1037 | auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) { |
1038 | int64_t batch_num_values = 0; |
1039 | int64_t batch_num_spaced_values = 0; |
1040 | WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset, |
1041 | &batch_num_values, &batch_num_spaced_values); |
1042 | dict_encoder->PutIndices(*indices->Slice(value_offset, batch_num_spaced_values)); |
1043 | CommitWriteAndCheckPageLimit(batch_size, batch_num_values); |
1044 | value_offset += batch_num_spaced_values; |
1045 | }; |
1046 | |
1047 | // Handle seeing dictionary for the first time |
1048 | if (!preserved_dictionary_) { |
1049 | // It's a new dictionary. Call PutDictionary and keep track of it |
1050 | PARQUET_CATCH_NOT_OK(dict_encoder->PutDictionary(*dictionary)); |
1051 | |
1052 | // TODO(wesm): If some dictionary values are unobserved, then the |
1053 | // statistics will be inaccurate. Do we care enough to fix it? |
1054 | if (page_statistics_ != nullptr) { |
1055 | PARQUET_CATCH_NOT_OK(page_statistics_->Update(*dictionary)); |
1056 | } |
1057 | preserved_dictionary_ = dictionary; |
1058 | } else if (!dictionary->Equals(*preserved_dictionary_)) { |
1059 | // Dictionary has changed |
1060 | PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding()); |
1061 | return WriteDense(); |
1062 | } |
1063 | |
1064 | PARQUET_CATCH_NOT_OK( |
1065 | DoInBatches(num_levels, properties_->write_batch_size(), WriteIndicesChunk)); |
1066 | return Status::OK(); |
1067 | } |
1068 | |
1069 | // ---------------------------------------------------------------------- |
1070 | // Direct Arrow write path |
1071 | |
1072 | template <typename ParquetType, typename ArrowType, typename Enable = void> |
1073 | struct SerializeFunctor { |
1074 | using ArrowCType = typename ArrowType::c_type; |
1075 | using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType; |
1076 | using ParquetCType = typename ParquetType::c_type; |
1077 | Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) { |
1078 | const ArrowCType* input = array.raw_values(); |
1079 | if (array.null_count() > 0) { |
1080 | for (int i = 0; i < array.length(); i++) { |
1081 | out[i] = static_cast<ParquetCType>(input[i]); |
1082 | } |
1083 | } else { |
1084 | std::copy(input, input + array.length(), out); |
1085 | } |
1086 | return Status::OK(); |
1087 | } |
1088 | }; |
1089 | |
1090 | template <typename ParquetType, typename ArrowType> |
1091 | inline Status SerializeData(const arrow::Array& array, ArrowWriteContext* ctx, |
1092 | typename ParquetType::c_type* out) { |
1093 | using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType; |
1094 | SerializeFunctor<ParquetType, ArrowType> functor; |
1095 | return functor.Serialize(checked_cast<const ArrayType&>(array), ctx, out); |
1096 | } |
1097 | |
1098 | template <typename ParquetType, typename ArrowType> |
1099 | Status WriteArrowSerialize(const arrow::Array& array, int64_t num_levels, |
1100 | const int16_t* def_levels, const int16_t* rep_levels, |
1101 | ArrowWriteContext* ctx, |
1102 | TypedColumnWriter<ParquetType>* writer) { |
1103 | using ParquetCType = typename ParquetType::c_type; |
1104 | |
1105 | ParquetCType* buffer; |
1106 | PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer)); |
1107 | |
1108 | bool no_nulls = |
1109 | writer->descr()->schema_node()->is_required() || (array.null_count() == 0); |
1110 | |
1111 | Status s = SerializeData<ParquetType, ArrowType>(array, ctx, buffer); |
1112 | RETURN_NOT_OK(s); |
1113 | if (no_nulls) { |
1114 | PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); |
1115 | } else { |
1116 | PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, |
1117 | array.null_bitmap_data(), |
1118 | array.offset(), buffer)); |
1119 | } |
1120 | return Status::OK(); |
1121 | } |
1122 | |
1123 | template <typename ParquetType> |
1124 | Status WriteArrowZeroCopy(const arrow::Array& array, int64_t num_levels, |
1125 | const int16_t* def_levels, const int16_t* rep_levels, |
1126 | ArrowWriteContext* ctx, |
1127 | TypedColumnWriter<ParquetType>* writer) { |
1128 | using T = typename ParquetType::c_type; |
1129 | const auto& data = static_cast<const arrow::PrimitiveArray&>(array); |
1130 | const T* values = nullptr; |
1131 | // The values buffer may be null if the array is empty (ARROW-2744) |
1132 | if (data.values() != nullptr) { |
1133 | values = reinterpret_cast<const T*>(data.values()->data()) + data.offset(); |
1134 | } else { |
1135 | DCHECK_EQ(data.length(), 0); |
1136 | } |
1137 | if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) { |
1138 | PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values)); |
1139 | } else { |
1140 | PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, |
1141 | data.null_bitmap_data(), data.offset(), |
1142 | values)); |
1143 | } |
1144 | return Status::OK(); |
1145 | } |
1146 | |
1147 | #define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \ |
1148 | case arrow::Type::ArrowEnum: \ |
1149 | return WriteArrowSerialize<ParquetType, arrow::ArrowType>( \ |
1150 | array, num_levels, def_levels, rep_levels, ctx, this); |
1151 | |
1152 | #define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \ |
1153 | case arrow::Type::ArrowEnum: \ |
1154 | return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \ |
1155 | ctx, this); |
1156 | |
1157 | #define ARROW_UNSUPPORTED() \ |
1158 | std::stringstream ss; \ |
1159 | ss << "Arrow type " << array.type()->ToString() \ |
1160 | << " cannot be written to Parquet type " << descr_->ToString(); \ |
1161 | return Status::Invalid(ss.str()); |
1162 | |
1163 | // ---------------------------------------------------------------------- |
1164 | // Write Arrow to BooleanType |
1165 | |
1166 | template <> |
1167 | struct SerializeFunctor<BooleanType, arrow::BooleanType> { |
1168 | Status Serialize(const arrow::BooleanArray& data, ArrowWriteContext*, bool* out) { |
1169 | for (int i = 0; i < data.length(); i++) { |
1170 | *out++ = data.Value(i); |
1171 | } |
1172 | return Status::OK(); |
1173 | } |
1174 | }; |
1175 | |
1176 | template <> |
1177 | Status TypedColumnWriterImpl<BooleanType>::WriteArrowDense(const int16_t* def_levels, |
1178 | const int16_t* rep_levels, |
1179 | int64_t num_levels, |
1180 | const arrow::Array& array, |
1181 | ArrowWriteContext* ctx) { |
1182 | if (array.type_id() != arrow::Type::BOOL) { |
1183 | ARROW_UNSUPPORTED(); |
1184 | } |
1185 | return WriteArrowSerialize<BooleanType, arrow::BooleanType>( |
1186 | array, num_levels, def_levels, rep_levels, ctx, this); |
1187 | } |
1188 | |
1189 | // ---------------------------------------------------------------------- |
1190 | // Write Arrow types to INT32 |
1191 | |
1192 | template <> |
1193 | struct SerializeFunctor<Int32Type, arrow::Date64Type> { |
1194 | Status Serialize(const arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) { |
1195 | const int64_t* input = array.raw_values(); |
1196 | for (int i = 0; i < array.length(); i++) { |
1197 | *out++ = static_cast<int32_t>(*input++ / 86400000); |
1198 | } |
1199 | return Status::OK(); |
1200 | } |
1201 | }; |
1202 | |
1203 | template <> |
1204 | struct SerializeFunctor<Int32Type, arrow::Time32Type> { |
1205 | Status Serialize(const arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) { |
1206 | const int32_t* input = array.raw_values(); |
1207 | const auto& type = static_cast<const arrow::Time32Type&>(*array.type()); |
1208 | if (type.unit() == arrow::TimeUnit::SECOND) { |
1209 | for (int i = 0; i < array.length(); i++) { |
1210 | out[i] = input[i] * 1000; |
1211 | } |
1212 | } else { |
1213 | std::copy(input, input + array.length(), out); |
1214 | } |
1215 | return Status::OK(); |
1216 | } |
1217 | }; |
1218 | |
1219 | template <> |
1220 | Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(const int16_t* def_levels, |
1221 | const int16_t* rep_levels, |
1222 | int64_t num_levels, |
1223 | const arrow::Array& array, |
1224 | ArrowWriteContext* ctx) { |
1225 | switch (array.type()->id()) { |
1226 | case arrow::Type::NA: { |
1227 | PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr)); |
1228 | } break; |
1229 | WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type) |
1230 | WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type) |
1231 | WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type) |
1232 | WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type) |
1233 | WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type) |
1234 | WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type) |
1235 | WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type) |
1236 | WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type) |
1237 | WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type) |
1238 | default: |
1239 | ARROW_UNSUPPORTED() |
1240 | } |
1241 | return Status::OK(); |
1242 | } |
1243 | |
1244 | // ---------------------------------------------------------------------- |
1245 | // Write Arrow to Int64 and Int96 |
1246 | |
1247 | #define INT96_CONVERT_LOOP(ConversionFunction) \ |
1248 | for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]); |
1249 | |
1250 | template <> |
1251 | struct SerializeFunctor<Int96Type, arrow::TimestampType> { |
1252 | Status Serialize(const arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) { |
1253 | const int64_t* input = array.raw_values(); |
1254 | const auto& type = static_cast<const arrow::TimestampType&>(*array.type()); |
1255 | switch (type.unit()) { |
1256 | case arrow::TimeUnit::NANO: |
1257 | INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp); |
1258 | break; |
1259 | case arrow::TimeUnit::MICRO: |
1260 | INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp); |
1261 | break; |
1262 | case arrow::TimeUnit::MILLI: |
1263 | INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp); |
1264 | break; |
1265 | case arrow::TimeUnit::SECOND: |
1266 | INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp); |
1267 | break; |
1268 | } |
1269 | return Status::OK(); |
1270 | } |
1271 | }; |
1272 | |
1273 | #define COERCE_DIVIDE -1 |
1274 | #define COERCE_INVALID 0 |
1275 | #define COERCE_MULTIPLY +1 |
1276 | |
1277 | static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = { |
1278 | // from seconds ... |
1279 | {{COERCE_INVALID, 0}, // ... to seconds |
1280 | {COERCE_MULTIPLY, 1000}, // ... to millis |
1281 | {COERCE_MULTIPLY, 1000000}, // ... to micros |
1282 | {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos |
1283 | // from millis ... |
1284 | {{COERCE_INVALID, 0}, |
1285 | {COERCE_MULTIPLY, 1}, |
1286 | {COERCE_MULTIPLY, 1000}, |
1287 | {COERCE_MULTIPLY, 1000000}}, |
1288 | // from micros ... |
1289 | {{COERCE_INVALID, 0}, |
1290 | {COERCE_DIVIDE, 1000}, |
1291 | {COERCE_MULTIPLY, 1}, |
1292 | {COERCE_MULTIPLY, 1000}}, |
1293 | // from nanos ... |
1294 | {{COERCE_INVALID, 0}, |
1295 | {COERCE_DIVIDE, 1000000}, |
1296 | {COERCE_DIVIDE, 1000}, |
1297 | {COERCE_MULTIPLY, 1}}}; |
1298 | |
1299 | template <> |
1300 | struct SerializeFunctor<Int64Type, arrow::TimestampType> { |
1301 | Status Serialize(const arrow::TimestampArray& array, ArrowWriteContext* ctx, |
1302 | int64_t* out) { |
1303 | const auto& source_type = static_cast<const arrow::TimestampType&>(*array.type()); |
1304 | auto source_unit = source_type.unit(); |
1305 | const int64_t* values = array.raw_values(); |
1306 | |
1307 | arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit(); |
1308 | auto target_type = arrow::timestamp(target_unit); |
1309 | bool truncation_allowed = ctx->properties->truncated_timestamps_allowed(); |
1310 | |
1311 | auto DivideBy = [&](const int64_t factor) { |
1312 | for (int64_t i = 0; i < array.length(); i++) { |
1313 | if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) { |
1314 | return Status::Invalid("Casting from " , source_type.ToString(), " to " , |
1315 | target_type->ToString(), |
1316 | " would lose data: " , values[i]); |
1317 | } |
1318 | out[i] = values[i] / factor; |
1319 | } |
1320 | return Status::OK(); |
1321 | }; |
1322 | |
1323 | auto MultiplyBy = [&](const int64_t factor) { |
1324 | for (int64_t i = 0; i < array.length(); i++) { |
1325 | out[i] = values[i] * factor; |
1326 | } |
1327 | return Status::OK(); |
1328 | }; |
1329 | |
1330 | const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)] |
1331 | [static_cast<int>(target_unit)]; |
1332 | |
1333 | // .first -> coercion operation; .second -> scale factor |
1334 | DCHECK_NE(coercion.first, COERCE_INVALID); |
1335 | return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second) |
1336 | : MultiplyBy(coercion.second); |
1337 | } |
1338 | }; |
1339 | |
1340 | #undef COERCE_DIVIDE |
1341 | #undef COERCE_INVALID |
1342 | #undef COERCE_MULTIPLY |
1343 | |
1344 | Status WriteTimestamps(const arrow::Array& values, int64_t num_levels, |
1345 | const int16_t* def_levels, const int16_t* rep_levels, |
1346 | ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer) { |
1347 | const auto& source_type = static_cast<const arrow::TimestampType&>(*values.type()); |
1348 | |
1349 | auto WriteCoerce = [&](const ArrowWriterProperties* properties) { |
1350 | ArrowWriteContext temp_ctx = *ctx; |
1351 | temp_ctx.properties = properties; |
1352 | return WriteArrowSerialize<Int64Type, arrow::TimestampType>( |
1353 | values, num_levels, def_levels, rep_levels, &temp_ctx, writer); |
1354 | }; |
1355 | |
1356 | if (ctx->properties->coerce_timestamps_enabled()) { |
1357 | // User explicitly requested coercion to specific unit |
1358 | if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) { |
1359 | // No data conversion necessary |
1360 | return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, |
1361 | ctx, writer); |
1362 | } else { |
1363 | return WriteCoerce(ctx->properties); |
1364 | } |
1365 | } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 && |
1366 | source_type.unit() == arrow::TimeUnit::NANO) { |
1367 | // Absent superseding user instructions, when writing Parquet version 1.0 files, |
1368 | // timestamps in nanoseconds are coerced to microseconds |
1369 | std::shared_ptr<ArrowWriterProperties> properties = |
1370 | (ArrowWriterProperties::Builder()) |
1371 | .coerce_timestamps(arrow::TimeUnit::MICRO) |
1372 | ->disallow_truncated_timestamps() |
1373 | ->build(); |
1374 | return WriteCoerce(properties.get()); |
1375 | } else if (source_type.unit() == arrow::TimeUnit::SECOND) { |
1376 | // Absent superseding user instructions, timestamps in seconds are coerced to |
1377 | // milliseconds |
1378 | std::shared_ptr<ArrowWriterProperties> properties = |
1379 | (ArrowWriterProperties::Builder()) |
1380 | .coerce_timestamps(arrow::TimeUnit::MILLI) |
1381 | ->build(); |
1382 | return WriteCoerce(properties.get()); |
1383 | } else { |
1384 | // No data conversion necessary |
1385 | return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx, |
1386 | writer); |
1387 | } |
1388 | } |
1389 | |
1390 | template <> |
1391 | Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(const int16_t* def_levels, |
1392 | const int16_t* rep_levels, |
1393 | int64_t num_levels, |
1394 | const arrow::Array& array, |
1395 | ArrowWriteContext* ctx) { |
1396 | switch (array.type()->id()) { |
1397 | case arrow::Type::TIMESTAMP: |
1398 | return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this); |
1399 | WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type) |
1400 | WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type) |
1401 | WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) |
1402 | WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type) |
1403 | default: |
1404 | ARROW_UNSUPPORTED(); |
1405 | } |
1406 | } |
1407 | |
1408 | template <> |
1409 | Status TypedColumnWriterImpl<Int96Type>::WriteArrowDense(const int16_t* def_levels, |
1410 | const int16_t* rep_levels, |
1411 | int64_t num_levels, |
1412 | const arrow::Array& array, |
1413 | ArrowWriteContext* ctx) { |
1414 | if (array.type_id() != arrow::Type::TIMESTAMP) { |
1415 | ARROW_UNSUPPORTED(); |
1416 | } |
1417 | return WriteArrowSerialize<Int96Type, arrow::TimestampType>( |
1418 | array, num_levels, def_levels, rep_levels, ctx, this); |
1419 | } |
1420 | |
1421 | // ---------------------------------------------------------------------- |
1422 | // Floating point types |
1423 | |
1424 | template <> |
1425 | Status TypedColumnWriterImpl<FloatType>::WriteArrowDense(const int16_t* def_levels, |
1426 | const int16_t* rep_levels, |
1427 | int64_t num_levels, |
1428 | const arrow::Array& array, |
1429 | ArrowWriteContext* ctx) { |
1430 | if (array.type_id() != arrow::Type::FLOAT) { |
1431 | ARROW_UNSUPPORTED(); |
1432 | } |
1433 | return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx, |
1434 | this); |
1435 | } |
1436 | |
1437 | template <> |
1438 | Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(const int16_t* def_levels, |
1439 | const int16_t* rep_levels, |
1440 | int64_t num_levels, |
1441 | const arrow::Array& array, |
1442 | ArrowWriteContext* ctx) { |
1443 | if (array.type_id() != arrow::Type::DOUBLE) { |
1444 | ARROW_UNSUPPORTED(); |
1445 | } |
1446 | return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx, |
1447 | this); |
1448 | } |
1449 | |
1450 | // ---------------------------------------------------------------------- |
1451 | // Write Arrow to BYTE_ARRAY |
1452 | |
1453 | template <> |
1454 | Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(const int16_t* def_levels, |
1455 | const int16_t* rep_levels, |
1456 | int64_t num_levels, |
1457 | const arrow::Array& array, |
1458 | ArrowWriteContext* ctx) { |
1459 | if (array.type()->id() != arrow::Type::BINARY && |
1460 | array.type()->id() != arrow::Type::STRING) { |
1461 | ARROW_UNSUPPORTED(); |
1462 | } |
1463 | |
1464 | int64_t value_offset = 0; |
1465 | auto WriteChunk = [&](int64_t offset, int64_t batch_size) { |
1466 | int64_t batch_num_values = 0; |
1467 | int64_t batch_num_spaced_values = 0; |
1468 | WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset, |
1469 | &batch_num_values, &batch_num_spaced_values); |
1470 | std::shared_ptr<arrow::Array> data_slice = |
1471 | array.Slice(value_offset, batch_num_spaced_values); |
1472 | current_encoder_->Put(*data_slice); |
1473 | if (page_statistics_ != nullptr) { |
1474 | page_statistics_->Update(*data_slice); |
1475 | } |
1476 | CommitWriteAndCheckPageLimit(batch_size, batch_num_values); |
1477 | CheckDictionarySizeLimit(); |
1478 | value_offset += batch_num_spaced_values; |
1479 | }; |
1480 | |
1481 | PARQUET_CATCH_NOT_OK( |
1482 | DoInBatches(num_levels, properties_->write_batch_size(), WriteChunk)); |
1483 | return Status::OK(); |
1484 | } |
1485 | |
1486 | // ---------------------------------------------------------------------- |
1487 | // Write Arrow to FIXED_LEN_BYTE_ARRAY |
1488 | |
1489 | template <typename ParquetType, typename ArrowType> |
1490 | struct SerializeFunctor<ParquetType, ArrowType, |
1491 | arrow::enable_if_fixed_size_binary<ArrowType>> { |
1492 | Status Serialize(const arrow::FixedSizeBinaryArray& array, ArrowWriteContext*, |
1493 | FLBA* out) { |
1494 | if (array.null_count() == 0) { |
1495 | // no nulls, just dump the data |
1496 | // todo(advancedxy): use a writeBatch to avoid this step |
1497 | for (int64_t i = 0; i < array.length(); i++) { |
1498 | out[i] = FixedLenByteArray(array.GetValue(i)); |
1499 | } |
1500 | } else { |
1501 | for (int64_t i = 0; i < array.length(); i++) { |
1502 | if (array.IsValid(i)) { |
1503 | out[i] = FixedLenByteArray(array.GetValue(i)); |
1504 | } |
1505 | } |
1506 | } |
1507 | return Status::OK(); |
1508 | } |
1509 | }; |
1510 | |
1511 | template <> |
1512 | Status WriteArrowSerialize<FLBAType, arrow::Decimal128Type>( |
1513 | const arrow::Array& array, int64_t num_levels, const int16_t* def_levels, |
1514 | const int16_t* rep_levels, ArrowWriteContext* ctx, |
1515 | TypedColumnWriter<FLBAType>* writer) { |
1516 | const auto& data = static_cast<const arrow::Decimal128Array&>(array); |
1517 | const int64_t length = data.length(); |
1518 | |
1519 | FLBA* buffer; |
1520 | RETURN_NOT_OK(ctx->GetScratchData<FLBA>(num_levels, &buffer)); |
1521 | |
1522 | const auto& decimal_type = static_cast<const arrow::Decimal128Type&>(*data.type()); |
1523 | const int32_t offset = |
1524 | decimal_type.byte_width() - internal::DecimalSize(decimal_type.precision()); |
1525 | |
1526 | const bool does_not_have_nulls = |
1527 | writer->descr()->schema_node()->is_required() || data.null_count() == 0; |
1528 | |
1529 | const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2; |
1530 | std::vector<uint64_t> big_endian_values(valid_value_count); |
1531 | |
1532 | // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we |
1533 | // don't have to keep writing two loops to handle the case where we know there are no |
1534 | // nulls |
1535 | if (does_not_have_nulls) { |
1536 | // no nulls, just dump the data |
1537 | // todo(advancedxy): use a writeBatch to avoid this step |
1538 | for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { |
1539 | auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i)); |
1540 | big_endian_values[j] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); |
1541 | big_endian_values[j + 1] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); |
1542 | buffer[i] = FixedLenByteArray( |
1543 | reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset); |
1544 | } |
1545 | } else { |
1546 | for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) { |
1547 | if (data.IsValid(i)) { |
1548 | auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i)); |
1549 | big_endian_values[j] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); |
1550 | big_endian_values[j + 1] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); |
1551 | buffer[buffer_idx++] = FixedLenByteArray( |
1552 | reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset); |
1553 | j += 2; |
1554 | } |
1555 | } |
1556 | } |
1557 | PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); |
1558 | return Status::OK(); |
1559 | } |
1560 | |
1561 | template <> |
1562 | Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(const int16_t* def_levels, |
1563 | const int16_t* rep_levels, |
1564 | int64_t num_levels, |
1565 | const arrow::Array& array, |
1566 | ArrowWriteContext* ctx) { |
1567 | switch (array.type()->id()) { |
1568 | WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) |
1569 | WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType) |
1570 | default: |
1571 | break; |
1572 | } |
1573 | return Status::OK(); |
1574 | } |
1575 | |
1576 | // ---------------------------------------------------------------------- |
1577 | // Dynamic column writer constructor |
1578 | |
1579 | std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata, |
1580 | std::unique_ptr<PageWriter> , |
1581 | const WriterProperties* properties) { |
1582 | const ColumnDescriptor* descr = metadata->descr(); |
1583 | const bool use_dictionary = properties->dictionary_enabled(descr->path()) && |
1584 | descr->physical_type() != Type::BOOLEAN; |
1585 | Encoding::type encoding = properties->encoding(descr->path()); |
1586 | if (use_dictionary) { |
1587 | encoding = properties->dictionary_index_encoding(); |
1588 | } |
1589 | switch (descr->physical_type()) { |
1590 | case Type::BOOLEAN: |
1591 | return std::make_shared<TypedColumnWriterImpl<BooleanType>>( |
1592 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1593 | case Type::INT32: |
1594 | return std::make_shared<TypedColumnWriterImpl<Int32Type>>( |
1595 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1596 | case Type::INT64: |
1597 | return std::make_shared<TypedColumnWriterImpl<Int64Type>>( |
1598 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1599 | case Type::INT96: |
1600 | return std::make_shared<TypedColumnWriterImpl<Int96Type>>( |
1601 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1602 | case Type::FLOAT: |
1603 | return std::make_shared<TypedColumnWriterImpl<FloatType>>( |
1604 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1605 | case Type::DOUBLE: |
1606 | return std::make_shared<TypedColumnWriterImpl<DoubleType>>( |
1607 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1608 | case Type::BYTE_ARRAY: |
1609 | return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>( |
1610 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1611 | case Type::FIXED_LEN_BYTE_ARRAY: |
1612 | return std::make_shared<TypedColumnWriterImpl<FLBAType>>( |
1613 | metadata, std::move(pager), use_dictionary, encoding, properties); |
1614 | default: |
1615 | ParquetException::NYI("type reader not implemented" ); |
1616 | } |
1617 | // Unreachable code, but supress compiler warning |
1618 | return std::shared_ptr<ColumnWriter>(nullptr); |
1619 | } |
1620 | |
1621 | } // namespace parquet |
1622 | |