1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef PARQUET_ENCODING_INTERNAL_H |
19 | #define PARQUET_ENCODING_INTERNAL_H |
20 | |
21 | #include <algorithm> |
22 | #include <cstdint> |
23 | #include <limits> |
24 | #include <memory> |
25 | #include <utility> |
26 | #include <vector> |
27 | |
28 | #include "arrow/util/bit-stream-utils.h" |
29 | #include "arrow/util/bit-util.h" |
30 | #include "arrow/util/hashing.h" |
31 | #include "arrow/util/macros.h" |
32 | #include "arrow/util/rle-encoding.h" |
33 | |
34 | #include "parquet/encoding.h" |
35 | #include "parquet/exception.h" |
36 | #include "parquet/schema.h" |
37 | #include "parquet/types.h" |
38 | #include "parquet/util/memory.h" |
39 | |
40 | namespace parquet { |
41 | |
42 | namespace BitUtil = ::arrow::BitUtil; |
43 | |
44 | class ColumnDescriptor; |
45 | |
46 | // ---------------------------------------------------------------------- |
47 | // Encoding::PLAIN decoder implementation |
48 | |
49 | template <typename DType> |
50 | class PlainDecoder : public Decoder<DType> { |
51 | public: |
52 | typedef typename DType::c_type T; |
53 | using Decoder<DType>::num_values_; |
54 | |
55 | explicit PlainDecoder(const ColumnDescriptor* descr) |
56 | : Decoder<DType>(descr, Encoding::PLAIN), data_(nullptr), len_(0) { |
57 | if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { |
58 | type_length_ = descr_->type_length(); |
59 | } else { |
60 | type_length_ = -1; |
61 | } |
62 | } |
63 | |
64 | virtual void SetData(int num_values, const uint8_t* data, int len) { |
65 | num_values_ = num_values; |
66 | data_ = data; |
67 | len_ = len; |
68 | } |
69 | |
70 | virtual int Decode(T* buffer, int max_values); |
71 | |
72 | private: |
73 | using Decoder<DType>::descr_; |
74 | const uint8_t* data_; |
75 | int len_; |
76 | int type_length_; |
77 | }; |
78 | |
79 | // Decode routine templated on C++ type rather than type enum |
80 | template <typename T> |
81 | inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, |
82 | int type_length, T* out) { |
83 | int bytes_to_decode = num_values * static_cast<int>(sizeof(T)); |
84 | if (data_size < bytes_to_decode) { |
85 | ParquetException::EofException(); |
86 | } |
87 | // If bytes_to_decode == 0, data could be null |
88 | if (bytes_to_decode > 0) { |
89 | memcpy(out, data, bytes_to_decode); |
90 | } |
91 | return bytes_to_decode; |
92 | } |
93 | |
94 | // Template specialization for BYTE_ARRAY. The written values do not own their |
95 | // own data. |
96 | template <> |
97 | inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values, |
98 | int type_length, ByteArray* out) { |
99 | int bytes_decoded = 0; |
100 | int increment; |
101 | for (int i = 0; i < num_values; ++i) { |
102 | uint32_t len = out[i].len = *reinterpret_cast<const uint32_t*>(data); |
103 | increment = static_cast<int>(sizeof(uint32_t) + len); |
104 | if (data_size < increment) ParquetException::EofException(); |
105 | out[i].ptr = data + sizeof(uint32_t); |
106 | data += increment; |
107 | data_size -= increment; |
108 | bytes_decoded += increment; |
109 | } |
110 | return bytes_decoded; |
111 | } |
112 | |
113 | // Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not |
114 | // own their own data. |
115 | template <> |
116 | inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size, |
117 | int num_values, int type_length, |
118 | FixedLenByteArray* out) { |
119 | int bytes_to_decode = type_length * num_values; |
120 | if (data_size < bytes_to_decode) { |
121 | ParquetException::EofException(); |
122 | } |
123 | for (int i = 0; i < num_values; ++i) { |
124 | out[i].ptr = data; |
125 | data += type_length; |
126 | data_size -= type_length; |
127 | } |
128 | return bytes_to_decode; |
129 | } |
130 | |
131 | template <typename DType> |
132 | inline int PlainDecoder<DType>::Decode(T* buffer, int max_values) { |
133 | max_values = std::min(max_values, num_values_); |
134 | int bytes_consumed = DecodePlain<T>(data_, len_, max_values, type_length_, buffer); |
135 | data_ += bytes_consumed; |
136 | len_ -= bytes_consumed; |
137 | num_values_ -= max_values; |
138 | return max_values; |
139 | } |
140 | |
141 | template <> |
142 | class PlainDecoder<BooleanType> : public Decoder<BooleanType> { |
143 | public: |
144 | explicit PlainDecoder(const ColumnDescriptor* descr) |
145 | : Decoder<BooleanType>(descr, Encoding::PLAIN) {} |
146 | |
147 | virtual void SetData(int num_values, const uint8_t* data, int len) { |
148 | num_values_ = num_values; |
149 | bit_reader_ = BitUtil::BitReader(data, len); |
150 | } |
151 | |
152 | // Two flavors of bool decoding |
153 | int Decode(uint8_t* buffer, int max_values) { |
154 | max_values = std::min(max_values, num_values_); |
155 | bool val; |
156 | ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values); |
157 | for (int i = 0; i < max_values; ++i) { |
158 | if (!bit_reader_.GetValue(1, &val)) { |
159 | ParquetException::EofException(); |
160 | } |
161 | if (val) { |
162 | bit_writer.Set(); |
163 | } |
164 | bit_writer.Next(); |
165 | } |
166 | bit_writer.Finish(); |
167 | num_values_ -= max_values; |
168 | return max_values; |
169 | } |
170 | |
171 | virtual int Decode(bool* buffer, int max_values) { |
172 | max_values = std::min(max_values, num_values_); |
173 | if (bit_reader_.GetBatch(1, buffer, max_values) != max_values) { |
174 | ParquetException::EofException(); |
175 | } |
176 | num_values_ -= max_values; |
177 | return max_values; |
178 | } |
179 | |
180 | private: |
181 | BitUtil::BitReader bit_reader_; |
182 | }; |
183 | |
184 | // ---------------------------------------------------------------------- |
185 | // Encoding::PLAIN encoder implementation |
186 | |
187 | template <typename DType> |
188 | class PlainEncoder : public Encoder<DType> { |
189 | public: |
190 | typedef typename DType::c_type T; |
191 | |
192 | explicit PlainEncoder(const ColumnDescriptor* descr, |
193 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
194 | : Encoder<DType>(descr, Encoding::PLAIN, pool) { |
195 | values_sink_.reset(new InMemoryOutputStream(pool)); |
196 | } |
197 | |
198 | int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); } |
199 | |
200 | std::shared_ptr<Buffer> FlushValues() override; |
201 | void Put(const T* src, int num_values) override; |
202 | |
203 | protected: |
204 | std::unique_ptr<InMemoryOutputStream> values_sink_; |
205 | }; |
206 | |
207 | template <> |
208 | class PlainEncoder<BooleanType> : public Encoder<BooleanType> { |
209 | public: |
210 | explicit PlainEncoder(const ColumnDescriptor* descr, |
211 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
212 | : Encoder<BooleanType>(descr, Encoding::PLAIN, pool), |
213 | bits_available_(kInMemoryDefaultCapacity * 8), |
214 | bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), |
215 | values_sink_(new InMemoryOutputStream(pool)) { |
216 | bit_writer_.reset(new BitUtil::BitWriter(bits_buffer_->mutable_data(), |
217 | static_cast<int>(bits_buffer_->size()))); |
218 | } |
219 | |
220 | int64_t EstimatedDataEncodedSize() override { |
221 | return values_sink_->Tell() + bit_writer_->bytes_written(); |
222 | } |
223 | |
224 | std::shared_ptr<Buffer> FlushValues() override { |
225 | if (bits_available_ > 0) { |
226 | bit_writer_->Flush(); |
227 | values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); |
228 | bit_writer_->Clear(); |
229 | bits_available_ = static_cast<int>(bits_buffer_->size()) * 8; |
230 | } |
231 | |
232 | std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); |
233 | values_sink_.reset(new InMemoryOutputStream(this->pool_)); |
234 | return buffer; |
235 | } |
236 | |
237 | #define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes) \ |
238 | void Put(input_type src, int num_values) function_attributes { \ |
239 | int bit_offset = 0; \ |
240 | if (bits_available_ > 0) { \ |
241 | int bits_to_write = std::min(bits_available_, num_values); \ |
242 | for (int i = 0; i < bits_to_write; i++) { \ |
243 | bit_writer_->PutValue(src[i], 1); \ |
244 | } \ |
245 | bits_available_ -= bits_to_write; \ |
246 | bit_offset = bits_to_write; \ |
247 | \ |
248 | if (bits_available_ == 0) { \ |
249 | bit_writer_->Flush(); \ |
250 | values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ |
251 | bit_writer_->Clear(); \ |
252 | } \ |
253 | } \ |
254 | \ |
255 | int bits_remaining = num_values - bit_offset; \ |
256 | while (bit_offset < num_values) { \ |
257 | bits_available_ = static_cast<int>(bits_buffer_->size()) * 8; \ |
258 | \ |
259 | int bits_to_write = std::min(bits_available_, bits_remaining); \ |
260 | for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \ |
261 | bit_writer_->PutValue(src[i], 1); \ |
262 | } \ |
263 | bit_offset += bits_to_write; \ |
264 | bits_available_ -= bits_to_write; \ |
265 | bits_remaining -= bits_to_write; \ |
266 | \ |
267 | if (bits_available_ == 0) { \ |
268 | bit_writer_->Flush(); \ |
269 | values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ |
270 | bit_writer_->Clear(); \ |
271 | } \ |
272 | } \ |
273 | } |
274 | |
275 | PLAINDECODER_BOOLEAN_PUT(const bool*, override) |
276 | PLAINDECODER_BOOLEAN_PUT(const std::vector<bool>&, ) |
277 | |
278 | protected: |
279 | int bits_available_; |
280 | std::unique_ptr<BitUtil::BitWriter> bit_writer_; |
281 | std::shared_ptr<ResizableBuffer> bits_buffer_; |
282 | std::unique_ptr<InMemoryOutputStream> values_sink_; |
283 | }; |
284 | |
285 | template <typename DType> |
286 | inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() { |
287 | std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer(); |
288 | values_sink_.reset(new InMemoryOutputStream(this->pool_)); |
289 | return buffer; |
290 | } |
291 | |
292 | template <typename DType> |
293 | inline void PlainEncoder<DType>::Put(const T* buffer, int num_values) { |
294 | values_sink_->Write(reinterpret_cast<const uint8_t*>(buffer), num_values * sizeof(T)); |
295 | } |
296 | |
297 | template <> |
298 | inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) { |
299 | for (int i = 0; i < num_values; ++i) { |
300 | // Write the result to the output stream |
301 | values_sink_->Write(reinterpret_cast<const uint8_t*>(&src[i].len), sizeof(uint32_t)); |
302 | if (src[i].len > 0) { |
303 | DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL" ; |
304 | } |
305 | values_sink_->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), src[i].len); |
306 | } |
307 | } |
308 | |
309 | template <> |
310 | inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) { |
311 | for (int i = 0; i < num_values; ++i) { |
312 | // Write the result to the output stream |
313 | if (descr_->type_length() > 0) { |
314 | DCHECK(nullptr != src[i].ptr) << "Value ptr cannot be NULL" ; |
315 | } |
316 | values_sink_->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), |
317 | descr_->type_length()); |
318 | } |
319 | } |
320 | |
321 | // ---------------------------------------------------------------------- |
322 | // Dictionary encoding and decoding |
323 | |
324 | template <typename Type> |
325 | class DictionaryDecoder : public Decoder<Type> { |
326 | public: |
327 | typedef typename Type::c_type T; |
328 | |
329 | // Initializes the dictionary with values from 'dictionary'. The data in |
330 | // dictionary is not guaranteed to persist in memory after this call so the |
331 | // dictionary decoder needs to copy the data out if necessary. |
332 | explicit DictionaryDecoder(const ColumnDescriptor* descr, |
333 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
334 | : Decoder<Type>(descr, Encoding::RLE_DICTIONARY), |
335 | dictionary_(0, pool), |
336 | byte_array_data_(AllocateBuffer(pool, 0)) {} |
337 | |
338 | // Perform type-specific initiatialization |
339 | void SetDict(Decoder<Type>* dictionary); |
340 | |
341 | void SetData(int num_values, const uint8_t* data, int len) override { |
342 | num_values_ = num_values; |
343 | if (len == 0) return; |
344 | uint8_t bit_width = *data; |
345 | ++data; |
346 | --len; |
347 | idx_decoder_ = ::arrow::util::RleDecoder(data, len, bit_width); |
348 | } |
349 | |
350 | int Decode(T* buffer, int max_values) override { |
351 | max_values = std::min(max_values, num_values_); |
352 | int decoded_values = |
353 | idx_decoder_.GetBatchWithDict(dictionary_.data(), buffer, max_values); |
354 | if (decoded_values != max_values) { |
355 | ParquetException::EofException(); |
356 | } |
357 | num_values_ -= max_values; |
358 | return max_values; |
359 | } |
360 | |
361 | int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, |
362 | int64_t valid_bits_offset) override { |
363 | int decoded_values = |
364 | idx_decoder_.GetBatchWithDictSpaced(dictionary_.data(), buffer, num_values, |
365 | null_count, valid_bits, valid_bits_offset); |
366 | if (decoded_values != num_values) { |
367 | ParquetException::EofException(); |
368 | } |
369 | return decoded_values; |
370 | } |
371 | |
372 | private: |
373 | using Decoder<Type>::num_values_; |
374 | |
375 | // Only one is set. |
376 | Vector<T> dictionary_; |
377 | |
378 | // Data that contains the byte array data (byte_array_dictionary_ just has the |
379 | // pointers). |
380 | std::shared_ptr<ResizableBuffer> byte_array_data_; |
381 | |
382 | ::arrow::util::RleDecoder idx_decoder_; |
383 | }; |
384 | |
385 | template <typename Type> |
386 | inline void DictionaryDecoder<Type>::SetDict(Decoder<Type>* dictionary) { |
387 | int num_dictionary_values = dictionary->values_left(); |
388 | dictionary_.Resize(num_dictionary_values); |
389 | dictionary->Decode(dictionary_.data(), num_dictionary_values); |
390 | } |
391 | |
392 | template <> |
393 | inline void DictionaryDecoder<BooleanType>::SetDict(Decoder<BooleanType>* dictionary) { |
394 | ParquetException::NYI("Dictionary encoding is not implemented for boolean values" ); |
395 | } |
396 | |
397 | template <> |
398 | inline void DictionaryDecoder<ByteArrayType>::SetDict( |
399 | Decoder<ByteArrayType>* dictionary) { |
400 | int num_dictionary_values = dictionary->values_left(); |
401 | dictionary_.Resize(num_dictionary_values); |
402 | dictionary->Decode(&dictionary_[0], num_dictionary_values); |
403 | |
404 | int total_size = 0; |
405 | for (int i = 0; i < num_dictionary_values; ++i) { |
406 | total_size += dictionary_[i].len; |
407 | } |
408 | if (total_size > 0) { |
409 | PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); |
410 | } |
411 | |
412 | int offset = 0; |
413 | uint8_t* bytes_data = byte_array_data_->mutable_data(); |
414 | for (int i = 0; i < num_dictionary_values; ++i) { |
415 | memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len); |
416 | dictionary_[i].ptr = bytes_data + offset; |
417 | offset += dictionary_[i].len; |
418 | } |
419 | } |
420 | |
421 | template <> |
422 | inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary) { |
423 | int num_dictionary_values = dictionary->values_left(); |
424 | dictionary_.Resize(num_dictionary_values); |
425 | dictionary->Decode(&dictionary_[0], num_dictionary_values); |
426 | |
427 | int fixed_len = descr_->type_length(); |
428 | int total_size = num_dictionary_values * fixed_len; |
429 | |
430 | PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); |
431 | uint8_t* bytes_data = byte_array_data_->mutable_data(); |
432 | for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) { |
433 | memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len); |
434 | dictionary_[i].ptr = bytes_data + offset; |
435 | } |
436 | } |
437 | |
438 | // ---------------------------------------------------------------------- |
439 | // Dictionary encoder |
440 | |
441 | template <typename DType> |
442 | struct DictEncoderTraits { |
443 | using c_type = typename DType::c_type; |
444 | using MemoTableType = ::arrow::internal::ScalarMemoTable<c_type>; |
445 | }; |
446 | |
447 | template <> |
448 | struct DictEncoderTraits<ByteArrayType> { |
449 | using MemoTableType = ::arrow::internal::BinaryMemoTable; |
450 | }; |
451 | |
452 | template <> |
453 | struct DictEncoderTraits<FLBAType> { |
454 | using MemoTableType = ::arrow::internal::BinaryMemoTable; |
455 | }; |
456 | |
457 | // Initially 1024 elements |
458 | static constexpr int32_t INITIAL_HASH_TABLE_SIZE = 1 << 10; |
459 | |
460 | /// See the dictionary encoding section of https://github.com/Parquet/parquet-format. |
461 | /// The encoding supports streaming encoding. Values are encoded as they are added while |
462 | /// the dictionary is being constructed. At any time, the buffered values can be |
463 | /// written out with the current dictionary size. More values can then be added to |
464 | /// the encoder, including new dictionary entries. |
465 | template <typename DType> |
466 | class DictEncoder : public Encoder<DType> { |
467 | using MemoTableType = typename DictEncoderTraits<DType>::MemoTableType; |
468 | |
469 | public: |
470 | typedef typename DType::c_type T; |
471 | |
472 | explicit DictEncoder(const ColumnDescriptor* desc, |
473 | ::arrow::MemoryPool* allocator = ::arrow::default_memory_pool()) |
474 | : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator), |
475 | allocator_(allocator), |
476 | dict_encoded_size_(0), |
477 | type_length_(desc->type_length()), |
478 | memo_table_(INITIAL_HASH_TABLE_SIZE) {} |
479 | |
480 | ~DictEncoder() override { DCHECK(buffered_indices_.empty()); } |
481 | |
482 | void set_type_length(int type_length) { type_length_ = type_length; } |
483 | |
484 | /// Returns a conservative estimate of the number of bytes needed to encode the buffered |
485 | /// indices. Used to size the buffer passed to WriteIndices(). |
486 | int64_t EstimatedDataEncodedSize() override { |
487 | // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to |
488 | // reserve |
489 | // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used |
490 | // but not reserving them would cause the encoder to fail. |
491 | return 1 + |
492 | ::arrow::util::RleEncoder::MaxBufferSize( |
493 | bit_width(), static_cast<int>(buffered_indices_.size())) + |
494 | ::arrow::util::RleEncoder::MinBufferSize(bit_width()); |
495 | } |
496 | |
497 | /// The minimum bit width required to encode the currently buffered indices. |
498 | int bit_width() const { |
499 | if (ARROW_PREDICT_FALSE(num_entries() == 0)) return 0; |
500 | if (ARROW_PREDICT_FALSE(num_entries() == 1)) return 1; |
501 | return BitUtil::Log2(num_entries()); |
502 | } |
503 | |
504 | /// Writes out any buffered indices to buffer preceded by the bit width of this data. |
505 | /// Returns the number of bytes written. |
506 | /// If the supplied buffer is not big enough, returns -1. |
507 | /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize() |
508 | /// to size buffer. |
509 | int WriteIndices(uint8_t* buffer, int buffer_len); |
510 | |
511 | int dict_encoded_size() { return dict_encoded_size_; } |
512 | |
513 | /// Encode value. Note that this does not actually write any data, just |
514 | /// buffers the value's index to be written later. |
515 | inline void Put(const T& value); |
516 | void Put(const T* values, int num_values) override; |
517 | |
518 | std::shared_ptr<Buffer> FlushValues() override { |
519 | std::shared_ptr<ResizableBuffer> buffer = |
520 | AllocateBuffer(this->allocator_, EstimatedDataEncodedSize()); |
521 | int result_size = WriteIndices(buffer->mutable_data(), |
522 | static_cast<int>(EstimatedDataEncodedSize())); |
523 | PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false)); |
524 | return std::move(buffer); |
525 | } |
526 | |
527 | void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, |
528 | int64_t valid_bits_offset) override { |
529 | ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, |
530 | num_values); |
531 | for (int32_t i = 0; i < num_values; i++) { |
532 | if (valid_bits_reader.IsSet()) { |
533 | Put(src[i]); |
534 | } |
535 | valid_bits_reader.Next(); |
536 | } |
537 | } |
538 | |
539 | /// Writes out the encoded dictionary to buffer. buffer must be preallocated to |
540 | /// dict_encoded_size() bytes. |
541 | void WriteDict(uint8_t* buffer); |
542 | |
543 | /// The number of entries in the dictionary. |
544 | int num_entries() const { return memo_table_.size(); } |
545 | |
546 | private: |
547 | /// Clears all the indices (but leaves the dictionary). |
548 | void ClearIndices() { buffered_indices_.clear(); } |
549 | |
550 | ::arrow::MemoryPool* allocator_; |
551 | |
552 | /// Indices that have not yet be written out by WriteIndices(). |
553 | std::vector<int> buffered_indices_; |
554 | |
555 | /// The number of bytes needed to encode the dictionary. |
556 | int dict_encoded_size_; |
557 | |
558 | /// Size of each encoded dictionary value. -1 for variable-length types. |
559 | int type_length_; |
560 | |
561 | MemoTableType memo_table_; |
562 | }; |
563 | |
564 | template <typename DType> |
565 | void DictEncoder<DType>::Put(const T* src, int num_values) { |
566 | for (int32_t i = 0; i < num_values; i++) { |
567 | Put(src[i]); |
568 | } |
569 | } |
570 | |
571 | template <typename DType> |
572 | inline void DictEncoder<DType>::Put(const T& v) { |
573 | // Put() implementation for primitive types |
574 | auto on_found = [](int32_t memo_index) {}; |
575 | auto on_not_found = [this](int32_t memo_index) { |
576 | dict_encoded_size_ += static_cast<int>(sizeof(T)); |
577 | }; |
578 | |
579 | auto memo_index = memo_table_.GetOrInsert(v, on_found, on_not_found); |
580 | buffered_indices_.push_back(memo_index); |
581 | } |
582 | |
583 | template <> |
584 | inline void DictEncoder<ByteArrayType>::Put(const ByteArray& v) { |
585 | static const uint8_t empty[] = {0}; |
586 | |
587 | auto on_found = [](int32_t memo_index) {}; |
588 | auto on_not_found = [&](int32_t memo_index) { |
589 | dict_encoded_size_ += static_cast<int>(v.len + sizeof(uint32_t)); |
590 | }; |
591 | |
592 | DCHECK(v.ptr != nullptr || v.len == 0); |
593 | const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; |
594 | auto memo_index = |
595 | memo_table_.GetOrInsert(ptr, static_cast<int32_t>(v.len), on_found, on_not_found); |
596 | buffered_indices_.push_back(memo_index); |
597 | } |
598 | |
599 | template <> |
600 | inline void DictEncoder<FLBAType>::Put(const FixedLenByteArray& v) { |
601 | static const uint8_t empty[] = {0}; |
602 | |
603 | auto on_found = [](int32_t memo_index) {}; |
604 | auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; }; |
605 | |
606 | DCHECK(v.ptr != nullptr || type_length_ == 0); |
607 | const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; |
608 | auto memo_index = memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found); |
609 | buffered_indices_.push_back(memo_index); |
610 | } |
611 | |
612 | template <typename DType> |
613 | inline void DictEncoder<DType>::WriteDict(uint8_t* buffer) { |
614 | // For primitive types, only a memcpy |
615 | DCHECK_EQ(static_cast<size_t>(dict_encoded_size_), sizeof(T) * memo_table_.size()); |
616 | memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast<T*>(buffer)); |
617 | } |
618 | |
619 | // ByteArray and FLBA already have the dictionary encoded in their data heaps |
620 | template <> |
621 | inline void DictEncoder<ByteArrayType>::WriteDict(uint8_t* buffer) { |
622 | memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { |
623 | uint32_t len = static_cast<uint32_t>(v.length()); |
624 | memcpy(buffer, &len, sizeof(uint32_t)); |
625 | buffer += sizeof(uint32_t); |
626 | memcpy(buffer, v.data(), v.length()); |
627 | buffer += v.length(); |
628 | }); |
629 | } |
630 | |
631 | template <> |
632 | inline void DictEncoder<FLBAType>::WriteDict(uint8_t* buffer) { |
633 | memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { |
634 | DCHECK_EQ(v.length(), static_cast<size_t>(type_length_)); |
635 | memcpy(buffer, v.data(), type_length_); |
636 | buffer += type_length_; |
637 | }); |
638 | } |
639 | |
640 | template <typename DType> |
641 | inline int DictEncoder<DType>::WriteIndices(uint8_t* buffer, int buffer_len) { |
642 | // Write bit width in first byte |
643 | *buffer = static_cast<uint8_t>(bit_width()); |
644 | ++buffer; |
645 | --buffer_len; |
646 | |
647 | ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width()); |
648 | for (int index : buffered_indices_) { |
649 | if (!encoder.Put(index)) return -1; |
650 | } |
651 | encoder.Flush(); |
652 | |
653 | ClearIndices(); |
654 | return 1 + encoder.len(); |
655 | } |
656 | |
657 | // ---------------------------------------------------------------------- |
658 | // DeltaBitPackDecoder |
659 | |
660 | template <typename DType> |
661 | class DeltaBitPackDecoder : public Decoder<DType> { |
662 | public: |
663 | typedef typename DType::c_type T; |
664 | |
665 | explicit DeltaBitPackDecoder(const ColumnDescriptor* descr, |
666 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
667 | : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) { |
668 | if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) { |
669 | throw ParquetException("Delta bit pack encoding should only be for integer data." ); |
670 | } |
671 | } |
672 | |
673 | virtual void SetData(int num_values, const uint8_t* data, int len) { |
674 | num_values_ = num_values; |
675 | decoder_ = BitUtil::BitReader(data, len); |
676 | values_current_block_ = 0; |
677 | values_current_mini_block_ = 0; |
678 | } |
679 | |
680 | virtual int Decode(T* buffer, int max_values) { |
681 | return GetInternal(buffer, max_values); |
682 | } |
683 | |
684 | private: |
685 | using Decoder<DType>::num_values_; |
686 | |
687 | void InitBlock() { |
688 | int32_t block_size; |
689 | if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); |
690 | if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException(); |
691 | if (!decoder_.GetVlqInt(&values_current_block_)) { |
692 | ParquetException::EofException(); |
693 | } |
694 | if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException(); |
695 | |
696 | delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_); |
697 | uint8_t* bit_width_data = delta_bit_widths_->mutable_data(); |
698 | |
699 | if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException(); |
700 | for (int i = 0; i < num_mini_blocks_; ++i) { |
701 | if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) { |
702 | ParquetException::EofException(); |
703 | } |
704 | } |
705 | values_per_mini_block_ = block_size / num_mini_blocks_; |
706 | mini_block_idx_ = 0; |
707 | delta_bit_width_ = bit_width_data[0]; |
708 | values_current_mini_block_ = values_per_mini_block_; |
709 | } |
710 | |
711 | template <typename T> |
712 | int GetInternal(T* buffer, int max_values) { |
713 | max_values = std::min(max_values, num_values_); |
714 | const uint8_t* bit_width_data = delta_bit_widths_->data(); |
715 | for (int i = 0; i < max_values; ++i) { |
716 | if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) { |
717 | ++mini_block_idx_; |
718 | if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) { |
719 | delta_bit_width_ = bit_width_data[mini_block_idx_]; |
720 | values_current_mini_block_ = values_per_mini_block_; |
721 | } else { |
722 | InitBlock(); |
723 | buffer[i] = last_value_; |
724 | continue; |
725 | } |
726 | } |
727 | |
728 | // TODO: the key to this algorithm is to decode the entire miniblock at once. |
729 | int64_t delta; |
730 | if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException(); |
731 | delta += min_delta_; |
732 | last_value_ += static_cast<int32_t>(delta); |
733 | buffer[i] = last_value_; |
734 | --values_current_mini_block_; |
735 | } |
736 | num_values_ -= max_values; |
737 | return max_values; |
738 | } |
739 | |
740 | ::arrow::MemoryPool* pool_; |
741 | BitUtil::BitReader decoder_; |
742 | int32_t values_current_block_; |
743 | int32_t num_mini_blocks_; |
744 | uint64_t values_per_mini_block_; |
745 | uint64_t values_current_mini_block_; |
746 | |
747 | int32_t min_delta_; |
748 | size_t mini_block_idx_; |
749 | std::shared_ptr<ResizableBuffer> delta_bit_widths_; |
750 | int delta_bit_width_; |
751 | |
752 | int32_t last_value_; |
753 | }; |
754 | |
755 | // ---------------------------------------------------------------------- |
756 | // DELTA_LENGTH_BYTE_ARRAY |
757 | |
758 | class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> { |
759 | public: |
760 | explicit DeltaLengthByteArrayDecoder( |
761 | const ColumnDescriptor* descr, |
762 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
763 | : Decoder<ByteArrayType>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY), |
764 | len_decoder_(nullptr, pool) {} |
765 | |
766 | virtual void SetData(int num_values, const uint8_t* data, int len) { |
767 | num_values_ = num_values; |
768 | if (len == 0) return; |
769 | int total_lengths_len = *reinterpret_cast<const int*>(data); |
770 | data += 4; |
771 | len_decoder_.SetData(num_values, data, total_lengths_len); |
772 | data_ = data + total_lengths_len; |
773 | len_ = len - 4 - total_lengths_len; |
774 | } |
775 | |
776 | virtual int Decode(ByteArray* buffer, int max_values) { |
777 | max_values = std::min(max_values, num_values_); |
778 | std::vector<int> lengths(max_values); |
779 | len_decoder_.Decode(lengths.data(), max_values); |
780 | for (int i = 0; i < max_values; ++i) { |
781 | buffer[i].len = lengths[i]; |
782 | buffer[i].ptr = data_; |
783 | data_ += lengths[i]; |
784 | len_ -= lengths[i]; |
785 | } |
786 | num_values_ -= max_values; |
787 | return max_values; |
788 | } |
789 | |
790 | private: |
791 | using Decoder<ByteArrayType>::num_values_; |
792 | DeltaBitPackDecoder<Int32Type> len_decoder_; |
793 | const uint8_t* data_; |
794 | int len_; |
795 | }; |
796 | |
797 | // ---------------------------------------------------------------------- |
798 | // DELTA_BYTE_ARRAY |
799 | |
800 | class DeltaByteArrayDecoder : public Decoder<ByteArrayType> { |
801 | public: |
802 | explicit DeltaByteArrayDecoder( |
803 | const ColumnDescriptor* descr, |
804 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
805 | : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY), |
806 | prefix_len_decoder_(nullptr, pool), |
807 | suffix_decoder_(nullptr, pool), |
808 | last_value_(0, nullptr) {} |
809 | |
810 | virtual void SetData(int num_values, const uint8_t* data, int len) { |
811 | num_values_ = num_values; |
812 | if (len == 0) return; |
813 | int prefix_len_length = *reinterpret_cast<const int*>(data); |
814 | data += 4; |
815 | len -= 4; |
816 | prefix_len_decoder_.SetData(num_values, data, prefix_len_length); |
817 | data += prefix_len_length; |
818 | len -= prefix_len_length; |
819 | suffix_decoder_.SetData(num_values, data, len); |
820 | } |
821 | |
822 | // TODO: this doesn't work and requires memory management. We need to allocate |
823 | // new strings to store the results. |
824 | virtual int Decode(ByteArray* buffer, int max_values) { |
825 | max_values = std::min(max_values, num_values_); |
826 | for (int i = 0; i < max_values; ++i) { |
827 | int prefix_len = 0; |
828 | prefix_len_decoder_.Decode(&prefix_len, 1); |
829 | ByteArray suffix = {0, nullptr}; |
830 | suffix_decoder_.Decode(&suffix, 1); |
831 | buffer[i].len = prefix_len + suffix.len; |
832 | |
833 | uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len)); |
834 | memcpy(result, last_value_.ptr, prefix_len); |
835 | memcpy(result + prefix_len, suffix.ptr, suffix.len); |
836 | |
837 | buffer[i].ptr = result; |
838 | last_value_ = buffer[i]; |
839 | } |
840 | num_values_ -= max_values; |
841 | return max_values; |
842 | } |
843 | |
844 | private: |
845 | using Decoder<ByteArrayType>::num_values_; |
846 | |
847 | DeltaBitPackDecoder<Int32Type> prefix_len_decoder_; |
848 | DeltaLengthByteArrayDecoder suffix_decoder_; |
849 | ByteArray last_value_; |
850 | }; |
851 | |
852 | } // namespace parquet |
853 | |
854 | #endif // PARQUET_ENCODING_INTERNAL_H |
855 | |