1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_COLUMN_READER_H
19#define PARQUET_COLUMN_READER_H
20
21#include <algorithm>
22#include <climits>
23#include <cstdint>
24#include <cstring>
25#include <iostream>
26#include <memory>
27#include <unordered_map>
28#include <utility>
29#include <vector>
30
31#include <arrow/buffer.h>
32#include <arrow/builder.h>
33#include <arrow/memory_pool.h>
34#include <arrow/util/bit-util.h>
35
36#include "parquet/column_page.h"
37#include "parquet/encoding.h"
38#include "parquet/exception.h"
39#include "parquet/schema.h"
40#include "parquet/types.h"
41#include "parquet/util/macros.h"
42#include "parquet/util/memory.h"
43#include "parquet/util/visibility.h"
44
45namespace arrow {
46
47namespace BitUtil {
48class BitReader;
49} // namespace BitUtil
50
51namespace util {
52class RleDecoder;
53} // namespace util
54
55} // namespace arrow
56
57namespace parquet {
58
59// 16 MB is the default maximum page header size
60static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
61
62// 16 KB is the default expected page header size
63static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
64
65namespace BitUtil = ::arrow::BitUtil;
66
67class PARQUET_EXPORT LevelDecoder {
68 public:
69 LevelDecoder();
70 ~LevelDecoder();
71
72 // Initialize the LevelDecoder state with new data
73 // and return the number of bytes consumed
74 int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values,
75 const uint8_t* data);
76
77 // Decodes a batch of levels into an array and returns the number of levels decoded
78 int Decode(int batch_size, int16_t* levels);
79
80 private:
81 int bit_width_;
82 int num_values_remaining_;
83 Encoding::type encoding_;
84 std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_;
85 std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_;
86};
87
88// Abstract page iterator interface. This way, we can feed column pages to the
89// ColumnReader through whatever mechanism we choose
90class PARQUET_EXPORT PageReader {
91 public:
92 virtual ~PageReader() = default;
93
94 static std::unique_ptr<PageReader> Open(
95 std::unique_ptr<InputStream> stream, int64_t total_num_rows,
96 Compression::type codec,
97 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
98
99 // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
100 // containing new Page otherwise
101 virtual std::shared_ptr<Page> NextPage() = 0;
102
103 virtual void set_max_page_header_size(uint32_t size) = 0;
104};
105
106class PARQUET_EXPORT ColumnReader {
107 public:
108 ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
109 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
110 virtual ~ColumnReader();
111
112 static std::shared_ptr<ColumnReader> Make(
113 const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
114 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
115
116 // Returns true if there are still values in this column.
117 bool HasNext() {
118 // Either there is no data page available yet, or the data page has been
119 // exhausted
120 if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
121 if (!ReadNewPage() || num_buffered_values_ == 0) {
122 return false;
123 }
124 }
125 return true;
126 }
127
128 Type::type type() const { return descr_->physical_type(); }
129
130 const ColumnDescriptor* descr() const { return descr_; }
131
132 protected:
133 virtual bool ReadNewPage() = 0;
134
135 // Read multiple definition levels into preallocated memory
136 //
137 // Returns the number of decoded definition levels
138 int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels);
139
140 // Read multiple repetition levels into preallocated memory
141 // Returns the number of decoded repetition levels
142 int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels);
143
144 int64_t available_values_current_page() const {
145 return num_buffered_values_ - num_decoded_values_;
146 }
147
148 void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
149
150 const ColumnDescriptor* descr_;
151
152 std::unique_ptr<PageReader> pager_;
153 std::shared_ptr<Page> current_page_;
154
155 // Not set if full schema for this field has no optional or repeated elements
156 LevelDecoder definition_level_decoder_;
157
158 // Not set for flat schemas.
159 LevelDecoder repetition_level_decoder_;
160
161 // The total number of values stored in the data page. This is the maximum of
162 // the number of encoded definition levels or encoded values. For
163 // non-repeated, required columns, this is equal to the number of encoded
164 // values. For repeated or optional values, there may be fewer data values
165 // than levels, and this tells you how many encoded levels there are in that
166 // case.
167 int64_t num_buffered_values_;
168
169 // The number of values from the current data page that have been decoded
170 // into memory
171 int64_t num_decoded_values_;
172
173 ::arrow::MemoryPool* pool_;
174};
175
176namespace internal {
177
178static inline void DefinitionLevelsToBitmap(
179 const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
180 const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
181 uint8_t* valid_bits, int64_t valid_bits_offset) {
182 // We assume here that valid_bits is large enough to accommodate the
183 // additional definition levels and the ones that have already been written
184 ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
185 valid_bits_offset + num_def_levels);
186
187 // TODO(itaiin): As an interim solution we are splitting the code path here
188 // between repeated+flat column reads, and non-repeated+nested reads.
189 // Those paths need to be merged in the future
190 for (int i = 0; i < num_def_levels; ++i) {
191 if (def_levels[i] == max_definition_level) {
192 valid_bits_writer.Set();
193 } else if (max_repetition_level > 0) {
194 // repetition+flat case
195 if (def_levels[i] == (max_definition_level - 1)) {
196 valid_bits_writer.Clear();
197 *null_count += 1;
198 } else {
199 continue;
200 }
201 } else {
202 // non-repeated+nested case
203 if (def_levels[i] < max_definition_level) {
204 valid_bits_writer.Clear();
205 *null_count += 1;
206 } else {
207 throw ParquetException("definition level exceeds maximum");
208 }
209 }
210
211 valid_bits_writer.Next();
212 }
213 valid_bits_writer.Finish();
214 *values_read = valid_bits_writer.position();
215}
216
217} // namespace internal
218
219// API to read values from a single column. This is a main client facing API.
220template <typename DType>
221class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnReader : public ColumnReader {
222 public:
223 typedef typename DType::c_type T;
224
225 TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
226 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
227 : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULLPTR) {}
228
229 // Read a batch of repetition levels, definition levels, and values from the
230 // column.
231 //
232 // Since null values are not stored in the values, the number of values read
233 // may be less than the number of repetition and definition levels. With
234 // nested data this is almost certainly true.
235 //
236 // Set def_levels or rep_levels to nullptr if you want to skip reading them.
237 // This is only safe if you know through some other source that there are no
238 // undefined values.
239 //
240 // To fully exhaust a row group, you must read batches until the number of
241 // values read reaches the number of stored values according to the metadata.
242 //
243 // This API is the same for both V1 and V2 of the DataPage
244 //
245 // @returns: actual number of levels read (see values_read for number of values read)
246 int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
247 T* values, int64_t* values_read);
248
249 /// Read a batch of repetition levels, definition levels, and values from the
250 /// column and leave spaces for null entries on the lowest level in the values
251 /// buffer.
252 ///
253 /// In comparision to ReadBatch the length of repetition and definition levels
254 /// is the same as of the number of values read for max_definition_level == 1.
255 /// In the case of max_definition_level > 1, the repetition and definition
256 /// levels are larger than the values but the values include the null entries
257 /// with definition_level == (max_definition_level - 1).
258 ///
259 /// To fully exhaust a row group, you must read batches until the number of
260 /// values read reaches the number of stored values according to the metadata.
261 ///
262 /// @param batch_size the number of levels to read
263 /// @param[out] def_levels The Parquet definition levels, output has
264 /// the length levels_read.
265 /// @param[out] rep_levels The Parquet repetition levels, output has
266 /// the length levels_read.
267 /// @param[out] values The values in the lowest nested level including
268 /// spacing for nulls on the lowest levels; output has the length
269 /// values_read.
270 /// @param[out] valid_bits Memory allocated for a bitmap that indicates if
271 /// the row is null or on the maximum definition level. For performance
272 /// reasons the underlying buffer should be able to store 1 bit more than
273 /// required. If this requires an additional byte, this byte is only read
274 /// but never written to.
275 /// @param valid_bits_offset The offset in bits of the valid_bits where the
276 /// first relevant bit resides.
277 /// @param[out] levels_read The number of repetition/definition levels that were read.
278 /// @param[out] values_read The number of values read, this includes all
279 /// non-null entries as well as all null-entries on the lowest level
280 /// (i.e. definition_level == max_definition_level - 1)
281 /// @param[out] null_count The number of nulls on the lowest levels.
282 /// (i.e. (values_read - null_count) is total number of non-null entries)
283 int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
284 T* values, uint8_t* valid_bits, int64_t valid_bits_offset,
285 int64_t* levels_read, int64_t* values_read,
286 int64_t* null_count);
287
288 // Skip reading levels
289 // Returns the number of levels skipped
290 int64_t Skip(int64_t num_rows_to_skip);
291
292 private:
293 typedef Decoder<DType> DecoderType;
294
295 // Advance to the next data page
296 bool ReadNewPage() override;
297
298 // Read up to batch_size values from the current data page into the
299 // pre-allocated memory T*
300 //
301 // @returns: the number of values read into the out buffer
302 int64_t ReadValues(int64_t batch_size, T* out);
303
304 // Read up to batch_size values from the current data page into the
305 // pre-allocated memory T*, leaving spaces for null entries according
306 // to the def_levels.
307 //
308 // @returns: the number of values read into the out buffer
309 int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
310 uint8_t* valid_bits, int64_t valid_bits_offset);
311
312 // Map of encoding type to the respective decoder object. For example, a
313 // column chunk's data pages may include both dictionary-encoded and
314 // plain-encoded data.
315 std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
316
317 void ConfigureDictionary(const DictionaryPage* page);
318
319 DecoderType* current_decoder_;
320};
321
322// ----------------------------------------------------------------------
323// Type column reader implementations
324
325template <typename DType>
326inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) {
327 int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
328 return num_decoded;
329}
330
331template <typename DType>
332inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
333 int64_t null_count,
334 uint8_t* valid_bits,
335 int64_t valid_bits_offset) {
336 return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
337 static_cast<int>(null_count), valid_bits,
338 valid_bits_offset);
339}
340
341template <typename DType>
342inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
343 int16_t* def_levels,
344 int16_t* rep_levels, T* values,
345 int64_t* values_read) {
346 // HasNext invokes ReadNewPage
347 if (!HasNext()) {
348 *values_read = 0;
349 return 0;
350 }
351
352 // TODO(wesm): keep reading data pages until batch_size is reached, or the
353 // row group is finished
354 batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
355
356 int64_t num_def_levels = 0;
357 int64_t num_rep_levels = 0;
358
359 int64_t values_to_read = 0;
360
361 // If the field is required and non-repeated, there are no definition levels
362 if (descr_->max_definition_level() > 0 && def_levels) {
363 num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
364 // TODO(wesm): this tallying of values-to-decode can be performed with better
365 // cache-efficiency if fused with the level decoding.
366 for (int64_t i = 0; i < num_def_levels; ++i) {
367 if (def_levels[i] == descr_->max_definition_level()) {
368 ++values_to_read;
369 }
370 }
371 } else {
372 // Required field, read all values
373 values_to_read = batch_size;
374 }
375
376 // Not present for non-repeated fields
377 if (descr_->max_repetition_level() > 0 && rep_levels) {
378 num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
379 if (def_levels && num_def_levels != num_rep_levels) {
380 throw ParquetException("Number of decoded rep / def levels did not match");
381 }
382 }
383
384 *values_read = ReadValues(values_to_read, values);
385 int64_t total_values = std::max(num_def_levels, *values_read);
386 ConsumeBufferedValues(total_values);
387
388 return total_values;
389}
390
391namespace internal {
392
393// TODO(itaiin): another code path split to merge when the general case is done
394static inline bool HasSpacedValues(const ColumnDescriptor* descr) {
395 if (descr->max_repetition_level() > 0) {
396 // repeated+flat case
397 return !descr->schema_node()->is_required();
398 } else {
399 // non-repeated+nested case
400 // Find if a node forces nulls in the lowest level along the hierarchy
401 const schema::Node* node = descr->schema_node().get();
402 while (node) {
403 if (node->is_optional()) {
404 return true;
405 }
406 node = node->parent();
407 }
408 return false;
409 }
410}
411
412} // namespace internal
413
414template <typename DType>
415inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
416 int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
417 uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
418 int64_t* values_read, int64_t* null_count_out) {
419 // HasNext invokes ReadNewPage
420 if (!HasNext()) {
421 *levels_read = 0;
422 *values_read = 0;
423 *null_count_out = 0;
424 return 0;
425 }
426
427 int64_t total_values;
428 // TODO(wesm): keep reading data pages until batch_size is reached, or the
429 // row group is finished
430 batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
431
432 // If the field is required and non-repeated, there are no definition levels
433 if (descr_->max_definition_level() > 0) {
434 int64_t num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
435
436 // Not present for non-repeated fields
437 if (descr_->max_repetition_level() > 0) {
438 int64_t num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
439 if (num_def_levels != num_rep_levels) {
440 throw ParquetException("Number of decoded rep / def levels did not match");
441 }
442 }
443
444 const bool has_spaced_values = internal::HasSpacedValues(descr_);
445
446 int64_t null_count = 0;
447 if (!has_spaced_values) {
448 int values_to_read = 0;
449 for (int64_t i = 0; i < num_def_levels; ++i) {
450 if (def_levels[i] == descr_->max_definition_level()) {
451 ++values_to_read;
452 }
453 }
454 total_values = ReadValues(values_to_read, values);
455 for (int64_t i = 0; i < total_values; i++) {
456 ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
457 }
458 *values_read = total_values;
459 } else {
460 int16_t max_definition_level = descr_->max_definition_level();
461 int16_t max_repetition_level = descr_->max_repetition_level();
462 internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
463 max_repetition_level, values_read, &null_count,
464 valid_bits, valid_bits_offset);
465 total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
466 valid_bits, valid_bits_offset);
467 }
468 *levels_read = num_def_levels;
469 *null_count_out = null_count;
470
471 } else {
472 // Required field, read all values
473 total_values = ReadValues(batch_size, values);
474 for (int64_t i = 0; i < total_values; i++) {
475 ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
476 }
477 *null_count_out = 0;
478 *levels_read = total_values;
479 }
480
481 ConsumeBufferedValues(*levels_read);
482 return total_values;
483}
484
485template <typename DType>
486int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
487 int64_t rows_to_skip = num_rows_to_skip;
488 while (HasNext() && rows_to_skip > 0) {
489 // If the number of rows to skip is more than the number of undecoded values, skip the
490 // Page.
491 if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) {
492 rows_to_skip -= num_buffered_values_ - num_decoded_values_;
493 num_decoded_values_ = num_buffered_values_;
494 } else {
495 // We need to read this Page
496 // Jump to the right offset in the Page
497 int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint
498 int64_t values_read = 0;
499
500 std::shared_ptr<ResizableBuffer> vals = AllocateBuffer(
501 this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
502 std::shared_ptr<ResizableBuffer> def_levels =
503 AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
504
505 std::shared_ptr<ResizableBuffer> rep_levels =
506 AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
507
508 do {
509 batch_size = std::min(batch_size, rows_to_skip);
510 values_read = ReadBatch(static_cast<int>(batch_size),
511 reinterpret_cast<int16_t*>(def_levels->mutable_data()),
512 reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
513 reinterpret_cast<T*>(vals->mutable_data()), &values_read);
514 rows_to_skip -= values_read;
515 } while (values_read > 0 && rows_to_skip > 0);
516 }
517 }
518 return num_rows_to_skip - rows_to_skip;
519}
520
521// ----------------------------------------------------------------------
522// Template instantiations
523
524typedef TypedColumnReader<BooleanType> BoolReader;
525typedef TypedColumnReader<Int32Type> Int32Reader;
526typedef TypedColumnReader<Int64Type> Int64Reader;
527typedef TypedColumnReader<Int96Type> Int96Reader;
528typedef TypedColumnReader<FloatType> FloatReader;
529typedef TypedColumnReader<DoubleType> DoubleReader;
530typedef TypedColumnReader<ByteArrayType> ByteArrayReader;
531typedef TypedColumnReader<FLBAType> FixedLenByteArrayReader;
532
533PARQUET_EXTERN_TEMPLATE TypedColumnReader<BooleanType>;
534PARQUET_EXTERN_TEMPLATE TypedColumnReader<Int32Type>;
535PARQUET_EXTERN_TEMPLATE TypedColumnReader<Int64Type>;
536PARQUET_EXTERN_TEMPLATE TypedColumnReader<Int96Type>;
537PARQUET_EXTERN_TEMPLATE TypedColumnReader<FloatType>;
538PARQUET_EXTERN_TEMPLATE TypedColumnReader<DoubleType>;
539PARQUET_EXTERN_TEMPLATE TypedColumnReader<ByteArrayType>;
540PARQUET_EXTERN_TEMPLATE TypedColumnReader<FLBAType>;
541
542} // namespace parquet
543
544#endif // PARQUET_COLUMN_READER_H
545