1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include <memory>
21#include <vector>
22
23#include "parquet/column_page.h"
24#include "parquet/encoding.h"
25#include "parquet/metadata.h"
26#include "parquet/properties.h"
27#include "parquet/schema.h"
28#include "parquet/statistics.h"
29#include "parquet/types.h"
30#include "parquet/util/macros.h"
31#include "parquet/util/memory.h"
32#include "parquet/util/visibility.h"
33
34namespace arrow {
35
36namespace BitUtil {
37class BitWriter;
38} // namespace BitUtil
39
40namespace util {
41class RleEncoder;
42} // namespace util
43
44} // namespace arrow
45
46namespace parquet {
47
48class PARQUET_EXPORT LevelEncoder {
49 public:
50 LevelEncoder();
51 ~LevelEncoder();
52
53 static int MaxBufferSize(Encoding::type encoding, int16_t max_level,
54 int num_buffered_values);
55
56 // Initialize the LevelEncoder.
57 void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
58 uint8_t* data, int data_size);
59
60 // Encodes a batch of levels from an array and returns the number of levels encoded
61 int Encode(int batch_size, const int16_t* levels);
62
63 int32_t len() {
64 if (encoding_ != Encoding::RLE) {
65 throw ParquetException("Only implemented for RLE encoding");
66 }
67 return rle_length_;
68 }
69
70 private:
71 int bit_width_;
72 int rle_length_;
73 Encoding::type encoding_;
74 std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_;
75 std::unique_ptr<::arrow::BitUtil::BitWriter> bit_packed_encoder_;
76};
77
78class PARQUET_EXPORT PageWriter {
79 public:
80 virtual ~PageWriter() {}
81
82 static std::unique_ptr<PageWriter> Open(
83 OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
84 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
85 bool buffered_row_group = false);
86
87 // The Column Writer decides if dictionary encoding is used if set and
88 // if the dictionary encoding has fallen back to default encoding on reaching dictionary
89 // page limit
90 virtual void Close(bool has_dictionary, bool fallback) = 0;
91
92 virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
93
94 virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
95
96 virtual bool has_compressor() = 0;
97
98 virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
99};
100
101static constexpr int WRITE_BATCH_SIZE = 1000;
102class PARQUET_EXPORT ColumnWriter {
103 public:
104 ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
105 bool has_dictionary, Encoding::type encoding,
106 const WriterProperties* properties);
107
108 virtual ~ColumnWriter() = default;
109
110 static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
111 std::unique_ptr<PageWriter>,
112 const WriterProperties* properties);
113
114 Type::type type() const { return descr_->physical_type(); }
115
116 const ColumnDescriptor* descr() const { return descr_; }
117
118 /**
119 * Closes the ColumnWriter, commits any buffered values to pages.
120 *
121 * @return Total size of the column in bytes
122 */
123 int64_t Close();
124
125 int64_t rows_written() const { return rows_written_; }
126
127 // Only considers the size of the compressed pages + page header
128 // Some values might be still buffered an not written to a page yet
129 int64_t total_compressed_bytes() const { return total_compressed_bytes_; }
130
131 int64_t total_bytes_written() const { return total_bytes_written_; }
132
133 const WriterProperties* properties() { return properties_; }
134
135 protected:
136 virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
137
138 // Serializes Dictionary Page if enabled
139 virtual void WriteDictionaryPage() = 0;
140
141 // Checks if the Dictionary Page size limit is reached
142 // If the limit is reached, the Dictionary and Data Pages are serialized
143 // The encoding is switched to PLAIN
144
145 virtual void CheckDictionarySizeLimit() = 0;
146
147 // Plain-encoded statistics of the current page
148 virtual EncodedStatistics GetPageStatistics() = 0;
149
150 // Plain-encoded statistics of the whole chunk
151 virtual EncodedStatistics GetChunkStatistics() = 0;
152
153 // Merges page statistics into chunk statistics, then resets the values
154 virtual void ResetPageStatistics() = 0;
155
156 // Adds Data Pages to an in memory buffer in dictionary encoding mode
157 // Serializes the Data Pages in other encoding modes
158 void AddDataPage();
159
160 // Serializes Data Pages
161 void WriteDataPage(const CompressedDataPage& page);
162
163 // Write multiple definition levels
164 void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
165
166 // Write multiple repetition levels
167 void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
168
169 // RLE encode the src_buffer into dest_buffer and return the encoded size
170 int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer,
171 int16_t max_level);
172
173 // Serialize the buffered Data Pages
174 void FlushBufferedDataPages();
175
176 ColumnChunkMetaDataBuilder* metadata_;
177 const ColumnDescriptor* descr_;
178
179 std::unique_ptr<PageWriter> pager_;
180
181 bool has_dictionary_;
182 Encoding::type encoding_;
183 const WriterProperties* properties_;
184
185 LevelEncoder level_encoder_;
186
187 ::arrow::MemoryPool* allocator_;
188
189 // The total number of values stored in the data page. This is the maximum of
190 // the number of encoded definition levels or encoded values. For
191 // non-repeated, required columns, this is equal to the number of encoded
192 // values. For repeated or optional values, there may be fewer data values
193 // than levels, and this tells you how many encoded levels there are in that
194 // case.
195 int64_t num_buffered_values_;
196
197 // The total number of stored values. For repeated or optional values, this
198 // number may be lower than num_buffered_values_.
199 int64_t num_buffered_encoded_values_;
200
201 // Total number of rows written with this ColumnWriter
202 int rows_written_;
203
204 // Records the total number of bytes written by the serializer
205 int64_t total_bytes_written_;
206
207 // Records the current number of compressed bytes in a column
208 int64_t total_compressed_bytes_;
209
210 // Flag to check if the Writer has been closed
211 bool closed_;
212
213 // Flag to infer if dictionary encoding has fallen back to PLAIN
214 bool fallback_;
215
216 std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
217 std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
218
219 std::shared_ptr<ResizableBuffer> definition_levels_rle_;
220 std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
221
222 std::shared_ptr<ResizableBuffer> uncompressed_data_;
223 std::shared_ptr<ResizableBuffer> compressed_data_;
224
225 std::vector<CompressedDataPage> data_pages_;
226
227 private:
228 void InitSinks();
229};
230
231// API to write values to a single column. This is the main client facing API.
232template <typename DType>
233class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter {
234 public:
235 typedef typename DType::c_type T;
236
237 TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
238 std::unique_ptr<PageWriter> pager, const bool use_dictionary,
239 Encoding::type encoding, const WriterProperties* properties);
240
241 // Write a batch of repetition levels, definition levels, and values to the
242 // column.
243 void WriteBatch(int64_t num_values, const int16_t* def_levels,
244 const int16_t* rep_levels, const T* values);
245
246 /// Write a batch of repetition levels, definition levels, and values to the
247 /// column.
248 ///
249 /// In comparision to WriteBatch the length of repetition and definition levels
250 /// is the same as of the number of values read for max_definition_level == 1.
251 /// In the case of max_definition_level > 1, the repetition and definition
252 /// levels are larger than the values but the values include the null entries
253 /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
254 /// in the parameters of this function if the input has the length of num_values or the
255 /// _number of rows in the lowest nesting level_.
256 ///
257 /// In the case that the most inner node in the Parquet is required, the _number of rows
258 /// in the lowest nesting level_ is equal to the number of non-null values. If the
259 /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
260 /// also includes all values with definition_level == (max_definition_level - 1).
261 ///
262 /// @param num_values number of levels to write.
263 /// @param def_levels The Parquet definiton levels, length is num_values
264 /// @param rep_levels The Parquet repetition levels, length is num_values
265 /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
266 /// level. The length is number of rows in the lowest nesting level.
267 /// @param valid_bits_offset The offset in bits of the valid_bits where the
268 /// first relevant bit resides.
269 /// @param values The values in the lowest nested level including
270 /// spacing for nulls on the lowest levels; input has the length
271 /// of the number of rows on the lowest nesting level.
272 void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
273 const int16_t* rep_levels, const uint8_t* valid_bits,
274 int64_t valid_bits_offset, const T* values);
275
276 // Estimated size of the values that are not written to a page yet
277 int64_t EstimatedBufferedValueBytes() const {
278 return current_encoder_->EstimatedDataEncodedSize();
279 }
280
281 protected:
282 std::shared_ptr<Buffer> GetValuesBuffer() override {
283 return current_encoder_->FlushValues();
284 }
285 void WriteDictionaryPage() override;
286 void CheckDictionarySizeLimit() override;
287 EncodedStatistics GetPageStatistics() override;
288 EncodedStatistics GetChunkStatistics() override;
289 void ResetPageStatistics() override;
290
291 private:
292 int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
293 const int16_t* rep_levels, const T* values);
294
295 int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
296 const int16_t* rep_levels, const uint8_t* valid_bits,
297 int64_t valid_bits_offset, const T* values,
298 int64_t* num_spaced_written);
299
300 typedef Encoder<DType> EncoderType;
301
302 // Write values to a temporary buffer before they are encoded into pages
303 void WriteValues(int64_t num_values, const T* values);
304 void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
305 int64_t valid_bits_offset, const T* values);
306 std::unique_ptr<EncoderType> current_encoder_;
307
308 typedef TypedRowGroupStatistics<DType> TypedStats;
309 std::unique_ptr<TypedStats> page_statistics_;
310 std::unique_ptr<TypedStats> chunk_statistics_;
311};
312
313typedef TypedColumnWriter<BooleanType> BoolWriter;
314typedef TypedColumnWriter<Int32Type> Int32Writer;
315typedef TypedColumnWriter<Int64Type> Int64Writer;
316typedef TypedColumnWriter<Int96Type> Int96Writer;
317typedef TypedColumnWriter<FloatType> FloatWriter;
318typedef TypedColumnWriter<DoubleType> DoubleWriter;
319typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter;
320typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter;
321
322PARQUET_EXTERN_TEMPLATE TypedColumnWriter<BooleanType>;
323PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int32Type>;
324PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int64Type>;
325PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int96Type>;
326PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FloatType>;
327PARQUET_EXTERN_TEMPLATE TypedColumnWriter<DoubleType>;
328PARQUET_EXTERN_TEMPLATE TypedColumnWriter<ByteArrayType>;
329PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FLBAType>;
330
331} // namespace parquet
332