| 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | #include "parquet/column_writer.h" |
| 19 | |
| 20 | #include <algorithm> |
| 21 | #include <cstdint> |
| 22 | #include <cstring> |
| 23 | #include <memory> |
| 24 | #include <utility> |
| 25 | #include <vector> |
| 26 | |
| 27 | #include "arrow/array.h" |
| 28 | #include "arrow/buffer_builder.h" |
| 29 | #include "arrow/compute/api.h" |
| 30 | #include "arrow/type.h" |
| 31 | #include "arrow/type_traits.h" |
| 32 | #include "arrow/util/bit_stream_utils.h" |
| 33 | #include "arrow/util/checked_cast.h" |
| 34 | #include "arrow/util/compression.h" |
| 35 | #include "arrow/util/logging.h" |
| 36 | #include "arrow/util/rle_encoding.h" |
| 37 | |
| 38 | #include "parquet/column_page.h" |
| 39 | #include "parquet/encoding.h" |
| 40 | #include "parquet/metadata.h" |
| 41 | #include "parquet/platform.h" |
| 42 | #include "parquet/properties.h" |
| 43 | #include "parquet/schema.h" |
| 44 | #include "parquet/statistics.h" |
| 45 | #include "parquet/thrift_internal.h" |
| 46 | #include "parquet/types.h" |
| 47 | |
| 48 | namespace parquet { |
| 49 | |
| 50 | using arrow::Status; |
| 51 | using arrow::compute::Datum; |
| 52 | using arrow::internal::checked_cast; |
| 53 | |
| 54 | using BitWriter = arrow::BitUtil::BitWriter; |
| 55 | using RleEncoder = arrow::util::RleEncoder; |
| 56 | |
| 57 | LevelEncoder::LevelEncoder() {} |
| 58 | LevelEncoder::~LevelEncoder() {} |
| 59 | |
| 60 | void LevelEncoder::Init(Encoding::type encoding, int16_t max_level, |
| 61 | int num_buffered_values, uint8_t* data, int data_size) { |
| 62 | bit_width_ = BitUtil::Log2(max_level + 1); |
| 63 | encoding_ = encoding; |
| 64 | switch (encoding) { |
| 65 | case Encoding::RLE: { |
| 66 | rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_)); |
| 67 | break; |
| 68 | } |
| 69 | case Encoding::BIT_PACKED: { |
| 70 | int num_bytes = |
| 71 | static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width_)); |
| 72 | bit_packed_encoder_.reset(new BitWriter(data, num_bytes)); |
| 73 | break; |
| 74 | } |
| 75 | default: |
| 76 | throw ParquetException("Unknown encoding type for levels." ); |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level, |
| 81 | int num_buffered_values) { |
| 82 | int bit_width = BitUtil::Log2(max_level + 1); |
| 83 | int num_bytes = 0; |
| 84 | switch (encoding) { |
| 85 | case Encoding::RLE: { |
| 86 | // TODO: Due to the way we currently check if the buffer is full enough, |
| 87 | // we need to have MinBufferSize as head room. |
| 88 | num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) + |
| 89 | RleEncoder::MinBufferSize(bit_width); |
| 90 | break; |
| 91 | } |
| 92 | case Encoding::BIT_PACKED: { |
| 93 | num_bytes = |
| 94 | static_cast<int>(BitUtil::BytesForBits(num_buffered_values * bit_width)); |
| 95 | break; |
| 96 | } |
| 97 | default: |
| 98 | throw ParquetException("Unknown encoding type for levels." ); |
| 99 | } |
| 100 | return num_bytes; |
| 101 | } |
| 102 | |
| 103 | int LevelEncoder::Encode(int batch_size, const int16_t* levels) { |
| 104 | int num_encoded = 0; |
| 105 | if (!rle_encoder_ && !bit_packed_encoder_) { |
| 106 | throw ParquetException("Level encoders are not initialized." ); |
| 107 | } |
| 108 | |
| 109 | if (encoding_ == Encoding::RLE) { |
| 110 | for (int i = 0; i < batch_size; ++i) { |
| 111 | if (!rle_encoder_->Put(*(levels + i))) { |
| 112 | break; |
| 113 | } |
| 114 | ++num_encoded; |
| 115 | } |
| 116 | rle_encoder_->Flush(); |
| 117 | rle_length_ = rle_encoder_->len(); |
| 118 | } else { |
| 119 | for (int i = 0; i < batch_size; ++i) { |
| 120 | if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { |
| 121 | break; |
| 122 | } |
| 123 | ++num_encoded; |
| 124 | } |
| 125 | bit_packed_encoder_->Flush(); |
| 126 | } |
| 127 | return num_encoded; |
| 128 | } |
| 129 | |
| 130 | // ---------------------------------------------------------------------- |
| 131 | // PageWriter implementation |
| 132 | |
| 133 | // This subclass delimits pages appearing in a serialized stream, each preceded |
| 134 | // by a serialized Thrift format::PageHeader indicating the type of each page |
| 135 | // and the page metadata. |
| 136 | class SerializedPageWriter : public PageWriter { |
| 137 | public: |
| 138 | SerializedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink, |
| 139 | Compression::type codec, int compression_level, |
| 140 | ColumnChunkMetaDataBuilder* metadata, |
| 141 | MemoryPool* pool = arrow::default_memory_pool()) |
| 142 | : sink_(sink), |
| 143 | metadata_(metadata), |
| 144 | pool_(pool), |
| 145 | num_values_(0), |
| 146 | dictionary_page_offset_(0), |
| 147 | data_page_offset_(0), |
| 148 | total_uncompressed_size_(0), |
| 149 | total_compressed_size_(0) { |
| 150 | compressor_ = GetCodec(codec, compression_level); |
| 151 | thrift_serializer_.reset(new ThriftSerializer); |
| 152 | } |
| 153 | |
| 154 | int64_t WriteDictionaryPage(const DictionaryPage& page) override { |
| 155 | int64_t uncompressed_size = page.size(); |
| 156 | std::shared_ptr<Buffer> compressed_data = nullptr; |
| 157 | if (has_compressor()) { |
| 158 | auto buffer = std::static_pointer_cast<ResizableBuffer>( |
| 159 | AllocateBuffer(pool_, uncompressed_size)); |
| 160 | Compress(*(page.buffer().get()), buffer.get()); |
| 161 | compressed_data = std::static_pointer_cast<Buffer>(buffer); |
| 162 | } else { |
| 163 | compressed_data = page.buffer(); |
| 164 | } |
| 165 | |
| 166 | format::DictionaryPageHeader ; |
| 167 | dict_page_header.__set_num_values(page.num_values()); |
| 168 | dict_page_header.__set_encoding(ToThrift(page.encoding())); |
| 169 | dict_page_header.__set_is_sorted(page.is_sorted()); |
| 170 | |
| 171 | format::PageHeader ; |
| 172 | page_header.__set_type(format::PageType::DICTIONARY_PAGE); |
| 173 | page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size)); |
| 174 | page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size())); |
| 175 | page_header.__set_dictionary_page_header(dict_page_header); |
| 176 | // TODO(PARQUET-594) crc checksum |
| 177 | |
| 178 | int64_t start_pos = -1; |
| 179 | PARQUET_THROW_NOT_OK(sink_->Tell(&start_pos)); |
| 180 | if (dictionary_page_offset_ == 0) { |
| 181 | dictionary_page_offset_ = start_pos; |
| 182 | } |
| 183 | int64_t = thrift_serializer_->Serialize(&page_header, sink_.get()); |
| 184 | PARQUET_THROW_NOT_OK(sink_->Write(compressed_data)); |
| 185 | |
| 186 | total_uncompressed_size_ += uncompressed_size + header_size; |
| 187 | total_compressed_size_ += compressed_data->size() + header_size; |
| 188 | |
| 189 | int64_t final_pos = -1; |
| 190 | PARQUET_THROW_NOT_OK(sink_->Tell(&final_pos)); |
| 191 | return final_pos - start_pos; |
| 192 | } |
| 193 | |
| 194 | void Close(bool has_dictionary, bool fallback) override { |
| 195 | // index_page_offset = -1 since they are not supported |
| 196 | metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_, |
| 197 | total_compressed_size_, total_uncompressed_size_, has_dictionary, |
| 198 | fallback); |
| 199 | |
| 200 | // Write metadata at end of column chunk |
| 201 | metadata_->WriteTo(sink_.get()); |
| 202 | } |
| 203 | |
| 204 | /** |
| 205 | * Compress a buffer. |
| 206 | */ |
| 207 | void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override { |
| 208 | DCHECK(compressor_ != nullptr); |
| 209 | |
| 210 | // Compress the data |
| 211 | int64_t max_compressed_size = |
| 212 | compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data()); |
| 213 | |
| 214 | // Use Arrow::Buffer::shrink_to_fit = false |
| 215 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
| 216 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); |
| 217 | |
| 218 | int64_t compressed_size; |
| 219 | PARQUET_THROW_NOT_OK( |
| 220 | compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size, |
| 221 | dest_buffer->mutable_data(), &compressed_size)); |
| 222 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); |
| 223 | } |
| 224 | |
| 225 | int64_t WriteDataPage(const CompressedDataPage& page) override { |
| 226 | int64_t uncompressed_size = page.uncompressed_size(); |
| 227 | std::shared_ptr<Buffer> compressed_data = page.buffer(); |
| 228 | |
| 229 | format::DataPageHeader ; |
| 230 | data_page_header.__set_num_values(page.num_values()); |
| 231 | data_page_header.__set_encoding(ToThrift(page.encoding())); |
| 232 | data_page_header.__set_definition_level_encoding( |
| 233 | ToThrift(page.definition_level_encoding())); |
| 234 | data_page_header.__set_repetition_level_encoding( |
| 235 | ToThrift(page.repetition_level_encoding())); |
| 236 | data_page_header.__set_statistics(ToThrift(page.statistics())); |
| 237 | |
| 238 | format::PageHeader ; |
| 239 | page_header.__set_type(format::PageType::DATA_PAGE); |
| 240 | page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size)); |
| 241 | page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size())); |
| 242 | page_header.__set_data_page_header(data_page_header); |
| 243 | // TODO(PARQUET-594) crc checksum |
| 244 | |
| 245 | int64_t start_pos = -1; |
| 246 | PARQUET_THROW_NOT_OK(sink_->Tell(&start_pos)); |
| 247 | if (data_page_offset_ == 0) { |
| 248 | data_page_offset_ = start_pos; |
| 249 | } |
| 250 | |
| 251 | int64_t = thrift_serializer_->Serialize(&page_header, sink_.get()); |
| 252 | PARQUET_THROW_NOT_OK(sink_->Write(compressed_data)); |
| 253 | |
| 254 | total_uncompressed_size_ += uncompressed_size + header_size; |
| 255 | total_compressed_size_ += compressed_data->size() + header_size; |
| 256 | num_values_ += page.num_values(); |
| 257 | |
| 258 | int64_t current_pos = -1; |
| 259 | PARQUET_THROW_NOT_OK(sink_->Tell(¤t_pos)); |
| 260 | return current_pos - start_pos; |
| 261 | } |
| 262 | |
| 263 | bool has_compressor() override { return (compressor_ != nullptr); } |
| 264 | |
| 265 | int64_t num_values() { return num_values_; } |
| 266 | |
| 267 | int64_t dictionary_page_offset() { return dictionary_page_offset_; } |
| 268 | |
| 269 | int64_t data_page_offset() { return data_page_offset_; } |
| 270 | |
| 271 | int64_t total_compressed_size() { return total_compressed_size_; } |
| 272 | |
| 273 | int64_t total_uncompressed_size() { return total_uncompressed_size_; } |
| 274 | |
| 275 | private: |
| 276 | std::shared_ptr<ArrowOutputStream> sink_; |
| 277 | ColumnChunkMetaDataBuilder* metadata_; |
| 278 | MemoryPool* pool_; |
| 279 | int64_t num_values_; |
| 280 | int64_t dictionary_page_offset_; |
| 281 | int64_t data_page_offset_; |
| 282 | int64_t total_uncompressed_size_; |
| 283 | int64_t total_compressed_size_; |
| 284 | |
| 285 | std::unique_ptr<ThriftSerializer> thrift_serializer_; |
| 286 | |
| 287 | // Compression codec to use. |
| 288 | std::unique_ptr<arrow::util::Codec> compressor_; |
| 289 | }; |
| 290 | |
| 291 | // This implementation of the PageWriter writes to the final sink on Close . |
| 292 | class BufferedPageWriter : public PageWriter { |
| 293 | public: |
| 294 | BufferedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink, |
| 295 | Compression::type codec, int compression_level, |
| 296 | ColumnChunkMetaDataBuilder* metadata, |
| 297 | MemoryPool* pool = arrow::default_memory_pool()) |
| 298 | : final_sink_(sink), metadata_(metadata) { |
| 299 | in_memory_sink_ = CreateOutputStream(pool); |
| 300 | pager_ = std::unique_ptr<SerializedPageWriter>(new SerializedPageWriter( |
| 301 | in_memory_sink_, codec, compression_level, metadata, pool)); |
| 302 | } |
| 303 | |
| 304 | int64_t WriteDictionaryPage(const DictionaryPage& page) override { |
| 305 | return pager_->WriteDictionaryPage(page); |
| 306 | } |
| 307 | |
| 308 | void Close(bool has_dictionary, bool fallback) override { |
| 309 | // index_page_offset = -1 since they are not supported |
| 310 | int64_t final_position = -1; |
| 311 | PARQUET_THROW_NOT_OK(final_sink_->Tell(&final_position)); |
| 312 | metadata_->Finish( |
| 313 | pager_->num_values(), pager_->dictionary_page_offset() + final_position, -1, |
| 314 | pager_->data_page_offset() + final_position, pager_->total_compressed_size(), |
| 315 | pager_->total_uncompressed_size(), has_dictionary, fallback); |
| 316 | |
| 317 | // Write metadata at end of column chunk |
| 318 | metadata_->WriteTo(in_memory_sink_.get()); |
| 319 | |
| 320 | // flush everything to the serialized sink |
| 321 | std::shared_ptr<Buffer> buffer; |
| 322 | PARQUET_THROW_NOT_OK(in_memory_sink_->Finish(&buffer)); |
| 323 | PARQUET_THROW_NOT_OK(final_sink_->Write(buffer)); |
| 324 | } |
| 325 | |
| 326 | int64_t WriteDataPage(const CompressedDataPage& page) override { |
| 327 | return pager_->WriteDataPage(page); |
| 328 | } |
| 329 | |
| 330 | void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override { |
| 331 | pager_->Compress(src_buffer, dest_buffer); |
| 332 | } |
| 333 | |
| 334 | bool has_compressor() override { return pager_->has_compressor(); } |
| 335 | |
| 336 | private: |
| 337 | std::shared_ptr<ArrowOutputStream> final_sink_; |
| 338 | ColumnChunkMetaDataBuilder* metadata_; |
| 339 | std::shared_ptr<arrow::io::BufferOutputStream> in_memory_sink_; |
| 340 | std::unique_ptr<SerializedPageWriter> ; |
| 341 | }; |
| 342 | |
| 343 | std::unique_ptr<PageWriter> PageWriter::Open( |
| 344 | const std::shared_ptr<ArrowOutputStream>& sink, Compression::type codec, |
| 345 | int compression_level, ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, |
| 346 | bool buffered_row_group) { |
| 347 | if (buffered_row_group) { |
| 348 | return std::unique_ptr<PageWriter>( |
| 349 | new BufferedPageWriter(sink, codec, compression_level, metadata, pool)); |
| 350 | } else { |
| 351 | return std::unique_ptr<PageWriter>( |
| 352 | new SerializedPageWriter(sink, codec, compression_level, metadata, pool)); |
| 353 | } |
| 354 | } |
| 355 | |
| 356 | // ---------------------------------------------------------------------- |
| 357 | // ColumnWriter |
| 358 | |
| 359 | std::shared_ptr<WriterProperties> default_writer_properties() { |
| 360 | static std::shared_ptr<WriterProperties> default_writer_properties = |
| 361 | WriterProperties::Builder().build(); |
| 362 | return default_writer_properties; |
| 363 | } |
| 364 | |
| 365 | class ColumnWriterImpl { |
| 366 | public: |
| 367 | ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, |
| 368 | std::unique_ptr<PageWriter> , const bool use_dictionary, |
| 369 | Encoding::type encoding, const WriterProperties* properties) |
| 370 | : metadata_(metadata), |
| 371 | descr_(metadata->descr()), |
| 372 | pager_(std::move(pager)), |
| 373 | has_dictionary_(use_dictionary), |
| 374 | encoding_(encoding), |
| 375 | properties_(properties), |
| 376 | allocator_(properties->memory_pool()), |
| 377 | num_buffered_values_(0), |
| 378 | num_buffered_encoded_values_(0), |
| 379 | rows_written_(0), |
| 380 | total_bytes_written_(0), |
| 381 | total_compressed_bytes_(0), |
| 382 | closed_(false), |
| 383 | fallback_(false), |
| 384 | definition_levels_sink_(allocator_), |
| 385 | repetition_levels_sink_(allocator_) { |
| 386 | definition_levels_rle_ = |
| 387 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
| 388 | repetition_levels_rle_ = |
| 389 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
| 390 | uncompressed_data_ = |
| 391 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
| 392 | if (pager_->has_compressor()) { |
| 393 | compressed_data_ = |
| 394 | std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0)); |
| 395 | } |
| 396 | } |
| 397 | |
| 398 | virtual ~ColumnWriterImpl() = default; |
| 399 | |
| 400 | int64_t Close(); |
| 401 | |
| 402 | protected: |
| 403 | virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0; |
| 404 | |
| 405 | // Serializes Dictionary Page if enabled |
| 406 | virtual void WriteDictionaryPage() = 0; |
| 407 | |
| 408 | // Plain-encoded statistics of the current page |
| 409 | virtual EncodedStatistics GetPageStatistics() = 0; |
| 410 | |
| 411 | // Plain-encoded statistics of the whole chunk |
| 412 | virtual EncodedStatistics GetChunkStatistics() = 0; |
| 413 | |
| 414 | // Merges page statistics into chunk statistics, then resets the values |
| 415 | virtual void ResetPageStatistics() = 0; |
| 416 | |
| 417 | // Adds Data Pages to an in memory buffer in dictionary encoding mode |
| 418 | // Serializes the Data Pages in other encoding modes |
| 419 | void AddDataPage(); |
| 420 | |
| 421 | // Serializes Data Pages |
| 422 | void WriteDataPage(const CompressedDataPage& page) { |
| 423 | total_bytes_written_ += pager_->WriteDataPage(page); |
| 424 | } |
| 425 | |
| 426 | // Write multiple definition levels |
| 427 | void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) { |
| 428 | DCHECK(!closed_); |
| 429 | PARQUET_THROW_NOT_OK( |
| 430 | definition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels)); |
| 431 | } |
| 432 | |
| 433 | // Write multiple repetition levels |
| 434 | void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) { |
| 435 | DCHECK(!closed_); |
| 436 | PARQUET_THROW_NOT_OK( |
| 437 | repetition_levels_sink_.Append(levels, sizeof(int16_t) * num_levels)); |
| 438 | } |
| 439 | |
| 440 | // RLE encode the src_buffer into dest_buffer and return the encoded size |
| 441 | int64_t RleEncodeLevels(const void* src_buffer, ResizableBuffer* dest_buffer, |
| 442 | int16_t max_level); |
| 443 | |
| 444 | // Serialize the buffered Data Pages |
| 445 | void FlushBufferedDataPages(); |
| 446 | |
| 447 | ColumnChunkMetaDataBuilder* metadata_; |
| 448 | const ColumnDescriptor* descr_; |
| 449 | |
| 450 | std::unique_ptr<PageWriter> pager_; |
| 451 | |
| 452 | bool has_dictionary_; |
| 453 | Encoding::type encoding_; |
| 454 | const WriterProperties* properties_; |
| 455 | |
| 456 | LevelEncoder level_encoder_; |
| 457 | |
| 458 | MemoryPool* allocator_; |
| 459 | |
| 460 | // The total number of values stored in the data page. This is the maximum of |
| 461 | // the number of encoded definition levels or encoded values. For |
| 462 | // non-repeated, required columns, this is equal to the number of encoded |
| 463 | // values. For repeated or optional values, there may be fewer data values |
| 464 | // than levels, and this tells you how many encoded levels there are in that |
| 465 | // case. |
| 466 | int64_t num_buffered_values_; |
| 467 | |
| 468 | // The total number of stored values. For repeated or optional values, this |
| 469 | // number may be lower than num_buffered_values_. |
| 470 | int64_t num_buffered_encoded_values_; |
| 471 | |
| 472 | // Total number of rows written with this ColumnWriter |
| 473 | int rows_written_; |
| 474 | |
| 475 | // Records the total number of bytes written by the serializer |
| 476 | int64_t total_bytes_written_; |
| 477 | |
| 478 | // Records the current number of compressed bytes in a column |
| 479 | int64_t total_compressed_bytes_; |
| 480 | |
| 481 | // Flag to check if the Writer has been closed |
| 482 | bool closed_; |
| 483 | |
| 484 | // Flag to infer if dictionary encoding has fallen back to PLAIN |
| 485 | bool fallback_; |
| 486 | |
| 487 | arrow::BufferBuilder definition_levels_sink_; |
| 488 | arrow::BufferBuilder repetition_levels_sink_; |
| 489 | |
| 490 | std::shared_ptr<ResizableBuffer> definition_levels_rle_; |
| 491 | std::shared_ptr<ResizableBuffer> repetition_levels_rle_; |
| 492 | |
| 493 | std::shared_ptr<ResizableBuffer> uncompressed_data_; |
| 494 | std::shared_ptr<ResizableBuffer> compressed_data_; |
| 495 | |
| 496 | std::vector<CompressedDataPage> data_pages_; |
| 497 | |
| 498 | private: |
| 499 | void InitSinks() { |
| 500 | definition_levels_sink_.Rewind(0); |
| 501 | repetition_levels_sink_.Rewind(0); |
| 502 | } |
| 503 | }; |
| 504 | |
| 505 | // return the size of the encoded buffer |
| 506 | int64_t ColumnWriterImpl::RleEncodeLevels(const void* src_buffer, |
| 507 | ResizableBuffer* dest_buffer, |
| 508 | int16_t max_level) { |
| 509 | // TODO: This only works with due to some RLE specifics |
| 510 | int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, |
| 511 | static_cast<int>(num_buffered_values_)) + |
| 512 | sizeof(int32_t); |
| 513 | |
| 514 | // Use Arrow::Buffer::shrink_to_fit = false |
| 515 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
| 516 | PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false)); |
| 517 | |
| 518 | level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_), |
| 519 | dest_buffer->mutable_data() + sizeof(int32_t), |
| 520 | static_cast<int>(dest_buffer->size() - sizeof(int32_t))); |
| 521 | int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_), |
| 522 | reinterpret_cast<const int16_t*>(src_buffer)); |
| 523 | DCHECK_EQ(encoded, num_buffered_values_); |
| 524 | reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len(); |
| 525 | int64_t encoded_size = level_encoder_.len() + sizeof(int32_t); |
| 526 | return encoded_size; |
| 527 | } |
| 528 | |
| 529 | void ColumnWriterImpl::AddDataPage() { |
| 530 | int64_t definition_levels_rle_size = 0; |
| 531 | int64_t repetition_levels_rle_size = 0; |
| 532 | |
| 533 | std::shared_ptr<Buffer> values = GetValuesBuffer(); |
| 534 | |
| 535 | if (descr_->max_definition_level() > 0) { |
| 536 | definition_levels_rle_size = |
| 537 | RleEncodeLevels(definition_levels_sink_.data(), definition_levels_rle_.get(), |
| 538 | descr_->max_definition_level()); |
| 539 | } |
| 540 | |
| 541 | if (descr_->max_repetition_level() > 0) { |
| 542 | repetition_levels_rle_size = |
| 543 | RleEncodeLevels(repetition_levels_sink_.data(), repetition_levels_rle_.get(), |
| 544 | descr_->max_repetition_level()); |
| 545 | } |
| 546 | |
| 547 | int64_t uncompressed_size = |
| 548 | definition_levels_rle_size + repetition_levels_rle_size + values->size(); |
| 549 | |
| 550 | // Use Arrow::Buffer::shrink_to_fit = false |
| 551 | // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. |
| 552 | PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false)); |
| 553 | |
| 554 | // Concatenate data into a single buffer |
| 555 | uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data(); |
| 556 | memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size); |
| 557 | uncompressed_ptr += repetition_levels_rle_size; |
| 558 | memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size); |
| 559 | uncompressed_ptr += definition_levels_rle_size; |
| 560 | memcpy(uncompressed_ptr, values->data(), values->size()); |
| 561 | |
| 562 | EncodedStatistics page_stats = GetPageStatistics(); |
| 563 | page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); |
| 564 | page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); |
| 565 | ResetPageStatistics(); |
| 566 | |
| 567 | std::shared_ptr<Buffer> compressed_data; |
| 568 | if (pager_->has_compressor()) { |
| 569 | pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get()); |
| 570 | compressed_data = compressed_data_; |
| 571 | } else { |
| 572 | compressed_data = uncompressed_data_; |
| 573 | } |
| 574 | |
| 575 | // Write the page to OutputStream eagerly if there is no dictionary or |
| 576 | // if dictionary encoding has fallen back to PLAIN |
| 577 | if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding |
| 578 | std::shared_ptr<Buffer> compressed_data_copy; |
| 579 | PARQUET_THROW_NOT_OK(compressed_data->Copy(0, compressed_data->size(), allocator_, |
| 580 | &compressed_data_copy)); |
| 581 | CompressedDataPage page(compressed_data_copy, |
| 582 | static_cast<int32_t>(num_buffered_values_), encoding_, |
| 583 | Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats); |
| 584 | total_compressed_bytes_ += page.size() + sizeof(format::PageHeader); |
| 585 | data_pages_.push_back(std::move(page)); |
| 586 | } else { // Eagerly write pages |
| 587 | CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_), |
| 588 | encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, |
| 589 | page_stats); |
| 590 | WriteDataPage(page); |
| 591 | } |
| 592 | |
| 593 | // Re-initialize the sinks for next Page. |
| 594 | InitSinks(); |
| 595 | num_buffered_values_ = 0; |
| 596 | num_buffered_encoded_values_ = 0; |
| 597 | } |
| 598 | |
| 599 | int64_t ColumnWriterImpl::Close() { |
| 600 | if (!closed_) { |
| 601 | closed_ = true; |
| 602 | if (has_dictionary_ && !fallback_) { |
| 603 | WriteDictionaryPage(); |
| 604 | } |
| 605 | |
| 606 | FlushBufferedDataPages(); |
| 607 | |
| 608 | EncodedStatistics chunk_statistics = GetChunkStatistics(); |
| 609 | chunk_statistics.ApplyStatSizeLimits( |
| 610 | properties_->max_statistics_size(descr_->path())); |
| 611 | chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); |
| 612 | |
| 613 | // Write stats only if the column has at least one row written |
| 614 | if (rows_written_ > 0 && chunk_statistics.is_set()) { |
| 615 | metadata_->SetStatistics(chunk_statistics); |
| 616 | } |
| 617 | pager_->Close(has_dictionary_, fallback_); |
| 618 | } |
| 619 | |
| 620 | return total_bytes_written_; |
| 621 | } |
| 622 | |
| 623 | void ColumnWriterImpl::FlushBufferedDataPages() { |
| 624 | // Write all outstanding data to a new page |
| 625 | if (num_buffered_values_ > 0) { |
| 626 | AddDataPage(); |
| 627 | } |
| 628 | for (size_t i = 0; i < data_pages_.size(); i++) { |
| 629 | WriteDataPage(data_pages_[i]); |
| 630 | } |
| 631 | data_pages_.clear(); |
| 632 | total_compressed_bytes_ = 0; |
| 633 | } |
| 634 | |
| 635 | // ---------------------------------------------------------------------- |
| 636 | // TypedColumnWriter |
| 637 | |
| 638 | template <typename Action> |
| 639 | inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) { |
| 640 | int64_t num_batches = static_cast<int>(total / batch_size); |
| 641 | for (int round = 0; round < num_batches; round++) { |
| 642 | action(round * batch_size, batch_size); |
| 643 | } |
| 644 | // Write the remaining values |
| 645 | if (total % batch_size > 0) { |
| 646 | action(num_batches * batch_size, total % batch_size); |
| 647 | } |
| 648 | } |
| 649 | |
| 650 | bool DictionaryDirectWriteSupported(const arrow::Array& array) { |
| 651 | DCHECK_EQ(array.type_id(), arrow::Type::DICTIONARY); |
| 652 | const arrow::DictionaryType& dict_type = |
| 653 | static_cast<const arrow::DictionaryType&>(*array.type()); |
| 654 | auto id = dict_type.value_type()->id(); |
| 655 | return id == arrow::Type::BINARY || id == arrow::Type::STRING; |
| 656 | } |
| 657 | |
| 658 | Status ConvertDictionaryToDense(const arrow::Array& array, MemoryPool* pool, |
| 659 | std::shared_ptr<arrow::Array>* out) { |
| 660 | const arrow::DictionaryType& dict_type = |
| 661 | static_cast<const arrow::DictionaryType&>(*array.type()); |
| 662 | |
| 663 | // TODO(ARROW-1648): Remove this special handling once we require an Arrow |
| 664 | // version that has this fixed. |
| 665 | if (dict_type.value_type()->id() == arrow::Type::NA) { |
| 666 | *out = std::make_shared<arrow::NullArray>(array.length()); |
| 667 | return Status::OK(); |
| 668 | } |
| 669 | |
| 670 | arrow::compute::FunctionContext ctx(pool); |
| 671 | Datum cast_output; |
| 672 | RETURN_NOT_OK(arrow::compute::Cast(&ctx, Datum(array.data()), dict_type.value_type(), |
| 673 | arrow::compute::CastOptions(), &cast_output)); |
| 674 | *out = cast_output.make_array(); |
| 675 | return Status::OK(); |
| 676 | } |
| 677 | |
| 678 | static inline bool IsDictionaryEncoding(Encoding::type encoding) { |
| 679 | return encoding == Encoding::PLAIN_DICTIONARY; |
| 680 | } |
| 681 | |
| 682 | template <typename DType> |
| 683 | class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> { |
| 684 | public: |
| 685 | using T = typename DType::c_type; |
| 686 | |
| 687 | TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata, |
| 688 | std::unique_ptr<PageWriter> , const bool use_dictionary, |
| 689 | Encoding::type encoding, const WriterProperties* properties) |
| 690 | : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding, |
| 691 | properties) { |
| 692 | current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_, |
| 693 | properties->memory_pool()); |
| 694 | |
| 695 | if (properties->statistics_enabled(descr_->path()) && |
| 696 | (SortOrder::UNKNOWN != descr_->sort_order())) { |
| 697 | page_statistics_ = MakeStatistics<DType>(descr_, allocator_); |
| 698 | chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_); |
| 699 | } |
| 700 | } |
| 701 | |
| 702 | int64_t Close() override { return ColumnWriterImpl::Close(); } |
| 703 | |
| 704 | void WriteBatch(int64_t num_values, const int16_t* def_levels, |
| 705 | const int16_t* rep_levels, const T* values) override { |
| 706 | // We check for DataPage limits only after we have inserted the values. If a user |
| 707 | // writes a large number of values, the DataPage size can be much above the limit. |
| 708 | // The purpose of this chunking is to bound this. Even if a user writes large number |
| 709 | // of values, the chunking will ensure the AddDataPage() is called at a reasonable |
| 710 | // pagesize limit |
| 711 | int64_t value_offset = 0; |
| 712 | auto WriteChunk = [&](int64_t offset, int64_t batch_size) { |
| 713 | int64_t values_to_write = |
| 714 | WriteLevels(batch_size, def_levels + offset, rep_levels + offset); |
| 715 | // PARQUET-780 |
| 716 | if (values_to_write > 0) { |
| 717 | DCHECK_NE(nullptr, values); |
| 718 | } |
| 719 | WriteValues(values + value_offset, values_to_write, batch_size - values_to_write); |
| 720 | CommitWriteAndCheckPageLimit(batch_size, values_to_write); |
| 721 | value_offset += values_to_write; |
| 722 | |
| 723 | // Dictionary size checked separately from data page size since we |
| 724 | // circumvent this check when writing arrow::DictionaryArray directly |
| 725 | CheckDictionarySizeLimit(); |
| 726 | }; |
| 727 | DoInBatches(num_values, properties_->write_batch_size(), WriteChunk); |
| 728 | } |
| 729 | |
| 730 | void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, |
| 731 | const int16_t* rep_levels, const uint8_t* valid_bits, |
| 732 | int64_t valid_bits_offset, const T* values) override { |
| 733 | // Like WriteBatch, but for spaced values |
| 734 | int64_t value_offset = 0; |
| 735 | auto WriteChunk = [&](int64_t offset, int64_t batch_size) { |
| 736 | int64_t batch_num_values = 0; |
| 737 | int64_t batch_num_spaced_values = 0; |
| 738 | WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset, |
| 739 | &batch_num_values, &batch_num_spaced_values); |
| 740 | WriteValuesSpaced(values + value_offset, batch_num_values, batch_num_spaced_values, |
| 741 | valid_bits, valid_bits_offset + value_offset); |
| 742 | CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values); |
| 743 | value_offset += batch_num_spaced_values; |
| 744 | |
| 745 | // Dictionary size checked separately from data page size since we |
| 746 | // circumvent this check when writing arrow::DictionaryArray directly |
| 747 | CheckDictionarySizeLimit(); |
| 748 | }; |
| 749 | DoInBatches(num_values, properties_->write_batch_size(), WriteChunk); |
| 750 | } |
| 751 | |
| 752 | Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, |
| 753 | int64_t num_levels, const arrow::Array& array, |
| 754 | ArrowWriteContext* ctx) override { |
| 755 | if (array.type()->id() == arrow::Type::DICTIONARY) { |
| 756 | return WriteArrowDictionary(def_levels, rep_levels, num_levels, array, ctx); |
| 757 | } else { |
| 758 | return WriteArrowDense(def_levels, rep_levels, num_levels, array, ctx); |
| 759 | } |
| 760 | } |
| 761 | |
| 762 | int64_t EstimatedBufferedValueBytes() const override { |
| 763 | return current_encoder_->EstimatedDataEncodedSize(); |
| 764 | } |
| 765 | |
| 766 | protected: |
| 767 | std::shared_ptr<Buffer> GetValuesBuffer() override { |
| 768 | return current_encoder_->FlushValues(); |
| 769 | } |
| 770 | |
| 771 | // Internal function to handle direct writing of arrow::DictionaryArray, |
| 772 | // since the standard logic concerning dictionary size limits and fallback to |
| 773 | // plain encoding is circumvented |
| 774 | Status WriteArrowDictionary(const int16_t* def_levels, const int16_t* rep_levels, |
| 775 | int64_t num_levels, const arrow::Array& array, |
| 776 | ArrowWriteContext* context); |
| 777 | |
| 778 | Status WriteArrowDense(const int16_t* def_levels, const int16_t* rep_levels, |
| 779 | int64_t num_levels, const arrow::Array& array, |
| 780 | ArrowWriteContext* context); |
| 781 | |
| 782 | void WriteDictionaryPage() override { |
| 783 | // We have to dynamic cast here because of TypedEncoder<Type> as |
| 784 | // some compilers don't want to cast through virtual inheritance |
| 785 | auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); |
| 786 | DCHECK(dict_encoder); |
| 787 | std::shared_ptr<ResizableBuffer> buffer = |
| 788 | AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size()); |
| 789 | dict_encoder->WriteDict(buffer->mutable_data()); |
| 790 | |
| 791 | DictionaryPage page(buffer, dict_encoder->num_entries(), |
| 792 | properties_->dictionary_page_encoding()); |
| 793 | total_bytes_written_ += pager_->WriteDictionaryPage(page); |
| 794 | } |
| 795 | |
| 796 | EncodedStatistics GetPageStatistics() override { |
| 797 | EncodedStatistics result; |
| 798 | if (page_statistics_) result = page_statistics_->Encode(); |
| 799 | return result; |
| 800 | } |
| 801 | |
| 802 | EncodedStatistics GetChunkStatistics() override { |
| 803 | EncodedStatistics result; |
| 804 | if (chunk_statistics_) result = chunk_statistics_->Encode(); |
| 805 | return result; |
| 806 | } |
| 807 | |
| 808 | void ResetPageStatistics() override { |
| 809 | if (chunk_statistics_ != nullptr) { |
| 810 | chunk_statistics_->Merge(*page_statistics_); |
| 811 | page_statistics_->Reset(); |
| 812 | } |
| 813 | } |
| 814 | |
| 815 | Type::type type() const override { return descr_->physical_type(); } |
| 816 | |
| 817 | const ColumnDescriptor* descr() const override { return descr_; } |
| 818 | |
| 819 | int64_t rows_written() const override { return rows_written_; } |
| 820 | |
| 821 | int64_t total_compressed_bytes() const override { return total_compressed_bytes_; } |
| 822 | |
| 823 | int64_t total_bytes_written() const override { return total_bytes_written_; } |
| 824 | |
| 825 | const WriterProperties* properties() override { return properties_; } |
| 826 | |
| 827 | private: |
| 828 | using ValueEncoderType = typename EncodingTraits<DType>::Encoder; |
| 829 | using TypedStats = TypedStatistics<DType>; |
| 830 | std::unique_ptr<Encoder> current_encoder_; |
| 831 | std::shared_ptr<TypedStats> page_statistics_; |
| 832 | std::shared_ptr<TypedStats> chunk_statistics_; |
| 833 | |
| 834 | // If writing a sequence of arrow::DictionaryArray to the writer, we keep the |
| 835 | // dictionary passed to DictEncoder<T>::PutDictionary so we can check |
| 836 | // subsequent array chunks to see either if materialization is required (in |
| 837 | // which case we call back to the dense write path) |
| 838 | std::shared_ptr<arrow::Array> preserved_dictionary_; |
| 839 | |
| 840 | int64_t WriteLevels(int64_t num_values, const int16_t* def_levels, |
| 841 | const int16_t* rep_levels) { |
| 842 | int64_t values_to_write = 0; |
| 843 | // If the field is required and non-repeated, there are no definition levels |
| 844 | if (descr_->max_definition_level() > 0) { |
| 845 | for (int64_t i = 0; i < num_values; ++i) { |
| 846 | if (def_levels[i] == descr_->max_definition_level()) { |
| 847 | ++values_to_write; |
| 848 | } |
| 849 | } |
| 850 | |
| 851 | WriteDefinitionLevels(num_values, def_levels); |
| 852 | } else { |
| 853 | // Required field, write all values |
| 854 | values_to_write = num_values; |
| 855 | } |
| 856 | |
| 857 | // Not present for non-repeated fields |
| 858 | if (descr_->max_repetition_level() > 0) { |
| 859 | // A row could include more than one value |
| 860 | // Count the occasions where we start a new row |
| 861 | for (int64_t i = 0; i < num_values; ++i) { |
| 862 | if (rep_levels[i] == 0) { |
| 863 | rows_written_++; |
| 864 | } |
| 865 | } |
| 866 | |
| 867 | WriteRepetitionLevels(num_values, rep_levels); |
| 868 | } else { |
| 869 | // Each value is exactly one row |
| 870 | rows_written_ += static_cast<int>(num_values); |
| 871 | } |
| 872 | return values_to_write; |
| 873 | } |
| 874 | |
| 875 | void WriteLevelsSpaced(int64_t num_levels, const int16_t* def_levels, |
| 876 | const int16_t* rep_levels, int64_t* out_values_to_write, |
| 877 | int64_t* out_spaced_values_to_write) { |
| 878 | int64_t values_to_write = 0; |
| 879 | int64_t spaced_values_to_write = 0; |
| 880 | // If the field is required and non-repeated, there are no definition levels |
| 881 | if (descr_->max_definition_level() > 0) { |
| 882 | // Minimal definition level for which spaced values are written |
| 883 | int16_t min_spaced_def_level = descr_->max_definition_level(); |
| 884 | if (descr_->schema_node()->is_optional()) { |
| 885 | min_spaced_def_level--; |
| 886 | } |
| 887 | for (int64_t i = 0; i < num_levels; ++i) { |
| 888 | if (def_levels[i] == descr_->max_definition_level()) { |
| 889 | ++values_to_write; |
| 890 | } |
| 891 | if (def_levels[i] >= min_spaced_def_level) { |
| 892 | ++spaced_values_to_write; |
| 893 | } |
| 894 | } |
| 895 | |
| 896 | WriteDefinitionLevels(num_levels, def_levels); |
| 897 | } else { |
| 898 | // Required field, write all values |
| 899 | values_to_write = num_levels; |
| 900 | spaced_values_to_write = num_levels; |
| 901 | } |
| 902 | |
| 903 | // Not present for non-repeated fields |
| 904 | if (descr_->max_repetition_level() > 0) { |
| 905 | // A row could include more than one value |
| 906 | // Count the occasions where we start a new row |
| 907 | for (int64_t i = 0; i < num_levels; ++i) { |
| 908 | if (rep_levels[i] == 0) { |
| 909 | rows_written_++; |
| 910 | } |
| 911 | } |
| 912 | |
| 913 | WriteRepetitionLevels(num_levels, rep_levels); |
| 914 | } else { |
| 915 | // Each value is exactly one row |
| 916 | rows_written_ += static_cast<int>(num_levels); |
| 917 | } |
| 918 | |
| 919 | *out_values_to_write = values_to_write; |
| 920 | *out_spaced_values_to_write = spaced_values_to_write; |
| 921 | } |
| 922 | |
| 923 | void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values) { |
| 924 | num_buffered_values_ += num_levels; |
| 925 | num_buffered_encoded_values_ += num_values; |
| 926 | |
| 927 | if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { |
| 928 | AddDataPage(); |
| 929 | } |
| 930 | } |
| 931 | |
| 932 | void FallbackToPlainEncoding() { |
| 933 | if (IsDictionaryEncoding(current_encoder_->encoding())) { |
| 934 | WriteDictionaryPage(); |
| 935 | // Serialize the buffered Dictionary Indicies |
| 936 | FlushBufferedDataPages(); |
| 937 | fallback_ = true; |
| 938 | // Only PLAIN encoding is supported for fallback in V1 |
| 939 | current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_, |
| 940 | properties_->memory_pool()); |
| 941 | encoding_ = Encoding::PLAIN; |
| 942 | } |
| 943 | } |
| 944 | |
| 945 | // Checks if the Dictionary Page size limit is reached |
| 946 | // If the limit is reached, the Dictionary and Data Pages are serialized |
| 947 | // The encoding is switched to PLAIN |
| 948 | // |
| 949 | // Only one Dictionary Page is written. |
| 950 | // Fallback to PLAIN if dictionary page limit is reached. |
| 951 | void CheckDictionarySizeLimit() { |
| 952 | if (!has_dictionary_ || fallback_) { |
| 953 | // Either not using dictionary encoding, or we have already fallen back |
| 954 | // to PLAIN encoding because the size threshold was reached |
| 955 | return; |
| 956 | } |
| 957 | |
| 958 | // We have to dynamic cast here because TypedEncoder<Type> as some compilers |
| 959 | // don't want to cast through virtual inheritance |
| 960 | auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); |
| 961 | if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) { |
| 962 | FallbackToPlainEncoding(); |
| 963 | } |
| 964 | } |
| 965 | |
| 966 | void WriteValues(const T* values, int64_t num_values, int64_t num_nulls) { |
| 967 | dynamic_cast<ValueEncoderType*>(current_encoder_.get()) |
| 968 | ->Put(values, static_cast<int>(num_values)); |
| 969 | if (page_statistics_ != nullptr) { |
| 970 | page_statistics_->Update(values, num_values, num_nulls); |
| 971 | } |
| 972 | } |
| 973 | |
| 974 | void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values, |
| 975 | const uint8_t* valid_bits, int64_t valid_bits_offset) { |
| 976 | if (descr_->schema_node()->is_optional()) { |
| 977 | dynamic_cast<ValueEncoderType*>(current_encoder_.get()) |
| 978 | ->PutSpaced(values, static_cast<int>(num_spaced_values), valid_bits, |
| 979 | valid_bits_offset); |
| 980 | } else { |
| 981 | dynamic_cast<ValueEncoderType*>(current_encoder_.get()) |
| 982 | ->Put(values, static_cast<int>(num_values)); |
| 983 | } |
| 984 | if (page_statistics_ != nullptr) { |
| 985 | const int64_t num_nulls = num_spaced_values - num_values; |
| 986 | page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_values, |
| 987 | num_nulls); |
| 988 | } |
| 989 | } |
| 990 | }; |
| 991 | |
| 992 | template <typename DType> |
| 993 | Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(const int16_t* def_levels, |
| 994 | const int16_t* rep_levels, |
| 995 | int64_t num_levels, |
| 996 | const arrow::Array& array, |
| 997 | ArrowWriteContext* ctx) { |
| 998 | // If this is the first time writing a DictionaryArray, then there's |
| 999 | // a few possible paths to take: |
| 1000 | // |
| 1001 | // - If dictionary encoding is not enabled, convert to densely |
| 1002 | // encoded and call WriteArrow |
| 1003 | // - Dictionary encoding enabled |
| 1004 | // - If this is the first time this is called, then we call |
| 1005 | // PutDictionary into the encoder and then PutIndices on each |
| 1006 | // chunk. We store the dictionary that was written in |
| 1007 | // preserved_dictionary_ so that subsequent calls to this method |
| 1008 | // can make sure the dictionary has not changed |
| 1009 | // - On subsequent calls, we have to check whether the dictionary |
| 1010 | // has changed. If it has, then we trigger the varying |
| 1011 | // dictionary path and materialize each chunk and then call |
| 1012 | // WriteArrow with that |
| 1013 | auto WriteDense = [&] { |
| 1014 | std::shared_ptr<arrow::Array> dense_array; |
| 1015 | RETURN_NOT_OK( |
| 1016 | ConvertDictionaryToDense(array, properties_->memory_pool(), &dense_array)); |
| 1017 | return WriteArrowDense(def_levels, rep_levels, num_levels, *dense_array, ctx); |
| 1018 | }; |
| 1019 | |
| 1020 | if (!IsDictionaryEncoding(current_encoder_->encoding()) || |
| 1021 | !DictionaryDirectWriteSupported(array)) { |
| 1022 | // No longer dictionary-encoding for whatever reason, maybe we never were |
| 1023 | // or we decided to stop. Note that WriteArrow can be invoked multiple |
| 1024 | // times with both dense and dictionary-encoded versions of the same data |
| 1025 | // without a problem. Any dense data will be hashed to indices until the |
| 1026 | // dictionary page limit is reached, at which everything (dictionary and |
| 1027 | // dense) will fall back to plain encoding |
| 1028 | return WriteDense(); |
| 1029 | } |
| 1030 | |
| 1031 | auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get()); |
| 1032 | const auto& data = checked_cast<const arrow::DictionaryArray&>(array); |
| 1033 | std::shared_ptr<arrow::Array> dictionary = data.dictionary(); |
| 1034 | std::shared_ptr<arrow::Array> indices = data.indices(); |
| 1035 | |
| 1036 | int64_t value_offset = 0; |
| 1037 | auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) { |
| 1038 | int64_t batch_num_values = 0; |
| 1039 | int64_t batch_num_spaced_values = 0; |
| 1040 | WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset, |
| 1041 | &batch_num_values, &batch_num_spaced_values); |
| 1042 | dict_encoder->PutIndices(*indices->Slice(value_offset, batch_num_spaced_values)); |
| 1043 | CommitWriteAndCheckPageLimit(batch_size, batch_num_values); |
| 1044 | value_offset += batch_num_spaced_values; |
| 1045 | }; |
| 1046 | |
| 1047 | // Handle seeing dictionary for the first time |
| 1048 | if (!preserved_dictionary_) { |
| 1049 | // It's a new dictionary. Call PutDictionary and keep track of it |
| 1050 | PARQUET_CATCH_NOT_OK(dict_encoder->PutDictionary(*dictionary)); |
| 1051 | |
| 1052 | // TODO(wesm): If some dictionary values are unobserved, then the |
| 1053 | // statistics will be inaccurate. Do we care enough to fix it? |
| 1054 | if (page_statistics_ != nullptr) { |
| 1055 | PARQUET_CATCH_NOT_OK(page_statistics_->Update(*dictionary)); |
| 1056 | } |
| 1057 | preserved_dictionary_ = dictionary; |
| 1058 | } else if (!dictionary->Equals(*preserved_dictionary_)) { |
| 1059 | // Dictionary has changed |
| 1060 | PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding()); |
| 1061 | return WriteDense(); |
| 1062 | } |
| 1063 | |
| 1064 | PARQUET_CATCH_NOT_OK( |
| 1065 | DoInBatches(num_levels, properties_->write_batch_size(), WriteIndicesChunk)); |
| 1066 | return Status::OK(); |
| 1067 | } |
| 1068 | |
| 1069 | // ---------------------------------------------------------------------- |
| 1070 | // Direct Arrow write path |
| 1071 | |
| 1072 | template <typename ParquetType, typename ArrowType, typename Enable = void> |
| 1073 | struct SerializeFunctor { |
| 1074 | using ArrowCType = typename ArrowType::c_type; |
| 1075 | using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType; |
| 1076 | using ParquetCType = typename ParquetType::c_type; |
| 1077 | Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) { |
| 1078 | const ArrowCType* input = array.raw_values(); |
| 1079 | if (array.null_count() > 0) { |
| 1080 | for (int i = 0; i < array.length(); i++) { |
| 1081 | out[i] = static_cast<ParquetCType>(input[i]); |
| 1082 | } |
| 1083 | } else { |
| 1084 | std::copy(input, input + array.length(), out); |
| 1085 | } |
| 1086 | return Status::OK(); |
| 1087 | } |
| 1088 | }; |
| 1089 | |
| 1090 | template <typename ParquetType, typename ArrowType> |
| 1091 | inline Status SerializeData(const arrow::Array& array, ArrowWriteContext* ctx, |
| 1092 | typename ParquetType::c_type* out) { |
| 1093 | using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType; |
| 1094 | SerializeFunctor<ParquetType, ArrowType> functor; |
| 1095 | return functor.Serialize(checked_cast<const ArrayType&>(array), ctx, out); |
| 1096 | } |
| 1097 | |
| 1098 | template <typename ParquetType, typename ArrowType> |
| 1099 | Status WriteArrowSerialize(const arrow::Array& array, int64_t num_levels, |
| 1100 | const int16_t* def_levels, const int16_t* rep_levels, |
| 1101 | ArrowWriteContext* ctx, |
| 1102 | TypedColumnWriter<ParquetType>* writer) { |
| 1103 | using ParquetCType = typename ParquetType::c_type; |
| 1104 | |
| 1105 | ParquetCType* buffer; |
| 1106 | PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer)); |
| 1107 | |
| 1108 | bool no_nulls = |
| 1109 | writer->descr()->schema_node()->is_required() || (array.null_count() == 0); |
| 1110 | |
| 1111 | Status s = SerializeData<ParquetType, ArrowType>(array, ctx, buffer); |
| 1112 | RETURN_NOT_OK(s); |
| 1113 | if (no_nulls) { |
| 1114 | PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); |
| 1115 | } else { |
| 1116 | PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, |
| 1117 | array.null_bitmap_data(), |
| 1118 | array.offset(), buffer)); |
| 1119 | } |
| 1120 | return Status::OK(); |
| 1121 | } |
| 1122 | |
| 1123 | template <typename ParquetType> |
| 1124 | Status WriteArrowZeroCopy(const arrow::Array& array, int64_t num_levels, |
| 1125 | const int16_t* def_levels, const int16_t* rep_levels, |
| 1126 | ArrowWriteContext* ctx, |
| 1127 | TypedColumnWriter<ParquetType>* writer) { |
| 1128 | using T = typename ParquetType::c_type; |
| 1129 | const auto& data = static_cast<const arrow::PrimitiveArray&>(array); |
| 1130 | const T* values = nullptr; |
| 1131 | // The values buffer may be null if the array is empty (ARROW-2744) |
| 1132 | if (data.values() != nullptr) { |
| 1133 | values = reinterpret_cast<const T*>(data.values()->data()) + data.offset(); |
| 1134 | } else { |
| 1135 | DCHECK_EQ(data.length(), 0); |
| 1136 | } |
| 1137 | if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) { |
| 1138 | PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values)); |
| 1139 | } else { |
| 1140 | PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels, |
| 1141 | data.null_bitmap_data(), data.offset(), |
| 1142 | values)); |
| 1143 | } |
| 1144 | return Status::OK(); |
| 1145 | } |
| 1146 | |
| 1147 | #define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \ |
| 1148 | case arrow::Type::ArrowEnum: \ |
| 1149 | return WriteArrowSerialize<ParquetType, arrow::ArrowType>( \ |
| 1150 | array, num_levels, def_levels, rep_levels, ctx, this); |
| 1151 | |
| 1152 | #define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \ |
| 1153 | case arrow::Type::ArrowEnum: \ |
| 1154 | return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \ |
| 1155 | ctx, this); |
| 1156 | |
| 1157 | #define ARROW_UNSUPPORTED() \ |
| 1158 | std::stringstream ss; \ |
| 1159 | ss << "Arrow type " << array.type()->ToString() \ |
| 1160 | << " cannot be written to Parquet type " << descr_->ToString(); \ |
| 1161 | return Status::Invalid(ss.str()); |
| 1162 | |
| 1163 | // ---------------------------------------------------------------------- |
| 1164 | // Write Arrow to BooleanType |
| 1165 | |
| 1166 | template <> |
| 1167 | struct SerializeFunctor<BooleanType, arrow::BooleanType> { |
| 1168 | Status Serialize(const arrow::BooleanArray& data, ArrowWriteContext*, bool* out) { |
| 1169 | for (int i = 0; i < data.length(); i++) { |
| 1170 | *out++ = data.Value(i); |
| 1171 | } |
| 1172 | return Status::OK(); |
| 1173 | } |
| 1174 | }; |
| 1175 | |
| 1176 | template <> |
| 1177 | Status TypedColumnWriterImpl<BooleanType>::WriteArrowDense(const int16_t* def_levels, |
| 1178 | const int16_t* rep_levels, |
| 1179 | int64_t num_levels, |
| 1180 | const arrow::Array& array, |
| 1181 | ArrowWriteContext* ctx) { |
| 1182 | if (array.type_id() != arrow::Type::BOOL) { |
| 1183 | ARROW_UNSUPPORTED(); |
| 1184 | } |
| 1185 | return WriteArrowSerialize<BooleanType, arrow::BooleanType>( |
| 1186 | array, num_levels, def_levels, rep_levels, ctx, this); |
| 1187 | } |
| 1188 | |
| 1189 | // ---------------------------------------------------------------------- |
| 1190 | // Write Arrow types to INT32 |
| 1191 | |
| 1192 | template <> |
| 1193 | struct SerializeFunctor<Int32Type, arrow::Date64Type> { |
| 1194 | Status Serialize(const arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) { |
| 1195 | const int64_t* input = array.raw_values(); |
| 1196 | for (int i = 0; i < array.length(); i++) { |
| 1197 | *out++ = static_cast<int32_t>(*input++ / 86400000); |
| 1198 | } |
| 1199 | return Status::OK(); |
| 1200 | } |
| 1201 | }; |
| 1202 | |
| 1203 | template <> |
| 1204 | struct SerializeFunctor<Int32Type, arrow::Time32Type> { |
| 1205 | Status Serialize(const arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) { |
| 1206 | const int32_t* input = array.raw_values(); |
| 1207 | const auto& type = static_cast<const arrow::Time32Type&>(*array.type()); |
| 1208 | if (type.unit() == arrow::TimeUnit::SECOND) { |
| 1209 | for (int i = 0; i < array.length(); i++) { |
| 1210 | out[i] = input[i] * 1000; |
| 1211 | } |
| 1212 | } else { |
| 1213 | std::copy(input, input + array.length(), out); |
| 1214 | } |
| 1215 | return Status::OK(); |
| 1216 | } |
| 1217 | }; |
| 1218 | |
| 1219 | template <> |
| 1220 | Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(const int16_t* def_levels, |
| 1221 | const int16_t* rep_levels, |
| 1222 | int64_t num_levels, |
| 1223 | const arrow::Array& array, |
| 1224 | ArrowWriteContext* ctx) { |
| 1225 | switch (array.type()->id()) { |
| 1226 | case arrow::Type::NA: { |
| 1227 | PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr)); |
| 1228 | } break; |
| 1229 | WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type) |
| 1230 | WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type) |
| 1231 | WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type) |
| 1232 | WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type) |
| 1233 | WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type) |
| 1234 | WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type) |
| 1235 | WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type) |
| 1236 | WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type) |
| 1237 | WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type) |
| 1238 | default: |
| 1239 | ARROW_UNSUPPORTED() |
| 1240 | } |
| 1241 | return Status::OK(); |
| 1242 | } |
| 1243 | |
| 1244 | // ---------------------------------------------------------------------- |
| 1245 | // Write Arrow to Int64 and Int96 |
| 1246 | |
| 1247 | #define INT96_CONVERT_LOOP(ConversionFunction) \ |
| 1248 | for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]); |
| 1249 | |
| 1250 | template <> |
| 1251 | struct SerializeFunctor<Int96Type, arrow::TimestampType> { |
| 1252 | Status Serialize(const arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) { |
| 1253 | const int64_t* input = array.raw_values(); |
| 1254 | const auto& type = static_cast<const arrow::TimestampType&>(*array.type()); |
| 1255 | switch (type.unit()) { |
| 1256 | case arrow::TimeUnit::NANO: |
| 1257 | INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp); |
| 1258 | break; |
| 1259 | case arrow::TimeUnit::MICRO: |
| 1260 | INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp); |
| 1261 | break; |
| 1262 | case arrow::TimeUnit::MILLI: |
| 1263 | INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp); |
| 1264 | break; |
| 1265 | case arrow::TimeUnit::SECOND: |
| 1266 | INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp); |
| 1267 | break; |
| 1268 | } |
| 1269 | return Status::OK(); |
| 1270 | } |
| 1271 | }; |
| 1272 | |
| 1273 | #define COERCE_DIVIDE -1 |
| 1274 | #define COERCE_INVALID 0 |
| 1275 | #define COERCE_MULTIPLY +1 |
| 1276 | |
| 1277 | static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = { |
| 1278 | // from seconds ... |
| 1279 | {{COERCE_INVALID, 0}, // ... to seconds |
| 1280 | {COERCE_MULTIPLY, 1000}, // ... to millis |
| 1281 | {COERCE_MULTIPLY, 1000000}, // ... to micros |
| 1282 | {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos |
| 1283 | // from millis ... |
| 1284 | {{COERCE_INVALID, 0}, |
| 1285 | {COERCE_MULTIPLY, 1}, |
| 1286 | {COERCE_MULTIPLY, 1000}, |
| 1287 | {COERCE_MULTIPLY, 1000000}}, |
| 1288 | // from micros ... |
| 1289 | {{COERCE_INVALID, 0}, |
| 1290 | {COERCE_DIVIDE, 1000}, |
| 1291 | {COERCE_MULTIPLY, 1}, |
| 1292 | {COERCE_MULTIPLY, 1000}}, |
| 1293 | // from nanos ... |
| 1294 | {{COERCE_INVALID, 0}, |
| 1295 | {COERCE_DIVIDE, 1000000}, |
| 1296 | {COERCE_DIVIDE, 1000}, |
| 1297 | {COERCE_MULTIPLY, 1}}}; |
| 1298 | |
| 1299 | template <> |
| 1300 | struct SerializeFunctor<Int64Type, arrow::TimestampType> { |
| 1301 | Status Serialize(const arrow::TimestampArray& array, ArrowWriteContext* ctx, |
| 1302 | int64_t* out) { |
| 1303 | const auto& source_type = static_cast<const arrow::TimestampType&>(*array.type()); |
| 1304 | auto source_unit = source_type.unit(); |
| 1305 | const int64_t* values = array.raw_values(); |
| 1306 | |
| 1307 | arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit(); |
| 1308 | auto target_type = arrow::timestamp(target_unit); |
| 1309 | bool truncation_allowed = ctx->properties->truncated_timestamps_allowed(); |
| 1310 | |
| 1311 | auto DivideBy = [&](const int64_t factor) { |
| 1312 | for (int64_t i = 0; i < array.length(); i++) { |
| 1313 | if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) { |
| 1314 | return Status::Invalid("Casting from " , source_type.ToString(), " to " , |
| 1315 | target_type->ToString(), |
| 1316 | " would lose data: " , values[i]); |
| 1317 | } |
| 1318 | out[i] = values[i] / factor; |
| 1319 | } |
| 1320 | return Status::OK(); |
| 1321 | }; |
| 1322 | |
| 1323 | auto MultiplyBy = [&](const int64_t factor) { |
| 1324 | for (int64_t i = 0; i < array.length(); i++) { |
| 1325 | out[i] = values[i] * factor; |
| 1326 | } |
| 1327 | return Status::OK(); |
| 1328 | }; |
| 1329 | |
| 1330 | const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)] |
| 1331 | [static_cast<int>(target_unit)]; |
| 1332 | |
| 1333 | // .first -> coercion operation; .second -> scale factor |
| 1334 | DCHECK_NE(coercion.first, COERCE_INVALID); |
| 1335 | return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second) |
| 1336 | : MultiplyBy(coercion.second); |
| 1337 | } |
| 1338 | }; |
| 1339 | |
| 1340 | #undef COERCE_DIVIDE |
| 1341 | #undef COERCE_INVALID |
| 1342 | #undef COERCE_MULTIPLY |
| 1343 | |
| 1344 | Status WriteTimestamps(const arrow::Array& values, int64_t num_levels, |
| 1345 | const int16_t* def_levels, const int16_t* rep_levels, |
| 1346 | ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer) { |
| 1347 | const auto& source_type = static_cast<const arrow::TimestampType&>(*values.type()); |
| 1348 | |
| 1349 | auto WriteCoerce = [&](const ArrowWriterProperties* properties) { |
| 1350 | ArrowWriteContext temp_ctx = *ctx; |
| 1351 | temp_ctx.properties = properties; |
| 1352 | return WriteArrowSerialize<Int64Type, arrow::TimestampType>( |
| 1353 | values, num_levels, def_levels, rep_levels, &temp_ctx, writer); |
| 1354 | }; |
| 1355 | |
| 1356 | if (ctx->properties->coerce_timestamps_enabled()) { |
| 1357 | // User explicitly requested coercion to specific unit |
| 1358 | if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) { |
| 1359 | // No data conversion necessary |
| 1360 | return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, |
| 1361 | ctx, writer); |
| 1362 | } else { |
| 1363 | return WriteCoerce(ctx->properties); |
| 1364 | } |
| 1365 | } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 && |
| 1366 | source_type.unit() == arrow::TimeUnit::NANO) { |
| 1367 | // Absent superseding user instructions, when writing Parquet version 1.0 files, |
| 1368 | // timestamps in nanoseconds are coerced to microseconds |
| 1369 | std::shared_ptr<ArrowWriterProperties> properties = |
| 1370 | (ArrowWriterProperties::Builder()) |
| 1371 | .coerce_timestamps(arrow::TimeUnit::MICRO) |
| 1372 | ->disallow_truncated_timestamps() |
| 1373 | ->build(); |
| 1374 | return WriteCoerce(properties.get()); |
| 1375 | } else if (source_type.unit() == arrow::TimeUnit::SECOND) { |
| 1376 | // Absent superseding user instructions, timestamps in seconds are coerced to |
| 1377 | // milliseconds |
| 1378 | std::shared_ptr<ArrowWriterProperties> properties = |
| 1379 | (ArrowWriterProperties::Builder()) |
| 1380 | .coerce_timestamps(arrow::TimeUnit::MILLI) |
| 1381 | ->build(); |
| 1382 | return WriteCoerce(properties.get()); |
| 1383 | } else { |
| 1384 | // No data conversion necessary |
| 1385 | return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx, |
| 1386 | writer); |
| 1387 | } |
| 1388 | } |
| 1389 | |
| 1390 | template <> |
| 1391 | Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(const int16_t* def_levels, |
| 1392 | const int16_t* rep_levels, |
| 1393 | int64_t num_levels, |
| 1394 | const arrow::Array& array, |
| 1395 | ArrowWriteContext* ctx) { |
| 1396 | switch (array.type()->id()) { |
| 1397 | case arrow::Type::TIMESTAMP: |
| 1398 | return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this); |
| 1399 | WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type) |
| 1400 | WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type) |
| 1401 | WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) |
| 1402 | WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type) |
| 1403 | default: |
| 1404 | ARROW_UNSUPPORTED(); |
| 1405 | } |
| 1406 | } |
| 1407 | |
| 1408 | template <> |
| 1409 | Status TypedColumnWriterImpl<Int96Type>::WriteArrowDense(const int16_t* def_levels, |
| 1410 | const int16_t* rep_levels, |
| 1411 | int64_t num_levels, |
| 1412 | const arrow::Array& array, |
| 1413 | ArrowWriteContext* ctx) { |
| 1414 | if (array.type_id() != arrow::Type::TIMESTAMP) { |
| 1415 | ARROW_UNSUPPORTED(); |
| 1416 | } |
| 1417 | return WriteArrowSerialize<Int96Type, arrow::TimestampType>( |
| 1418 | array, num_levels, def_levels, rep_levels, ctx, this); |
| 1419 | } |
| 1420 | |
| 1421 | // ---------------------------------------------------------------------- |
| 1422 | // Floating point types |
| 1423 | |
| 1424 | template <> |
| 1425 | Status TypedColumnWriterImpl<FloatType>::WriteArrowDense(const int16_t* def_levels, |
| 1426 | const int16_t* rep_levels, |
| 1427 | int64_t num_levels, |
| 1428 | const arrow::Array& array, |
| 1429 | ArrowWriteContext* ctx) { |
| 1430 | if (array.type_id() != arrow::Type::FLOAT) { |
| 1431 | ARROW_UNSUPPORTED(); |
| 1432 | } |
| 1433 | return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx, |
| 1434 | this); |
| 1435 | } |
| 1436 | |
| 1437 | template <> |
| 1438 | Status TypedColumnWriterImpl<DoubleType>::WriteArrowDense(const int16_t* def_levels, |
| 1439 | const int16_t* rep_levels, |
| 1440 | int64_t num_levels, |
| 1441 | const arrow::Array& array, |
| 1442 | ArrowWriteContext* ctx) { |
| 1443 | if (array.type_id() != arrow::Type::DOUBLE) { |
| 1444 | ARROW_UNSUPPORTED(); |
| 1445 | } |
| 1446 | return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx, |
| 1447 | this); |
| 1448 | } |
| 1449 | |
| 1450 | // ---------------------------------------------------------------------- |
| 1451 | // Write Arrow to BYTE_ARRAY |
| 1452 | |
| 1453 | template <> |
| 1454 | Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(const int16_t* def_levels, |
| 1455 | const int16_t* rep_levels, |
| 1456 | int64_t num_levels, |
| 1457 | const arrow::Array& array, |
| 1458 | ArrowWriteContext* ctx) { |
| 1459 | if (array.type()->id() != arrow::Type::BINARY && |
| 1460 | array.type()->id() != arrow::Type::STRING) { |
| 1461 | ARROW_UNSUPPORTED(); |
| 1462 | } |
| 1463 | |
| 1464 | int64_t value_offset = 0; |
| 1465 | auto WriteChunk = [&](int64_t offset, int64_t batch_size) { |
| 1466 | int64_t batch_num_values = 0; |
| 1467 | int64_t batch_num_spaced_values = 0; |
| 1468 | WriteLevelsSpaced(batch_size, def_levels + offset, rep_levels + offset, |
| 1469 | &batch_num_values, &batch_num_spaced_values); |
| 1470 | std::shared_ptr<arrow::Array> data_slice = |
| 1471 | array.Slice(value_offset, batch_num_spaced_values); |
| 1472 | current_encoder_->Put(*data_slice); |
| 1473 | if (page_statistics_ != nullptr) { |
| 1474 | page_statistics_->Update(*data_slice); |
| 1475 | } |
| 1476 | CommitWriteAndCheckPageLimit(batch_size, batch_num_values); |
| 1477 | CheckDictionarySizeLimit(); |
| 1478 | value_offset += batch_num_spaced_values; |
| 1479 | }; |
| 1480 | |
| 1481 | PARQUET_CATCH_NOT_OK( |
| 1482 | DoInBatches(num_levels, properties_->write_batch_size(), WriteChunk)); |
| 1483 | return Status::OK(); |
| 1484 | } |
| 1485 | |
| 1486 | // ---------------------------------------------------------------------- |
| 1487 | // Write Arrow to FIXED_LEN_BYTE_ARRAY |
| 1488 | |
| 1489 | template <typename ParquetType, typename ArrowType> |
| 1490 | struct SerializeFunctor<ParquetType, ArrowType, |
| 1491 | arrow::enable_if_fixed_size_binary<ArrowType>> { |
| 1492 | Status Serialize(const arrow::FixedSizeBinaryArray& array, ArrowWriteContext*, |
| 1493 | FLBA* out) { |
| 1494 | if (array.null_count() == 0) { |
| 1495 | // no nulls, just dump the data |
| 1496 | // todo(advancedxy): use a writeBatch to avoid this step |
| 1497 | for (int64_t i = 0; i < array.length(); i++) { |
| 1498 | out[i] = FixedLenByteArray(array.GetValue(i)); |
| 1499 | } |
| 1500 | } else { |
| 1501 | for (int64_t i = 0; i < array.length(); i++) { |
| 1502 | if (array.IsValid(i)) { |
| 1503 | out[i] = FixedLenByteArray(array.GetValue(i)); |
| 1504 | } |
| 1505 | } |
| 1506 | } |
| 1507 | return Status::OK(); |
| 1508 | } |
| 1509 | }; |
| 1510 | |
| 1511 | template <> |
| 1512 | Status WriteArrowSerialize<FLBAType, arrow::Decimal128Type>( |
| 1513 | const arrow::Array& array, int64_t num_levels, const int16_t* def_levels, |
| 1514 | const int16_t* rep_levels, ArrowWriteContext* ctx, |
| 1515 | TypedColumnWriter<FLBAType>* writer) { |
| 1516 | const auto& data = static_cast<const arrow::Decimal128Array&>(array); |
| 1517 | const int64_t length = data.length(); |
| 1518 | |
| 1519 | FLBA* buffer; |
| 1520 | RETURN_NOT_OK(ctx->GetScratchData<FLBA>(num_levels, &buffer)); |
| 1521 | |
| 1522 | const auto& decimal_type = static_cast<const arrow::Decimal128Type&>(*data.type()); |
| 1523 | const int32_t offset = |
| 1524 | decimal_type.byte_width() - internal::DecimalSize(decimal_type.precision()); |
| 1525 | |
| 1526 | const bool does_not_have_nulls = |
| 1527 | writer->descr()->schema_node()->is_required() || data.null_count() == 0; |
| 1528 | |
| 1529 | const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2; |
| 1530 | std::vector<uint64_t> big_endian_values(valid_value_count); |
| 1531 | |
| 1532 | // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we |
| 1533 | // don't have to keep writing two loops to handle the case where we know there are no |
| 1534 | // nulls |
| 1535 | if (does_not_have_nulls) { |
| 1536 | // no nulls, just dump the data |
| 1537 | // todo(advancedxy): use a writeBatch to avoid this step |
| 1538 | for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { |
| 1539 | auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i)); |
| 1540 | big_endian_values[j] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); |
| 1541 | big_endian_values[j + 1] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); |
| 1542 | buffer[i] = FixedLenByteArray( |
| 1543 | reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset); |
| 1544 | } |
| 1545 | } else { |
| 1546 | for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) { |
| 1547 | if (data.IsValid(i)) { |
| 1548 | auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i)); |
| 1549 | big_endian_values[j] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); |
| 1550 | big_endian_values[j + 1] = arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); |
| 1551 | buffer[buffer_idx++] = FixedLenByteArray( |
| 1552 | reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset); |
| 1553 | j += 2; |
| 1554 | } |
| 1555 | } |
| 1556 | } |
| 1557 | PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer)); |
| 1558 | return Status::OK(); |
| 1559 | } |
| 1560 | |
| 1561 | template <> |
| 1562 | Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(const int16_t* def_levels, |
| 1563 | const int16_t* rep_levels, |
| 1564 | int64_t num_levels, |
| 1565 | const arrow::Array& array, |
| 1566 | ArrowWriteContext* ctx) { |
| 1567 | switch (array.type()->id()) { |
| 1568 | WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) |
| 1569 | WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType) |
| 1570 | default: |
| 1571 | break; |
| 1572 | } |
| 1573 | return Status::OK(); |
| 1574 | } |
| 1575 | |
| 1576 | // ---------------------------------------------------------------------- |
| 1577 | // Dynamic column writer constructor |
| 1578 | |
| 1579 | std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata, |
| 1580 | std::unique_ptr<PageWriter> , |
| 1581 | const WriterProperties* properties) { |
| 1582 | const ColumnDescriptor* descr = metadata->descr(); |
| 1583 | const bool use_dictionary = properties->dictionary_enabled(descr->path()) && |
| 1584 | descr->physical_type() != Type::BOOLEAN; |
| 1585 | Encoding::type encoding = properties->encoding(descr->path()); |
| 1586 | if (use_dictionary) { |
| 1587 | encoding = properties->dictionary_index_encoding(); |
| 1588 | } |
| 1589 | switch (descr->physical_type()) { |
| 1590 | case Type::BOOLEAN: |
| 1591 | return std::make_shared<TypedColumnWriterImpl<BooleanType>>( |
| 1592 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1593 | case Type::INT32: |
| 1594 | return std::make_shared<TypedColumnWriterImpl<Int32Type>>( |
| 1595 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1596 | case Type::INT64: |
| 1597 | return std::make_shared<TypedColumnWriterImpl<Int64Type>>( |
| 1598 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1599 | case Type::INT96: |
| 1600 | return std::make_shared<TypedColumnWriterImpl<Int96Type>>( |
| 1601 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1602 | case Type::FLOAT: |
| 1603 | return std::make_shared<TypedColumnWriterImpl<FloatType>>( |
| 1604 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1605 | case Type::DOUBLE: |
| 1606 | return std::make_shared<TypedColumnWriterImpl<DoubleType>>( |
| 1607 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1608 | case Type::BYTE_ARRAY: |
| 1609 | return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>( |
| 1610 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1611 | case Type::FIXED_LEN_BYTE_ARRAY: |
| 1612 | return std::make_shared<TypedColumnWriterImpl<FLBAType>>( |
| 1613 | metadata, std::move(pager), use_dictionary, encoding, properties); |
| 1614 | default: |
| 1615 | ParquetException::NYI("type reader not implemented" ); |
| 1616 | } |
| 1617 | // Unreachable code, but supress compiler warning |
| 1618 | return std::shared_ptr<ColumnWriter>(nullptr); |
| 1619 | } |
| 1620 | |
| 1621 | } // namespace parquet |
| 1622 | |