1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This module defines an abstract interface for iterating through pages in a
19// Parquet column chunk within a row group. It could be extended in the future
20// to iterate through all data pages in all chunks in a file.
21
22#pragma once
23
24#include <algorithm>
25#include <limits>
26#include <memory>
27#include <string>
28#include <utility>
29#include <vector>
30
31#include <gtest/gtest.h>
32
33#include "parquet/column_page.h"
34#include "parquet/column_reader.h"
35#include "parquet/column_writer.h"
36#include "parquet/encoding-internal.h"
37#include "parquet/util/memory.h"
38#include "parquet/util/test-common.h"
39
40using std::shared_ptr;
41using std::vector;
42
43namespace parquet {
44
45static constexpr int FLBA_LENGTH = 12;
46
47bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
48 return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
49}
50
51namespace test {
52
53template <typename T>
54static void InitValues(int num_values, vector<T>& values, vector<uint8_t>& buffer) {
55 random_numbers(num_values, 0, std::numeric_limits<T>::min(),
56 std::numeric_limits<T>::max(), values.data());
57}
58
59template <typename T>
60static void InitDictValues(int num_values, int num_dicts, vector<T>& values,
61 vector<uint8_t>& buffer) {
62 int repeat_factor = num_values / num_dicts;
63 InitValues<T>(num_dicts, values, buffer);
64 // add some repeated values
65 for (int j = 1; j < repeat_factor; ++j) {
66 for (int i = 0; i < num_dicts; ++i) {
67 std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
68 }
69 }
70 // computed only dict_per_page * repeat_factor - 1 values < num_values
71 // compute remaining
72 for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
73 std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
74 }
75}
76
77class MockPageReader : public PageReader {
78 public:
79 explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
80 : pages_(pages), page_index_(0) {}
81
82 shared_ptr<Page> NextPage() override {
83 if (page_index_ == static_cast<int>(pages_.size())) {
84 // EOS to consumer
85 return shared_ptr<Page>(nullptr);
86 }
87 return pages_[page_index_++];
88 }
89
90 // No-op
91 void set_max_page_header_size(uint32_t size) override {}
92
93 private:
94 vector<shared_ptr<Page>> pages_;
95 int page_index_;
96};
97
98// TODO(wesm): this is only used for testing for now. Refactor to form part of
99// primary file write path
100template <typename Type>
101class DataPageBuilder {
102 public:
103 typedef typename Type::c_type T;
104
105 // This class writes data and metadata to the passed inputs
106 explicit DataPageBuilder(InMemoryOutputStream* sink)
107 : sink_(sink),
108 num_values_(0),
109 encoding_(Encoding::PLAIN),
110 definition_level_encoding_(Encoding::RLE),
111 repetition_level_encoding_(Encoding::RLE),
112 have_def_levels_(false),
113 have_rep_levels_(false),
114 have_values_(false) {}
115
116 void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level,
117 Encoding::type encoding = Encoding::RLE) {
118 AppendLevels(levels, max_level, encoding);
119
120 num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
121 definition_level_encoding_ = encoding;
122 have_def_levels_ = true;
123 }
124
125 void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level,
126 Encoding::type encoding = Encoding::RLE) {
127 AppendLevels(levels, max_level, encoding);
128
129 num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
130 repetition_level_encoding_ = encoding;
131 have_rep_levels_ = true;
132 }
133
134 void AppendValues(const ColumnDescriptor* d, const vector<T>& values,
135 Encoding::type encoding = Encoding::PLAIN) {
136 PlainEncoder<Type> encoder(d);
137 encoder.Put(&values[0], static_cast<int>(values.size()));
138 std::shared_ptr<Buffer> values_sink = encoder.FlushValues();
139 sink_->Write(values_sink->data(), values_sink->size());
140
141 num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
142 encoding_ = encoding;
143 have_values_ = true;
144 }
145
146 int32_t num_values() const { return num_values_; }
147
148 Encoding::type encoding() const { return encoding_; }
149
150 Encoding::type rep_level_encoding() const { return repetition_level_encoding_; }
151
152 Encoding::type def_level_encoding() const { return definition_level_encoding_; }
153
154 private:
155 InMemoryOutputStream* sink_;
156
157 int32_t num_values_;
158 Encoding::type encoding_;
159 Encoding::type definition_level_encoding_;
160 Encoding::type repetition_level_encoding_;
161
162 bool have_def_levels_;
163 bool have_rep_levels_;
164 bool have_values_;
165
166 // Used internally for both repetition and definition levels
167 void AppendLevels(const vector<int16_t>& levels, int16_t max_level,
168 Encoding::type encoding) {
169 if (encoding != Encoding::RLE) {
170 ParquetException::NYI("only rle encoding currently implemented");
171 }
172
173 // TODO: compute a more precise maximum size for the encoded levels
174 vector<uint8_t> encode_buffer(levels.size() * 2);
175
176 // We encode into separate memory from the output stream because the
177 // RLE-encoded bytes have to be preceded in the stream by their absolute
178 // size.
179 LevelEncoder encoder;
180 encoder.Init(encoding, max_level, static_cast<int>(levels.size()),
181 encode_buffer.data(), static_cast<int>(encode_buffer.size()));
182
183 encoder.Encode(static_cast<int>(levels.size()), levels.data());
184
185 int32_t rle_bytes = encoder.len();
186 sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t));
187 sink_->Write(encode_buffer.data(), rle_bytes);
188 }
189};
190
191template <>
192void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor* d,
193 const vector<bool>& values,
194 Encoding::type encoding) {
195 if (encoding != Encoding::PLAIN) {
196 ParquetException::NYI("only plain encoding currently implemented");
197 }
198 PlainEncoder<BooleanType> encoder(d);
199 encoder.Put(values, static_cast<int>(values.size()));
200 std::shared_ptr<Buffer> buffer = encoder.FlushValues();
201 sink_->Write(buffer->data(), buffer->size());
202
203 num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
204 encoding_ = encoding;
205 have_values_ = true;
206}
207
208template <typename Type>
209static shared_ptr<DataPage> MakeDataPage(
210 const ColumnDescriptor* d, const vector<typename Type::c_type>& values, int num_vals,
211 Encoding::type encoding, const uint8_t* indices, int indices_size,
212 const vector<int16_t>& def_levels, int16_t max_def_level,
213 const vector<int16_t>& rep_levels, int16_t max_rep_level) {
214 int num_values = 0;
215
216 InMemoryOutputStream page_stream;
217 test::DataPageBuilder<Type> page_builder(&page_stream);
218
219 if (!rep_levels.empty()) {
220 page_builder.AppendRepLevels(rep_levels, max_rep_level);
221 }
222 if (!def_levels.empty()) {
223 page_builder.AppendDefLevels(def_levels, max_def_level);
224 }
225
226 if (encoding == Encoding::PLAIN) {
227 page_builder.AppendValues(d, values, encoding);
228 num_values = page_builder.num_values();
229 } else { // DICTIONARY PAGES
230 page_stream.Write(indices, indices_size);
231 num_values = std::max(page_builder.num_values(), num_vals);
232 }
233
234 auto buffer = page_stream.GetBuffer();
235
236 return std::make_shared<DataPage>(buffer, num_values, encoding,
237 page_builder.def_level_encoding(),
238 page_builder.rep_level_encoding());
239}
240
241template <typename TYPE>
242class DictionaryPageBuilder {
243 public:
244 typedef typename TYPE::c_type TC;
245 static constexpr int TN = TYPE::type_num;
246
247 // This class writes data and metadata to the passed inputs
248 explicit DictionaryPageBuilder(const ColumnDescriptor* d)
249 : num_dict_values_(0), have_values_(false) {
250 encoder_.reset(new DictEncoder<TYPE>(d));
251 }
252
253 ~DictionaryPageBuilder() {}
254
255 shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
256 int num_values = static_cast<int>(values.size());
257 // Dictionary encoding
258 encoder_->Put(values.data(), num_values);
259 num_dict_values_ = encoder_->num_entries();
260 have_values_ = true;
261 return encoder_->FlushValues();
262 }
263
264 shared_ptr<Buffer> WriteDict() {
265 std::shared_ptr<ResizableBuffer> dict_buffer =
266 AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
267 encoder_->WriteDict(dict_buffer->mutable_data());
268 return std::move(dict_buffer);
269 }
270
271 int32_t num_values() const { return num_dict_values_; }
272
273 private:
274 shared_ptr<DictEncoder<TYPE>> encoder_;
275 int32_t num_dict_values_;
276 bool have_values_;
277};
278
279template <>
280DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor* d) {
281 ParquetException::NYI("only plain encoding currently implemented for boolean");
282}
283
284template <>
285shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() {
286 ParquetException::NYI("only plain encoding currently implemented for boolean");
287 return nullptr;
288}
289
290template <>
291shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues(
292 const vector<TC>& values) {
293 ParquetException::NYI("only plain encoding currently implemented for boolean");
294 return nullptr;
295}
296
297template <typename Type>
298static shared_ptr<DictionaryPage> MakeDictPage(
299 const ColumnDescriptor* d, const vector<typename Type::c_type>& values,
300 const vector<int>& values_per_page, Encoding::type encoding,
301 vector<shared_ptr<Buffer>>& rle_indices) {
302 InMemoryOutputStream page_stream;
303 test::DictionaryPageBuilder<Type> page_builder(d);
304 int num_pages = static_cast<int>(values_per_page.size());
305 int value_start = 0;
306
307 for (int i = 0; i < num_pages; i++) {
308 rle_indices.push_back(page_builder.AppendValues(
309 slice(values, value_start, value_start + values_per_page[i])));
310 value_start += values_per_page[i];
311 }
312
313 auto buffer = page_builder.WriteDict();
314
315 return std::make_shared<DictionaryPage>(buffer, page_builder.num_values(),
316 Encoding::PLAIN);
317}
318
319// Given def/rep levels and values create multiple dict pages
320template <typename Type>
321static void PaginateDict(const ColumnDescriptor* d,
322 const vector<typename Type::c_type>& values,
323 const vector<int16_t>& def_levels, int16_t max_def_level,
324 const vector<int16_t>& rep_levels, int16_t max_rep_level,
325 int num_levels_per_page, const vector<int>& values_per_page,
326 vector<shared_ptr<Page>>& pages,
327 Encoding::type encoding = Encoding::RLE_DICTIONARY) {
328 int num_pages = static_cast<int>(values_per_page.size());
329 vector<shared_ptr<Buffer>> rle_indices;
330 shared_ptr<DictionaryPage> dict_page =
331 MakeDictPage<Type>(d, values, values_per_page, encoding, rle_indices);
332 pages.push_back(dict_page);
333 int def_level_start = 0;
334 int def_level_end = 0;
335 int rep_level_start = 0;
336 int rep_level_end = 0;
337 for (int i = 0; i < num_pages; i++) {
338 if (max_def_level > 0) {
339 def_level_start = i * num_levels_per_page;
340 def_level_end = (i + 1) * num_levels_per_page;
341 }
342 if (max_rep_level > 0) {
343 rep_level_start = i * num_levels_per_page;
344 rep_level_end = (i + 1) * num_levels_per_page;
345 }
346 shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
347 d, {}, values_per_page[i], encoding, rle_indices[i]->data(),
348 static_cast<int>(rle_indices[i]->size()),
349 slice(def_levels, def_level_start, def_level_end), max_def_level,
350 slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
351 pages.push_back(data_page);
352 }
353}
354
355// Given def/rep levels and values create multiple plain pages
356template <typename Type>
357static void PaginatePlain(const ColumnDescriptor* d,
358 const vector<typename Type::c_type>& values,
359 const vector<int16_t>& def_levels, int16_t max_def_level,
360 const vector<int16_t>& rep_levels, int16_t max_rep_level,
361 int num_levels_per_page, const vector<int>& values_per_page,
362 vector<shared_ptr<Page>>& pages,
363 Encoding::type encoding = Encoding::PLAIN) {
364 int num_pages = static_cast<int>(values_per_page.size());
365 int def_level_start = 0;
366 int def_level_end = 0;
367 int rep_level_start = 0;
368 int rep_level_end = 0;
369 int value_start = 0;
370 for (int i = 0; i < num_pages; i++) {
371 if (max_def_level > 0) {
372 def_level_start = i * num_levels_per_page;
373 def_level_end = (i + 1) * num_levels_per_page;
374 }
375 if (max_rep_level > 0) {
376 rep_level_start = i * num_levels_per_page;
377 rep_level_end = (i + 1) * num_levels_per_page;
378 }
379 shared_ptr<DataPage> page = MakeDataPage<Type>(
380 d, slice(values, value_start, value_start + values_per_page[i]),
381 values_per_page[i], encoding, nullptr, 0,
382 slice(def_levels, def_level_start, def_level_end), max_def_level,
383 slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
384 pages.push_back(page);
385 value_start += values_per_page[i];
386 }
387}
388
389// Generates pages from randomly generated data
390template <typename Type>
391static int MakePages(const ColumnDescriptor* d, int num_pages, int levels_per_page,
392 vector<int16_t>& def_levels, vector<int16_t>& rep_levels,
393 vector<typename Type::c_type>& values, vector<uint8_t>& buffer,
394 vector<shared_ptr<Page>>& pages,
395 Encoding::type encoding = Encoding::PLAIN) {
396 int num_levels = levels_per_page * num_pages;
397 int num_values = 0;
398 uint32_t seed = 0;
399 int16_t zero = 0;
400 int16_t max_def_level = d->max_definition_level();
401 int16_t max_rep_level = d->max_repetition_level();
402 vector<int> values_per_page(num_pages, levels_per_page);
403 // Create definition levels
404 if (max_def_level > 0) {
405 def_levels.resize(num_levels);
406 random_numbers(num_levels, seed, zero, max_def_level, def_levels.data());
407 for (int p = 0; p < num_pages; p++) {
408 int num_values_per_page = 0;
409 for (int i = 0; i < levels_per_page; i++) {
410 if (def_levels[i + p * levels_per_page] == max_def_level) {
411 num_values_per_page++;
412 num_values++;
413 }
414 }
415 values_per_page[p] = num_values_per_page;
416 }
417 } else {
418 num_values = num_levels;
419 }
420 // Create repitition levels
421 if (max_rep_level > 0) {
422 rep_levels.resize(num_levels);
423 random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
424 }
425 // Create values
426 values.resize(num_values);
427 if (encoding == Encoding::PLAIN) {
428 InitValues<typename Type::c_type>(num_values, values, buffer);
429 PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
430 levels_per_page, values_per_page, pages);
431 } else if (encoding == Encoding::RLE_DICTIONARY ||
432 encoding == Encoding::PLAIN_DICTIONARY) {
433 // Calls InitValues and repeats the data
434 InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer);
435 PaginateDict<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
436 levels_per_page, values_per_page, pages);
437 }
438
439 return num_values;
440}
441
442} // namespace test
443
444} // namespace parquet
445