1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_COLUMN_PROPERTIES_H
19#define PARQUET_COLUMN_PROPERTIES_H
20
21#include <memory>
22#include <string>
23#include <unordered_map>
24
25#include "parquet/exception.h"
26#include "parquet/parquet_version.h"
27#include "parquet/schema.h"
28#include "parquet/types.h"
29#include "parquet/util/macros.h"
30#include "parquet/util/memory.h"
31#include "parquet/util/visibility.h"
32
33namespace parquet {
34
35struct ParquetVersion {
36 enum type { PARQUET_1_0, PARQUET_2_0 };
37};
38
39static int64_t DEFAULT_BUFFER_SIZE = 0;
40static bool DEFAULT_USE_BUFFERED_STREAM = false;
41
42class PARQUET_EXPORT ReaderProperties {
43 public:
44 explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
45 : pool_(pool) {
46 buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
47 buffer_size_ = DEFAULT_BUFFER_SIZE;
48 }
49
50 ::arrow::MemoryPool* memory_pool() const { return pool_; }
51
52 std::unique_ptr<InputStream> GetStream(RandomAccessSource* source, int64_t start,
53 int64_t num_bytes) {
54 std::unique_ptr<InputStream> stream;
55 if (buffered_stream_enabled_) {
56 stream.reset(
57 new BufferedInputStream(pool_, buffer_size_, source, start, num_bytes));
58 } else {
59 stream.reset(new InMemoryInputStream(source, start, num_bytes));
60 }
61 return stream;
62 }
63
64 bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; }
65
66 void enable_buffered_stream() { buffered_stream_enabled_ = true; }
67
68 void disable_buffered_stream() { buffered_stream_enabled_ = false; }
69
70 void set_buffer_size(int64_t buf_size) { buffer_size_ = buf_size; }
71
72 int64_t buffer_size() const { return buffer_size_; }
73
74 private:
75 ::arrow::MemoryPool* pool_;
76 int64_t buffer_size_;
77 bool buffered_stream_enabled_;
78};
79
80ReaderProperties PARQUET_EXPORT default_reader_properties();
81
82static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
83static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
84static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
85static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
86static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
87static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
88static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
89static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
90static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
91 ParquetVersion::PARQUET_1_0;
92static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
93static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
94
95class PARQUET_EXPORT ColumnProperties {
96 public:
97 ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
98 Compression::type codec = DEFAULT_COMPRESSION_TYPE,
99 bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
100 bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
101 size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE)
102 : encoding_(encoding),
103 codec_(codec),
104 dictionary_enabled_(dictionary_enabled),
105 statistics_enabled_(statistics_enabled),
106 max_stats_size_(max_stats_size) {}
107
108 void set_encoding(Encoding::type encoding) { encoding_ = encoding; }
109
110 void set_compression(Compression::type codec) { codec_ = codec; }
111
112 void set_dictionary_enabled(bool dictionary_enabled) {
113 dictionary_enabled_ = dictionary_enabled;
114 }
115
116 void set_statistics_enabled(bool statistics_enabled) {
117 statistics_enabled_ = statistics_enabled;
118 }
119
120 void set_max_statistics_size(size_t max_stats_size) {
121 max_stats_size_ = max_stats_size;
122 }
123
124 Encoding::type encoding() const { return encoding_; }
125
126 Compression::type compression() const { return codec_; }
127
128 bool dictionary_enabled() const { return dictionary_enabled_; }
129
130 bool statistics_enabled() const { return statistics_enabled_; }
131
132 size_t max_statistics_size() const { return max_stats_size_; }
133
134 private:
135 Encoding::type encoding_;
136 Compression::type codec_;
137 bool dictionary_enabled_;
138 bool statistics_enabled_;
139 size_t max_stats_size_;
140};
141
142class PARQUET_EXPORT WriterProperties {
143 public:
144 class Builder {
145 public:
146 Builder()
147 : pool_(::arrow::default_memory_pool()),
148 dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
149 write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
150 max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
151 pagesize_(DEFAULT_PAGE_SIZE),
152 version_(DEFAULT_WRITER_VERSION),
153 created_by_(DEFAULT_CREATED_BY) {}
154 virtual ~Builder() {}
155
156 Builder* memory_pool(::arrow::MemoryPool* pool) {
157 pool_ = pool;
158 return this;
159 }
160
161 Builder* enable_dictionary() {
162 default_column_properties_.set_dictionary_enabled(true);
163 return this;
164 }
165
166 Builder* disable_dictionary() {
167 default_column_properties_.set_dictionary_enabled(false);
168 return this;
169 }
170
171 Builder* enable_dictionary(const std::string& path) {
172 dictionary_enabled_[path] = true;
173 return this;
174 }
175
176 Builder* enable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
177 return this->enable_dictionary(path->ToDotString());
178 }
179
180 Builder* disable_dictionary(const std::string& path) {
181 dictionary_enabled_[path] = false;
182 return this;
183 }
184
185 Builder* disable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
186 return this->disable_dictionary(path->ToDotString());
187 }
188
189 Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
190 dictionary_pagesize_limit_ = dictionary_psize_limit;
191 return this;
192 }
193
194 Builder* write_batch_size(int64_t write_batch_size) {
195 write_batch_size_ = write_batch_size;
196 return this;
197 }
198
199 Builder* max_row_group_length(int64_t max_row_group_length) {
200 max_row_group_length_ = max_row_group_length;
201 return this;
202 }
203
204 Builder* data_pagesize(int64_t pg_size) {
205 pagesize_ = pg_size;
206 return this;
207 }
208
209 Builder* version(ParquetVersion::type version) {
210 version_ = version;
211 return this;
212 }
213
214 Builder* created_by(const std::string& created_by) {
215 created_by_ = created_by;
216 return this;
217 }
218
219 /**
220 * Define the encoding that is used when we don't utilise dictionary encoding.
221 *
222 * This either apply if dictionary encoding is disabled or if we fallback
223 * as the dictionary grew too large.
224 */
225 Builder* encoding(Encoding::type encoding_type) {
226 if (encoding_type == Encoding::PLAIN_DICTIONARY ||
227 encoding_type == Encoding::RLE_DICTIONARY) {
228 throw ParquetException("Can't use dictionary encoding as fallback encoding");
229 }
230
231 default_column_properties_.set_encoding(encoding_type);
232 return this;
233 }
234
235 /**
236 * Define the encoding that is used when we don't utilise dictionary encoding.
237 *
238 * This either apply if dictionary encoding is disabled or if we fallback
239 * as the dictionary grew too large.
240 */
241 Builder* encoding(const std::string& path, Encoding::type encoding_type) {
242 if (encoding_type == Encoding::PLAIN_DICTIONARY ||
243 encoding_type == Encoding::RLE_DICTIONARY) {
244 throw ParquetException("Can't use dictionary encoding as fallback encoding");
245 }
246
247 encodings_[path] = encoding_type;
248 return this;
249 }
250
251 /**
252 * Define the encoding that is used when we don't utilise dictionary encoding.
253 *
254 * This either apply if dictionary encoding is disabled or if we fallback
255 * as the dictionary grew too large.
256 */
257 Builder* encoding(const std::shared_ptr<schema::ColumnPath>& path,
258 Encoding::type encoding_type) {
259 return this->encoding(path->ToDotString(), encoding_type);
260 }
261
262 Builder* compression(Compression::type codec) {
263 default_column_properties_.set_compression(codec);
264 return this;
265 }
266
267 Builder* max_statistics_size(size_t max_stats_sz) {
268 default_column_properties_.set_max_statistics_size(max_stats_sz);
269 return this;
270 }
271
272 Builder* compression(const std::string& path, Compression::type codec) {
273 codecs_[path] = codec;
274 return this;
275 }
276
277 Builder* compression(const std::shared_ptr<schema::ColumnPath>& path,
278 Compression::type codec) {
279 return this->compression(path->ToDotString(), codec);
280 }
281
282 Builder* enable_statistics() {
283 default_column_properties_.set_statistics_enabled(true);
284 return this;
285 }
286
287 Builder* disable_statistics() {
288 default_column_properties_.set_statistics_enabled(false);
289 return this;
290 }
291
292 Builder* enable_statistics(const std::string& path) {
293 statistics_enabled_[path] = true;
294 return this;
295 }
296
297 Builder* enable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
298 return this->enable_statistics(path->ToDotString());
299 }
300
301 Builder* disable_statistics(const std::string& path) {
302 statistics_enabled_[path] = false;
303 return this;
304 }
305
306 Builder* disable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
307 return this->disable_statistics(path->ToDotString());
308 }
309
310 std::shared_ptr<WriterProperties> build() {
311 std::unordered_map<std::string, ColumnProperties> column_properties;
312 auto get = [&](const std::string& key) -> ColumnProperties& {
313 auto it = column_properties.find(key);
314 if (it == column_properties.end())
315 return column_properties[key] = default_column_properties_;
316 else
317 return it->second;
318 };
319
320 for (const auto& item : encodings_) get(item.first).set_encoding(item.second);
321 for (const auto& item : codecs_) get(item.first).set_compression(item.second);
322 for (const auto& item : dictionary_enabled_)
323 get(item.first).set_dictionary_enabled(item.second);
324 for (const auto& item : statistics_enabled_)
325 get(item.first).set_statistics_enabled(item.second);
326
327 return std::shared_ptr<WriterProperties>(
328 new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_,
329 max_row_group_length_, pagesize_, version_, created_by_,
330 default_column_properties_, column_properties));
331 }
332
333 private:
334 ::arrow::MemoryPool* pool_;
335 int64_t dictionary_pagesize_limit_;
336 int64_t write_batch_size_;
337 int64_t max_row_group_length_;
338 int64_t pagesize_;
339 ParquetVersion::type version_;
340 std::string created_by_;
341
342 // Settings used for each column unless overridden in any of the maps below
343 ColumnProperties default_column_properties_;
344 std::unordered_map<std::string, Encoding::type> encodings_;
345 std::unordered_map<std::string, Compression::type> codecs_;
346 std::unordered_map<std::string, bool> dictionary_enabled_;
347 std::unordered_map<std::string, bool> statistics_enabled_;
348 };
349
350 inline ::arrow::MemoryPool* memory_pool() const { return pool_; }
351
352 inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
353
354 inline int64_t write_batch_size() const { return write_batch_size_; }
355
356 inline int64_t max_row_group_length() const { return max_row_group_length_; }
357
358 inline int64_t data_pagesize() const { return pagesize_; }
359
360 inline ParquetVersion::type version() const { return parquet_version_; }
361
362 inline std::string created_by() const { return parquet_created_by_; }
363
364 inline Encoding::type dictionary_index_encoding() const {
365 if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
366 return Encoding::PLAIN_DICTIONARY;
367 } else {
368 return Encoding::RLE_DICTIONARY;
369 }
370 }
371
372 inline Encoding::type dictionary_page_encoding() const {
373 if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
374 return Encoding::PLAIN_DICTIONARY;
375 } else {
376 return Encoding::PLAIN;
377 }
378 }
379
380 const ColumnProperties& column_properties(
381 const std::shared_ptr<schema::ColumnPath>& path) const {
382 auto it = column_properties_.find(path->ToDotString());
383 if (it != column_properties_.end()) return it->second;
384 return default_column_properties_;
385 }
386
387 Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
388 return column_properties(path).encoding();
389 }
390
391 Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
392 return column_properties(path).compression();
393 }
394
395 bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
396 return column_properties(path).dictionary_enabled();
397 }
398
399 bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
400 return column_properties(path).statistics_enabled();
401 }
402
403 size_t max_statistics_size(const std::shared_ptr<schema::ColumnPath>& path) const {
404 return column_properties(path).max_statistics_size();
405 }
406
407 private:
408 explicit WriterProperties(
409 ::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
410 int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize,
411 ParquetVersion::type version, const std::string& created_by,
412 const ColumnProperties& default_column_properties,
413 const std::unordered_map<std::string, ColumnProperties>& column_properties)
414 : pool_(pool),
415 dictionary_pagesize_limit_(dictionary_pagesize_limit),
416 write_batch_size_(write_batch_size),
417 max_row_group_length_(max_row_group_length),
418 pagesize_(pagesize),
419 parquet_version_(version),
420 parquet_created_by_(created_by),
421 default_column_properties_(default_column_properties),
422 column_properties_(column_properties) {}
423
424 ::arrow::MemoryPool* pool_;
425 int64_t dictionary_pagesize_limit_;
426 int64_t write_batch_size_;
427 int64_t max_row_group_length_;
428 int64_t pagesize_;
429 ParquetVersion::type parquet_version_;
430 std::string parquet_created_by_;
431 ColumnProperties default_column_properties_;
432 std::unordered_map<std::string, ColumnProperties> column_properties_;
433};
434
435std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
436
437} // namespace parquet
438
439#endif // PARQUET_COLUMN_PROPERTIES_H
440