1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_ENCODING_H
19#define PARQUET_ENCODING_H
20
21#include <cstdint>
22#include <memory>
23#include <sstream>
24
25#include "arrow/status.h"
26#include "arrow/util/bit-util.h"
27
28#include "parquet/exception.h"
29#include "parquet/schema.h"
30#include "parquet/types.h"
31#include "parquet/util/memory.h"
32
33namespace parquet {
34
35class ColumnDescriptor;
36
37// Base class for value encoders. Since encoders may or not have state (e.g.,
38// dictionary encoding) we use a class instance to maintain any state.
39//
40// TODO(wesm): Encode interface API is temporary
41template <typename DType>
42class Encoder {
43 public:
44 typedef typename DType::c_type T;
45
46 virtual ~Encoder() {}
47
48 virtual int64_t EstimatedDataEncodedSize() = 0;
49 virtual std::shared_ptr<Buffer> FlushValues() = 0;
50 virtual void Put(const T* src, int num_values) = 0;
51 virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
52 int64_t valid_bits_offset) {
53 std::shared_ptr<ResizableBuffer> buffer;
54 auto status =
55 ::arrow::AllocateResizableBuffer(pool_, num_values * sizeof(T), &buffer);
56 if (!status.ok()) {
57 std::ostringstream ss;
58 ss << "AllocateResizableBuffer failed in Encoder.PutSpaced in " << __FILE__
59 << ", on line " << __LINE__;
60 throw ParquetException(ss.str());
61 }
62 int32_t num_valid_values = 0;
63 ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
64 num_values);
65 T* data = reinterpret_cast<T*>(buffer->mutable_data());
66 for (int32_t i = 0; i < num_values; i++) {
67 if (valid_bits_reader.IsSet()) {
68 data[num_valid_values++] = src[i];
69 }
70 valid_bits_reader.Next();
71 }
72 Put(data, num_valid_values);
73 }
74
75 Encoding::type encoding() const { return encoding_; }
76
77 protected:
78 explicit Encoder(const ColumnDescriptor* descr, Encoding::type encoding,
79 ::arrow::MemoryPool* pool)
80 : descr_(descr), encoding_(encoding), pool_(pool) {}
81
82 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
83 const ColumnDescriptor* descr_;
84 const Encoding::type encoding_;
85 ::arrow::MemoryPool* pool_;
86};
87
88// The Decoder template is parameterized on parquet::DataType subclasses
89template <typename DType>
90class Decoder {
91 public:
92 typedef typename DType::c_type T;
93
94 virtual ~Decoder() {}
95
96 // Sets the data for a new page. This will be called multiple times on the same
97 // decoder and should reset all internal state.
98 virtual void SetData(int num_values, const uint8_t* data, int len) = 0;
99
100 // Subclasses should override the ones they support. In each of these functions,
101 // the decoder would decode put to 'max_values', storing the result in 'buffer'.
102 // The function returns the number of values decoded, which should be max_values
103 // except for end of the current data page.
104 virtual int Decode(T* buffer, int max_values) = 0;
105
106 // Decode the values in this data page but leave spaces for null entries.
107 //
108 // num_values is the size of the def_levels and buffer arrays including the number of
109 // null values.
110 virtual int DecodeSpaced(T* buffer, int num_values, int null_count,
111 const uint8_t* valid_bits, int64_t valid_bits_offset) {
112 int values_to_read = num_values - null_count;
113 int values_read = Decode(buffer, values_to_read);
114 if (values_read != values_to_read) {
115 throw ParquetException("Number of values / definition_levels read did not match");
116 }
117
118 // Depending on the number of nulls, some of the value slots in buffer may
119 // be uninitialized, and this will cause valgrind warnings / potentially UB
120 memset(static_cast<void*>(buffer + values_read), 0,
121 (num_values - values_read) * sizeof(T));
122
123 // Add spacing for null entries. As we have filled the buffer from the front,
124 // we need to add the spacing from the back.
125 int values_to_move = values_read;
126 for (int i = num_values - 1; i >= 0; i--) {
127 if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
128 buffer[i] = buffer[--values_to_move];
129 }
130 }
131 return num_values;
132 }
133
134 // Returns the number of values left (for the last call to SetData()). This is
135 // the number of values left in this page.
136 int values_left() const { return num_values_; }
137
138 Encoding::type encoding() const { return encoding_; }
139
140 protected:
141 explicit Decoder(const ColumnDescriptor* descr, Encoding::type encoding)
142 : descr_(descr), encoding_(encoding), num_values_(0) {}
143
144 // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
145 const ColumnDescriptor* descr_;
146
147 const Encoding::type encoding_;
148 int num_values_;
149};
150
151} // namespace parquet
152
153#endif // PARQUET_ENCODING_H
154