| 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | #include "parquet/column_reader.h" |
| 19 | |
| 20 | #include <algorithm> |
| 21 | #include <cstdint> |
| 22 | #include <cstring> |
| 23 | #include <exception> |
| 24 | #include <iostream> |
| 25 | #include <memory> |
| 26 | #include <unordered_map> |
| 27 | #include <utility> |
| 28 | |
| 29 | #include "arrow/array.h" |
| 30 | #include "arrow/builder.h" |
| 31 | #include "arrow/table.h" |
| 32 | #include "arrow/type.h" |
| 33 | #include "arrow/util/bit_stream_utils.h" |
| 34 | #include "arrow/util/checked_cast.h" |
| 35 | #include "arrow/util/compression.h" |
| 36 | #include "arrow/util/logging.h" |
| 37 | #include "arrow/util/rle_encoding.h" |
| 38 | |
| 39 | #include "parquet/column_page.h" |
| 40 | #include "parquet/encoding.h" |
| 41 | #include "parquet/properties.h" |
| 42 | #include "parquet/statistics.h" |
| 43 | #include "parquet/thrift_internal.h" // IWYU pragma: keep |
| 44 | |
| 45 | using arrow::MemoryPool; |
| 46 | using arrow::internal::checked_cast; |
| 47 | |
| 48 | namespace parquet { |
| 49 | |
| 50 | LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} |
| 51 | |
| 52 | LevelDecoder::~LevelDecoder() {} |
| 53 | |
| 54 | int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level, |
| 55 | int num_buffered_values, const uint8_t* data) { |
| 56 | int32_t num_bytes = 0; |
| 57 | encoding_ = encoding; |
| 58 | num_values_remaining_ = num_buffered_values; |
| 59 | bit_width_ = BitUtil::Log2(max_level + 1); |
| 60 | switch (encoding) { |
| 61 | case Encoding::RLE: { |
| 62 | num_bytes = arrow::util::SafeLoadAs<int32_t>(data); |
| 63 | const uint8_t* decoder_data = data + sizeof(int32_t); |
| 64 | if (!rle_decoder_) { |
| 65 | rle_decoder_.reset( |
| 66 | new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_)); |
| 67 | } else { |
| 68 | rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); |
| 69 | } |
| 70 | return static_cast<int>(sizeof(int32_t)) + num_bytes; |
| 71 | } |
| 72 | case Encoding::BIT_PACKED: { |
| 73 | num_bytes = |
| 74 | static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_)); |
| 75 | if (!bit_packed_decoder_) { |
| 76 | bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes)); |
| 77 | } else { |
| 78 | bit_packed_decoder_->Reset(data, num_bytes); |
| 79 | } |
| 80 | return num_bytes; |
| 81 | } |
| 82 | default: |
| 83 | throw ParquetException("Unknown encoding type for levels." ); |
| 84 | } |
| 85 | return -1; |
| 86 | } |
| 87 | |
| 88 | int LevelDecoder::Decode(int batch_size, int16_t* levels) { |
| 89 | int num_decoded = 0; |
| 90 | |
| 91 | int num_values = std::min(num_values_remaining_, batch_size); |
| 92 | if (encoding_ == Encoding::RLE) { |
| 93 | num_decoded = rle_decoder_->GetBatch(levels, num_values); |
| 94 | } else { |
| 95 | num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values); |
| 96 | } |
| 97 | num_values_remaining_ -= num_decoded; |
| 98 | return num_decoded; |
| 99 | } |
| 100 | |
| 101 | ReaderProperties default_reader_properties() { |
| 102 | static ReaderProperties default_reader_properties; |
| 103 | return default_reader_properties; |
| 104 | } |
| 105 | |
| 106 | // ---------------------------------------------------------------------- |
| 107 | // SerializedPageReader deserializes Thrift metadata and pages that have been |
| 108 | // assembled in a serialized stream for storing in a Parquet files |
| 109 | |
| 110 | // This subclass delimits pages appearing in a serialized stream, each preceded |
| 111 | // by a serialized Thrift format::PageHeader indicating the type of each page |
| 112 | // and the page metadata. |
| 113 | class : public PageReader { |
| 114 | public: |
| 115 | (const std::shared_ptr<ArrowInputStream>& stream, |
| 116 | int64_t total_num_rows, Compression::type codec, |
| 117 | ::arrow::MemoryPool* pool) |
| 118 | : stream_(stream), |
| 119 | decompression_buffer_(AllocateBuffer(pool, 0)), |
| 120 | seen_num_rows_(0), |
| 121 | total_num_rows_(total_num_rows) { |
| 122 | max_page_header_size_ = kDefaultMaxPageHeaderSize; |
| 123 | decompressor_ = GetCodec(codec); |
| 124 | } |
| 125 | |
| 126 | // Implement the PageReader interface |
| 127 | std::shared_ptr<Page> NextPage() override; |
| 128 | |
| 129 | void (uint32_t size) override { max_page_header_size_ = size; } |
| 130 | |
| 131 | private: |
| 132 | std::shared_ptr<ArrowInputStream> ; |
| 133 | |
| 134 | format::PageHeader ; |
| 135 | std::shared_ptr<Page> ; |
| 136 | |
| 137 | // Compression codec to use. |
| 138 | std::unique_ptr<::arrow::util::Codec> ; |
| 139 | std::shared_ptr<ResizableBuffer> ; |
| 140 | |
| 141 | // Maximum allowed page size |
| 142 | uint32_t ; |
| 143 | |
| 144 | // Number of rows read in data pages so far |
| 145 | int64_t ; |
| 146 | |
| 147 | // Number of rows in all the data pages |
| 148 | int64_t ; |
| 149 | }; |
| 150 | |
| 151 | std::shared_ptr<Page> SerializedPageReader::() { |
| 152 | // Loop here because there may be unhandled page types that we skip until |
| 153 | // finding a page that we do know what to do with |
| 154 | while (seen_num_rows_ < total_num_rows_) { |
| 155 | uint32_t = 0; |
| 156 | uint32_t allowed_page_size = kDefaultPageHeaderSize; |
| 157 | |
| 158 | // Page headers can be very large because of page statistics |
| 159 | // We try to deserialize a larger buffer progressively |
| 160 | // until a maximum allowed header limit |
| 161 | while (true) { |
| 162 | string_view buffer; |
| 163 | PARQUET_THROW_NOT_OK(stream_->Peek(allowed_page_size, &buffer)); |
| 164 | if (buffer.size() == 0) { |
| 165 | return std::shared_ptr<Page>(nullptr); |
| 166 | } |
| 167 | |
| 168 | // This gets used, then set by DeserializeThriftMsg |
| 169 | header_size = static_cast<uint32_t>(buffer.size()); |
| 170 | try { |
| 171 | DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(buffer.data()), |
| 172 | &header_size, ¤t_page_header_); |
| 173 | break; |
| 174 | } catch (std::exception& e) { |
| 175 | // Failed to deserialize. Double the allowed page header size and try again |
| 176 | std::stringstream ss; |
| 177 | ss << e.what(); |
| 178 | allowed_page_size *= 2; |
| 179 | if (allowed_page_size > max_page_header_size_) { |
| 180 | ss << "Deserializing page header failed.\n" ; |
| 181 | throw ParquetException(ss.str()); |
| 182 | } |
| 183 | } |
| 184 | } |
| 185 | // Advance the stream offset |
| 186 | PARQUET_THROW_NOT_OK(stream_->Advance(header_size)); |
| 187 | |
| 188 | int compressed_len = current_page_header_.compressed_page_size; |
| 189 | int uncompressed_len = current_page_header_.uncompressed_page_size; |
| 190 | |
| 191 | // Read the compressed data page. |
| 192 | std::shared_ptr<Buffer> page_buffer; |
| 193 | PARQUET_THROW_NOT_OK(stream_->Read(compressed_len, &page_buffer)); |
| 194 | if (page_buffer->size() != compressed_len) { |
| 195 | std::stringstream ss; |
| 196 | ss << "Page was smaller (" << page_buffer->size() << ") than expected (" |
| 197 | << compressed_len << ")" ; |
| 198 | ParquetException::EofException(ss.str()); |
| 199 | } |
| 200 | |
| 201 | // Uncompress it if we need to |
| 202 | if (decompressor_ != nullptr) { |
| 203 | // Grow the uncompressed buffer if we need to. |
| 204 | if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) { |
| 205 | PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); |
| 206 | } |
| 207 | PARQUET_THROW_NOT_OK( |
| 208 | decompressor_->Decompress(compressed_len, page_buffer->data(), uncompressed_len, |
| 209 | decompression_buffer_->mutable_data())); |
| 210 | page_buffer = decompression_buffer_; |
| 211 | } |
| 212 | |
| 213 | if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) { |
| 214 | const format::DictionaryPageHeader& = |
| 215 | current_page_header_.dictionary_page_header; |
| 216 | |
| 217 | bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; |
| 218 | |
| 219 | return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values, |
| 220 | FromThrift(dict_header.encoding), |
| 221 | is_sorted); |
| 222 | } else if (current_page_header_.type == format::PageType::DATA_PAGE) { |
| 223 | const format::DataPageHeader& = current_page_header_.data_page_header; |
| 224 | |
| 225 | EncodedStatistics page_statistics; |
| 226 | if (header.__isset.statistics) { |
| 227 | const format::Statistics& stats = header.statistics; |
| 228 | if (stats.__isset.max) { |
| 229 | page_statistics.set_max(stats.max); |
| 230 | } |
| 231 | if (stats.__isset.min) { |
| 232 | page_statistics.set_min(stats.min); |
| 233 | } |
| 234 | if (stats.__isset.null_count) { |
| 235 | page_statistics.set_null_count(stats.null_count); |
| 236 | } |
| 237 | if (stats.__isset.distinct_count) { |
| 238 | page_statistics.set_distinct_count(stats.distinct_count); |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | seen_num_rows_ += header.num_values; |
| 243 | |
| 244 | return std::make_shared<DataPageV1>( |
| 245 | page_buffer, header.num_values, FromThrift(header.encoding), |
| 246 | FromThrift(header.definition_level_encoding), |
| 247 | FromThrift(header.repetition_level_encoding), page_statistics); |
| 248 | } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) { |
| 249 | const format::DataPageHeaderV2& = current_page_header_.data_page_header_v2; |
| 250 | bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; |
| 251 | |
| 252 | seen_num_rows_ += header.num_values; |
| 253 | |
| 254 | return std::make_shared<DataPageV2>( |
| 255 | page_buffer, header.num_values, header.num_nulls, header.num_rows, |
| 256 | FromThrift(header.encoding), header.definition_levels_byte_length, |
| 257 | header.repetition_levels_byte_length, is_compressed); |
| 258 | } else { |
| 259 | // We don't know what this page type is. We're allowed to skip non-data |
| 260 | // pages. |
| 261 | continue; |
| 262 | } |
| 263 | } |
| 264 | return std::shared_ptr<Page>(nullptr); |
| 265 | } |
| 266 | |
| 267 | std::unique_ptr<PageReader> PageReader::( |
| 268 | const std::shared_ptr<ArrowInputStream>& stream, int64_t total_num_rows, |
| 269 | Compression::type codec, ::arrow::MemoryPool* pool) { |
| 270 | return std::unique_ptr<PageReader>( |
| 271 | new SerializedPageReader(stream, total_num_rows, codec, pool)); |
| 272 | } |
| 273 | |
| 274 | // ---------------------------------------------------------------------- |
| 275 | // Impl base class for TypedColumnReader and RecordReader |
| 276 | |
| 277 | // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index |
| 278 | // encoding. |
| 279 | static bool IsDictionaryIndexEncoding(const Encoding::type& e) { |
| 280 | return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; |
| 281 | } |
| 282 | |
| 283 | template <typename DType> |
| 284 | class ColumnReaderImplBase { |
| 285 | public: |
| 286 | using T = typename DType::c_type; |
| 287 | |
| 288 | ColumnReaderImplBase(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
| 289 | : descr_(descr), |
| 290 | max_def_level_(descr->max_definition_level()), |
| 291 | max_rep_level_(descr->max_repetition_level()), |
| 292 | num_buffered_values_(0), |
| 293 | num_decoded_values_(0), |
| 294 | pool_(pool), |
| 295 | current_decoder_(nullptr), |
| 296 | current_encoding_(Encoding::UNKNOWN) {} |
| 297 | |
| 298 | virtual ~ColumnReaderImplBase() = default; |
| 299 | |
| 300 | protected: |
| 301 | // Read up to batch_size values from the current data page into the |
| 302 | // pre-allocated memory T* |
| 303 | // |
| 304 | // @returns: the number of values read into the out buffer |
| 305 | int64_t ReadValues(int64_t batch_size, T* out) { |
| 306 | int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size)); |
| 307 | return num_decoded; |
| 308 | } |
| 309 | |
| 310 | // Read up to batch_size values from the current data page into the |
| 311 | // pre-allocated memory T*, leaving spaces for null entries according |
| 312 | // to the def_levels. |
| 313 | // |
| 314 | // @returns: the number of values read into the out buffer |
| 315 | int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, |
| 316 | uint8_t* valid_bits, int64_t valid_bits_offset) { |
| 317 | return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), |
| 318 | static_cast<int>(null_count), valid_bits, |
| 319 | valid_bits_offset); |
| 320 | } |
| 321 | |
| 322 | // Read multiple definition levels into preallocated memory |
| 323 | // |
| 324 | // Returns the number of decoded definition levels |
| 325 | int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { |
| 326 | if (max_def_level_ == 0) { |
| 327 | return 0; |
| 328 | } |
| 329 | return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
| 330 | } |
| 331 | |
| 332 | bool HasNextInternal() { |
| 333 | // Either there is no data page available yet, or the data page has been |
| 334 | // exhausted |
| 335 | if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { |
| 336 | if (!ReadNewPage() || num_buffered_values_ == 0) { |
| 337 | return false; |
| 338 | } |
| 339 | } |
| 340 | return true; |
| 341 | } |
| 342 | |
| 343 | // Read multiple repetition levels into preallocated memory |
| 344 | // Returns the number of decoded repetition levels |
| 345 | int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { |
| 346 | if (max_rep_level_ == 0) { |
| 347 | return 0; |
| 348 | } |
| 349 | return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels); |
| 350 | } |
| 351 | |
| 352 | // Advance to the next data page |
| 353 | bool ReadNewPage() { |
| 354 | // Loop until we find the next data page. |
| 355 | while (true) { |
| 356 | current_page_ = pager_->NextPage(); |
| 357 | if (!current_page_) { |
| 358 | // EOS |
| 359 | return false; |
| 360 | } |
| 361 | |
| 362 | if (current_page_->type() == PageType::DICTIONARY_PAGE) { |
| 363 | ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get())); |
| 364 | continue; |
| 365 | } else if (current_page_->type() == PageType::DATA_PAGE) { |
| 366 | const auto page = std::static_pointer_cast<DataPageV1>(current_page_); |
| 367 | const int64_t levels_byte_size = InitializeLevelDecoders( |
| 368 | *page, page->repetition_level_encoding(), page->definition_level_encoding()); |
| 369 | InitializeDataDecoder(*page, levels_byte_size); |
| 370 | return true; |
| 371 | } else if (current_page_->type() == PageType::DATA_PAGE_V2) { |
| 372 | const auto page = std::static_pointer_cast<DataPageV2>(current_page_); |
| 373 | // Repetition and definition levels are always encoded using RLE encoding |
| 374 | // in the DataPageV2 format. |
| 375 | const int64_t levels_byte_size = |
| 376 | InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE); |
| 377 | InitializeDataDecoder(*page, levels_byte_size); |
| 378 | return true; |
| 379 | } else { |
| 380 | // We don't know what this page type is. We're allowed to skip non-data |
| 381 | // pages. |
| 382 | continue; |
| 383 | } |
| 384 | } |
| 385 | return true; |
| 386 | } |
| 387 | |
| 388 | void ConfigureDictionary(const DictionaryPage* page) { |
| 389 | int encoding = static_cast<int>(page->encoding()); |
| 390 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
| 391 | page->encoding() == Encoding::PLAIN) { |
| 392 | encoding = static_cast<int>(Encoding::RLE_DICTIONARY); |
| 393 | } |
| 394 | |
| 395 | auto it = decoders_.find(encoding); |
| 396 | if (it != decoders_.end()) { |
| 397 | throw ParquetException("Column cannot have more than one dictionary." ); |
| 398 | } |
| 399 | |
| 400 | if (page->encoding() == Encoding::PLAIN_DICTIONARY || |
| 401 | page->encoding() == Encoding::PLAIN) { |
| 402 | auto dictionary = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_); |
| 403 | dictionary->SetData(page->num_values(), page->data(), page->size()); |
| 404 | |
| 405 | // The dictionary is fully decoded during DictionaryDecoder::Init, so the |
| 406 | // DictionaryPage buffer is no longer required after this step |
| 407 | // |
| 408 | // TODO(wesm): investigate whether this all-or-nothing decoding of the |
| 409 | // dictionary makes sense and whether performance can be improved |
| 410 | |
| 411 | std::unique_ptr<DictDecoder<DType>> decoder = MakeDictDecoder<DType>(descr_, pool_); |
| 412 | decoder->SetDict(dictionary.get()); |
| 413 | decoders_[encoding] = |
| 414 | std::unique_ptr<DecoderType>(dynamic_cast<DecoderType*>(decoder.release())); |
| 415 | } else { |
| 416 | ParquetException::NYI("only plain dictionary encoding has been implemented" ); |
| 417 | } |
| 418 | |
| 419 | new_dictionary_ = true; |
| 420 | current_decoder_ = decoders_[encoding].get(); |
| 421 | DCHECK(current_decoder_); |
| 422 | } |
| 423 | |
| 424 | // Initialize repetition and definition level decoders on the next data page. |
| 425 | |
| 426 | // If the data page includes repetition and definition levels, we |
| 427 | // initialize the level decoders and return the number of encoded level bytes. |
| 428 | // The return value helps determine the number of bytes in the encoded data. |
| 429 | int64_t InitializeLevelDecoders(const DataPage& page, |
| 430 | Encoding::type repetition_level_encoding, |
| 431 | Encoding::type definition_level_encoding) { |
| 432 | // Read a data page. |
| 433 | num_buffered_values_ = page.num_values(); |
| 434 | |
| 435 | // Have not decoded any values from the data page yet |
| 436 | num_decoded_values_ = 0; |
| 437 | |
| 438 | const uint8_t* buffer = page.data(); |
| 439 | int64_t levels_byte_size = 0; |
| 440 | |
| 441 | // Data page Layout: Repetition Levels - Definition Levels - encoded values. |
| 442 | // Levels are encoded as rle or bit-packed. |
| 443 | // Init repetition levels |
| 444 | if (max_rep_level_ > 0) { |
| 445 | int64_t rep_levels_bytes = repetition_level_decoder_.SetData( |
| 446 | repetition_level_encoding, max_rep_level_, |
| 447 | static_cast<int>(num_buffered_values_), buffer); |
| 448 | buffer += rep_levels_bytes; |
| 449 | levels_byte_size += rep_levels_bytes; |
| 450 | } |
| 451 | // TODO figure a way to set max_def_level_ to 0 |
| 452 | // if the initial value is invalid |
| 453 | |
| 454 | // Init definition levels |
| 455 | if (max_def_level_ > 0) { |
| 456 | int64_t def_levels_bytes = definition_level_decoder_.SetData( |
| 457 | definition_level_encoding, max_def_level_, |
| 458 | static_cast<int>(num_buffered_values_), buffer); |
| 459 | levels_byte_size += def_levels_bytes; |
| 460 | } |
| 461 | |
| 462 | return levels_byte_size; |
| 463 | } |
| 464 | |
| 465 | // Get a decoder object for this page or create a new decoder if this is the |
| 466 | // first page with this encoding. |
| 467 | void InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) { |
| 468 | const uint8_t* buffer = page.data() + levels_byte_size; |
| 469 | const int64_t data_size = page.size() - levels_byte_size; |
| 470 | |
| 471 | Encoding::type encoding = page.encoding(); |
| 472 | |
| 473 | if (IsDictionaryIndexEncoding(encoding)) { |
| 474 | encoding = Encoding::RLE_DICTIONARY; |
| 475 | } |
| 476 | |
| 477 | auto it = decoders_.find(static_cast<int>(encoding)); |
| 478 | if (it != decoders_.end()) { |
| 479 | DCHECK(it->second.get() != nullptr); |
| 480 | if (encoding == Encoding::RLE_DICTIONARY) { |
| 481 | DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); |
| 482 | } |
| 483 | current_decoder_ = it->second.get(); |
| 484 | } else { |
| 485 | switch (encoding) { |
| 486 | case Encoding::PLAIN: { |
| 487 | auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_); |
| 488 | current_decoder_ = decoder.get(); |
| 489 | decoders_[static_cast<int>(encoding)] = std::move(decoder); |
| 490 | break; |
| 491 | } |
| 492 | case Encoding::RLE_DICTIONARY: |
| 493 | throw ParquetException("Dictionary page must be before data page." ); |
| 494 | |
| 495 | case Encoding::DELTA_BINARY_PACKED: |
| 496 | case Encoding::DELTA_LENGTH_BYTE_ARRAY: |
| 497 | case Encoding::DELTA_BYTE_ARRAY: |
| 498 | ParquetException::NYI("Unsupported encoding" ); |
| 499 | |
| 500 | default: |
| 501 | throw ParquetException("Unknown encoding type." ); |
| 502 | } |
| 503 | } |
| 504 | current_encoding_ = encoding; |
| 505 | current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer, |
| 506 | static_cast<int>(data_size)); |
| 507 | } |
| 508 | |
| 509 | const ColumnDescriptor* descr_; |
| 510 | const int16_t max_def_level_; |
| 511 | const int16_t max_rep_level_; |
| 512 | |
| 513 | std::unique_ptr<PageReader> pager_; |
| 514 | std::shared_ptr<Page> current_page_; |
| 515 | |
| 516 | // Not set if full schema for this field has no optional or repeated elements |
| 517 | LevelDecoder definition_level_decoder_; |
| 518 | |
| 519 | // Not set for flat schemas. |
| 520 | LevelDecoder repetition_level_decoder_; |
| 521 | |
| 522 | // The total number of values stored in the data page. This is the maximum of |
| 523 | // the number of encoded definition levels or encoded values. For |
| 524 | // non-repeated, required columns, this is equal to the number of encoded |
| 525 | // values. For repeated or optional values, there may be fewer data values |
| 526 | // than levels, and this tells you how many encoded levels there are in that |
| 527 | // case. |
| 528 | int64_t num_buffered_values_; |
| 529 | |
| 530 | // The number of values from the current data page that have been decoded |
| 531 | // into memory |
| 532 | int64_t num_decoded_values_; |
| 533 | |
| 534 | ::arrow::MemoryPool* pool_; |
| 535 | |
| 536 | using DecoderType = typename EncodingTraits<DType>::Decoder; |
| 537 | DecoderType* current_decoder_; |
| 538 | Encoding::type current_encoding_; |
| 539 | |
| 540 | /// Flag to signal when a new dictionary has been set, for the benefit of |
| 541 | /// DictionaryRecordReader |
| 542 | bool new_dictionary_; |
| 543 | |
| 544 | // Map of encoding type to the respective decoder object. For example, a |
| 545 | // column chunk's data pages may include both dictionary-encoded and |
| 546 | // plain-encoded data. |
| 547 | std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_; |
| 548 | |
| 549 | void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } |
| 550 | }; |
| 551 | |
| 552 | // ---------------------------------------------------------------------- |
| 553 | // TypedColumnReader implementations |
| 554 | |
| 555 | template <typename DType> |
| 556 | class TypedColumnReaderImpl : public TypedColumnReader<DType>, |
| 557 | public ColumnReaderImplBase<DType> { |
| 558 | public: |
| 559 | using T = typename DType::c_type; |
| 560 | |
| 561 | TypedColumnReaderImpl(const ColumnDescriptor* descr, std::unique_ptr<PageReader> , |
| 562 | ::arrow::MemoryPool* pool) |
| 563 | : ColumnReaderImplBase<DType>(descr, pool) { |
| 564 | this->pager_ = std::move(pager); |
| 565 | } |
| 566 | |
| 567 | bool HasNext() override { return this->HasNextInternal(); } |
| 568 | |
| 569 | int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
| 570 | T* values, int64_t* values_read) override; |
| 571 | |
| 572 | int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
| 573 | T* values, uint8_t* valid_bits, int64_t valid_bits_offset, |
| 574 | int64_t* levels_read, int64_t* values_read, |
| 575 | int64_t* null_count) override; |
| 576 | |
| 577 | int64_t Skip(int64_t num_rows_to_skip) override; |
| 578 | |
| 579 | Type::type type() const override { return this->descr_->physical_type(); } |
| 580 | |
| 581 | const ColumnDescriptor* descr() const override { return this->descr_; } |
| 582 | }; |
| 583 | |
| 584 | template <typename DType> |
| 585 | int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels, |
| 586 | int16_t* rep_levels, T* values, |
| 587 | int64_t* values_read) { |
| 588 | // HasNext invokes ReadNewPage |
| 589 | if (!HasNext()) { |
| 590 | *values_read = 0; |
| 591 | return 0; |
| 592 | } |
| 593 | |
| 594 | // TODO(wesm): keep reading data pages until batch_size is reached, or the |
| 595 | // row group is finished |
| 596 | batch_size = |
| 597 | std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); |
| 598 | |
| 599 | int64_t num_def_levels = 0; |
| 600 | int64_t num_rep_levels = 0; |
| 601 | |
| 602 | int64_t values_to_read = 0; |
| 603 | |
| 604 | // If the field is required and non-repeated, there are no definition levels |
| 605 | if (this->max_def_level_ > 0 && def_levels) { |
| 606 | num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); |
| 607 | // TODO(wesm): this tallying of values-to-decode can be performed with better |
| 608 | // cache-efficiency if fused with the level decoding. |
| 609 | for (int64_t i = 0; i < num_def_levels; ++i) { |
| 610 | if (def_levels[i] == this->max_def_level_) { |
| 611 | ++values_to_read; |
| 612 | } |
| 613 | } |
| 614 | } else { |
| 615 | // Required field, read all values |
| 616 | values_to_read = batch_size; |
| 617 | } |
| 618 | |
| 619 | // Not present for non-repeated fields |
| 620 | if (this->max_rep_level_ > 0 && rep_levels) { |
| 621 | num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); |
| 622 | if (def_levels && num_def_levels != num_rep_levels) { |
| 623 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
| 624 | } |
| 625 | } |
| 626 | |
| 627 | *values_read = this->ReadValues(values_to_read, values); |
| 628 | int64_t total_values = std::max(num_def_levels, *values_read); |
| 629 | this->ConsumeBufferedValues(total_values); |
| 630 | |
| 631 | return total_values; |
| 632 | } |
| 633 | |
| 634 | template <typename DType> |
| 635 | int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced( |
| 636 | int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, |
| 637 | uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, |
| 638 | int64_t* values_read, int64_t* null_count_out) { |
| 639 | // HasNext invokes ReadNewPage |
| 640 | if (!HasNext()) { |
| 641 | *levels_read = 0; |
| 642 | *values_read = 0; |
| 643 | *null_count_out = 0; |
| 644 | return 0; |
| 645 | } |
| 646 | |
| 647 | int64_t total_values; |
| 648 | // TODO(wesm): keep reading data pages until batch_size is reached, or the |
| 649 | // row group is finished |
| 650 | batch_size = |
| 651 | std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); |
| 652 | |
| 653 | // If the field is required and non-repeated, there are no definition levels |
| 654 | if (this->max_def_level_ > 0) { |
| 655 | int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); |
| 656 | |
| 657 | // Not present for non-repeated fields |
| 658 | if (this->max_rep_level_ > 0) { |
| 659 | int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); |
| 660 | if (num_def_levels != num_rep_levels) { |
| 661 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
| 662 | } |
| 663 | } |
| 664 | |
| 665 | const bool has_spaced_values = internal::HasSpacedValues(this->descr_); |
| 666 | |
| 667 | int64_t null_count = 0; |
| 668 | if (!has_spaced_values) { |
| 669 | int values_to_read = 0; |
| 670 | for (int64_t i = 0; i < num_def_levels; ++i) { |
| 671 | if (def_levels[i] == this->max_def_level_) { |
| 672 | ++values_to_read; |
| 673 | } |
| 674 | } |
| 675 | total_values = this->ReadValues(values_to_read, values); |
| 676 | for (int64_t i = 0; i < total_values; i++) { |
| 677 | ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); |
| 678 | } |
| 679 | *values_read = total_values; |
| 680 | } else { |
| 681 | internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, this->max_def_level_, |
| 682 | this->max_rep_level_, values_read, &null_count, |
| 683 | valid_bits, valid_bits_offset); |
| 684 | total_values = |
| 685 | this->ReadValuesSpaced(*values_read, values, static_cast<int>(null_count), |
| 686 | valid_bits, valid_bits_offset); |
| 687 | } |
| 688 | *levels_read = num_def_levels; |
| 689 | *null_count_out = null_count; |
| 690 | |
| 691 | } else { |
| 692 | // Required field, read all values |
| 693 | total_values = this->ReadValues(batch_size, values); |
| 694 | for (int64_t i = 0; i < total_values; i++) { |
| 695 | ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); |
| 696 | } |
| 697 | *null_count_out = 0; |
| 698 | *levels_read = total_values; |
| 699 | } |
| 700 | |
| 701 | this->ConsumeBufferedValues(*levels_read); |
| 702 | return total_values; |
| 703 | } |
| 704 | |
| 705 | template <typename DType> |
| 706 | int64_t TypedColumnReaderImpl<DType>::Skip(int64_t num_rows_to_skip) { |
| 707 | int64_t rows_to_skip = num_rows_to_skip; |
| 708 | while (HasNext() && rows_to_skip > 0) { |
| 709 | // If the number of rows to skip is more than the number of undecoded values, skip the |
| 710 | // Page. |
| 711 | if (rows_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) { |
| 712 | rows_to_skip -= this->num_buffered_values_ - this->num_decoded_values_; |
| 713 | this->num_decoded_values_ = this->num_buffered_values_; |
| 714 | } else { |
| 715 | // We need to read this Page |
| 716 | // Jump to the right offset in the Page |
| 717 | int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint |
| 718 | int64_t values_read = 0; |
| 719 | |
| 720 | // This will be enough scratch space to accommodate 16-bit levels or any |
| 721 | // value type |
| 722 | std::shared_ptr<ResizableBuffer> scratch = AllocateBuffer( |
| 723 | this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size); |
| 724 | |
| 725 | do { |
| 726 | batch_size = std::min(batch_size, rows_to_skip); |
| 727 | values_read = |
| 728 | ReadBatch(static_cast<int>(batch_size), |
| 729 | reinterpret_cast<int16_t*>(scratch->mutable_data()), |
| 730 | reinterpret_cast<int16_t*>(scratch->mutable_data()), |
| 731 | reinterpret_cast<T*>(scratch->mutable_data()), &values_read); |
| 732 | rows_to_skip -= values_read; |
| 733 | } while (values_read > 0 && rows_to_skip > 0); |
| 734 | } |
| 735 | } |
| 736 | return num_rows_to_skip - rows_to_skip; |
| 737 | } |
| 738 | |
| 739 | // ---------------------------------------------------------------------- |
| 740 | // Dynamic column reader constructor |
| 741 | |
| 742 | std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr, |
| 743 | std::unique_ptr<PageReader> , |
| 744 | MemoryPool* pool) { |
| 745 | switch (descr->physical_type()) { |
| 746 | case Type::BOOLEAN: |
| 747 | return std::make_shared<TypedColumnReaderImpl<BooleanType>>(descr, std::move(pager), |
| 748 | pool); |
| 749 | case Type::INT32: |
| 750 | return std::make_shared<TypedColumnReaderImpl<Int32Type>>(descr, std::move(pager), |
| 751 | pool); |
| 752 | case Type::INT64: |
| 753 | return std::make_shared<TypedColumnReaderImpl<Int64Type>>(descr, std::move(pager), |
| 754 | pool); |
| 755 | case Type::INT96: |
| 756 | return std::make_shared<TypedColumnReaderImpl<Int96Type>>(descr, std::move(pager), |
| 757 | pool); |
| 758 | case Type::FLOAT: |
| 759 | return std::make_shared<TypedColumnReaderImpl<FloatType>>(descr, std::move(pager), |
| 760 | pool); |
| 761 | case Type::DOUBLE: |
| 762 | return std::make_shared<TypedColumnReaderImpl<DoubleType>>(descr, std::move(pager), |
| 763 | pool); |
| 764 | case Type::BYTE_ARRAY: |
| 765 | return std::make_shared<TypedColumnReaderImpl<ByteArrayType>>( |
| 766 | descr, std::move(pager), pool); |
| 767 | case Type::FIXED_LEN_BYTE_ARRAY: |
| 768 | return std::make_shared<TypedColumnReaderImpl<FLBAType>>(descr, std::move(pager), |
| 769 | pool); |
| 770 | default: |
| 771 | ParquetException::NYI("type reader not implemented" ); |
| 772 | } |
| 773 | // Unreachable code, but supress compiler warning |
| 774 | return std::shared_ptr<ColumnReader>(nullptr); |
| 775 | } |
| 776 | |
| 777 | // ---------------------------------------------------------------------- |
| 778 | // RecordReader |
| 779 | |
| 780 | namespace internal { |
| 781 | |
| 782 | // The minimum number of repetition/definition levels to decode at a time, for |
| 783 | // better vectorized performance when doing many smaller record reads |
| 784 | constexpr int64_t kMinLevelBatchSize = 1024; |
| 785 | |
| 786 | template <typename DType> |
| 787 | class TypedRecordReader : public ColumnReaderImplBase<DType>, |
| 788 | virtual public RecordReader { |
| 789 | public: |
| 790 | using T = typename DType::c_type; |
| 791 | using BASE = ColumnReaderImplBase<DType>; |
| 792 | TypedRecordReader(const ColumnDescriptor* descr, MemoryPool* pool) : BASE(descr, pool) { |
| 793 | nullable_values_ = internal::HasSpacedValues(descr); |
| 794 | at_record_start_ = true; |
| 795 | records_read_ = 0; |
| 796 | values_written_ = 0; |
| 797 | values_capacity_ = 0; |
| 798 | null_count_ = 0; |
| 799 | levels_written_ = 0; |
| 800 | levels_position_ = 0; |
| 801 | levels_capacity_ = 0; |
| 802 | uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY); |
| 803 | |
| 804 | if (uses_values_) { |
| 805 | values_ = AllocateBuffer(pool); |
| 806 | } |
| 807 | valid_bits_ = AllocateBuffer(pool); |
| 808 | def_levels_ = AllocateBuffer(pool); |
| 809 | rep_levels_ = AllocateBuffer(pool); |
| 810 | Reset(); |
| 811 | } |
| 812 | |
| 813 | int64_t available_values_current_page() const { |
| 814 | return this->num_buffered_values_ - this->num_decoded_values_; |
| 815 | } |
| 816 | |
| 817 | int64_t ReadRecords(int64_t num_records) override { |
| 818 | // Delimit records, then read values at the end |
| 819 | int64_t records_read = 0; |
| 820 | |
| 821 | if (levels_position_ < levels_written_) { |
| 822 | records_read += ReadRecordData(num_records); |
| 823 | } |
| 824 | |
| 825 | int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); |
| 826 | |
| 827 | // If we are in the middle of a record, we continue until reaching the |
| 828 | // desired number of records or the end of the current record if we've found |
| 829 | // enough records |
| 830 | while (!at_record_start_ || records_read < num_records) { |
| 831 | // Is there more data to read in this row group? |
| 832 | if (!this->HasNextInternal()) { |
| 833 | if (!at_record_start_) { |
| 834 | // We ended the row group while inside a record that we haven't seen |
| 835 | // the end of yet. So increment the record count for the last record in |
| 836 | // the row group |
| 837 | ++records_read; |
| 838 | at_record_start_ = true; |
| 839 | } |
| 840 | break; |
| 841 | } |
| 842 | |
| 843 | /// We perform multiple batch reads until we either exhaust the row group |
| 844 | /// or observe the desired number of records |
| 845 | int64_t batch_size = std::min(level_batch_size, available_values_current_page()); |
| 846 | |
| 847 | // No more data in column |
| 848 | if (batch_size == 0) { |
| 849 | break; |
| 850 | } |
| 851 | |
| 852 | if (this->max_def_level_ > 0) { |
| 853 | ReserveLevels(batch_size); |
| 854 | |
| 855 | int16_t* def_levels = this->def_levels() + levels_written_; |
| 856 | int16_t* rep_levels = this->rep_levels() + levels_written_; |
| 857 | |
| 858 | // Not present for non-repeated fields |
| 859 | int64_t levels_read = 0; |
| 860 | if (this->max_rep_level_ > 0) { |
| 861 | levels_read = this->ReadDefinitionLevels(batch_size, def_levels); |
| 862 | if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { |
| 863 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
| 864 | } |
| 865 | } else if (this->max_def_level_ > 0) { |
| 866 | levels_read = this->ReadDefinitionLevels(batch_size, def_levels); |
| 867 | } |
| 868 | |
| 869 | // Exhausted column chunk |
| 870 | if (levels_read == 0) { |
| 871 | break; |
| 872 | } |
| 873 | |
| 874 | levels_written_ += levels_read; |
| 875 | records_read += ReadRecordData(num_records - records_read); |
| 876 | } else { |
| 877 | // No repetition or definition levels |
| 878 | batch_size = std::min(num_records - records_read, batch_size); |
| 879 | records_read += ReadRecordData(batch_size); |
| 880 | } |
| 881 | } |
| 882 | |
| 883 | return records_read; |
| 884 | } |
| 885 | |
| 886 | // We may outwardly have the appearance of having exhausted a column chunk |
| 887 | // when in fact we are in the middle of processing the last batch |
| 888 | bool has_values_to_process() const { return levels_position_ < levels_written_; } |
| 889 | |
| 890 | std::shared_ptr<ResizableBuffer> ReleaseValues() override { |
| 891 | if (uses_values_) { |
| 892 | auto result = values_; |
| 893 | values_ = AllocateBuffer(this->pool_); |
| 894 | return result; |
| 895 | } else { |
| 896 | return nullptr; |
| 897 | } |
| 898 | } |
| 899 | |
| 900 | std::shared_ptr<ResizableBuffer> ReleaseIsValid() override { |
| 901 | if (nullable_values_) { |
| 902 | auto result = valid_bits_; |
| 903 | valid_bits_ = AllocateBuffer(this->pool_); |
| 904 | return result; |
| 905 | } else { |
| 906 | return nullptr; |
| 907 | } |
| 908 | } |
| 909 | |
| 910 | // Process written repetition/definition levels to reach the end of |
| 911 | // records. Process no more levels than necessary to delimit the indicated |
| 912 | // number of logical records. Updates internal state of RecordReader |
| 913 | // |
| 914 | // \return Number of records delimited |
| 915 | int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { |
| 916 | int64_t values_to_read = 0; |
| 917 | int64_t records_read = 0; |
| 918 | |
| 919 | const int16_t* def_levels = this->def_levels() + levels_position_; |
| 920 | const int16_t* rep_levels = this->rep_levels() + levels_position_; |
| 921 | |
| 922 | DCHECK_GT(this->max_rep_level_, 0); |
| 923 | |
| 924 | // Count logical records and number of values to read |
| 925 | while (levels_position_ < levels_written_) { |
| 926 | if (*rep_levels++ == 0) { |
| 927 | // If at_record_start_ is true, we are seeing the start of a record |
| 928 | // for the second time, such as after repeated calls to |
| 929 | // DelimitRecords. In this case we must continue until we find |
| 930 | // another record start or exhausting the ColumnChunk |
| 931 | if (!at_record_start_) { |
| 932 | // We've reached the end of a record; increment the record count. |
| 933 | ++records_read; |
| 934 | if (records_read == num_records) { |
| 935 | // We've found the number of records we were looking for. Set |
| 936 | // at_record_start_ to true and break |
| 937 | at_record_start_ = true; |
| 938 | break; |
| 939 | } |
| 940 | } |
| 941 | } |
| 942 | |
| 943 | // We have decided to consume the level at this position; therefore we |
| 944 | // must advance until we find another record boundary |
| 945 | at_record_start_ = false; |
| 946 | |
| 947 | if (*def_levels++ == this->max_def_level_) { |
| 948 | ++values_to_read; |
| 949 | } |
| 950 | ++levels_position_; |
| 951 | } |
| 952 | *values_seen = values_to_read; |
| 953 | return records_read; |
| 954 | } |
| 955 | |
| 956 | void Reserve(int64_t capacity) override { |
| 957 | ReserveLevels(capacity); |
| 958 | ReserveValues(capacity); |
| 959 | } |
| 960 | |
| 961 | void ReserveLevels(int64_t capacity) { |
| 962 | if (this->max_def_level_ > 0 && (levels_written_ + capacity > levels_capacity_)) { |
| 963 | int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1); |
| 964 | while (levels_written_ + capacity > new_levels_capacity) { |
| 965 | new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1); |
| 966 | } |
| 967 | PARQUET_THROW_NOT_OK( |
| 968 | def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); |
| 969 | if (this->max_rep_level_ > 0) { |
| 970 | PARQUET_THROW_NOT_OK( |
| 971 | rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false)); |
| 972 | } |
| 973 | levels_capacity_ = new_levels_capacity; |
| 974 | } |
| 975 | } |
| 976 | |
| 977 | void ReserveValues(int64_t capacity) { |
| 978 | if (values_written_ + capacity > values_capacity_) { |
| 979 | int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1); |
| 980 | while (values_written_ + capacity > new_values_capacity) { |
| 981 | new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1); |
| 982 | } |
| 983 | |
| 984 | int type_size = GetTypeByteSize(this->descr_->physical_type()); |
| 985 | |
| 986 | // XXX(wesm): A hack to avoid memory allocation when reading directly |
| 987 | // into builder classes |
| 988 | if (uses_values_) { |
| 989 | PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false)); |
| 990 | } |
| 991 | |
| 992 | values_capacity_ = new_values_capacity; |
| 993 | } |
| 994 | if (nullable_values_) { |
| 995 | int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); |
| 996 | if (valid_bits_->size() < valid_bytes_new) { |
| 997 | int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); |
| 998 | PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); |
| 999 | |
| 1000 | // Avoid valgrind warnings |
| 1001 | memset(valid_bits_->mutable_data() + valid_bytes_old, 0, |
| 1002 | valid_bytes_new - valid_bytes_old); |
| 1003 | } |
| 1004 | } |
| 1005 | } |
| 1006 | |
| 1007 | void Reset() override { |
| 1008 | ResetValues(); |
| 1009 | |
| 1010 | if (levels_written_ > 0) { |
| 1011 | const int64_t levels_remaining = levels_written_ - levels_position_; |
| 1012 | // Shift remaining levels to beginning of buffer and trim to only the number |
| 1013 | // of decoded levels remaining |
| 1014 | int16_t* def_data = def_levels(); |
| 1015 | int16_t* rep_data = rep_levels(); |
| 1016 | |
| 1017 | std::copy(def_data + levels_position_, def_data + levels_written_, def_data); |
| 1018 | PARQUET_THROW_NOT_OK( |
| 1019 | def_levels_->Resize(levels_remaining * sizeof(int16_t), false)); |
| 1020 | |
| 1021 | if (this->max_rep_level_ > 0) { |
| 1022 | std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data); |
| 1023 | PARQUET_THROW_NOT_OK( |
| 1024 | rep_levels_->Resize(levels_remaining * sizeof(int16_t), false)); |
| 1025 | } |
| 1026 | |
| 1027 | levels_written_ -= levels_position_; |
| 1028 | levels_position_ = 0; |
| 1029 | levels_capacity_ = levels_remaining; |
| 1030 | } |
| 1031 | |
| 1032 | records_read_ = 0; |
| 1033 | |
| 1034 | // Call Finish on the binary builders to reset them |
| 1035 | } |
| 1036 | |
| 1037 | void (std::unique_ptr<PageReader> reader) override { |
| 1038 | at_record_start_ = true; |
| 1039 | this->pager_ = std::move(reader); |
| 1040 | ResetDecoders(); |
| 1041 | } |
| 1042 | |
| 1043 | bool HasMoreData() const override { return this->pager_ != nullptr; } |
| 1044 | |
| 1045 | // Dictionary decoders must be reset when advancing row groups |
| 1046 | void ResetDecoders() { this->decoders_.clear(); } |
| 1047 | |
| 1048 | virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { |
| 1049 | uint8_t* valid_bits = valid_bits_->mutable_data(); |
| 1050 | const int64_t valid_bits_offset = values_written_; |
| 1051 | |
| 1052 | int64_t num_decoded = this->current_decoder_->DecodeSpaced( |
| 1053 | ValuesHead<T>(), static_cast<int>(values_with_nulls), |
| 1054 | static_cast<int>(null_count), valid_bits, valid_bits_offset); |
| 1055 | DCHECK_EQ(num_decoded, values_with_nulls); |
| 1056 | } |
| 1057 | |
| 1058 | virtual void ReadValuesDense(int64_t values_to_read) { |
| 1059 | int64_t num_decoded = |
| 1060 | this->current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read)); |
| 1061 | DCHECK_EQ(num_decoded, values_to_read); |
| 1062 | } |
| 1063 | |
| 1064 | // Return number of logical records read |
| 1065 | int64_t ReadRecordData(int64_t num_records) { |
| 1066 | // Conservative upper bound |
| 1067 | const int64_t possible_num_values = |
| 1068 | std::max(num_records, levels_written_ - levels_position_); |
| 1069 | ReserveValues(possible_num_values); |
| 1070 | |
| 1071 | const int64_t start_levels_position = levels_position_; |
| 1072 | |
| 1073 | int64_t values_to_read = 0; |
| 1074 | int64_t records_read = 0; |
| 1075 | if (this->max_rep_level_ > 0) { |
| 1076 | records_read = DelimitRecords(num_records, &values_to_read); |
| 1077 | } else if (this->max_def_level_ > 0) { |
| 1078 | // No repetition levels, skip delimiting logic. Each level represents a |
| 1079 | // null or not null entry |
| 1080 | records_read = std::min(levels_written_ - levels_position_, num_records); |
| 1081 | |
| 1082 | // This is advanced by DelimitRecords, which we skipped |
| 1083 | levels_position_ += records_read; |
| 1084 | } else { |
| 1085 | records_read = values_to_read = num_records; |
| 1086 | } |
| 1087 | |
| 1088 | int64_t null_count = 0; |
| 1089 | if (nullable_values_) { |
| 1090 | int64_t values_with_nulls = 0; |
| 1091 | internal::DefinitionLevelsToBitmap( |
| 1092 | def_levels() + start_levels_position, levels_position_ - start_levels_position, |
| 1093 | this->max_def_level_, this->max_rep_level_, &values_with_nulls, &null_count, |
| 1094 | valid_bits_->mutable_data(), values_written_); |
| 1095 | values_to_read = values_with_nulls - null_count; |
| 1096 | ReadValuesSpaced(values_with_nulls, null_count); |
| 1097 | } else { |
| 1098 | ReadValuesDense(values_to_read); |
| 1099 | } |
| 1100 | if (this->max_def_level_ > 0) { |
| 1101 | // Optional, repeated, or some mix thereof |
| 1102 | this->ConsumeBufferedValues(levels_position_ - start_levels_position); |
| 1103 | } else { |
| 1104 | // Flat, non-repeated |
| 1105 | this->ConsumeBufferedValues(values_to_read); |
| 1106 | } |
| 1107 | // Total values, including null spaces, if any |
| 1108 | values_written_ += values_to_read + null_count; |
| 1109 | null_count_ += null_count; |
| 1110 | |
| 1111 | return records_read; |
| 1112 | } |
| 1113 | |
| 1114 | void DebugPrintState() override { |
| 1115 | const int16_t* def_levels = this->def_levels(); |
| 1116 | const int16_t* rep_levels = this->rep_levels(); |
| 1117 | const int64_t total_levels_read = levels_position_; |
| 1118 | |
| 1119 | const T* vals = reinterpret_cast<const T*>(this->values()); |
| 1120 | |
| 1121 | std::cout << "def levels: " ; |
| 1122 | for (int64_t i = 0; i < total_levels_read; ++i) { |
| 1123 | std::cout << def_levels[i] << " " ; |
| 1124 | } |
| 1125 | std::cout << std::endl; |
| 1126 | |
| 1127 | std::cout << "rep levels: " ; |
| 1128 | for (int64_t i = 0; i < total_levels_read; ++i) { |
| 1129 | std::cout << rep_levels[i] << " " ; |
| 1130 | } |
| 1131 | std::cout << std::endl; |
| 1132 | |
| 1133 | std::cout << "values: " ; |
| 1134 | for (int64_t i = 0; i < this->values_written(); ++i) { |
| 1135 | std::cout << vals[i] << " " ; |
| 1136 | } |
| 1137 | std::cout << std::endl; |
| 1138 | } |
| 1139 | |
| 1140 | void ResetValues() { |
| 1141 | if (values_written_ > 0) { |
| 1142 | // Resize to 0, but do not shrink to fit |
| 1143 | if (uses_values_) { |
| 1144 | PARQUET_THROW_NOT_OK(values_->Resize(0, false)); |
| 1145 | } |
| 1146 | PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); |
| 1147 | values_written_ = 0; |
| 1148 | values_capacity_ = 0; |
| 1149 | null_count_ = 0; |
| 1150 | } |
| 1151 | } |
| 1152 | |
| 1153 | protected: |
| 1154 | template <typename T> |
| 1155 | T* ValuesHead() { |
| 1156 | return reinterpret_cast<T*>(values_->mutable_data()) + values_written_; |
| 1157 | } |
| 1158 | }; |
| 1159 | |
| 1160 | class FLBARecordReader : public TypedRecordReader<FLBAType>, |
| 1161 | virtual public BinaryRecordReader { |
| 1162 | public: |
| 1163 | FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
| 1164 | : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) { |
| 1165 | DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); |
| 1166 | int byte_width = descr_->type_length(); |
| 1167 | std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); |
| 1168 | builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, this->pool_)); |
| 1169 | } |
| 1170 | |
| 1171 | ::arrow::ArrayVector GetBuilderChunks() override { |
| 1172 | std::shared_ptr<::arrow::Array> chunk; |
| 1173 | PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); |
| 1174 | return ::arrow::ArrayVector({chunk}); |
| 1175 | } |
| 1176 | |
| 1177 | void ReadValuesDense(int64_t values_to_read) override { |
| 1178 | auto values = ValuesHead<FLBA>(); |
| 1179 | int64_t num_decoded = |
| 1180 | this->current_decoder_->Decode(values, static_cast<int>(values_to_read)); |
| 1181 | DCHECK_EQ(num_decoded, values_to_read); |
| 1182 | |
| 1183 | for (int64_t i = 0; i < num_decoded; i++) { |
| 1184 | PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); |
| 1185 | } |
| 1186 | ResetValues(); |
| 1187 | } |
| 1188 | |
| 1189 | void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
| 1190 | uint8_t* valid_bits = valid_bits_->mutable_data(); |
| 1191 | const int64_t valid_bits_offset = values_written_; |
| 1192 | auto values = ValuesHead<FLBA>(); |
| 1193 | |
| 1194 | int64_t num_decoded = this->current_decoder_->DecodeSpaced( |
| 1195 | values, static_cast<int>(values_to_read), static_cast<int>(null_count), |
| 1196 | valid_bits, valid_bits_offset); |
| 1197 | DCHECK_EQ(num_decoded, values_to_read); |
| 1198 | |
| 1199 | for (int64_t i = 0; i < num_decoded; i++) { |
| 1200 | if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { |
| 1201 | PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); |
| 1202 | } else { |
| 1203 | PARQUET_THROW_NOT_OK(builder_->AppendNull()); |
| 1204 | } |
| 1205 | } |
| 1206 | ResetValues(); |
| 1207 | } |
| 1208 | |
| 1209 | private: |
| 1210 | std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; |
| 1211 | }; |
| 1212 | |
| 1213 | class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType>, |
| 1214 | virtual public BinaryRecordReader { |
| 1215 | public: |
| 1216 | ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) |
| 1217 | : TypedRecordReader<ByteArrayType>(descr, pool) { |
| 1218 | DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); |
| 1219 | accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); |
| 1220 | } |
| 1221 | |
| 1222 | ::arrow::ArrayVector GetBuilderChunks() override { |
| 1223 | ::arrow::ArrayVector result = accumulator_.chunks; |
| 1224 | if (result.size() == 0 || accumulator_.builder->length() > 0) { |
| 1225 | std::shared_ptr<::arrow::Array> last_chunk; |
| 1226 | PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk)); |
| 1227 | result.push_back(last_chunk); |
| 1228 | } |
| 1229 | accumulator_.chunks = {}; |
| 1230 | return result; |
| 1231 | } |
| 1232 | |
| 1233 | void ReadValuesDense(int64_t values_to_read) override { |
| 1234 | int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( |
| 1235 | static_cast<int>(values_to_read), &accumulator_); |
| 1236 | DCHECK_EQ(num_decoded, values_to_read); |
| 1237 | ResetValues(); |
| 1238 | } |
| 1239 | |
| 1240 | void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
| 1241 | int64_t num_decoded = this->current_decoder_->DecodeArrow( |
| 1242 | static_cast<int>(values_to_read), static_cast<int>(null_count), |
| 1243 | valid_bits_->mutable_data(), values_written_, &accumulator_); |
| 1244 | DCHECK_EQ(num_decoded, values_to_read - null_count); |
| 1245 | ResetValues(); |
| 1246 | } |
| 1247 | |
| 1248 | private: |
| 1249 | // Helper data structure for accumulating builder chunks |
| 1250 | ArrowBinaryAccumulator accumulator_; |
| 1251 | }; |
| 1252 | |
| 1253 | class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType>, |
| 1254 | virtual public DictionaryRecordReader { |
| 1255 | public: |
| 1256 | ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, |
| 1257 | ::arrow::MemoryPool* pool) |
| 1258 | : TypedRecordReader<ByteArrayType>(descr, pool), builder_(pool) { |
| 1259 | this->read_dictionary_ = true; |
| 1260 | } |
| 1261 | |
| 1262 | std::shared_ptr<::arrow::ChunkedArray> GetResult() override { |
| 1263 | FlushBuilder(); |
| 1264 | return std::make_shared<::arrow::ChunkedArray>(result_chunks_, builder_.type()); |
| 1265 | } |
| 1266 | |
| 1267 | void FlushBuilder() { |
| 1268 | if (builder_.length() > 0) { |
| 1269 | std::shared_ptr<::arrow::Array> chunk; |
| 1270 | PARQUET_THROW_NOT_OK(builder_.Finish(&chunk)); |
| 1271 | result_chunks_.emplace_back(std::move(chunk)); |
| 1272 | |
| 1273 | // Also clears the dictionary memo table |
| 1274 | builder_.ResetFull(); |
| 1275 | } |
| 1276 | } |
| 1277 | |
| 1278 | void MaybeWriteNewDictionary() { |
| 1279 | if (this->new_dictionary_) { |
| 1280 | /// If there is a new dictionary, we may need to flush the builder, then |
| 1281 | /// insert the new dictionary values |
| 1282 | FlushBuilder(); |
| 1283 | auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
| 1284 | decoder->InsertDictionary(&builder_); |
| 1285 | this->new_dictionary_ = false; |
| 1286 | } |
| 1287 | } |
| 1288 | |
| 1289 | void ReadValuesDense(int64_t values_to_read) override { |
| 1290 | int64_t num_decoded = 0; |
| 1291 | if (current_encoding_ == Encoding::RLE_DICTIONARY) { |
| 1292 | MaybeWriteNewDictionary(); |
| 1293 | auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
| 1294 | num_decoded = decoder->DecodeIndices(static_cast<int>(values_to_read), &builder_); |
| 1295 | } else { |
| 1296 | num_decoded = this->current_decoder_->DecodeArrowNonNull( |
| 1297 | static_cast<int>(values_to_read), &builder_); |
| 1298 | |
| 1299 | /// Flush values since they have been copied into the builder |
| 1300 | ResetValues(); |
| 1301 | } |
| 1302 | DCHECK_EQ(num_decoded, values_to_read); |
| 1303 | } |
| 1304 | |
| 1305 | void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { |
| 1306 | int64_t num_decoded = 0; |
| 1307 | if (current_encoding_ == Encoding::RLE_DICTIONARY) { |
| 1308 | MaybeWriteNewDictionary(); |
| 1309 | auto decoder = dynamic_cast<BinaryDictDecoder*>(this->current_decoder_); |
| 1310 | num_decoded = decoder->DecodeIndicesSpaced( |
| 1311 | static_cast<int>(values_to_read), static_cast<int>(null_count), |
| 1312 | valid_bits_->mutable_data(), values_written_, &builder_); |
| 1313 | } else { |
| 1314 | num_decoded = this->current_decoder_->DecodeArrow( |
| 1315 | static_cast<int>(values_to_read), static_cast<int>(null_count), |
| 1316 | valid_bits_->mutable_data(), values_written_, &builder_); |
| 1317 | |
| 1318 | /// Flush values since they have been copied into the builder |
| 1319 | ResetValues(); |
| 1320 | } |
| 1321 | DCHECK_EQ(num_decoded, values_to_read - null_count); |
| 1322 | } |
| 1323 | |
| 1324 | private: |
| 1325 | using BinaryDictDecoder = DictDecoder<ByteArrayType>; |
| 1326 | |
| 1327 | ::arrow::BinaryDictionary32Builder builder_; |
| 1328 | std::vector<std::shared_ptr<::arrow::Array>> result_chunks_; |
| 1329 | }; |
| 1330 | |
| 1331 | // TODO(wesm): Implement these to some satisfaction |
| 1332 | template <> |
| 1333 | void TypedRecordReader<Int96Type>::DebugPrintState() {} |
| 1334 | |
| 1335 | template <> |
| 1336 | void TypedRecordReader<ByteArrayType>::DebugPrintState() {} |
| 1337 | |
| 1338 | template <> |
| 1339 | void TypedRecordReader<FLBAType>::DebugPrintState() {} |
| 1340 | |
| 1341 | std::shared_ptr<RecordReader> MakeByteArrayRecordReader(const ColumnDescriptor* descr, |
| 1342 | arrow::MemoryPool* pool, |
| 1343 | bool read_dictionary) { |
| 1344 | if (read_dictionary) { |
| 1345 | return std::make_shared<ByteArrayDictionaryRecordReader>(descr, pool); |
| 1346 | } else { |
| 1347 | return std::make_shared<ByteArrayChunkedRecordReader>(descr, pool); |
| 1348 | } |
| 1349 | } |
| 1350 | |
| 1351 | std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, |
| 1352 | MemoryPool* pool, |
| 1353 | const bool read_dictionary) { |
| 1354 | switch (descr->physical_type()) { |
| 1355 | case Type::BOOLEAN: |
| 1356 | return std::make_shared<TypedRecordReader<BooleanType>>(descr, pool); |
| 1357 | case Type::INT32: |
| 1358 | return std::make_shared<TypedRecordReader<Int32Type>>(descr, pool); |
| 1359 | case Type::INT64: |
| 1360 | return std::make_shared<TypedRecordReader<Int64Type>>(descr, pool); |
| 1361 | case Type::INT96: |
| 1362 | return std::make_shared<TypedRecordReader<Int96Type>>(descr, pool); |
| 1363 | case Type::FLOAT: |
| 1364 | return std::make_shared<TypedRecordReader<FloatType>>(descr, pool); |
| 1365 | case Type::DOUBLE: |
| 1366 | return std::make_shared<TypedRecordReader<DoubleType>>(descr, pool); |
| 1367 | case Type::BYTE_ARRAY: |
| 1368 | return MakeByteArrayRecordReader(descr, pool, read_dictionary); |
| 1369 | case Type::FIXED_LEN_BYTE_ARRAY: |
| 1370 | return std::make_shared<FLBARecordReader>(descr, pool); |
| 1371 | default: { |
| 1372 | // PARQUET-1481: This can occur if the file is corrupt |
| 1373 | std::stringstream ss; |
| 1374 | ss << "Invalid physical column type: " << static_cast<int>(descr->physical_type()); |
| 1375 | throw ParquetException(ss.str()); |
| 1376 | } |
| 1377 | } |
| 1378 | // Unreachable code, but supress compiler warning |
| 1379 | return nullptr; |
| 1380 | } |
| 1381 | |
| 1382 | } // namespace internal |
| 1383 | } // namespace parquet |
| 1384 | |