1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef PARQUET_COLUMN_READER_H |
19 | #define PARQUET_COLUMN_READER_H |
20 | |
21 | #include <algorithm> |
22 | #include <climits> |
23 | #include <cstdint> |
24 | #include <cstring> |
25 | #include <iostream> |
26 | #include <memory> |
27 | #include <unordered_map> |
28 | #include <utility> |
29 | #include <vector> |
30 | |
31 | #include <arrow/buffer.h> |
32 | #include <arrow/builder.h> |
33 | #include <arrow/memory_pool.h> |
34 | #include <arrow/util/bit-util.h> |
35 | |
36 | #include "parquet/column_page.h" |
37 | #include "parquet/encoding.h" |
38 | #include "parquet/exception.h" |
39 | #include "parquet/schema.h" |
40 | #include "parquet/types.h" |
41 | #include "parquet/util/macros.h" |
42 | #include "parquet/util/memory.h" |
43 | #include "parquet/util/visibility.h" |
44 | |
45 | namespace arrow { |
46 | |
47 | namespace BitUtil { |
48 | class BitReader; |
49 | } // namespace BitUtil |
50 | |
51 | namespace util { |
52 | class RleDecoder; |
53 | } // namespace util |
54 | |
55 | } // namespace arrow |
56 | |
57 | namespace parquet { |
58 | |
59 | // 16 MB is the default maximum page header size |
60 | static constexpr uint32_t = 16 * 1024 * 1024; |
61 | |
62 | // 16 KB is the default expected page header size |
63 | static constexpr uint32_t = 16 * 1024; |
64 | |
65 | namespace BitUtil = ::arrow::BitUtil; |
66 | |
67 | class PARQUET_EXPORT LevelDecoder { |
68 | public: |
69 | LevelDecoder(); |
70 | ~LevelDecoder(); |
71 | |
72 | // Initialize the LevelDecoder state with new data |
73 | // and return the number of bytes consumed |
74 | int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, |
75 | const uint8_t* data); |
76 | |
77 | // Decodes a batch of levels into an array and returns the number of levels decoded |
78 | int Decode(int batch_size, int16_t* levels); |
79 | |
80 | private: |
81 | int bit_width_; |
82 | int num_values_remaining_; |
83 | Encoding::type encoding_; |
84 | std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_; |
85 | std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_; |
86 | }; |
87 | |
88 | // Abstract page iterator interface. This way, we can feed column pages to the |
89 | // ColumnReader through whatever mechanism we choose |
90 | class PARQUET_EXPORT { |
91 | public: |
92 | virtual () = default; |
93 | |
94 | static std::unique_ptr<PageReader> ( |
95 | std::unique_ptr<InputStream> stream, int64_t total_num_rows, |
96 | Compression::type codec, |
97 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); |
98 | |
99 | // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> |
100 | // containing new Page otherwise |
101 | virtual std::shared_ptr<Page> () = 0; |
102 | |
103 | virtual void (uint32_t size) = 0; |
104 | }; |
105 | |
106 | class PARQUET_EXPORT ColumnReader { |
107 | public: |
108 | ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>, |
109 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); |
110 | virtual ~ColumnReader(); |
111 | |
112 | static std::shared_ptr<ColumnReader> Make( |
113 | const ColumnDescriptor* descr, std::unique_ptr<PageReader> , |
114 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); |
115 | |
116 | // Returns true if there are still values in this column. |
117 | bool HasNext() { |
118 | // Either there is no data page available yet, or the data page has been |
119 | // exhausted |
120 | if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { |
121 | if (!ReadNewPage() || num_buffered_values_ == 0) { |
122 | return false; |
123 | } |
124 | } |
125 | return true; |
126 | } |
127 | |
128 | Type::type type() const { return descr_->physical_type(); } |
129 | |
130 | const ColumnDescriptor* descr() const { return descr_; } |
131 | |
132 | protected: |
133 | virtual bool ReadNewPage() = 0; |
134 | |
135 | // Read multiple definition levels into preallocated memory |
136 | // |
137 | // Returns the number of decoded definition levels |
138 | int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels); |
139 | |
140 | // Read multiple repetition levels into preallocated memory |
141 | // Returns the number of decoded repetition levels |
142 | int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels); |
143 | |
144 | int64_t available_values_current_page() const { |
145 | return num_buffered_values_ - num_decoded_values_; |
146 | } |
147 | |
148 | void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; } |
149 | |
150 | const ColumnDescriptor* descr_; |
151 | |
152 | std::unique_ptr<PageReader> pager_; |
153 | std::shared_ptr<Page> current_page_; |
154 | |
155 | // Not set if full schema for this field has no optional or repeated elements |
156 | LevelDecoder definition_level_decoder_; |
157 | |
158 | // Not set for flat schemas. |
159 | LevelDecoder repetition_level_decoder_; |
160 | |
161 | // The total number of values stored in the data page. This is the maximum of |
162 | // the number of encoded definition levels or encoded values. For |
163 | // non-repeated, required columns, this is equal to the number of encoded |
164 | // values. For repeated or optional values, there may be fewer data values |
165 | // than levels, and this tells you how many encoded levels there are in that |
166 | // case. |
167 | int64_t num_buffered_values_; |
168 | |
169 | // The number of values from the current data page that have been decoded |
170 | // into memory |
171 | int64_t num_decoded_values_; |
172 | |
173 | ::arrow::MemoryPool* pool_; |
174 | }; |
175 | |
176 | namespace internal { |
177 | |
178 | static inline void DefinitionLevelsToBitmap( |
179 | const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level, |
180 | const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count, |
181 | uint8_t* valid_bits, int64_t valid_bits_offset) { |
182 | // We assume here that valid_bits is large enough to accommodate the |
183 | // additional definition levels and the ones that have already been written |
184 | ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset, |
185 | valid_bits_offset + num_def_levels); |
186 | |
187 | // TODO(itaiin): As an interim solution we are splitting the code path here |
188 | // between repeated+flat column reads, and non-repeated+nested reads. |
189 | // Those paths need to be merged in the future |
190 | for (int i = 0; i < num_def_levels; ++i) { |
191 | if (def_levels[i] == max_definition_level) { |
192 | valid_bits_writer.Set(); |
193 | } else if (max_repetition_level > 0) { |
194 | // repetition+flat case |
195 | if (def_levels[i] == (max_definition_level - 1)) { |
196 | valid_bits_writer.Clear(); |
197 | *null_count += 1; |
198 | } else { |
199 | continue; |
200 | } |
201 | } else { |
202 | // non-repeated+nested case |
203 | if (def_levels[i] < max_definition_level) { |
204 | valid_bits_writer.Clear(); |
205 | *null_count += 1; |
206 | } else { |
207 | throw ParquetException("definition level exceeds maximum" ); |
208 | } |
209 | } |
210 | |
211 | valid_bits_writer.Next(); |
212 | } |
213 | valid_bits_writer.Finish(); |
214 | *values_read = valid_bits_writer.position(); |
215 | } |
216 | |
217 | } // namespace internal |
218 | |
219 | // API to read values from a single column. This is a main client facing API. |
220 | template <typename DType> |
221 | class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnReader : public ColumnReader { |
222 | public: |
223 | typedef typename DType::c_type T; |
224 | |
225 | TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> , |
226 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
227 | : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULLPTR) {} |
228 | |
229 | // Read a batch of repetition levels, definition levels, and values from the |
230 | // column. |
231 | // |
232 | // Since null values are not stored in the values, the number of values read |
233 | // may be less than the number of repetition and definition levels. With |
234 | // nested data this is almost certainly true. |
235 | // |
236 | // Set def_levels or rep_levels to nullptr if you want to skip reading them. |
237 | // This is only safe if you know through some other source that there are no |
238 | // undefined values. |
239 | // |
240 | // To fully exhaust a row group, you must read batches until the number of |
241 | // values read reaches the number of stored values according to the metadata. |
242 | // |
243 | // This API is the same for both V1 and V2 of the DataPage |
244 | // |
245 | // @returns: actual number of levels read (see values_read for number of values read) |
246 | int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
247 | T* values, int64_t* values_read); |
248 | |
249 | /// Read a batch of repetition levels, definition levels, and values from the |
250 | /// column and leave spaces for null entries on the lowest level in the values |
251 | /// buffer. |
252 | /// |
253 | /// In comparision to ReadBatch the length of repetition and definition levels |
254 | /// is the same as of the number of values read for max_definition_level == 1. |
255 | /// In the case of max_definition_level > 1, the repetition and definition |
256 | /// levels are larger than the values but the values include the null entries |
257 | /// with definition_level == (max_definition_level - 1). |
258 | /// |
259 | /// To fully exhaust a row group, you must read batches until the number of |
260 | /// values read reaches the number of stored values according to the metadata. |
261 | /// |
262 | /// @param batch_size the number of levels to read |
263 | /// @param[out] def_levels The Parquet definition levels, output has |
264 | /// the length levels_read. |
265 | /// @param[out] rep_levels The Parquet repetition levels, output has |
266 | /// the length levels_read. |
267 | /// @param[out] values The values in the lowest nested level including |
268 | /// spacing for nulls on the lowest levels; output has the length |
269 | /// values_read. |
270 | /// @param[out] valid_bits Memory allocated for a bitmap that indicates if |
271 | /// the row is null or on the maximum definition level. For performance |
272 | /// reasons the underlying buffer should be able to store 1 bit more than |
273 | /// required. If this requires an additional byte, this byte is only read |
274 | /// but never written to. |
275 | /// @param valid_bits_offset The offset in bits of the valid_bits where the |
276 | /// first relevant bit resides. |
277 | /// @param[out] levels_read The number of repetition/definition levels that were read. |
278 | /// @param[out] values_read The number of values read, this includes all |
279 | /// non-null entries as well as all null-entries on the lowest level |
280 | /// (i.e. definition_level == max_definition_level - 1) |
281 | /// @param[out] null_count The number of nulls on the lowest levels. |
282 | /// (i.e. (values_read - null_count) is total number of non-null entries) |
283 | int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
284 | T* values, uint8_t* valid_bits, int64_t valid_bits_offset, |
285 | int64_t* levels_read, int64_t* values_read, |
286 | int64_t* null_count); |
287 | |
288 | // Skip reading levels |
289 | // Returns the number of levels skipped |
290 | int64_t Skip(int64_t num_rows_to_skip); |
291 | |
292 | private: |
293 | typedef Decoder<DType> DecoderType; |
294 | |
295 | // Advance to the next data page |
296 | bool ReadNewPage() override; |
297 | |
298 | // Read up to batch_size values from the current data page into the |
299 | // pre-allocated memory T* |
300 | // |
301 | // @returns: the number of values read into the out buffer |
302 | int64_t ReadValues(int64_t batch_size, T* out); |
303 | |
304 | // Read up to batch_size values from the current data page into the |
305 | // pre-allocated memory T*, leaving spaces for null entries according |
306 | // to the def_levels. |
307 | // |
308 | // @returns: the number of values read into the out buffer |
309 | int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count, |
310 | uint8_t* valid_bits, int64_t valid_bits_offset); |
311 | |
312 | // Map of encoding type to the respective decoder object. For example, a |
313 | // column chunk's data pages may include both dictionary-encoded and |
314 | // plain-encoded data. |
315 | std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_; |
316 | |
317 | void ConfigureDictionary(const DictionaryPage* page); |
318 | |
319 | DecoderType* current_decoder_; |
320 | }; |
321 | |
322 | // ---------------------------------------------------------------------- |
323 | // Type column reader implementations |
324 | |
325 | template <typename DType> |
326 | inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) { |
327 | int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size)); |
328 | return num_decoded; |
329 | } |
330 | |
331 | template <typename DType> |
332 | inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out, |
333 | int64_t null_count, |
334 | uint8_t* valid_bits, |
335 | int64_t valid_bits_offset) { |
336 | return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), |
337 | static_cast<int>(null_count), valid_bits, |
338 | valid_bits_offset); |
339 | } |
340 | |
341 | template <typename DType> |
342 | inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size, |
343 | int16_t* def_levels, |
344 | int16_t* rep_levels, T* values, |
345 | int64_t* values_read) { |
346 | // HasNext invokes ReadNewPage |
347 | if (!HasNext()) { |
348 | *values_read = 0; |
349 | return 0; |
350 | } |
351 | |
352 | // TODO(wesm): keep reading data pages until batch_size is reached, or the |
353 | // row group is finished |
354 | batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_); |
355 | |
356 | int64_t num_def_levels = 0; |
357 | int64_t num_rep_levels = 0; |
358 | |
359 | int64_t values_to_read = 0; |
360 | |
361 | // If the field is required and non-repeated, there are no definition levels |
362 | if (descr_->max_definition_level() > 0 && def_levels) { |
363 | num_def_levels = ReadDefinitionLevels(batch_size, def_levels); |
364 | // TODO(wesm): this tallying of values-to-decode can be performed with better |
365 | // cache-efficiency if fused with the level decoding. |
366 | for (int64_t i = 0; i < num_def_levels; ++i) { |
367 | if (def_levels[i] == descr_->max_definition_level()) { |
368 | ++values_to_read; |
369 | } |
370 | } |
371 | } else { |
372 | // Required field, read all values |
373 | values_to_read = batch_size; |
374 | } |
375 | |
376 | // Not present for non-repeated fields |
377 | if (descr_->max_repetition_level() > 0 && rep_levels) { |
378 | num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels); |
379 | if (def_levels && num_def_levels != num_rep_levels) { |
380 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
381 | } |
382 | } |
383 | |
384 | *values_read = ReadValues(values_to_read, values); |
385 | int64_t total_values = std::max(num_def_levels, *values_read); |
386 | ConsumeBufferedValues(total_values); |
387 | |
388 | return total_values; |
389 | } |
390 | |
391 | namespace internal { |
392 | |
393 | // TODO(itaiin): another code path split to merge when the general case is done |
394 | static inline bool HasSpacedValues(const ColumnDescriptor* descr) { |
395 | if (descr->max_repetition_level() > 0) { |
396 | // repeated+flat case |
397 | return !descr->schema_node()->is_required(); |
398 | } else { |
399 | // non-repeated+nested case |
400 | // Find if a node forces nulls in the lowest level along the hierarchy |
401 | const schema::Node* node = descr->schema_node().get(); |
402 | while (node) { |
403 | if (node->is_optional()) { |
404 | return true; |
405 | } |
406 | node = node->parent(); |
407 | } |
408 | return false; |
409 | } |
410 | } |
411 | |
412 | } // namespace internal |
413 | |
414 | template <typename DType> |
415 | inline int64_t TypedColumnReader<DType>::ReadBatchSpaced( |
416 | int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, |
417 | uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, |
418 | int64_t* values_read, int64_t* null_count_out) { |
419 | // HasNext invokes ReadNewPage |
420 | if (!HasNext()) { |
421 | *levels_read = 0; |
422 | *values_read = 0; |
423 | *null_count_out = 0; |
424 | return 0; |
425 | } |
426 | |
427 | int64_t total_values; |
428 | // TODO(wesm): keep reading data pages until batch_size is reached, or the |
429 | // row group is finished |
430 | batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_); |
431 | |
432 | // If the field is required and non-repeated, there are no definition levels |
433 | if (descr_->max_definition_level() > 0) { |
434 | int64_t num_def_levels = ReadDefinitionLevels(batch_size, def_levels); |
435 | |
436 | // Not present for non-repeated fields |
437 | if (descr_->max_repetition_level() > 0) { |
438 | int64_t num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels); |
439 | if (num_def_levels != num_rep_levels) { |
440 | throw ParquetException("Number of decoded rep / def levels did not match" ); |
441 | } |
442 | } |
443 | |
444 | const bool has_spaced_values = internal::HasSpacedValues(descr_); |
445 | |
446 | int64_t null_count = 0; |
447 | if (!has_spaced_values) { |
448 | int values_to_read = 0; |
449 | for (int64_t i = 0; i < num_def_levels; ++i) { |
450 | if (def_levels[i] == descr_->max_definition_level()) { |
451 | ++values_to_read; |
452 | } |
453 | } |
454 | total_values = ReadValues(values_to_read, values); |
455 | for (int64_t i = 0; i < total_values; i++) { |
456 | ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); |
457 | } |
458 | *values_read = total_values; |
459 | } else { |
460 | int16_t max_definition_level = descr_->max_definition_level(); |
461 | int16_t max_repetition_level = descr_->max_repetition_level(); |
462 | internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level, |
463 | max_repetition_level, values_read, &null_count, |
464 | valid_bits, valid_bits_offset); |
465 | total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count), |
466 | valid_bits, valid_bits_offset); |
467 | } |
468 | *levels_read = num_def_levels; |
469 | *null_count_out = null_count; |
470 | |
471 | } else { |
472 | // Required field, read all values |
473 | total_values = ReadValues(batch_size, values); |
474 | for (int64_t i = 0; i < total_values; i++) { |
475 | ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); |
476 | } |
477 | *null_count_out = 0; |
478 | *levels_read = total_values; |
479 | } |
480 | |
481 | ConsumeBufferedValues(*levels_read); |
482 | return total_values; |
483 | } |
484 | |
485 | template <typename DType> |
486 | int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) { |
487 | int64_t rows_to_skip = num_rows_to_skip; |
488 | while (HasNext() && rows_to_skip > 0) { |
489 | // If the number of rows to skip is more than the number of undecoded values, skip the |
490 | // Page. |
491 | if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) { |
492 | rows_to_skip -= num_buffered_values_ - num_decoded_values_; |
493 | num_decoded_values_ = num_buffered_values_; |
494 | } else { |
495 | // We need to read this Page |
496 | // Jump to the right offset in the Page |
497 | int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint |
498 | int64_t values_read = 0; |
499 | |
500 | std::shared_ptr<ResizableBuffer> vals = AllocateBuffer( |
501 | this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size); |
502 | std::shared_ptr<ResizableBuffer> def_levels = |
503 | AllocateBuffer(this->pool_, batch_size * sizeof(int16_t)); |
504 | |
505 | std::shared_ptr<ResizableBuffer> rep_levels = |
506 | AllocateBuffer(this->pool_, batch_size * sizeof(int16_t)); |
507 | |
508 | do { |
509 | batch_size = std::min(batch_size, rows_to_skip); |
510 | values_read = ReadBatch(static_cast<int>(batch_size), |
511 | reinterpret_cast<int16_t*>(def_levels->mutable_data()), |
512 | reinterpret_cast<int16_t*>(rep_levels->mutable_data()), |
513 | reinterpret_cast<T*>(vals->mutable_data()), &values_read); |
514 | rows_to_skip -= values_read; |
515 | } while (values_read > 0 && rows_to_skip > 0); |
516 | } |
517 | } |
518 | return num_rows_to_skip - rows_to_skip; |
519 | } |
520 | |
521 | // ---------------------------------------------------------------------- |
522 | // Template instantiations |
523 | |
524 | typedef TypedColumnReader<BooleanType> BoolReader; |
525 | typedef TypedColumnReader<Int32Type> Int32Reader; |
526 | typedef TypedColumnReader<Int64Type> Int64Reader; |
527 | typedef TypedColumnReader<Int96Type> Int96Reader; |
528 | typedef TypedColumnReader<FloatType> FloatReader; |
529 | typedef TypedColumnReader<DoubleType> DoubleReader; |
530 | typedef TypedColumnReader<ByteArrayType> ByteArrayReader; |
531 | typedef TypedColumnReader<FLBAType> FixedLenByteArrayReader; |
532 | |
533 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<BooleanType>; |
534 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<Int32Type>; |
535 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<Int64Type>; |
536 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<Int96Type>; |
537 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<FloatType>; |
538 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<DoubleType>; |
539 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<ByteArrayType>; |
540 | PARQUET_EXTERN_TEMPLATE TypedColumnReader<FLBAType>; |
541 | |
542 | } // namespace parquet |
543 | |
544 | #endif // PARQUET_COLUMN_READER_H |
545 | |