1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/column_reader.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <memory>
23#include <utility>
24
25#include <arrow/buffer.h>
26#include <arrow/memory_pool.h>
27#include <arrow/util/bit-util.h>
28#include <arrow/util/compression.h>
29#include <arrow/util/rle-encoding.h>
30
31#include "parquet/column_page.h"
32#include "parquet/encoding-internal.h"
33#include "parquet/properties.h"
34#include "parquet/thrift.h"
35
36using arrow::MemoryPool;
37
38namespace parquet {
39
40LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
41
42LevelDecoder::~LevelDecoder() {}
43
44int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
45 int num_buffered_values, const uint8_t* data) {
46 int32_t num_bytes = 0;
47 encoding_ = encoding;
48 num_values_remaining_ = num_buffered_values;
49 bit_width_ = BitUtil::Log2(max_level + 1);
50 switch (encoding) {
51 case Encoding::RLE: {
52 num_bytes = *reinterpret_cast<const int32_t*>(data);
53 const uint8_t* decoder_data = data + sizeof(int32_t);
54 if (!rle_decoder_) {
55 rle_decoder_.reset(
56 new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_));
57 } else {
58 rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
59 }
60 return static_cast<int>(sizeof(int32_t)) + num_bytes;
61 }
62 case Encoding::BIT_PACKED: {
63 num_bytes =
64 static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
65 if (!bit_packed_decoder_) {
66 bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes));
67 } else {
68 bit_packed_decoder_->Reset(data, num_bytes);
69 }
70 return num_bytes;
71 }
72 default:
73 throw ParquetException("Unknown encoding type for levels.");
74 }
75 return -1;
76}
77
78int LevelDecoder::Decode(int batch_size, int16_t* levels) {
79 int num_decoded = 0;
80
81 int num_values = std::min(num_values_remaining_, batch_size);
82 if (encoding_ == Encoding::RLE) {
83 num_decoded = rle_decoder_->GetBatch(levels, num_values);
84 } else {
85 num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
86 }
87 num_values_remaining_ -= num_decoded;
88 return num_decoded;
89}
90
91ReaderProperties default_reader_properties() {
92 static ReaderProperties default_reader_properties;
93 return default_reader_properties;
94}
95
96// ----------------------------------------------------------------------
97// SerializedPageReader deserializes Thrift metadata and pages that have been
98// assembled in a serialized stream for storing in a Parquet files
99
100// This subclass delimits pages appearing in a serialized stream, each preceded
101// by a serialized Thrift format::PageHeader indicating the type of each page
102// and the page metadata.
103class SerializedPageReader : public PageReader {
104 public:
105 SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t total_num_rows,
106 Compression::type codec, ::arrow::MemoryPool* pool)
107 : stream_(std::move(stream)),
108 decompression_buffer_(AllocateBuffer(pool, 0)),
109 seen_num_rows_(0),
110 total_num_rows_(total_num_rows) {
111 max_page_header_size_ = kDefaultMaxPageHeaderSize;
112 decompressor_ = GetCodecFromArrow(codec);
113 }
114
115 // Implement the PageReader interface
116 std::shared_ptr<Page> NextPage() override;
117
118 void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
119
120 private:
121 std::unique_ptr<InputStream> stream_;
122
123 format::PageHeader current_page_header_;
124 std::shared_ptr<Page> current_page_;
125
126 // Compression codec to use.
127 std::unique_ptr<::arrow::util::Codec> decompressor_;
128 std::shared_ptr<ResizableBuffer> decompression_buffer_;
129
130 // Maximum allowed page size
131 uint32_t max_page_header_size_;
132
133 // Number of rows read in data pages so far
134 int64_t seen_num_rows_;
135
136 // Number of rows in all the data pages
137 int64_t total_num_rows_;
138};
139
140std::shared_ptr<Page> SerializedPageReader::NextPage() {
141 // Loop here because there may be unhandled page types that we skip until
142 // finding a page that we do know what to do with
143 while (seen_num_rows_ < total_num_rows_) {
144 int64_t bytes_read = 0;
145 int64_t bytes_available = 0;
146 uint32_t header_size = 0;
147 const uint8_t* buffer;
148 uint32_t allowed_page_size = kDefaultPageHeaderSize;
149
150 // Page headers can be very large because of page statistics
151 // We try to deserialize a larger buffer progressively
152 // until a maximum allowed header limit
153 while (true) {
154 buffer = stream_->Peek(allowed_page_size, &bytes_available);
155 if (bytes_available == 0) {
156 return std::shared_ptr<Page>(nullptr);
157 }
158
159 // This gets used, then set by DeserializeThriftMsg
160 header_size = static_cast<uint32_t>(bytes_available);
161 try {
162 DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
163 break;
164 } catch (std::exception& e) {
165 // Failed to deserialize. Double the allowed page header size and try again
166 std::stringstream ss;
167 ss << e.what();
168 allowed_page_size *= 2;
169 if (allowed_page_size > max_page_header_size_) {
170 ss << "Deserializing page header failed.\n";
171 throw ParquetException(ss.str());
172 }
173 }
174 }
175 // Advance the stream offset
176 stream_->Advance(header_size);
177
178 int compressed_len = current_page_header_.compressed_page_size;
179 int uncompressed_len = current_page_header_.uncompressed_page_size;
180
181 // Read the compressed data page.
182 buffer = stream_->Read(compressed_len, &bytes_read);
183 if (bytes_read != compressed_len) {
184 std::stringstream ss;
185 ss << "Page was smaller (" << bytes_read << ") than expected (" << compressed_len
186 << ")";
187 ParquetException::EofException(ss.str());
188 }
189
190 // Uncompress it if we need to
191 if (decompressor_ != nullptr) {
192 // Grow the uncompressed buffer if we need to.
193 if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
194 PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
195 }
196 PARQUET_THROW_NOT_OK(
197 decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
198 decompression_buffer_->mutable_data()));
199 buffer = decompression_buffer_->data();
200 }
201
202 auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
203
204 if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
205 const format::DictionaryPageHeader& dict_header =
206 current_page_header_.dictionary_page_header;
207
208 bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
209
210 return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
211 FromThrift(dict_header.encoding),
212 is_sorted);
213 } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
214 const format::DataPageHeader& header = current_page_header_.data_page_header;
215
216 EncodedStatistics page_statistics;
217 if (header.__isset.statistics) {
218 const format::Statistics& stats = header.statistics;
219 if (stats.__isset.max) {
220 page_statistics.set_max(stats.max);
221 }
222 if (stats.__isset.min) {
223 page_statistics.set_min(stats.min);
224 }
225 if (stats.__isset.null_count) {
226 page_statistics.set_null_count(stats.null_count);
227 }
228 if (stats.__isset.distinct_count) {
229 page_statistics.set_distinct_count(stats.distinct_count);
230 }
231 }
232
233 seen_num_rows_ += header.num_values;
234
235 return std::make_shared<DataPage>(
236 page_buffer, header.num_values, FromThrift(header.encoding),
237 FromThrift(header.definition_level_encoding),
238 FromThrift(header.repetition_level_encoding), page_statistics);
239 } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
240 const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
241 bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
242
243 seen_num_rows_ += header.num_values;
244
245 return std::make_shared<DataPageV2>(
246 page_buffer, header.num_values, header.num_nulls, header.num_rows,
247 FromThrift(header.encoding), header.definition_levels_byte_length,
248 header.repetition_levels_byte_length, is_compressed);
249 } else {
250 // We don't know what this page type is. We're allowed to skip non-data
251 // pages.
252 continue;
253 }
254 }
255 return std::shared_ptr<Page>(nullptr);
256}
257
258std::unique_ptr<PageReader> PageReader::Open(std::unique_ptr<InputStream> stream,
259 int64_t total_num_rows,
260 Compression::type codec,
261 ::arrow::MemoryPool* pool) {
262 return std::unique_ptr<PageReader>(
263 new SerializedPageReader(std::move(stream), total_num_rows, codec, pool));
264}
265
266// ----------------------------------------------------------------------
267
268ColumnReader::ColumnReader(const ColumnDescriptor* descr,
269 std::unique_ptr<PageReader> pager, MemoryPool* pool)
270 : descr_(descr),
271 pager_(std::move(pager)),
272 num_buffered_values_(0),
273 num_decoded_values_(0),
274 pool_(pool) {}
275
276ColumnReader::~ColumnReader() {}
277
278template <typename DType>
279void TypedColumnReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
280 int encoding = static_cast<int>(page->encoding());
281 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
282 page->encoding() == Encoding::PLAIN) {
283 encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
284 }
285
286 auto it = decoders_.find(encoding);
287 if (it != decoders_.end()) {
288 throw ParquetException("Column cannot have more than one dictionary.");
289 }
290
291 if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
292 page->encoding() == Encoding::PLAIN) {
293 PlainDecoder<DType> dictionary(descr_);
294 dictionary.SetData(page->num_values(), page->data(), page->size());
295
296 // The dictionary is fully decoded during DictionaryDecoder::Init, so the
297 // DictionaryPage buffer is no longer required after this step
298 //
299 // TODO(wesm): investigate whether this all-or-nothing decoding of the
300 // dictionary makes sense and whether performance can be improved
301
302 auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
303 decoder->SetDict(&dictionary);
304 decoders_[encoding] = decoder;
305 } else {
306 ParquetException::NYI("only plain dictionary encoding has been implemented");
307 }
308
309 current_decoder_ = decoders_[encoding].get();
310}
311
312// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
313// encoding.
314static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
315 return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
316}
317
318template <typename DType>
319bool TypedColumnReader<DType>::ReadNewPage() {
320 // Loop until we find the next data page.
321 const uint8_t* buffer;
322
323 while (true) {
324 current_page_ = pager_->NextPage();
325 if (!current_page_) {
326 // EOS
327 return false;
328 }
329
330 if (current_page_->type() == PageType::DICTIONARY_PAGE) {
331 ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
332 continue;
333 } else if (current_page_->type() == PageType::DATA_PAGE) {
334 const DataPage* page = static_cast<const DataPage*>(current_page_.get());
335
336 // Read a data page.
337 num_buffered_values_ = page->num_values();
338
339 // Have not decoded any values from the data page yet
340 num_decoded_values_ = 0;
341
342 buffer = page->data();
343
344 // If the data page includes repetition and definition levels, we
345 // initialize the level decoder and subtract the encoded level bytes from
346 // the page size to determine the number of bytes in the encoded data.
347 int64_t data_size = page->size();
348
349 // Data page Layout: Repetition Levels - Definition Levels - encoded values.
350 // Levels are encoded as rle or bit-packed.
351 // Init repetition levels
352 if (descr_->max_repetition_level() > 0) {
353 int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
354 page->repetition_level_encoding(), descr_->max_repetition_level(),
355 static_cast<int>(num_buffered_values_), buffer);
356 buffer += rep_levels_bytes;
357 data_size -= rep_levels_bytes;
358 }
359 // TODO figure a way to set max_definition_level_ to 0
360 // if the initial value is invalid
361
362 // Init definition levels
363 if (descr_->max_definition_level() > 0) {
364 int64_t def_levels_bytes = definition_level_decoder_.SetData(
365 page->definition_level_encoding(), descr_->max_definition_level(),
366 static_cast<int>(num_buffered_values_), buffer);
367 buffer += def_levels_bytes;
368 data_size -= def_levels_bytes;
369 }
370
371 // Get a decoder object for this page or create a new decoder if this is the
372 // first page with this encoding.
373 Encoding::type encoding = page->encoding();
374
375 if (IsDictionaryIndexEncoding(encoding)) {
376 encoding = Encoding::RLE_DICTIONARY;
377 }
378
379 auto it = decoders_.find(static_cast<int>(encoding));
380 if (it != decoders_.end()) {
381 if (encoding == Encoding::RLE_DICTIONARY) {
382 DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
383 }
384 current_decoder_ = it->second.get();
385 } else {
386 switch (encoding) {
387 case Encoding::PLAIN: {
388 std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
389 decoders_[static_cast<int>(encoding)] = decoder;
390 current_decoder_ = decoder.get();
391 break;
392 }
393 case Encoding::RLE_DICTIONARY:
394 throw ParquetException("Dictionary page must be before data page.");
395
396 case Encoding::DELTA_BINARY_PACKED:
397 case Encoding::DELTA_LENGTH_BYTE_ARRAY:
398 case Encoding::DELTA_BYTE_ARRAY:
399 ParquetException::NYI("Unsupported encoding");
400
401 default:
402 throw ParquetException("Unknown encoding type.");
403 }
404 }
405 current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
406 static_cast<int>(data_size));
407 return true;
408 } else {
409 // We don't know what this page type is. We're allowed to skip non-data
410 // pages.
411 continue;
412 }
413 }
414 return true;
415}
416
417// ----------------------------------------------------------------------
418// Batch read APIs
419
420int64_t ColumnReader::ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
421 if (descr_->max_definition_level() == 0) {
422 return 0;
423 }
424 return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
425}
426
427int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
428 if (descr_->max_repetition_level() == 0) {
429 return 0;
430 }
431 return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
432}
433
434// ----------------------------------------------------------------------
435// Dynamic column reader constructor
436
437std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr,
438 std::unique_ptr<PageReader> pager,
439 MemoryPool* pool) {
440 switch (descr->physical_type()) {
441 case Type::BOOLEAN:
442 return std::make_shared<BoolReader>(descr, std::move(pager), pool);
443 case Type::INT32:
444 return std::make_shared<Int32Reader>(descr, std::move(pager), pool);
445 case Type::INT64:
446 return std::make_shared<Int64Reader>(descr, std::move(pager), pool);
447 case Type::INT96:
448 return std::make_shared<Int96Reader>(descr, std::move(pager), pool);
449 case Type::FLOAT:
450 return std::make_shared<FloatReader>(descr, std::move(pager), pool);
451 case Type::DOUBLE:
452 return std::make_shared<DoubleReader>(descr, std::move(pager), pool);
453 case Type::BYTE_ARRAY:
454 return std::make_shared<ByteArrayReader>(descr, std::move(pager), pool);
455 case Type::FIXED_LEN_BYTE_ARRAY:
456 return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager), pool);
457 default:
458 ParquetException::NYI("type reader not implemented");
459 }
460 // Unreachable code, but supress compiler warning
461 return std::shared_ptr<ColumnReader>(nullptr);
462}
463
464// ----------------------------------------------------------------------
465// Instantiate templated classes
466
467template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<BooleanType>;
468template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int32Type>;
469template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int64Type>;
470template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int96Type>;
471template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FloatType>;
472template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<DoubleType>;
473template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<ByteArrayType>;
474template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FLBAType>;
475
476} // namespace parquet
477