1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // This module defines an abstract interface for iterating through pages in a |
19 | // Parquet column chunk within a row group. It could be extended in the future |
20 | // to iterate through all data pages in all chunks in a file. |
21 | |
22 | #pragma once |
23 | |
24 | #include <algorithm> |
25 | #include <limits> |
26 | #include <memory> |
27 | #include <string> |
28 | #include <utility> |
29 | #include <vector> |
30 | |
31 | #include <gtest/gtest.h> |
32 | |
33 | #include "parquet/column_page.h" |
34 | #include "parquet/column_reader.h" |
35 | #include "parquet/column_writer.h" |
36 | #include "parquet/encoding-internal.h" |
37 | #include "parquet/util/memory.h" |
38 | #include "parquet/util/test-common.h" |
39 | |
40 | using std::shared_ptr; |
41 | using std::vector; |
42 | |
43 | namespace parquet { |
44 | |
45 | static constexpr int FLBA_LENGTH = 12; |
46 | |
47 | bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { |
48 | return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH); |
49 | } |
50 | |
51 | namespace test { |
52 | |
53 | template <typename T> |
54 | static void InitValues(int num_values, vector<T>& values, vector<uint8_t>& buffer) { |
55 | random_numbers(num_values, 0, std::numeric_limits<T>::min(), |
56 | std::numeric_limits<T>::max(), values.data()); |
57 | } |
58 | |
59 | template <typename T> |
60 | static void InitDictValues(int num_values, int num_dicts, vector<T>& values, |
61 | vector<uint8_t>& buffer) { |
62 | int repeat_factor = num_values / num_dicts; |
63 | InitValues<T>(num_dicts, values, buffer); |
64 | // add some repeated values |
65 | for (int j = 1; j < repeat_factor; ++j) { |
66 | for (int i = 0; i < num_dicts; ++i) { |
67 | std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T)); |
68 | } |
69 | } |
70 | // computed only dict_per_page * repeat_factor - 1 values < num_values |
71 | // compute remaining |
72 | for (int i = num_dicts * repeat_factor; i < num_values; ++i) { |
73 | std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T)); |
74 | } |
75 | } |
76 | |
77 | class : public PageReader { |
78 | public: |
79 | explicit (const vector<shared_ptr<Page>>& pages) |
80 | : pages_(pages), page_index_(0) {} |
81 | |
82 | shared_ptr<Page> () override { |
83 | if (page_index_ == static_cast<int>(pages_.size())) { |
84 | // EOS to consumer |
85 | return shared_ptr<Page>(nullptr); |
86 | } |
87 | return pages_[page_index_++]; |
88 | } |
89 | |
90 | // No-op |
91 | void (uint32_t size) override {} |
92 | |
93 | private: |
94 | vector<shared_ptr<Page>> ; |
95 | int ; |
96 | }; |
97 | |
98 | // TODO(wesm): this is only used for testing for now. Refactor to form part of |
99 | // primary file write path |
100 | template <typename Type> |
101 | class DataPageBuilder { |
102 | public: |
103 | typedef typename Type::c_type T; |
104 | |
105 | // This class writes data and metadata to the passed inputs |
106 | explicit DataPageBuilder(InMemoryOutputStream* sink) |
107 | : sink_(sink), |
108 | num_values_(0), |
109 | encoding_(Encoding::PLAIN), |
110 | definition_level_encoding_(Encoding::RLE), |
111 | repetition_level_encoding_(Encoding::RLE), |
112 | have_def_levels_(false), |
113 | have_rep_levels_(false), |
114 | have_values_(false) {} |
115 | |
116 | void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level, |
117 | Encoding::type encoding = Encoding::RLE) { |
118 | AppendLevels(levels, max_level, encoding); |
119 | |
120 | num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_); |
121 | definition_level_encoding_ = encoding; |
122 | have_def_levels_ = true; |
123 | } |
124 | |
125 | void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level, |
126 | Encoding::type encoding = Encoding::RLE) { |
127 | AppendLevels(levels, max_level, encoding); |
128 | |
129 | num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_); |
130 | repetition_level_encoding_ = encoding; |
131 | have_rep_levels_ = true; |
132 | } |
133 | |
134 | void AppendValues(const ColumnDescriptor* d, const vector<T>& values, |
135 | Encoding::type encoding = Encoding::PLAIN) { |
136 | PlainEncoder<Type> encoder(d); |
137 | encoder.Put(&values[0], static_cast<int>(values.size())); |
138 | std::shared_ptr<Buffer> values_sink = encoder.FlushValues(); |
139 | sink_->Write(values_sink->data(), values_sink->size()); |
140 | |
141 | num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_); |
142 | encoding_ = encoding; |
143 | have_values_ = true; |
144 | } |
145 | |
146 | int32_t num_values() const { return num_values_; } |
147 | |
148 | Encoding::type encoding() const { return encoding_; } |
149 | |
150 | Encoding::type rep_level_encoding() const { return repetition_level_encoding_; } |
151 | |
152 | Encoding::type def_level_encoding() const { return definition_level_encoding_; } |
153 | |
154 | private: |
155 | InMemoryOutputStream* sink_; |
156 | |
157 | int32_t num_values_; |
158 | Encoding::type encoding_; |
159 | Encoding::type definition_level_encoding_; |
160 | Encoding::type repetition_level_encoding_; |
161 | |
162 | bool have_def_levels_; |
163 | bool have_rep_levels_; |
164 | bool have_values_; |
165 | |
166 | // Used internally for both repetition and definition levels |
167 | void AppendLevels(const vector<int16_t>& levels, int16_t max_level, |
168 | Encoding::type encoding) { |
169 | if (encoding != Encoding::RLE) { |
170 | ParquetException::NYI("only rle encoding currently implemented" ); |
171 | } |
172 | |
173 | // TODO: compute a more precise maximum size for the encoded levels |
174 | vector<uint8_t> encode_buffer(levels.size() * 2); |
175 | |
176 | // We encode into separate memory from the output stream because the |
177 | // RLE-encoded bytes have to be preceded in the stream by their absolute |
178 | // size. |
179 | LevelEncoder encoder; |
180 | encoder.Init(encoding, max_level, static_cast<int>(levels.size()), |
181 | encode_buffer.data(), static_cast<int>(encode_buffer.size())); |
182 | |
183 | encoder.Encode(static_cast<int>(levels.size()), levels.data()); |
184 | |
185 | int32_t rle_bytes = encoder.len(); |
186 | sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t)); |
187 | sink_->Write(encode_buffer.data(), rle_bytes); |
188 | } |
189 | }; |
190 | |
191 | template <> |
192 | void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor* d, |
193 | const vector<bool>& values, |
194 | Encoding::type encoding) { |
195 | if (encoding != Encoding::PLAIN) { |
196 | ParquetException::NYI("only plain encoding currently implemented" ); |
197 | } |
198 | PlainEncoder<BooleanType> encoder(d); |
199 | encoder.Put(values, static_cast<int>(values.size())); |
200 | std::shared_ptr<Buffer> buffer = encoder.FlushValues(); |
201 | sink_->Write(buffer->data(), buffer->size()); |
202 | |
203 | num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_); |
204 | encoding_ = encoding; |
205 | have_values_ = true; |
206 | } |
207 | |
208 | template <typename Type> |
209 | static shared_ptr<DataPage> MakeDataPage( |
210 | const ColumnDescriptor* d, const vector<typename Type::c_type>& values, int num_vals, |
211 | Encoding::type encoding, const uint8_t* indices, int indices_size, |
212 | const vector<int16_t>& def_levels, int16_t max_def_level, |
213 | const vector<int16_t>& rep_levels, int16_t max_rep_level) { |
214 | int num_values = 0; |
215 | |
216 | InMemoryOutputStream page_stream; |
217 | test::DataPageBuilder<Type> page_builder(&page_stream); |
218 | |
219 | if (!rep_levels.empty()) { |
220 | page_builder.AppendRepLevels(rep_levels, max_rep_level); |
221 | } |
222 | if (!def_levels.empty()) { |
223 | page_builder.AppendDefLevels(def_levels, max_def_level); |
224 | } |
225 | |
226 | if (encoding == Encoding::PLAIN) { |
227 | page_builder.AppendValues(d, values, encoding); |
228 | num_values = page_builder.num_values(); |
229 | } else { // DICTIONARY PAGES |
230 | page_stream.Write(indices, indices_size); |
231 | num_values = std::max(page_builder.num_values(), num_vals); |
232 | } |
233 | |
234 | auto buffer = page_stream.GetBuffer(); |
235 | |
236 | return std::make_shared<DataPage>(buffer, num_values, encoding, |
237 | page_builder.def_level_encoding(), |
238 | page_builder.rep_level_encoding()); |
239 | } |
240 | |
241 | template <typename TYPE> |
242 | class DictionaryPageBuilder { |
243 | public: |
244 | typedef typename TYPE::c_type TC; |
245 | static constexpr int TN = TYPE::type_num; |
246 | |
247 | // This class writes data and metadata to the passed inputs |
248 | explicit DictionaryPageBuilder(const ColumnDescriptor* d) |
249 | : num_dict_values_(0), have_values_(false) { |
250 | encoder_.reset(new DictEncoder<TYPE>(d)); |
251 | } |
252 | |
253 | ~DictionaryPageBuilder() {} |
254 | |
255 | shared_ptr<Buffer> AppendValues(const vector<TC>& values) { |
256 | int num_values = static_cast<int>(values.size()); |
257 | // Dictionary encoding |
258 | encoder_->Put(values.data(), num_values); |
259 | num_dict_values_ = encoder_->num_entries(); |
260 | have_values_ = true; |
261 | return encoder_->FlushValues(); |
262 | } |
263 | |
264 | shared_ptr<Buffer> WriteDict() { |
265 | std::shared_ptr<ResizableBuffer> dict_buffer = |
266 | AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size()); |
267 | encoder_->WriteDict(dict_buffer->mutable_data()); |
268 | return std::move(dict_buffer); |
269 | } |
270 | |
271 | int32_t num_values() const { return num_dict_values_; } |
272 | |
273 | private: |
274 | shared_ptr<DictEncoder<TYPE>> encoder_; |
275 | int32_t num_dict_values_; |
276 | bool have_values_; |
277 | }; |
278 | |
279 | template <> |
280 | DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor* d) { |
281 | ParquetException::NYI("only plain encoding currently implemented for boolean" ); |
282 | } |
283 | |
284 | template <> |
285 | shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() { |
286 | ParquetException::NYI("only plain encoding currently implemented for boolean" ); |
287 | return nullptr; |
288 | } |
289 | |
290 | template <> |
291 | shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues( |
292 | const vector<TC>& values) { |
293 | ParquetException::NYI("only plain encoding currently implemented for boolean" ); |
294 | return nullptr; |
295 | } |
296 | |
297 | template <typename Type> |
298 | static shared_ptr<DictionaryPage> MakeDictPage( |
299 | const ColumnDescriptor* d, const vector<typename Type::c_type>& values, |
300 | const vector<int>& values_per_page, Encoding::type encoding, |
301 | vector<shared_ptr<Buffer>>& rle_indices) { |
302 | InMemoryOutputStream page_stream; |
303 | test::DictionaryPageBuilder<Type> page_builder(d); |
304 | int num_pages = static_cast<int>(values_per_page.size()); |
305 | int value_start = 0; |
306 | |
307 | for (int i = 0; i < num_pages; i++) { |
308 | rle_indices.push_back(page_builder.AppendValues( |
309 | slice(values, value_start, value_start + values_per_page[i]))); |
310 | value_start += values_per_page[i]; |
311 | } |
312 | |
313 | auto buffer = page_builder.WriteDict(); |
314 | |
315 | return std::make_shared<DictionaryPage>(buffer, page_builder.num_values(), |
316 | Encoding::PLAIN); |
317 | } |
318 | |
319 | // Given def/rep levels and values create multiple dict pages |
320 | template <typename Type> |
321 | static void PaginateDict(const ColumnDescriptor* d, |
322 | const vector<typename Type::c_type>& values, |
323 | const vector<int16_t>& def_levels, int16_t max_def_level, |
324 | const vector<int16_t>& rep_levels, int16_t max_rep_level, |
325 | int num_levels_per_page, const vector<int>& values_per_page, |
326 | vector<shared_ptr<Page>>& pages, |
327 | Encoding::type encoding = Encoding::RLE_DICTIONARY) { |
328 | int num_pages = static_cast<int>(values_per_page.size()); |
329 | vector<shared_ptr<Buffer>> rle_indices; |
330 | shared_ptr<DictionaryPage> dict_page = |
331 | MakeDictPage<Type>(d, values, values_per_page, encoding, rle_indices); |
332 | pages.push_back(dict_page); |
333 | int def_level_start = 0; |
334 | int def_level_end = 0; |
335 | int rep_level_start = 0; |
336 | int rep_level_end = 0; |
337 | for (int i = 0; i < num_pages; i++) { |
338 | if (max_def_level > 0) { |
339 | def_level_start = i * num_levels_per_page; |
340 | def_level_end = (i + 1) * num_levels_per_page; |
341 | } |
342 | if (max_rep_level > 0) { |
343 | rep_level_start = i * num_levels_per_page; |
344 | rep_level_end = (i + 1) * num_levels_per_page; |
345 | } |
346 | shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>( |
347 | d, {}, values_per_page[i], encoding, rle_indices[i]->data(), |
348 | static_cast<int>(rle_indices[i]->size()), |
349 | slice(def_levels, def_level_start, def_level_end), max_def_level, |
350 | slice(rep_levels, rep_level_start, rep_level_end), max_rep_level); |
351 | pages.push_back(data_page); |
352 | } |
353 | } |
354 | |
355 | // Given def/rep levels and values create multiple plain pages |
356 | template <typename Type> |
357 | static void PaginatePlain(const ColumnDescriptor* d, |
358 | const vector<typename Type::c_type>& values, |
359 | const vector<int16_t>& def_levels, int16_t max_def_level, |
360 | const vector<int16_t>& rep_levels, int16_t max_rep_level, |
361 | int num_levels_per_page, const vector<int>& values_per_page, |
362 | vector<shared_ptr<Page>>& pages, |
363 | Encoding::type encoding = Encoding::PLAIN) { |
364 | int num_pages = static_cast<int>(values_per_page.size()); |
365 | int def_level_start = 0; |
366 | int def_level_end = 0; |
367 | int rep_level_start = 0; |
368 | int rep_level_end = 0; |
369 | int value_start = 0; |
370 | for (int i = 0; i < num_pages; i++) { |
371 | if (max_def_level > 0) { |
372 | def_level_start = i * num_levels_per_page; |
373 | def_level_end = (i + 1) * num_levels_per_page; |
374 | } |
375 | if (max_rep_level > 0) { |
376 | rep_level_start = i * num_levels_per_page; |
377 | rep_level_end = (i + 1) * num_levels_per_page; |
378 | } |
379 | shared_ptr<DataPage> page = MakeDataPage<Type>( |
380 | d, slice(values, value_start, value_start + values_per_page[i]), |
381 | values_per_page[i], encoding, nullptr, 0, |
382 | slice(def_levels, def_level_start, def_level_end), max_def_level, |
383 | slice(rep_levels, rep_level_start, rep_level_end), max_rep_level); |
384 | pages.push_back(page); |
385 | value_start += values_per_page[i]; |
386 | } |
387 | } |
388 | |
389 | // Generates pages from randomly generated data |
390 | template <typename Type> |
391 | static int MakePages(const ColumnDescriptor* d, int num_pages, int levels_per_page, |
392 | vector<int16_t>& def_levels, vector<int16_t>& rep_levels, |
393 | vector<typename Type::c_type>& values, vector<uint8_t>& buffer, |
394 | vector<shared_ptr<Page>>& pages, |
395 | Encoding::type encoding = Encoding::PLAIN) { |
396 | int num_levels = levels_per_page * num_pages; |
397 | int num_values = 0; |
398 | uint32_t seed = 0; |
399 | int16_t zero = 0; |
400 | int16_t max_def_level = d->max_definition_level(); |
401 | int16_t max_rep_level = d->max_repetition_level(); |
402 | vector<int> values_per_page(num_pages, levels_per_page); |
403 | // Create definition levels |
404 | if (max_def_level > 0) { |
405 | def_levels.resize(num_levels); |
406 | random_numbers(num_levels, seed, zero, max_def_level, def_levels.data()); |
407 | for (int p = 0; p < num_pages; p++) { |
408 | int num_values_per_page = 0; |
409 | for (int i = 0; i < levels_per_page; i++) { |
410 | if (def_levels[i + p * levels_per_page] == max_def_level) { |
411 | num_values_per_page++; |
412 | num_values++; |
413 | } |
414 | } |
415 | values_per_page[p] = num_values_per_page; |
416 | } |
417 | } else { |
418 | num_values = num_levels; |
419 | } |
420 | // Create repitition levels |
421 | if (max_rep_level > 0) { |
422 | rep_levels.resize(num_levels); |
423 | random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data()); |
424 | } |
425 | // Create values |
426 | values.resize(num_values); |
427 | if (encoding == Encoding::PLAIN) { |
428 | InitValues<typename Type::c_type>(num_values, values, buffer); |
429 | PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level, |
430 | levels_per_page, values_per_page, pages); |
431 | } else if (encoding == Encoding::RLE_DICTIONARY || |
432 | encoding == Encoding::PLAIN_DICTIONARY) { |
433 | // Calls InitValues and repeats the data |
434 | InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer); |
435 | PaginateDict<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level, |
436 | levels_per_page, values_per_page, pages); |
437 | } |
438 | |
439 | return num_values; |
440 | } |
441 | |
442 | } // namespace test |
443 | |
444 | } // namespace parquet |
445 | |