1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_COLUMN_PROPERTIES_H
19#define PARQUET_COLUMN_PROPERTIES_H
20
21#include <memory>
22#include <string>
23#include <unordered_map>
24#include <unordered_set>
25
26#include "arrow/type.h"
27#include "arrow/util/compression.h"
28
29#include "parquet/exception.h"
30#include "parquet/parquet_version.h"
31#include "parquet/platform.h"
32#include "parquet/schema.h"
33#include "parquet/types.h"
34
35namespace parquet {
36
37struct ParquetVersion {
38 enum type { PARQUET_1_0, PARQUET_2_0 };
39};
40
41static int64_t DEFAULT_BUFFER_SIZE = 1024;
42static bool DEFAULT_USE_BUFFERED_STREAM = false;
43
44class PARQUET_EXPORT ReaderProperties {
45 public:
46 explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool())
47 : pool_(pool) {
48 buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
49 buffer_size_ = DEFAULT_BUFFER_SIZE;
50 }
51
52 MemoryPool* memory_pool() const { return pool_; }
53
54 std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source,
55 int64_t start, int64_t num_bytes);
56
57 bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; }
58
59 void enable_buffered_stream() { buffered_stream_enabled_ = true; }
60
61 void disable_buffered_stream() { buffered_stream_enabled_ = false; }
62
63 void set_buffer_size(int64_t buf_size) { buffer_size_ = buf_size; }
64
65 int64_t buffer_size() const { return buffer_size_; }
66
67 private:
68 MemoryPool* pool_;
69 int64_t buffer_size_;
70 bool buffered_stream_enabled_;
71};
72
73ReaderProperties PARQUET_EXPORT default_reader_properties();
74
75static constexpr int64_t kDefaultDataPageSize = 1024 * 1024;
76static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
77static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize;
78static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
79static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
80static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
81static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
82static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
83static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
84 ParquetVersion::PARQUET_1_0;
85static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
86static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
87
88class PARQUET_EXPORT ColumnProperties {
89 public:
90 ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
91 Compression::type codec = DEFAULT_COMPRESSION_TYPE,
92 bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
93 bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
94 size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE)
95 : encoding_(encoding),
96 codec_(codec),
97 dictionary_enabled_(dictionary_enabled),
98 statistics_enabled_(statistics_enabled),
99 max_stats_size_(max_stats_size),
100 compression_level_(Codec::UseDefaultCompressionLevel()) {}
101
102 void set_encoding(Encoding::type encoding) { encoding_ = encoding; }
103
104 void set_compression(Compression::type codec) { codec_ = codec; }
105
106 void set_dictionary_enabled(bool dictionary_enabled) {
107 dictionary_enabled_ = dictionary_enabled;
108 }
109
110 void set_statistics_enabled(bool statistics_enabled) {
111 statistics_enabled_ = statistics_enabled;
112 }
113
114 void set_max_statistics_size(size_t max_stats_size) {
115 max_stats_size_ = max_stats_size;
116 }
117
118 void set_compression_level(int compression_level) {
119 compression_level_ = compression_level;
120 }
121
122 Encoding::type encoding() const { return encoding_; }
123
124 Compression::type compression() const { return codec_; }
125
126 bool dictionary_enabled() const { return dictionary_enabled_; }
127
128 bool statistics_enabled() const { return statistics_enabled_; }
129
130 size_t max_statistics_size() const { return max_stats_size_; }
131
132 int compression_level() const { return compression_level_; }
133
134 private:
135 Encoding::type encoding_;
136 Compression::type codec_;
137 bool dictionary_enabled_;
138 bool statistics_enabled_;
139 size_t max_stats_size_;
140 int compression_level_;
141};
142
143class PARQUET_EXPORT WriterProperties {
144 public:
145 class Builder {
146 public:
147 Builder()
148 : pool_(::arrow::default_memory_pool()),
149 dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
150 write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
151 max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
152 pagesize_(kDefaultDataPageSize),
153 version_(DEFAULT_WRITER_VERSION),
154 created_by_(DEFAULT_CREATED_BY) {}
155 virtual ~Builder() {}
156
157 Builder* memory_pool(MemoryPool* pool) {
158 pool_ = pool;
159 return this;
160 }
161
162 Builder* enable_dictionary() {
163 default_column_properties_.set_dictionary_enabled(true);
164 return this;
165 }
166
167 Builder* disable_dictionary() {
168 default_column_properties_.set_dictionary_enabled(false);
169 return this;
170 }
171
172 Builder* enable_dictionary(const std::string& path) {
173 dictionary_enabled_[path] = true;
174 return this;
175 }
176
177 Builder* enable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
178 return this->enable_dictionary(path->ToDotString());
179 }
180
181 Builder* disable_dictionary(const std::string& path) {
182 dictionary_enabled_[path] = false;
183 return this;
184 }
185
186 Builder* disable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
187 return this->disable_dictionary(path->ToDotString());
188 }
189
190 Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
191 dictionary_pagesize_limit_ = dictionary_psize_limit;
192 return this;
193 }
194
195 Builder* write_batch_size(int64_t write_batch_size) {
196 write_batch_size_ = write_batch_size;
197 return this;
198 }
199
200 Builder* max_row_group_length(int64_t max_row_group_length) {
201 max_row_group_length_ = max_row_group_length;
202 return this;
203 }
204
205 Builder* data_pagesize(int64_t pg_size) {
206 pagesize_ = pg_size;
207 return this;
208 }
209
210 Builder* version(ParquetVersion::type version) {
211 version_ = version;
212 return this;
213 }
214
215 Builder* created_by(const std::string& created_by) {
216 created_by_ = created_by;
217 return this;
218 }
219
220 /**
221 * Define the encoding that is used when we don't utilise dictionary encoding.
222 *
223 * This either apply if dictionary encoding is disabled or if we fallback
224 * as the dictionary grew too large.
225 */
226 Builder* encoding(Encoding::type encoding_type) {
227 if (encoding_type == Encoding::PLAIN_DICTIONARY ||
228 encoding_type == Encoding::RLE_DICTIONARY) {
229 throw ParquetException("Can't use dictionary encoding as fallback encoding");
230 }
231
232 default_column_properties_.set_encoding(encoding_type);
233 return this;
234 }
235
236 /**
237 * Define the encoding that is used when we don't utilise dictionary encoding.
238 *
239 * This either apply if dictionary encoding is disabled or if we fallback
240 * as the dictionary grew too large.
241 */
242 Builder* encoding(const std::string& path, Encoding::type encoding_type) {
243 if (encoding_type == Encoding::PLAIN_DICTIONARY ||
244 encoding_type == Encoding::RLE_DICTIONARY) {
245 throw ParquetException("Can't use dictionary encoding as fallback encoding");
246 }
247
248 encodings_[path] = encoding_type;
249 return this;
250 }
251
252 /**
253 * Define the encoding that is used when we don't utilise dictionary encoding.
254 *
255 * This either apply if dictionary encoding is disabled or if we fallback
256 * as the dictionary grew too large.
257 */
258 Builder* encoding(const std::shared_ptr<schema::ColumnPath>& path,
259 Encoding::type encoding_type) {
260 return this->encoding(path->ToDotString(), encoding_type);
261 }
262
263 Builder* compression(Compression::type codec) {
264 default_column_properties_.set_compression(codec);
265 return this;
266 }
267
268 Builder* max_statistics_size(size_t max_stats_sz) {
269 default_column_properties_.set_max_statistics_size(max_stats_sz);
270 return this;
271 }
272
273 Builder* compression(const std::string& path, Compression::type codec) {
274 codecs_[path] = codec;
275 return this;
276 }
277
278 Builder* compression(const std::shared_ptr<schema::ColumnPath>& path,
279 Compression::type codec) {
280 return this->compression(path->ToDotString(), codec);
281 }
282
283 /// \brief Specify the default compression level for the compressor in
284 /// every column. In case a column does not have an explicitly specified
285 /// compression level, the default one would be used.
286 ///
287 /// The provided compression level is compressor specific. The user would
288 /// have to familiarize oneself with the available levels for the selected
289 /// compressor. If the compressor does not allow for selecting different
290 /// compression levels, calling this function would not have any effect.
291 /// Parquet and Arrow do not validate the passed compression level. If no
292 /// level is selected by the user or if the special
293 /// std::numeric_limits<int>::min() value is passed, then Arrow selects the
294 /// compression level.
295 Builder* compression_level(int compression_level) {
296 default_column_properties_.set_compression_level(compression_level);
297 return this;
298 }
299
300 /// \brief Specify a compression level for the compressor for the column
301 /// described by path.
302 ///
303 /// The provided compression level is compressor specific. The user would
304 /// have to familiarize oneself with the available levels for the selected
305 /// compressor. If the compressor does not allow for selecting different
306 /// compression levels, calling this function would not have any effect.
307 /// Parquet and Arrow do not validate the passed compression level. If no
308 /// level is selected by the user or if the special
309 /// std::numeric_limits<int>::min() value is passed, then Arrow selects the
310 /// compression level.
311 Builder* compression_level(const std::string& path, int compression_level) {
312 codecs_compression_level_[path] = compression_level;
313 return this;
314 }
315
316 /// \brief Specify a compression level for the compressor for the column
317 /// described by path.
318 ///
319 /// The provided compression level is compressor specific. The user would
320 /// have to familiarize oneself with the available levels for the selected
321 /// compressor. If the compressor does not allow for selecting different
322 /// compression levels, calling this function would not have any effect.
323 /// Parquet and Arrow do not validate the passed compression level. If no
324 /// level is selected by the user or if the special
325 /// std::numeric_limits<int>::min() value is passed, then Arrow selects the
326 /// compression level.
327 Builder* compression_level(const std::shared_ptr<schema::ColumnPath>& path,
328 int compression_level) {
329 return this->compression_level(path->ToDotString(), compression_level);
330 }
331
332 Builder* enable_statistics() {
333 default_column_properties_.set_statistics_enabled(true);
334 return this;
335 }
336
337 Builder* disable_statistics() {
338 default_column_properties_.set_statistics_enabled(false);
339 return this;
340 }
341
342 Builder* enable_statistics(const std::string& path) {
343 statistics_enabled_[path] = true;
344 return this;
345 }
346
347 Builder* enable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
348 return this->enable_statistics(path->ToDotString());
349 }
350
351 Builder* disable_statistics(const std::string& path) {
352 statistics_enabled_[path] = false;
353 return this;
354 }
355
356 Builder* disable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
357 return this->disable_statistics(path->ToDotString());
358 }
359
360 std::shared_ptr<WriterProperties> build() {
361 std::unordered_map<std::string, ColumnProperties> column_properties;
362 auto get = [&](const std::string& key) -> ColumnProperties& {
363 auto it = column_properties.find(key);
364 if (it == column_properties.end())
365 return column_properties[key] = default_column_properties_;
366 else
367 return it->second;
368 };
369
370 for (const auto& item : encodings_) get(item.first).set_encoding(item.second);
371 for (const auto& item : codecs_) get(item.first).set_compression(item.second);
372 for (const auto& item : codecs_compression_level_)
373 get(item.first).set_compression_level(item.second);
374 for (const auto& item : dictionary_enabled_)
375 get(item.first).set_dictionary_enabled(item.second);
376 for (const auto& item : statistics_enabled_)
377 get(item.first).set_statistics_enabled(item.second);
378
379 return std::shared_ptr<WriterProperties>(
380 new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_,
381 max_row_group_length_, pagesize_, version_, created_by_,
382 default_column_properties_, column_properties));
383 }
384
385 private:
386 MemoryPool* pool_;
387 int64_t dictionary_pagesize_limit_;
388 int64_t write_batch_size_;
389 int64_t max_row_group_length_;
390 int64_t pagesize_;
391 ParquetVersion::type version_;
392 std::string created_by_;
393
394 // Settings used for each column unless overridden in any of the maps below
395 ColumnProperties default_column_properties_;
396 std::unordered_map<std::string, Encoding::type> encodings_;
397 std::unordered_map<std::string, Compression::type> codecs_;
398 std::unordered_map<std::string, int32_t> codecs_compression_level_;
399 std::unordered_map<std::string, bool> dictionary_enabled_;
400 std::unordered_map<std::string, bool> statistics_enabled_;
401 };
402
403 inline MemoryPool* memory_pool() const { return pool_; }
404
405 inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
406
407 inline int64_t write_batch_size() const { return write_batch_size_; }
408
409 inline int64_t max_row_group_length() const { return max_row_group_length_; }
410
411 inline int64_t data_pagesize() const { return pagesize_; }
412
413 inline ParquetVersion::type version() const { return parquet_version_; }
414
415 inline std::string created_by() const { return parquet_created_by_; }
416
417 inline Encoding::type dictionary_index_encoding() const {
418 if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
419 return Encoding::PLAIN_DICTIONARY;
420 } else {
421 return Encoding::RLE_DICTIONARY;
422 }
423 }
424
425 inline Encoding::type dictionary_page_encoding() const {
426 if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
427 return Encoding::PLAIN_DICTIONARY;
428 } else {
429 return Encoding::PLAIN;
430 }
431 }
432
433 const ColumnProperties& column_properties(
434 const std::shared_ptr<schema::ColumnPath>& path) const {
435 auto it = column_properties_.find(path->ToDotString());
436 if (it != column_properties_.end()) return it->second;
437 return default_column_properties_;
438 }
439
440 Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
441 return column_properties(path).encoding();
442 }
443
444 Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
445 return column_properties(path).compression();
446 }
447
448 int compression_level(const std::shared_ptr<schema::ColumnPath>& path) const {
449 return column_properties(path).compression_level();
450 }
451
452 bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
453 return column_properties(path).dictionary_enabled();
454 }
455
456 bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
457 return column_properties(path).statistics_enabled();
458 }
459
460 size_t max_statistics_size(const std::shared_ptr<schema::ColumnPath>& path) const {
461 return column_properties(path).max_statistics_size();
462 }
463
464 private:
465 explicit WriterProperties(
466 MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size,
467 int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version,
468 const std::string& created_by, const ColumnProperties& default_column_properties,
469 const std::unordered_map<std::string, ColumnProperties>& column_properties)
470 : pool_(pool),
471 dictionary_pagesize_limit_(dictionary_pagesize_limit),
472 write_batch_size_(write_batch_size),
473 max_row_group_length_(max_row_group_length),
474 pagesize_(pagesize),
475 parquet_version_(version),
476 parquet_created_by_(created_by),
477 default_column_properties_(default_column_properties),
478 column_properties_(column_properties) {}
479
480 MemoryPool* pool_;
481 int64_t dictionary_pagesize_limit_;
482 int64_t write_batch_size_;
483 int64_t max_row_group_length_;
484 int64_t pagesize_;
485 ParquetVersion::type parquet_version_;
486 std::string parquet_created_by_;
487 ColumnProperties default_column_properties_;
488 std::unordered_map<std::string, ColumnProperties> column_properties_;
489};
490
491std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
492
493// ----------------------------------------------------------------------
494// Properties specific to Apache Arrow columnar read and write
495
496static constexpr bool kArrowDefaultUseThreads = false;
497
498// Default number of rows to read when using ::arrow::RecordBatchReader
499static constexpr int64_t kArrowDefaultBatchSize = 64 * 1024;
500
501/// EXPERIMENTAL: Properties for configuring FileReader behavior.
502class PARQUET_EXPORT ArrowReaderProperties {
503 public:
504 explicit ArrowReaderProperties(bool use_threads = kArrowDefaultUseThreads)
505 : use_threads_(use_threads),
506 read_dict_indices_(),
507 batch_size_(kArrowDefaultBatchSize) {}
508
509 void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
510
511 bool use_threads() const { return use_threads_; }
512
513 void set_read_dictionary(int column_index, bool read_dict) {
514 if (read_dict) {
515 read_dict_indices_.insert(column_index);
516 } else {
517 read_dict_indices_.erase(column_index);
518 }
519 }
520 bool read_dictionary(int column_index) const {
521 if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) {
522 return true;
523 } else {
524 return false;
525 }
526 }
527
528 void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; }
529
530 int64_t batch_size() const { return batch_size_; }
531
532 private:
533 bool use_threads_;
534 std::unordered_set<int> read_dict_indices_;
535 int64_t batch_size_;
536};
537
538/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
539PARQUET_EXPORT
540ArrowReaderProperties default_arrow_reader_properties();
541
542class PARQUET_EXPORT ArrowWriterProperties {
543 public:
544 class Builder {
545 public:
546 Builder()
547 : write_timestamps_as_int96_(false),
548 coerce_timestamps_enabled_(false),
549 coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
550 truncated_timestamps_allowed_(false),
551 store_schema_(false) {}
552 virtual ~Builder() {}
553
554 Builder* disable_deprecated_int96_timestamps() {
555 write_timestamps_as_int96_ = false;
556 return this;
557 }
558
559 Builder* enable_deprecated_int96_timestamps() {
560 write_timestamps_as_int96_ = true;
561 return this;
562 }
563
564 Builder* coerce_timestamps(::arrow::TimeUnit::type unit) {
565 coerce_timestamps_enabled_ = true;
566 coerce_timestamps_unit_ = unit;
567 return this;
568 }
569
570 Builder* allow_truncated_timestamps() {
571 truncated_timestamps_allowed_ = true;
572 return this;
573 }
574
575 Builder* disallow_truncated_timestamps() {
576 truncated_timestamps_allowed_ = false;
577 return this;
578 }
579
580 /// \brief EXPERIMENTAL: Write binary serialized Arrow schema to the file,
581 /// to enable certain read options (like "read_dictionary") to be set
582 /// automatically
583 Builder* store_schema() {
584 store_schema_ = true;
585 return this;
586 }
587
588 std::shared_ptr<ArrowWriterProperties> build() {
589 return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
590 write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
591 truncated_timestamps_allowed_, store_schema_));
592 }
593
594 private:
595 bool write_timestamps_as_int96_;
596
597 bool coerce_timestamps_enabled_;
598 ::arrow::TimeUnit::type coerce_timestamps_unit_;
599 bool truncated_timestamps_allowed_;
600
601 bool store_schema_;
602 };
603
604 bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
605
606 bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
607 ::arrow::TimeUnit::type coerce_timestamps_unit() const {
608 return coerce_timestamps_unit_;
609 }
610
611 bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; }
612
613 bool store_schema() const { return store_schema_; }
614
615 private:
616 explicit ArrowWriterProperties(bool write_nanos_as_int96,
617 bool coerce_timestamps_enabled,
618 ::arrow::TimeUnit::type coerce_timestamps_unit,
619 bool truncated_timestamps_allowed, bool store_schema)
620 : write_timestamps_as_int96_(write_nanos_as_int96),
621 coerce_timestamps_enabled_(coerce_timestamps_enabled),
622 coerce_timestamps_unit_(coerce_timestamps_unit),
623 truncated_timestamps_allowed_(truncated_timestamps_allowed),
624 store_schema_(store_schema) {}
625
626 const bool write_timestamps_as_int96_;
627 const bool coerce_timestamps_enabled_;
628 const ::arrow::TimeUnit::type coerce_timestamps_unit_;
629 const bool truncated_timestamps_allowed_;
630 const bool store_schema_;
631};
632
633/// \brief State object used for writing Arrow data directly to a Parquet
634/// column chunk. API possibly not stable
635struct ArrowWriteContext {
636 ArrowWriteContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
637 : memory_pool(memory_pool),
638 properties(properties),
639 data_buffer(AllocateBuffer(memory_pool)),
640 def_levels_buffer(AllocateBuffer(memory_pool)) {}
641
642 template <typename T>
643 ::arrow::Status GetScratchData(const int64_t num_values, T** out) {
644 ARROW_RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
645 *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
646 return ::arrow::Status::OK();
647 }
648
649 MemoryPool* memory_pool;
650 const ArrowWriterProperties* properties;
651
652 // Buffer used for storing the data of an array converted to the physical type
653 // as expected by parquet-cpp.
654 std::shared_ptr<ResizableBuffer> data_buffer;
655
656 // We use the shared ownership of this buffer
657 std::shared_ptr<ResizableBuffer> def_levels_buffer;
658};
659
660PARQUET_EXPORT
661std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties();
662
663} // namespace parquet
664
665#endif // PARQUET_COLUMN_PROPERTIES_H
666