1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/encoding.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstdlib>
23#include <memory>
24#include <utility>
25#include <vector>
26
27#include "arrow/array.h"
28#include "arrow/builder.h"
29#include "arrow/util/bit_stream_utils.h"
30#include "arrow/util/checked_cast.h"
31#include "arrow/util/hashing.h"
32#include "arrow/util/logging.h"
33#include "arrow/util/rle_encoding.h"
34#include "arrow/util/ubsan.h"
35
36#include "parquet/exception.h"
37#include "parquet/platform.h"
38#include "parquet/schema.h"
39#include "parquet/types.h"
40
41using arrow::Status;
42using arrow::internal::checked_cast;
43
44namespace parquet {
45
46constexpr int64_t kInMemoryDefaultCapacity = 1024;
47
48class EncoderImpl : virtual public Encoder {
49 public:
50 EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool)
51 : descr_(descr),
52 encoding_(encoding),
53 pool_(pool),
54 type_length_(descr ? descr->type_length() : -1) {}
55
56 Encoding::type encoding() const override { return encoding_; }
57
58 MemoryPool* memory_pool() const override { return pool_; }
59
60 protected:
61 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
62 const ColumnDescriptor* descr_;
63 const Encoding::type encoding_;
64 MemoryPool* pool_;
65
66 /// Type length from descr
67 int type_length_;
68};
69
70// ----------------------------------------------------------------------
71// Plain encoder implementation
72
73template <typename DType>
74class PlainEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
75 public:
76 using T = typename DType::c_type;
77
78 explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
79 : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}
80
81 int64_t EstimatedDataEncodedSize() override { return sink_.length(); }
82
83 std::shared_ptr<Buffer> FlushValues() override {
84 std::shared_ptr<Buffer> buffer;
85 PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
86 return buffer;
87 }
88
89 void Put(const T* buffer, int num_values) override;
90
91 void Put(const arrow::Array& values) override;
92
93 void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
94 int64_t valid_bits_offset) override {
95 std::shared_ptr<ResizableBuffer> buffer;
96 PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(),
97 num_values * sizeof(T), &buffer));
98 int32_t num_valid_values = 0;
99 arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
100 num_values);
101 T* data = reinterpret_cast<T*>(buffer->mutable_data());
102 for (int32_t i = 0; i < num_values; i++) {
103 if (valid_bits_reader.IsSet()) {
104 data[num_valid_values++] = src[i];
105 }
106 valid_bits_reader.Next();
107 }
108 Put(data, num_valid_values);
109 }
110
111 void UnsafePutByteArray(const void* data, uint32_t length) {
112 DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL";
113 sink_.UnsafeAppend(&length, sizeof(uint32_t));
114 sink_.UnsafeAppend(data, static_cast<int64_t>(length));
115 }
116
117 void Put(const ByteArray& val) {
118 // Write the result to the output stream
119 const int64_t increment = static_cast<int64_t>(val.len + sizeof(uint32_t));
120 if (ARROW_PREDICT_FALSE(sink_.length() + increment > sink_.capacity())) {
121 PARQUET_THROW_NOT_OK(sink_.Reserve(increment));
122 }
123 UnsafePutByteArray(val.ptr, val.len);
124 }
125
126 protected:
127 arrow::BufferBuilder sink_;
128};
129
130template <typename DType>
131void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
132 if (num_values > 0) {
133 PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T)));
134 }
135}
136
137template <>
138inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
139 for (int i = 0; i < num_values; ++i) {
140 Put(src[i]);
141 }
142}
143
144template <typename DType>
145void PlainEncoder<DType>::Put(const arrow::Array& values) {
146 ParquetException::NYI(values.type()->ToString());
147}
148
149void AssertBinary(const arrow::Array& values) {
150 if (values.type_id() != arrow::Type::BINARY &&
151 values.type_id() != arrow::Type::STRING) {
152 throw ParquetException("Only BinaryArray and subclasses supported");
153 }
154}
155
156template <>
157void PlainEncoder<ByteArrayType>::Put(const arrow::Array& values) {
158 AssertBinary(values);
159 const auto& data = checked_cast<const arrow::BinaryArray&>(values);
160 const int64_t total_bytes = data.value_offset(data.length()) - data.value_offset(0);
161 PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes + data.length() * sizeof(uint32_t)));
162
163 if (data.null_count() == 0) {
164 // no nulls, just dump the data
165 for (int64_t i = 0; i < data.length(); i++) {
166 auto view = data.GetView(i);
167 UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
168 }
169 } else {
170 for (int64_t i = 0; i < data.length(); i++) {
171 if (data.IsValid(i)) {
172 auto view = data.GetView(i);
173 UnsafePutByteArray(view.data(), static_cast<uint32_t>(view.size()));
174 }
175 }
176 }
177}
178
179template <>
180inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
181 if (descr_->type_length() == 0) {
182 return;
183 }
184 for (int i = 0; i < num_values; ++i) {
185 // Write the result to the output stream
186 DCHECK(src[i].ptr != nullptr) << "Value ptr cannot be NULL";
187 PARQUET_THROW_NOT_OK(sink_.Append(src[i].ptr, descr_->type_length()));
188 }
189}
190
191class PlainFLBAEncoder : public PlainEncoder<FLBAType>, virtual public FLBAEncoder {
192 public:
193 using BASE = PlainEncoder<FLBAType>;
194 using BASE::PlainEncoder;
195};
196
197class PlainBooleanEncoder : public EncoderImpl,
198 virtual public TypedEncoder<BooleanType>,
199 virtual public BooleanEncoder {
200 public:
201 explicit PlainBooleanEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
202 : EncoderImpl(descr, Encoding::PLAIN, pool),
203 bits_available_(kInMemoryDefaultCapacity * 8),
204 bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
205 sink_(pool),
206 bit_writer_(bits_buffer_->mutable_data(),
207 static_cast<int>(bits_buffer_->size())) {}
208
209 int64_t EstimatedDataEncodedSize() override;
210 std::shared_ptr<Buffer> FlushValues() override;
211
212 void Put(const bool* src, int num_values) override;
213 void Put(const std::vector<bool>& src, int num_values) override;
214
215 void PutSpaced(const bool* src, int num_values, const uint8_t* valid_bits,
216 int64_t valid_bits_offset) override {
217 std::shared_ptr<ResizableBuffer> buffer;
218 PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(),
219 num_values * sizeof(T), &buffer));
220 int32_t num_valid_values = 0;
221 arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
222 num_values);
223 T* data = reinterpret_cast<T*>(buffer->mutable_data());
224 for (int32_t i = 0; i < num_values; i++) {
225 if (valid_bits_reader.IsSet()) {
226 data[num_valid_values++] = src[i];
227 }
228 valid_bits_reader.Next();
229 }
230 Put(data, num_valid_values);
231 }
232
233 void Put(const arrow::Array& values) override {
234 ParquetException::NYI("Direct Arrow to Boolean writes not implemented");
235 }
236
237 private:
238 int bits_available_;
239 std::shared_ptr<ResizableBuffer> bits_buffer_;
240 arrow::BufferBuilder sink_;
241 arrow::BitUtil::BitWriter bit_writer_;
242
243 template <typename SequenceType>
244 void PutImpl(const SequenceType& src, int num_values);
245};
246
247template <typename SequenceType>
248void PlainBooleanEncoder::PutImpl(const SequenceType& src, int num_values) {
249 int bit_offset = 0;
250 if (bits_available_ > 0) {
251 int bits_to_write = std::min(bits_available_, num_values);
252 for (int i = 0; i < bits_to_write; i++) {
253 bit_writer_.PutValue(src[i], 1);
254 }
255 bits_available_ -= bits_to_write;
256 bit_offset = bits_to_write;
257
258 if (bits_available_ == 0) {
259 bit_writer_.Flush();
260 PARQUET_THROW_NOT_OK(
261 sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
262 bit_writer_.Clear();
263 }
264 }
265
266 int bits_remaining = num_values - bit_offset;
267 while (bit_offset < num_values) {
268 bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
269
270 int bits_to_write = std::min(bits_available_, bits_remaining);
271 for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
272 bit_writer_.PutValue(src[i], 1);
273 }
274 bit_offset += bits_to_write;
275 bits_available_ -= bits_to_write;
276 bits_remaining -= bits_to_write;
277
278 if (bits_available_ == 0) {
279 bit_writer_.Flush();
280 PARQUET_THROW_NOT_OK(
281 sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
282 bit_writer_.Clear();
283 }
284 }
285}
286
287int64_t PlainBooleanEncoder::EstimatedDataEncodedSize() {
288 int64_t position = sink_.length();
289 return position + bit_writer_.bytes_written();
290}
291
292std::shared_ptr<Buffer> PlainBooleanEncoder::FlushValues() {
293 if (bits_available_ > 0) {
294 bit_writer_.Flush();
295 PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
296 bit_writer_.Clear();
297 bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
298 }
299
300 std::shared_ptr<Buffer> buffer;
301 PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
302 return buffer;
303}
304
305void PlainBooleanEncoder::Put(const bool* src, int num_values) {
306 PutImpl(src, num_values);
307}
308
309void PlainBooleanEncoder::Put(const std::vector<bool>& src, int num_values) {
310 PutImpl(src, num_values);
311}
312
313// ----------------------------------------------------------------------
314// DictEncoder<T> implementations
315
316template <typename DType>
317struct DictEncoderTraits {
318 using c_type = typename DType::c_type;
319 using MemoTableType = arrow::internal::ScalarMemoTable<c_type>;
320};
321
322template <>
323struct DictEncoderTraits<ByteArrayType> {
324 using MemoTableType = arrow::internal::BinaryMemoTable;
325};
326
327template <>
328struct DictEncoderTraits<FLBAType> {
329 using MemoTableType = arrow::internal::BinaryMemoTable;
330};
331
332// Initially 1024 elements
333static constexpr int32_t kInitialHashTableSize = 1 << 10;
334
335/// See the dictionary encoding section of
336/// https://github.com/Parquet/parquet-format. The encoding supports
337/// streaming encoding. Values are encoded as they are added while the
338/// dictionary is being constructed. At any time, the buffered values
339/// can be written out with the current dictionary size. More values
340/// can then be added to the encoder, including new dictionary
341/// entries.
342template <typename DType>
343class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder<DType> {
344 using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType;
345
346 public:
347 typedef typename DType::c_type T;
348
349 explicit DictEncoderImpl(const ColumnDescriptor* desc, MemoryPool* pool)
350 : EncoderImpl(desc, Encoding::PLAIN_DICTIONARY, pool),
351 dict_encoded_size_(0),
352 memo_table_(pool, kInitialHashTableSize) {}
353
354 ~DictEncoderImpl() override { DCHECK(buffered_indices_.empty()); }
355
356 int dict_encoded_size() override { return dict_encoded_size_; }
357
358 int WriteIndices(uint8_t* buffer, int buffer_len) override {
359 // Write bit width in first byte
360 *buffer = static_cast<uint8_t>(bit_width());
361 ++buffer;
362 --buffer_len;
363
364 arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
365
366 for (int32_t index : buffered_indices_) {
367 if (!encoder.Put(index)) return -1;
368 }
369 encoder.Flush();
370
371 ClearIndices();
372 return 1 + encoder.len();
373 }
374
375 void set_type_length(int type_length) { this->type_length_ = type_length; }
376
377 /// Returns a conservative estimate of the number of bytes needed to encode the buffered
378 /// indices. Used to size the buffer passed to WriteIndices().
379 int64_t EstimatedDataEncodedSize() override {
380 // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
381 // reserve
382 // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
383 // but not reserving them would cause the encoder to fail.
384 return 1 +
385 arrow::util::RleEncoder::MaxBufferSize(
386 bit_width(), static_cast<int>(buffered_indices_.size())) +
387 arrow::util::RleEncoder::MinBufferSize(bit_width());
388 }
389
390 /// The minimum bit width required to encode the currently buffered indices.
391 int bit_width() const override {
392 if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0;
393 if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1;
394 return BitUtil::Log2(num_entries());
395 }
396
397 /// Encode value. Note that this does not actually write any data, just
398 /// buffers the value's index to be written later.
399 inline void Put(const T& value);
400
401 // Not implemented for other data types
402 inline void PutByteArray(const void* ptr, int32_t length);
403
404 void Put(const T* src, int num_values) override {
405 for (int32_t i = 0; i < num_values; i++) {
406 Put(src[i]);
407 }
408 }
409
410 void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
411 int64_t valid_bits_offset) override {
412 arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
413 num_values);
414 for (int32_t i = 0; i < num_values; i++) {
415 if (valid_bits_reader.IsSet()) {
416 Put(src[i]);
417 }
418 valid_bits_reader.Next();
419 }
420 }
421
422 void Put(const arrow::Array& values) override;
423 void PutDictionary(const arrow::Array& values) override;
424
425 template <typename ArrowType>
426 void PutIndicesTyped(const arrow::Array& data) {
427 using ArrayType = typename arrow::TypeTraits<ArrowType>::ArrayType;
428 const auto& indices = checked_cast<const ArrayType&>(data);
429 auto values = indices.raw_values();
430
431 size_t buffer_position = buffered_indices_.size();
432 buffered_indices_.resize(
433 buffer_position + static_cast<size_t>(indices.length() - indices.null_count()));
434 if (indices.null_count() > 0) {
435 arrow::internal::BitmapReader valid_bits_reader(indices.null_bitmap_data(),
436 indices.offset(), indices.length());
437 for (int64_t i = 0; i < indices.length(); ++i) {
438 if (valid_bits_reader.IsSet()) {
439 buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
440 }
441 valid_bits_reader.Next();
442 }
443 } else {
444 for (int64_t i = 0; i < indices.length(); ++i) {
445 buffered_indices_[buffer_position++] = static_cast<int32_t>(values[i]);
446 }
447 }
448 }
449
450 void PutIndices(const arrow::Array& data) override {
451 switch (data.type()->id()) {
452 case arrow::Type::INT8:
453 return PutIndicesTyped<arrow::Int8Type>(data);
454 case arrow::Type::INT16:
455 return PutIndicesTyped<arrow::Int16Type>(data);
456 case arrow::Type::INT32:
457 return PutIndicesTyped<arrow::Int32Type>(data);
458 case arrow::Type::INT64:
459 return PutIndicesTyped<arrow::Int64Type>(data);
460 default:
461 throw ParquetException("Dictionary indices were not signed integer");
462 }
463 }
464
465 std::shared_ptr<Buffer> FlushValues() override {
466 std::shared_ptr<ResizableBuffer> buffer =
467 AllocateBuffer(this->pool_, EstimatedDataEncodedSize());
468 int result_size = WriteIndices(buffer->mutable_data(),
469 static_cast<int>(EstimatedDataEncodedSize()));
470 PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
471 return std::move(buffer);
472 }
473
474 /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
475 /// dict_encoded_size() bytes.
476 void WriteDict(uint8_t* buffer) override;
477
478 /// The number of entries in the dictionary.
479 int num_entries() const override { return memo_table_.size(); }
480
481 private:
482 /// Clears all the indices (but leaves the dictionary).
483 void ClearIndices() { buffered_indices_.clear(); }
484
485 /// Indices that have not yet be written out by WriteIndices().
486 std::vector<int32_t> buffered_indices_;
487
488 /// The number of bytes needed to encode the dictionary.
489 int dict_encoded_size_;
490
491 MemoTableType memo_table_;
492};
493
494template <typename DType>
495void DictEncoderImpl<DType>::WriteDict(uint8_t* buffer) {
496 // For primitive types, only a memcpy
497 DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size());
498 memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer));
499}
500
501// ByteArray and FLBA already have the dictionary encoded in their data heaps
502template <>
503void DictEncoderImpl<ByteArrayType>::WriteDict(uint8_t* buffer) {
504 memo_table_.VisitValues(0, [&buffer](const arrow::util::string_view& v) {
505 uint32_t len = static_cast<uint32_t>(v.length());
506 memcpy(buffer, &len, sizeof(len));
507 buffer += sizeof(len);
508 memcpy(buffer, v.data(), len);
509 buffer += len;
510 });
511}
512
513template <>
514void DictEncoderImpl<FLBAType>::WriteDict(uint8_t* buffer) {
515 memo_table_.VisitValues(0, [&](const arrow::util::string_view& v) {
516 DCHECK_EQ(v.length(), static_cast<size_t>(type_length_));
517 memcpy(buffer, v.data(), type_length_);
518 buffer += type_length_;
519 });
520}
521
522template <typename DType>
523inline void DictEncoderImpl<DType>::Put(const T& v) {
524 // Put() implementation for primitive types
525 auto on_found = [](int32_t memo_index) {};
526 auto on_not_found = [this](int32_t memo_index) {
527 dict_encoded_size_ += static_cast<int>(sizeof(T));
528 };
529
530 auto memo_index = memo_table_.GetOrInsert(v, on_found, on_not_found);
531 buffered_indices_.push_back(memo_index);
532}
533
534template <typename DType>
535inline void DictEncoderImpl<DType>::PutByteArray(const void* ptr, int32_t length) {
536 DCHECK(false);
537}
538
539template <>
540inline void DictEncoderImpl<ByteArrayType>::PutByteArray(const void* ptr,
541 int32_t length) {
542 static const uint8_t empty[] = {0};
543
544 auto on_found = [](int32_t memo_index) {};
545 auto on_not_found = [&](int32_t memo_index) {
546 dict_encoded_size_ += static_cast<int>(length + sizeof(uint32_t));
547 };
548
549 DCHECK(ptr != nullptr || length == 0);
550 ptr = (ptr != nullptr) ? ptr : empty;
551 auto memo_index = memo_table_.GetOrInsert(ptr, length, on_found, on_not_found);
552 buffered_indices_.push_back(memo_index);
553}
554
555template <>
556inline void DictEncoderImpl<ByteArrayType>::Put(const ByteArray& val) {
557 return PutByteArray(val.ptr, static_cast<int32_t>(val.len));
558}
559
560template <>
561inline void DictEncoderImpl<FLBAType>::Put(const FixedLenByteArray& v) {
562 static const uint8_t empty[] = {0};
563
564 auto on_found = [](int32_t memo_index) {};
565 auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; };
566
567 DCHECK(v.ptr != nullptr || type_length_ == 0);
568 const void* ptr = (v.ptr != nullptr) ? v.ptr : empty;
569 auto memo_index = memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found);
570 buffered_indices_.push_back(memo_index);
571}
572
573template <typename DType>
574void DictEncoderImpl<DType>::Put(const arrow::Array& values) {
575 ParquetException::NYI(values.type()->ToString());
576}
577
578template <>
579void DictEncoderImpl<ByteArrayType>::Put(const arrow::Array& values) {
580 AssertBinary(values);
581 const auto& data = checked_cast<const arrow::BinaryArray&>(values);
582 if (data.null_count() == 0) {
583 // no nulls, just dump the data
584 for (int64_t i = 0; i < data.length(); i++) {
585 auto view = data.GetView(i);
586 PutByteArray(view.data(), static_cast<int32_t>(view.size()));
587 }
588 } else {
589 for (int64_t i = 0; i < data.length(); i++) {
590 if (data.IsValid(i)) {
591 auto view = data.GetView(i);
592 PutByteArray(view.data(), static_cast<int32_t>(view.size()));
593 }
594 }
595 }
596}
597
598template <typename DType>
599void DictEncoderImpl<DType>::PutDictionary(const arrow::Array& values) {
600 ParquetException::NYI(values.type()->ToString());
601}
602
603template <>
604void DictEncoderImpl<ByteArrayType>::PutDictionary(const arrow::Array& values) {
605 AssertBinary(values);
606 if (this->num_entries() > 0) {
607 throw ParquetException("Can only call PutDictionary on an empty DictEncoder");
608 }
609
610 const auto& data = checked_cast<const arrow::BinaryArray&>(values);
611 if (data.null_count() > 0) {
612 throw ParquetException("Inserted binary dictionary cannot cannot contain nulls");
613 }
614 for (int64_t i = 0; i < data.length(); i++) {
615 auto v = data.GetView(i);
616 dict_encoded_size_ += static_cast<int>(v.size() + sizeof(uint32_t));
617 ARROW_IGNORE_EXPR(
618 memo_table_.GetOrInsert(v.data(), static_cast<int32_t>(v.size()),
619 /*on_found=*/[](int32_t memo_index) {},
620 /*on_not_found=*/[](int32_t memo_index) {}));
621 }
622}
623
624// ----------------------------------------------------------------------
625// Encoder and decoder factory functions
626
627std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encoding,
628 bool use_dictionary, const ColumnDescriptor* descr,
629 MemoryPool* pool) {
630 if (use_dictionary) {
631 switch (type_num) {
632 case Type::INT32:
633 return std::unique_ptr<Encoder>(new DictEncoderImpl<Int32Type>(descr, pool));
634 case Type::INT64:
635 return std::unique_ptr<Encoder>(new DictEncoderImpl<Int64Type>(descr, pool));
636 case Type::INT96:
637 return std::unique_ptr<Encoder>(new DictEncoderImpl<Int96Type>(descr, pool));
638 case Type::FLOAT:
639 return std::unique_ptr<Encoder>(new DictEncoderImpl<FloatType>(descr, pool));
640 case Type::DOUBLE:
641 return std::unique_ptr<Encoder>(new DictEncoderImpl<DoubleType>(descr, pool));
642 case Type::BYTE_ARRAY:
643 return std::unique_ptr<Encoder>(new DictEncoderImpl<ByteArrayType>(descr, pool));
644 case Type::FIXED_LEN_BYTE_ARRAY:
645 return std::unique_ptr<Encoder>(new DictEncoderImpl<FLBAType>(descr, pool));
646 default:
647 DCHECK(false) << "Encoder not implemented";
648 break;
649 }
650 } else if (encoding == Encoding::PLAIN) {
651 switch (type_num) {
652 case Type::BOOLEAN:
653 return std::unique_ptr<Encoder>(new PlainBooleanEncoder(descr, pool));
654 case Type::INT32:
655 return std::unique_ptr<Encoder>(new PlainEncoder<Int32Type>(descr, pool));
656 case Type::INT64:
657 return std::unique_ptr<Encoder>(new PlainEncoder<Int64Type>(descr, pool));
658 case Type::INT96:
659 return std::unique_ptr<Encoder>(new PlainEncoder<Int96Type>(descr, pool));
660 case Type::FLOAT:
661 return std::unique_ptr<Encoder>(new PlainEncoder<FloatType>(descr, pool));
662 case Type::DOUBLE:
663 return std::unique_ptr<Encoder>(new PlainEncoder<DoubleType>(descr, pool));
664 case Type::BYTE_ARRAY:
665 return std::unique_ptr<Encoder>(new PlainEncoder<ByteArrayType>(descr, pool));
666 case Type::FIXED_LEN_BYTE_ARRAY:
667 return std::unique_ptr<Encoder>(new PlainEncoder<FLBAType>(descr, pool));
668 default:
669 DCHECK(false) << "Encoder not implemented";
670 break;
671 }
672 } else {
673 ParquetException::NYI("Selected encoding is not supported");
674 }
675 DCHECK(false) << "Should not be able to reach this code";
676 return nullptr;
677}
678
679class DecoderImpl : virtual public Decoder {
680 public:
681 void SetData(int num_values, const uint8_t* data, int len) override {
682 num_values_ = num_values;
683 data_ = data;
684 len_ = len;
685 }
686
687 int values_left() const override { return num_values_; }
688 Encoding::type encoding() const override { return encoding_; }
689
690 protected:
691 explicit DecoderImpl(const ColumnDescriptor* descr, Encoding::type encoding)
692 : descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {}
693
694 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
695 const ColumnDescriptor* descr_;
696
697 const Encoding::type encoding_;
698 int num_values_;
699 const uint8_t* data_;
700 int len_;
701 int type_length_;
702};
703
704template <typename DType>
705class PlainDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
706 public:
707 using T = typename DType::c_type;
708 explicit PlainDecoder(const ColumnDescriptor* descr);
709
710 int Decode(T* buffer, int max_values) override;
711};
712
713template <typename DType>
714PlainDecoder<DType>::PlainDecoder(const ColumnDescriptor* descr)
715 : DecoderImpl(descr, Encoding::PLAIN) {
716 if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
717 type_length_ = descr_->type_length();
718 } else {
719 type_length_ = -1;
720 }
721}
722
723// Decode routine templated on C++ type rather than type enum
724template <typename T>
725inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
726 int type_length, T* out) {
727 int bytes_to_decode = num_values * static_cast<int>(sizeof(T));
728 if (data_size < bytes_to_decode) {
729 ParquetException::EofException();
730 }
731 // If bytes_to_decode == 0, data could be null
732 if (bytes_to_decode > 0) {
733 memcpy(out, data, bytes_to_decode);
734 }
735 return bytes_to_decode;
736}
737
738// Template specialization for BYTE_ARRAY. The written values do not own their
739// own data.
740template <>
741inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
742 int type_length, ByteArray* out) {
743 int bytes_decoded = 0;
744 int increment;
745 for (int i = 0; i < num_values; ++i) {
746 uint32_t len = out[i].len = arrow::util::SafeLoadAs<uint32_t>(data);
747 increment = static_cast<int>(sizeof(uint32_t) + len);
748 if (data_size < increment) ParquetException::EofException();
749 out[i].ptr = data + sizeof(uint32_t);
750 data += increment;
751 data_size -= increment;
752 bytes_decoded += increment;
753 }
754 return bytes_decoded;
755}
756
757// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
758// own their own data.
759template <>
760inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
761 int num_values, int type_length,
762 FixedLenByteArray* out) {
763 int bytes_to_decode = type_length * num_values;
764 if (data_size < bytes_to_decode) {
765 ParquetException::EofException();
766 }
767 for (int i = 0; i < num_values; ++i) {
768 out[i].ptr = data;
769 data += type_length;
770 data_size -= type_length;
771 }
772 return bytes_to_decode;
773}
774
775template <typename DType>
776int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
777 max_values = std::min(max_values, num_values_);
778 int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer);
779 data_ += bytes_consumed;
780 len_ -= bytes_consumed;
781 num_values_ -= max_values;
782 return max_values;
783}
784
785class PlainBooleanDecoder : public DecoderImpl,
786 virtual public TypedDecoder<BooleanType>,
787 virtual public BooleanDecoder {
788 public:
789 explicit PlainBooleanDecoder(const ColumnDescriptor* descr);
790 void SetData(int num_values, const uint8_t* data, int len) override;
791
792 // Two flavors of bool decoding
793 int Decode(uint8_t* buffer, int max_values) override;
794 int Decode(bool* buffer, int max_values) override;
795
796 private:
797 std::unique_ptr<arrow::BitUtil::BitReader> bit_reader_;
798};
799
800PlainBooleanDecoder::PlainBooleanDecoder(const ColumnDescriptor* descr)
801 : DecoderImpl(descr, Encoding::PLAIN) {}
802
803void PlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) {
804 num_values_ = num_values;
805 bit_reader_.reset(new BitUtil::BitReader(data, len));
806}
807
808int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) {
809 max_values = std::min(max_values, num_values_);
810 bool val;
811 arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
812 for (int i = 0; i < max_values; ++i) {
813 if (!bit_reader_->GetValue(1, &val)) {
814 ParquetException::EofException();
815 }
816 if (val) {
817 bit_writer.Set();
818 }
819 bit_writer.Next();
820 }
821 bit_writer.Finish();
822 num_values_ -= max_values;
823 return max_values;
824}
825
826int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
827 max_values = std::min(max_values, num_values_);
828 if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) {
829 ParquetException::EofException();
830 }
831 num_values_ -= max_values;
832 return max_values;
833}
834
835struct ArrowBinaryHelper {
836 explicit ArrowBinaryHelper(ArrowBinaryAccumulator* out) {
837 this->out = out;
838 this->builder = out->builder.get();
839 this->chunk_space_remaining =
840 ::arrow::kBinaryMemoryLimit - this->builder->value_data_length();
841 }
842
843 Status PushChunk() {
844 std::shared_ptr<::arrow::Array> result;
845 RETURN_NOT_OK(builder->Finish(&result));
846 out->chunks.push_back(result);
847 chunk_space_remaining = ::arrow::kBinaryMemoryLimit;
848 return Status::OK();
849 }
850
851 bool CanFit(int64_t length) const { return length <= chunk_space_remaining; }
852
853 void UnsafeAppend(const uint8_t* data, int32_t length) {
854 chunk_space_remaining -= length;
855 builder->UnsafeAppend(data, length);
856 }
857
858 void UnsafeAppendNull() { builder->UnsafeAppendNull(); }
859
860 Status Append(const uint8_t* data, int32_t length) {
861 chunk_space_remaining -= length;
862 return builder->Append(data, length);
863 }
864
865 Status AppendNull() { return builder->AppendNull(); }
866
867 ArrowBinaryAccumulator* out;
868 arrow::BinaryBuilder* builder;
869 int64_t chunk_space_remaining;
870};
871
872class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>,
873 virtual public ByteArrayDecoder {
874 public:
875 using Base = PlainDecoder<ByteArrayType>;
876 using Base::DecodeSpaced;
877 using Base::PlainDecoder;
878
879 // ----------------------------------------------------------------------
880 // Dictionary read paths
881
882 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
883 int64_t valid_bits_offset,
884 arrow::BinaryDictionary32Builder* builder) override {
885 int result = 0;
886 PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
887 valid_bits_offset, builder, &result));
888 return result;
889 }
890
891 int DecodeArrowNonNull(int num_values,
892 arrow::BinaryDictionary32Builder* builder) override {
893 int result = 0;
894 PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
895 return result;
896 }
897
898 // ----------------------------------------------------------------------
899 // Optimized dense binary read paths
900
901 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
902 int64_t valid_bits_offset, ArrowBinaryAccumulator* out) override {
903 int result = 0;
904 PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
905 valid_bits_offset, out, &result));
906 return result;
907 }
908
909 int DecodeArrowNonNull(int num_values, ArrowBinaryAccumulator* out) override {
910 int result = 0;
911 PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
912 return result;
913 }
914
915 private:
916 Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
917 int64_t valid_bits_offset, ArrowBinaryAccumulator* out,
918 int* out_values_decoded) {
919 ArrowBinaryHelper helper(out);
920 arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
921 int values_decoded = 0;
922
923 RETURN_NOT_OK(helper.builder->Reserve(num_values));
924 RETURN_NOT_OK(helper.builder->ReserveData(
925 std::min<int64_t>(len_, helper.chunk_space_remaining)));
926 for (int i = 0; i < num_values; ++i) {
927 if (bit_reader.IsSet()) {
928 auto value_len = static_cast<int32_t>(arrow::util::SafeLoadAs<uint32_t>(data_));
929 int increment = static_cast<int>(sizeof(uint32_t) + value_len);
930 if (ARROW_PREDICT_FALSE(len_ < increment)) ParquetException::EofException();
931 if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
932 // This element would exceed the capacity of a chunk
933 RETURN_NOT_OK(helper.PushChunk());
934 RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
935 RETURN_NOT_OK(helper.builder->ReserveData(
936 std::min<int64_t>(len_, helper.chunk_space_remaining)));
937 }
938 helper.UnsafeAppend(data_ + sizeof(uint32_t), value_len);
939 data_ += increment;
940 len_ -= increment;
941 ++values_decoded;
942 } else {
943 helper.UnsafeAppendNull();
944 }
945 bit_reader.Next();
946 }
947
948 num_values_ -= values_decoded;
949 *out_values_decoded = values_decoded;
950 return Status::OK();
951 }
952
953 Status DecodeArrowDenseNonNull(int num_values, ArrowBinaryAccumulator* out,
954 int* values_decoded) {
955 ArrowBinaryHelper helper(out);
956 num_values = std::min(num_values, num_values_);
957 RETURN_NOT_OK(helper.builder->Reserve(num_values));
958 RETURN_NOT_OK(helper.builder->ReserveData(
959 std::min<int64_t>(len_, helper.chunk_space_remaining)));
960 for (int i = 0; i < num_values; ++i) {
961 int32_t value_len = static_cast<int32_t>(arrow::util::SafeLoadAs<uint32_t>(data_));
962 int increment = static_cast<int>(sizeof(uint32_t) + value_len);
963 if (ARROW_PREDICT_FALSE(len_ < increment)) ParquetException::EofException();
964 if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) {
965 // This element would exceed the capacity of a chunk
966 RETURN_NOT_OK(helper.PushChunk());
967 RETURN_NOT_OK(helper.builder->Reserve(num_values - i));
968 RETURN_NOT_OK(helper.builder->ReserveData(
969 std::min<int64_t>(len_, helper.chunk_space_remaining)));
970 }
971 helper.UnsafeAppend(data_ + sizeof(uint32_t), value_len);
972 data_ += increment;
973 len_ -= increment;
974 }
975
976 num_values_ -= num_values;
977 *values_decoded = num_values;
978 return Status::OK();
979 }
980
981 template <typename BuilderType>
982 Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
983 int64_t valid_bits_offset, BuilderType* builder,
984 int* out_values_decoded) {
985 RETURN_NOT_OK(builder->Reserve(num_values));
986 arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
987 int values_decoded = 0;
988 for (int i = 0; i < num_values; ++i) {
989 if (bit_reader.IsSet()) {
990 uint32_t value_len = arrow::util::SafeLoadAs<uint32_t>(data_);
991 int increment = static_cast<int>(sizeof(uint32_t) + value_len);
992 if (len_ < increment) {
993 ParquetException::EofException();
994 }
995 RETURN_NOT_OK(builder->Append(data_ + sizeof(uint32_t), value_len));
996 data_ += increment;
997 len_ -= increment;
998 ++values_decoded;
999 } else {
1000 RETURN_NOT_OK(builder->AppendNull());
1001 }
1002 bit_reader.Next();
1003 }
1004 num_values_ -= values_decoded;
1005 *out_values_decoded = values_decoded;
1006 return Status::OK();
1007 }
1008
1009 template <typename BuilderType>
1010 Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* values_decoded) {
1011 num_values = std::min(num_values, num_values_);
1012 RETURN_NOT_OK(builder->Reserve(num_values));
1013 for (int i = 0; i < num_values; ++i) {
1014 uint32_t value_len = arrow::util::SafeLoadAs<uint32_t>(data_);
1015 int increment = static_cast<int>(sizeof(uint32_t) + value_len);
1016 if (len_ < increment) ParquetException::EofException();
1017 RETURN_NOT_OK(builder->Append(data_ + sizeof(uint32_t), value_len));
1018 data_ += increment;
1019 len_ -= increment;
1020 }
1021 num_values_ -= num_values;
1022 *values_decoded = num_values;
1023 return Status::OK();
1024 }
1025};
1026
1027class PlainFLBADecoder : public PlainDecoder<FLBAType>, virtual public FLBADecoder {
1028 public:
1029 using Base = PlainDecoder<FLBAType>;
1030 using Base::PlainDecoder;
1031};
1032
1033// ----------------------------------------------------------------------
1034// Dictionary encoding and decoding
1035
1036template <typename Type>
1037class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder<Type> {
1038 public:
1039 typedef typename Type::c_type T;
1040
1041 // Initializes the dictionary with values from 'dictionary'. The data in
1042 // dictionary is not guaranteed to persist in memory after this call so the
1043 // dictionary decoder needs to copy the data out if necessary.
1044 explicit DictDecoderImpl(const ColumnDescriptor* descr,
1045 MemoryPool* pool = arrow::default_memory_pool())
1046 : DecoderImpl(descr, Encoding::RLE_DICTIONARY),
1047 dictionary_(AllocateBuffer(pool, 0)),
1048 dictionary_length_(0),
1049 byte_array_data_(AllocateBuffer(pool, 0)),
1050 byte_array_offsets_(AllocateBuffer(pool, 0)),
1051 indices_scratch_space_(AllocateBuffer(pool, 0)) {}
1052
1053 // Perform type-specific initiatialization
1054 void SetDict(TypedDecoder<Type>* dictionary) override;
1055
1056 void SetData(int num_values, const uint8_t* data, int len) override {
1057 num_values_ = num_values;
1058 if (len == 0) return;
1059 uint8_t bit_width = *data;
1060 ++data;
1061 --len;
1062 idx_decoder_ = arrow::util::RleDecoder(data, len, bit_width);
1063 }
1064
1065 int Decode(T* buffer, int num_values) override {
1066 num_values = std::min(num_values, num_values_);
1067 int decoded_values = idx_decoder_.GetBatchWithDict(
1068 reinterpret_cast<const T*>(dictionary_->data()), buffer, num_values);
1069 if (decoded_values != num_values) {
1070 ParquetException::EofException();
1071 }
1072 num_values_ -= num_values;
1073 return num_values;
1074 }
1075
1076 int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
1077 int64_t valid_bits_offset) override {
1078 num_values = std::min(num_values, num_values_);
1079 if (num_values != idx_decoder_.GetBatchWithDictSpaced(
1080 reinterpret_cast<const T*>(dictionary_->data()), buffer,
1081 num_values, null_count, valid_bits, valid_bits_offset)) {
1082 ParquetException::EofException();
1083 }
1084 num_values_ -= num_values;
1085 return num_values;
1086 }
1087
1088 void InsertDictionary(arrow::ArrayBuilder* builder) override;
1089
1090 int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits,
1091 int64_t valid_bits_offset,
1092 arrow::ArrayBuilder* builder) override {
1093 if (num_values > 0) {
1094 // TODO(wesm): Refactor to batch reads for improved memory use. It is not
1095 // trivial because the null_count is relative to the entire bitmap
1096 PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
1097 num_values, /*shrink_to_fit=*/false));
1098 }
1099
1100 auto indices_buffer =
1101 reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
1102
1103 if (num_values != idx_decoder_.GetBatchSpaced(num_values, null_count, valid_bits,
1104 valid_bits_offset, indices_buffer)) {
1105 ParquetException::EofException();
1106 }
1107
1108 /// XXX(wesm): Cannot append "valid bits" directly to the builder
1109 std::vector<uint8_t> valid_bytes(num_values);
1110 arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
1111 for (int64_t i = 0; i < num_values; ++i) {
1112 valid_bytes[i] = static_cast<uint8_t>(bit_reader.IsSet());
1113 bit_reader.Next();
1114 }
1115
1116 auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
1117 PARQUET_THROW_NOT_OK(
1118 binary_builder->AppendIndices(indices_buffer, num_values, valid_bytes.data()));
1119 num_values_ -= num_values - null_count;
1120 return num_values - null_count;
1121 }
1122
1123 int DecodeIndices(int num_values, arrow::ArrayBuilder* builder) override {
1124 num_values = std::min(num_values, num_values_);
1125 num_values = std::min(num_values, num_values_);
1126 if (num_values > 0) {
1127 // TODO(wesm): Refactor to batch reads for improved memory use. This is
1128 // relatively simple here because we don't have to do any bookkeeping of
1129 // nulls
1130 PARQUET_THROW_NOT_OK(indices_scratch_space_->TypedResize<int32_t>(
1131 num_values, /*shrink_to_fit=*/false));
1132 }
1133 auto indices_buffer =
1134 reinterpret_cast<int32_t*>(indices_scratch_space_->mutable_data());
1135 if (num_values != idx_decoder_.GetBatch(indices_buffer, num_values)) {
1136 ParquetException::EofException();
1137 }
1138 auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
1139 PARQUET_THROW_NOT_OK(binary_builder->AppendIndices(indices_buffer, num_values));
1140 num_values_ -= num_values;
1141 return num_values;
1142 }
1143
1144 protected:
1145 inline void DecodeDict(TypedDecoder<Type>* dictionary) {
1146 dictionary_length_ = static_cast<int32_t>(dictionary->values_left());
1147 PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T),
1148 /*shrink_to_fit=*/false));
1149 dictionary->Decode(reinterpret_cast<T*>(dictionary_->mutable_data()),
1150 dictionary_length_);
1151 }
1152
1153 // Only one is set.
1154 std::shared_ptr<ResizableBuffer> dictionary_;
1155
1156 int32_t dictionary_length_;
1157
1158 // Data that contains the byte array data (byte_array_dictionary_ just has the
1159 // pointers).
1160 std::shared_ptr<ResizableBuffer> byte_array_data_;
1161
1162 // Arrow-style byte offsets for each dictionary value. We maintain two
1163 // representations of the dictionary, one as ByteArray* for non-Arrow
1164 // consumers and this one for Arrow conumers. Since dictionaries are
1165 // generally pretty small to begin with this doesn't mean too much extra
1166 // memory use in most cases
1167 std::shared_ptr<ResizableBuffer> byte_array_offsets_;
1168
1169 // Reusable buffer for decoding dictionary indices to be appended to a
1170 // BinaryDictionary32Builder
1171 std::shared_ptr<ResizableBuffer> indices_scratch_space_;
1172
1173 arrow::util::RleDecoder idx_decoder_;
1174};
1175
1176template <typename Type>
1177void DictDecoderImpl<Type>::SetDict(TypedDecoder<Type>* dictionary) {
1178 DecodeDict(dictionary);
1179}
1180
1181template <>
1182void DictDecoderImpl<BooleanType>::SetDict(TypedDecoder<BooleanType>* dictionary) {
1183 ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
1184}
1185
1186template <>
1187void DictDecoderImpl<ByteArrayType>::SetDict(TypedDecoder<ByteArrayType>* dictionary) {
1188 DecodeDict(dictionary);
1189
1190 auto dict_values = reinterpret_cast<ByteArray*>(dictionary_->mutable_data());
1191
1192 int total_size = 0;
1193 for (int i = 0; i < dictionary_length_; ++i) {
1194 total_size += dict_values[i].len;
1195 }
1196 if (total_size > 0) {
1197 PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
1198 /*shrink_to_fit=*/false));
1199 PARQUET_THROW_NOT_OK(
1200 byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t),
1201 /*shrink_to_fit=*/false));
1202 }
1203
1204 int32_t offset = 0;
1205 uint8_t* bytes_data = byte_array_data_->mutable_data();
1206 int32_t* bytes_offsets =
1207 reinterpret_cast<int32_t*>(byte_array_offsets_->mutable_data());
1208 for (int i = 0; i < dictionary_length_; ++i) {
1209 memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len);
1210 bytes_offsets[i] = offset;
1211 dict_values[i].ptr = bytes_data + offset;
1212 offset += dict_values[i].len;
1213 }
1214 bytes_offsets[dictionary_length_] = offset;
1215}
1216
1217template <>
1218inline void DictDecoderImpl<FLBAType>::SetDict(TypedDecoder<FLBAType>* dictionary) {
1219 DecodeDict(dictionary);
1220
1221 auto dict_values = reinterpret_cast<FLBA*>(dictionary_->mutable_data());
1222
1223 int fixed_len = descr_->type_length();
1224 int total_size = dictionary_length_ * fixed_len;
1225
1226 PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size,
1227 /*shrink_to_fit=*/false));
1228 uint8_t* bytes_data = byte_array_data_->mutable_data();
1229 for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) {
1230 memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len);
1231 dict_values[i].ptr = bytes_data + offset;
1232 }
1233}
1234
1235template <typename Type>
1236void DictDecoderImpl<Type>::InsertDictionary(arrow::ArrayBuilder* builder) {
1237 ParquetException::NYI("InsertDictionary only implemented for BYTE_ARRAY types");
1238}
1239
1240template <>
1241void DictDecoderImpl<ByteArrayType>::InsertDictionary(arrow::ArrayBuilder* builder) {
1242 auto binary_builder = checked_cast<arrow::BinaryDictionary32Builder*>(builder);
1243
1244 // Make an BinaryArray referencing the internal dictionary data
1245 auto arr = std::make_shared<arrow::BinaryArray>(dictionary_length_, byte_array_offsets_,
1246 byte_array_data_);
1247 PARQUET_THROW_NOT_OK(binary_builder->InsertMemoValues(*arr));
1248}
1249
1250class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
1251 virtual public ByteArrayDecoder {
1252 public:
1253 using BASE = DictDecoderImpl<ByteArrayType>;
1254 using BASE::DictDecoderImpl;
1255
1256 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1257 int64_t valid_bits_offset,
1258 arrow::BinaryDictionary32Builder* builder) override {
1259 int result = 0;
1260 PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits,
1261 valid_bits_offset, builder, &result));
1262 return result;
1263 }
1264
1265 int DecodeArrowNonNull(int num_values,
1266 arrow::BinaryDictionary32Builder* builder) override {
1267 int result = 0;
1268 PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, builder, &result));
1269 return result;
1270 }
1271
1272 int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1273 int64_t valid_bits_offset, ArrowBinaryAccumulator* out) override {
1274 int result = 0;
1275 PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits,
1276 valid_bits_offset, out, &result));
1277 return result;
1278 }
1279
1280 int DecodeArrowNonNull(int num_values, ArrowBinaryAccumulator* out) override {
1281 int result = 0;
1282 PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, out, &result));
1283 return result;
1284 }
1285
1286 private:
1287 Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
1288 int64_t valid_bits_offset, ArrowBinaryAccumulator* out,
1289 int* out_num_values) {
1290 constexpr int32_t buffer_size = 1024;
1291 int32_t indices_buffer[buffer_size];
1292
1293 ArrowBinaryHelper helper(out);
1294
1295 arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
1296
1297 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
1298 int values_decoded = 0;
1299 int num_appended = 0;
1300 while (num_appended < num_values) {
1301 bool is_valid = bit_reader.IsSet();
1302 bit_reader.Next();
1303
1304 if (is_valid) {
1305 int32_t batch_size =
1306 std::min<int32_t>(buffer_size, num_values - num_appended - null_count);
1307 int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
1308
1309 int i = 0;
1310 while (true) {
1311 // Consume all indices
1312 if (is_valid) {
1313 const auto& val = dict_values[indices_buffer[i]];
1314 if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
1315 RETURN_NOT_OK(helper.PushChunk());
1316 }
1317 RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
1318 ++i;
1319 ++values_decoded;
1320 } else {
1321 RETURN_NOT_OK(helper.AppendNull());
1322 --null_count;
1323 }
1324 ++num_appended;
1325 if (i == num_indices) {
1326 // Do not advance the bit_reader if we have fulfilled the decode
1327 // request
1328 break;
1329 }
1330 is_valid = bit_reader.IsSet();
1331 bit_reader.Next();
1332 }
1333 } else {
1334 RETURN_NOT_OK(helper.AppendNull());
1335 --null_count;
1336 ++num_appended;
1337 }
1338 }
1339 *out_num_values = values_decoded;
1340 return Status::OK();
1341 }
1342
1343 Status DecodeArrowDenseNonNull(int num_values, ArrowBinaryAccumulator* out,
1344 int* out_num_values) {
1345 constexpr int32_t buffer_size = 2048;
1346 int32_t indices_buffer[buffer_size];
1347 int values_decoded = 0;
1348
1349 ArrowBinaryHelper helper(out);
1350 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
1351
1352 while (values_decoded < num_values) {
1353 int32_t batch_size = std::min<int32_t>(buffer_size, num_values - values_decoded);
1354 int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
1355 if (num_indices == 0) ParquetException::EofException();
1356 for (int i = 0; i < num_indices; ++i) {
1357 const auto& val = dict_values[indices_buffer[i]];
1358 if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) {
1359 RETURN_NOT_OK(helper.PushChunk());
1360 }
1361 RETURN_NOT_OK(helper.Append(val.ptr, static_cast<int32_t>(val.len)));
1362 }
1363 values_decoded += num_indices;
1364 }
1365 *out_num_values = values_decoded;
1366 return Status::OK();
1367 }
1368
1369 template <typename BuilderType>
1370 Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
1371 int64_t valid_bits_offset, BuilderType* builder,
1372 int* out_num_values) {
1373 constexpr int32_t buffer_size = 1024;
1374 int32_t indices_buffer[buffer_size];
1375
1376 RETURN_NOT_OK(builder->Reserve(num_values));
1377 arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values);
1378
1379 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
1380
1381 int values_decoded = 0;
1382 int num_appended = 0;
1383 while (num_appended < num_values) {
1384 bool is_valid = bit_reader.IsSet();
1385 bit_reader.Next();
1386
1387 if (is_valid) {
1388 int32_t batch_size =
1389 std::min<int32_t>(buffer_size, num_values - num_appended - null_count);
1390 int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
1391
1392 int i = 0;
1393 while (true) {
1394 // Consume all indices
1395 if (is_valid) {
1396 const auto& val = dict_values[indices_buffer[i]];
1397 RETURN_NOT_OK(builder->Append(val.ptr, val.len));
1398 ++i;
1399 ++values_decoded;
1400 } else {
1401 RETURN_NOT_OK(builder->AppendNull());
1402 --null_count;
1403 }
1404 ++num_appended;
1405 if (i == num_indices) {
1406 // Do not advance the bit_reader if we have fulfilled the decode
1407 // request
1408 break;
1409 }
1410 is_valid = bit_reader.IsSet();
1411 bit_reader.Next();
1412 }
1413 } else {
1414 RETURN_NOT_OK(builder->AppendNull());
1415 --null_count;
1416 ++num_appended;
1417 }
1418 }
1419 *out_num_values = values_decoded;
1420 return Status::OK();
1421 }
1422
1423 template <typename BuilderType>
1424 Status DecodeArrowNonNull(int num_values, BuilderType* builder, int* out_num_values) {
1425 constexpr int32_t buffer_size = 2048;
1426 int32_t indices_buffer[buffer_size];
1427 int values_decoded = 0;
1428 RETURN_NOT_OK(builder->Reserve(num_values));
1429
1430 auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());
1431
1432 while (values_decoded < num_values) {
1433 int32_t batch_size = std::min<int32_t>(buffer_size, num_values - values_decoded);
1434 int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size);
1435 if (num_indices == 0) ParquetException::EofException();
1436 for (int i = 0; i < num_indices; ++i) {
1437 const auto& val = dict_values[indices_buffer[i]];
1438 RETURN_NOT_OK(builder->Append(val.ptr, val.len));
1439 }
1440 values_decoded += num_indices;
1441 }
1442 *out_num_values = values_decoded;
1443 return Status::OK();
1444 }
1445};
1446
1447class DictFLBADecoder : public DictDecoderImpl<FLBAType>, virtual public FLBADecoder {
1448 public:
1449 using BASE = DictDecoderImpl<FLBAType>;
1450 using BASE::DictDecoderImpl;
1451};
1452
1453// ----------------------------------------------------------------------
1454// DeltaBitPackDecoder
1455
1456template <typename DType>
1457class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
1458 public:
1459 typedef typename DType::c_type T;
1460
1461 explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
1462 MemoryPool* pool = arrow::default_memory_pool())
1463 : DecoderImpl(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
1464 if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
1465 throw ParquetException("Delta bit pack encoding should only be for integer data.");
1466 }
1467 }
1468
1469 virtual void SetData(int num_values, const uint8_t* data, int len) {
1470 this->num_values_ = num_values;
1471 decoder_ = arrow::BitUtil::BitReader(data, len);
1472 values_current_block_ = 0;
1473 values_current_mini_block_ = 0;
1474 }
1475
1476 virtual int Decode(T* buffer, int max_values) {
1477 return GetInternal(buffer, max_values);
1478 }
1479
1480 private:
1481 void InitBlock() {
1482 int32_t block_size;
1483 if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
1484 if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
1485 if (!decoder_.GetVlqInt(&values_current_block_)) {
1486 ParquetException::EofException();
1487 }
1488 if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
1489
1490 delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
1491 uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
1492
1493 if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
1494 for (int i = 0; i < num_mini_blocks_; ++i) {
1495 if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
1496 ParquetException::EofException();
1497 }
1498 }
1499 values_per_mini_block_ = block_size / num_mini_blocks_;
1500 mini_block_idx_ = 0;
1501 delta_bit_width_ = bit_width_data[0];
1502 values_current_mini_block_ = values_per_mini_block_;
1503 }
1504
1505 template <typename T>
1506 int GetInternal(T* buffer, int max_values) {
1507 max_values = std::min(max_values, this->num_values_);
1508 const uint8_t* bit_width_data = delta_bit_widths_->data();
1509 for (int i = 0; i < max_values; ++i) {
1510 if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
1511 ++mini_block_idx_;
1512 if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
1513 delta_bit_width_ = bit_width_data[mini_block_idx_];
1514 values_current_mini_block_ = values_per_mini_block_;
1515 } else {
1516 InitBlock();
1517 buffer[i] = last_value_;
1518 continue;
1519 }
1520 }
1521
1522 // TODO: the key to this algorithm is to decode the entire miniblock at once.
1523 int64_t delta;
1524 if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException();
1525 delta += min_delta_;
1526 last_value_ += static_cast<int32_t>(delta);
1527 buffer[i] = last_value_;
1528 --values_current_mini_block_;
1529 }
1530 this->num_values_ -= max_values;
1531 return max_values;
1532 }
1533
1534 MemoryPool* pool_;
1535 arrow::BitUtil::BitReader decoder_;
1536 int32_t values_current_block_;
1537 int32_t num_mini_blocks_;
1538 uint64_t values_per_mini_block_;
1539 uint64_t values_current_mini_block_;
1540
1541 int32_t min_delta_;
1542 size_t mini_block_idx_;
1543 std::shared_ptr<ResizableBuffer> delta_bit_widths_;
1544 int delta_bit_width_;
1545
1546 int32_t last_value_;
1547};
1548
1549// ----------------------------------------------------------------------
1550// DELTA_LENGTH_BYTE_ARRAY
1551
1552class DeltaLengthByteArrayDecoder : public DecoderImpl,
1553 virtual public TypedDecoder<ByteArrayType> {
1554 public:
1555 explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
1556 MemoryPool* pool = arrow::default_memory_pool())
1557 : DecoderImpl(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
1558 len_decoder_(nullptr, pool) {}
1559
1560 virtual void SetData(int num_values, const uint8_t* data, int len) {
1561 num_values_ = num_values;
1562 if (len == 0) return;
1563 int total_lengths_len = arrow::util::SafeLoadAs<int32_t>(data);
1564 data += 4;
1565 this->len_decoder_.SetData(num_values, data, total_lengths_len);
1566 data_ = data + total_lengths_len;
1567 this->len_ = len - 4 - total_lengths_len;
1568 }
1569
1570 virtual int Decode(ByteArray* buffer, int max_values) {
1571 max_values = std::min(max_values, num_values_);
1572 std::vector<int> lengths(max_values);
1573 len_decoder_.Decode(lengths.data(), max_values);
1574 for (int i = 0; i < max_values; ++i) {
1575 buffer[i].len = lengths[i];
1576 buffer[i].ptr = data_;
1577 this->data_ += lengths[i];
1578 this->len_ -= lengths[i];
1579 }
1580 this->num_values_ -= max_values;
1581 return max_values;
1582 }
1583
1584 private:
1585 DeltaBitPackDecoder<Int32Type> len_decoder_;
1586};
1587
1588// ----------------------------------------------------------------------
1589// DELTA_BYTE_ARRAY
1590
1591class DeltaByteArrayDecoder : public DecoderImpl,
1592 virtual public TypedDecoder<ByteArrayType> {
1593 public:
1594 explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
1595 MemoryPool* pool = arrow::default_memory_pool())
1596 : DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
1597 prefix_len_decoder_(nullptr, pool),
1598 suffix_decoder_(nullptr, pool),
1599 last_value_(0, nullptr) {}
1600
1601 virtual void SetData(int num_values, const uint8_t* data, int len) {
1602 num_values_ = num_values;
1603 if (len == 0) return;
1604 int prefix_len_length = arrow::util::SafeLoadAs<int32_t>(data);
1605 data += 4;
1606 len -= 4;
1607 prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
1608 data += prefix_len_length;
1609 len -= prefix_len_length;
1610 suffix_decoder_.SetData(num_values, data, len);
1611 }
1612
1613 // TODO: this doesn't work and requires memory management. We need to allocate
1614 // new strings to store the results.
1615 virtual int Decode(ByteArray* buffer, int max_values) {
1616 max_values = std::min(max_values, this->num_values_);
1617 for (int i = 0; i < max_values; ++i) {
1618 int prefix_len = 0;
1619 prefix_len_decoder_.Decode(&prefix_len, 1);
1620 ByteArray suffix = {0, nullptr};
1621 suffix_decoder_.Decode(&suffix, 1);
1622 buffer[i].len = prefix_len + suffix.len;
1623
1624 uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
1625 memcpy(result, last_value_.ptr, prefix_len);
1626 memcpy(result + prefix_len, suffix.ptr, suffix.len);
1627
1628 buffer[i].ptr = result;
1629 last_value_ = buffer[i];
1630 }
1631 this->num_values_ -= max_values;
1632 return max_values;
1633 }
1634
1635 private:
1636 DeltaBitPackDecoder<Int32Type> prefix_len_decoder_;
1637 DeltaLengthByteArrayDecoder suffix_decoder_;
1638 ByteArray last_value_;
1639};
1640
1641// ----------------------------------------------------------------------
1642
1643std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
1644 const ColumnDescriptor* descr) {
1645 if (encoding == Encoding::PLAIN) {
1646 switch (type_num) {
1647 case Type::BOOLEAN:
1648 return std::unique_ptr<Decoder>(new PlainBooleanDecoder(descr));
1649 case Type::INT32:
1650 return std::unique_ptr<Decoder>(new PlainDecoder<Int32Type>(descr));
1651 case Type::INT64:
1652 return std::unique_ptr<Decoder>(new PlainDecoder<Int64Type>(descr));
1653 case Type::INT96:
1654 return std::unique_ptr<Decoder>(new PlainDecoder<Int96Type>(descr));
1655 case Type::FLOAT:
1656 return std::unique_ptr<Decoder>(new PlainDecoder<FloatType>(descr));
1657 case Type::DOUBLE:
1658 return std::unique_ptr<Decoder>(new PlainDecoder<DoubleType>(descr));
1659 case Type::BYTE_ARRAY:
1660 return std::unique_ptr<Decoder>(new PlainByteArrayDecoder(descr));
1661 case Type::FIXED_LEN_BYTE_ARRAY:
1662 return std::unique_ptr<Decoder>(new PlainFLBADecoder(descr));
1663 default:
1664 break;
1665 }
1666 } else {
1667 ParquetException::NYI("Selected encoding is not supported");
1668 }
1669 DCHECK(false) << "Should not be able to reach this code";
1670 return nullptr;
1671}
1672
1673namespace detail {
1674
1675std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
1676 const ColumnDescriptor* descr,
1677 MemoryPool* pool) {
1678 switch (type_num) {
1679 case Type::BOOLEAN:
1680 ParquetException::NYI("Dictionary encoding not implemented for boolean type");
1681 case Type::INT32:
1682 return std::unique_ptr<Decoder>(new DictDecoderImpl<Int32Type>(descr, pool));
1683 case Type::INT64:
1684 return std::unique_ptr<Decoder>(new DictDecoderImpl<Int64Type>(descr, pool));
1685 case Type::INT96:
1686 return std::unique_ptr<Decoder>(new DictDecoderImpl<Int96Type>(descr, pool));
1687 case Type::FLOAT:
1688 return std::unique_ptr<Decoder>(new DictDecoderImpl<FloatType>(descr, pool));
1689 case Type::DOUBLE:
1690 return std::unique_ptr<Decoder>(new DictDecoderImpl<DoubleType>(descr, pool));
1691 case Type::BYTE_ARRAY:
1692 return std::unique_ptr<Decoder>(new DictByteArrayDecoderImpl(descr, pool));
1693 case Type::FIXED_LEN_BYTE_ARRAY:
1694 return std::unique_ptr<Decoder>(new DictFLBADecoder(descr, pool));
1695 default:
1696 break;
1697 }
1698 DCHECK(false) << "Should not be able to reach this code";
1699 return nullptr;
1700}
1701
1702} // namespace detail
1703} // namespace parquet
1704