1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <gtest/gtest.h> |
19 | |
20 | #include <algorithm> |
21 | #include <array> |
22 | #include <cstdint> |
23 | #include <cstring> |
24 | #include <memory> |
25 | #include <vector> |
26 | |
27 | #include "parquet/column_reader.h" |
28 | #include "parquet/column_writer.h" |
29 | #include "parquet/file_reader.h" |
30 | #include "parquet/file_writer.h" |
31 | #include "parquet/schema.h" |
32 | #include "parquet/statistics.h" |
33 | #include "parquet/test-specialization.h" |
34 | #include "parquet/test-util.h" |
35 | #include "parquet/thrift.h" |
36 | #include "parquet/types.h" |
37 | #include "parquet/util/memory.h" |
38 | |
39 | using arrow::default_memory_pool; |
40 | using arrow::MemoryPool; |
41 | |
42 | namespace parquet { |
43 | |
44 | using schema::GroupNode; |
45 | using schema::NodePtr; |
46 | using schema::PrimitiveNode; |
47 | |
48 | namespace test { |
49 | |
50 | template <typename TestType> |
51 | class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> { |
52 | public: |
53 | using T = typename TestType::c_type; |
54 | using TypedStats = TypedRowGroupStatistics<TestType>; |
55 | |
56 | std::vector<T> GetDeepCopy( |
57 | const std::vector<T>&); // allocates new memory for FLBA/ByteArray |
58 | |
59 | T* GetValuesPointer(std::vector<T>&); |
60 | void DeepFree(std::vector<T>&); |
61 | |
62 | void TestMinMaxEncode() { |
63 | this->GenerateData(1000); |
64 | |
65 | TypedStats statistics1(this->schema_.Column(0)); |
66 | statistics1.Update(this->values_ptr_, this->values_.size(), 0); |
67 | std::string encoded_min = statistics1.EncodeMin(); |
68 | std::string encoded_max = statistics1.EncodeMax(); |
69 | |
70 | TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max, |
71 | this->values_.size(), 0, 0, true); |
72 | |
73 | TypedStats statistics3(this->schema_.Column(0)); |
74 | std::vector<uint8_t> valid_bits( |
75 | BitUtil::BytesForBits(static_cast<uint32_t>(this->values_.size())) + 1, 255); |
76 | statistics3.UpdateSpaced(this->values_ptr_, valid_bits.data(), 0, |
77 | this->values_.size(), 0); |
78 | std::string encoded_min_spaced = statistics3.EncodeMin(); |
79 | std::string encoded_max_spaced = statistics3.EncodeMax(); |
80 | |
81 | ASSERT_EQ(encoded_min, statistics2.EncodeMin()); |
82 | ASSERT_EQ(encoded_max, statistics2.EncodeMax()); |
83 | ASSERT_EQ(statistics1.min(), statistics2.min()); |
84 | ASSERT_EQ(statistics1.max(), statistics2.max()); |
85 | ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin()); |
86 | ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax()); |
87 | ASSERT_EQ(statistics3.min(), statistics2.min()); |
88 | ASSERT_EQ(statistics3.max(), statistics2.max()); |
89 | } |
90 | |
91 | void TestReset() { |
92 | this->GenerateData(1000); |
93 | |
94 | TypedStats statistics(this->schema_.Column(0)); |
95 | statistics.Update(this->values_ptr_, this->values_.size(), 0); |
96 | ASSERT_EQ(this->values_.size(), statistics.num_values()); |
97 | |
98 | statistics.Reset(); |
99 | ASSERT_EQ(0, statistics.null_count()); |
100 | ASSERT_EQ(0, statistics.num_values()); |
101 | ASSERT_EQ("" , statistics.EncodeMin()); |
102 | ASSERT_EQ("" , statistics.EncodeMax()); |
103 | } |
104 | |
105 | void TestMerge() { |
106 | int num_null[2]; |
107 | random_numbers(2, 42, 0, 100, num_null); |
108 | |
109 | TypedStats statistics1(this->schema_.Column(0)); |
110 | this->GenerateData(1000); |
111 | statistics1.Update(this->values_ptr_, this->values_.size() - num_null[0], |
112 | num_null[0]); |
113 | |
114 | TypedStats statistics2(this->schema_.Column(0)); |
115 | this->GenerateData(1000); |
116 | statistics2.Update(this->values_ptr_, this->values_.size() - num_null[1], |
117 | num_null[1]); |
118 | |
119 | TypedStats total(this->schema_.Column(0)); |
120 | total.Merge(statistics1); |
121 | total.Merge(statistics2); |
122 | |
123 | ASSERT_EQ(num_null[0] + num_null[1], total.null_count()); |
124 | ASSERT_EQ(this->values_.size() * 2 - num_null[0] - num_null[1], total.num_values()); |
125 | ASSERT_EQ(total.min(), std::min(statistics1.min(), statistics2.min())); |
126 | ASSERT_EQ(total.max(), std::max(statistics1.max(), statistics2.max())); |
127 | } |
128 | |
129 | void TestFullRoundtrip(int64_t num_values, int64_t null_count) { |
130 | this->GenerateData(num_values); |
131 | |
132 | // compute statistics for the whole batch |
133 | TypedStats expected_stats(this->schema_.Column(0)); |
134 | expected_stats.Update(this->values_ptr_, num_values - null_count, null_count); |
135 | |
136 | auto sink = std::make_shared<InMemoryOutputStream>(); |
137 | auto gnode = std::static_pointer_cast<GroupNode>(this->node_); |
138 | std::shared_ptr<WriterProperties> writer_properties = |
139 | WriterProperties::Builder().enable_statistics("column" )->build(); |
140 | auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties); |
141 | auto row_group_writer = file_writer->AppendRowGroup(); |
142 | auto column_writer = |
143 | static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn()); |
144 | |
145 | // simulate the case when data comes from multiple buffers, |
146 | // in which case special care is necessary for FLBA/ByteArray types |
147 | for (int i = 0; i < 2; i++) { |
148 | int64_t batch_num_values = i ? num_values - num_values / 2 : num_values / 2; |
149 | int64_t batch_null_count = i ? null_count : 0; |
150 | DCHECK(null_count <= num_values); // avoid too much headache |
151 | std::vector<int16_t> definition_levels(batch_null_count, 0); |
152 | definition_levels.insert(definition_levels.end(), |
153 | batch_num_values - batch_null_count, 1); |
154 | auto beg = this->values_.begin() + i * num_values / 2; |
155 | auto end = beg + batch_num_values; |
156 | std::vector<T> batch = GetDeepCopy(std::vector<T>(beg, end)); |
157 | T* batch_values_ptr = GetValuesPointer(batch); |
158 | column_writer->WriteBatch(batch_num_values, definition_levels.data(), nullptr, |
159 | batch_values_ptr); |
160 | DeepFree(batch); |
161 | } |
162 | column_writer->Close(); |
163 | row_group_writer->Close(); |
164 | file_writer->Close(); |
165 | |
166 | auto buffer = sink->GetBuffer(); |
167 | auto source = std::make_shared<::arrow::io::BufferReader>(buffer); |
168 | auto file_reader = ParquetFileReader::Open(source); |
169 | auto rg_reader = file_reader->RowGroup(0); |
170 | auto column_chunk = rg_reader->metadata()->ColumnChunk(0); |
171 | if (!column_chunk->is_stats_set()) return; |
172 | std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics(); |
173 | // check values after serialization + deserialization |
174 | ASSERT_EQ(null_count, stats->null_count()); |
175 | ASSERT_EQ(num_values - null_count, stats->num_values()); |
176 | ASSERT_EQ(expected_stats.EncodeMin(), stats->EncodeMin()); |
177 | ASSERT_EQ(expected_stats.EncodeMax(), stats->EncodeMax()); |
178 | } |
179 | }; |
180 | |
181 | template <typename TestType> |
182 | typename TestType::c_type* TestRowGroupStatistics<TestType>::GetValuesPointer( |
183 | std::vector<typename TestType::c_type>& values) { |
184 | return values.data(); |
185 | } |
186 | |
187 | template <> |
188 | bool* TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& values) { |
189 | static std::vector<uint8_t> bool_buffer; |
190 | bool_buffer.clear(); |
191 | bool_buffer.resize(values.size()); |
192 | std::copy(values.begin(), values.end(), bool_buffer.begin()); |
193 | return reinterpret_cast<bool*>(bool_buffer.data()); |
194 | } |
195 | |
196 | template <typename TestType> |
197 | typename std::vector<typename TestType::c_type> |
198 | TestRowGroupStatistics<TestType>::GetDeepCopy( |
199 | const std::vector<typename TestType::c_type>& values) { |
200 | return values; |
201 | } |
202 | |
203 | template <> |
204 | std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy( |
205 | const std::vector<FLBA>& values) { |
206 | std::vector<FLBA> copy; |
207 | MemoryPool* pool = ::arrow::default_memory_pool(); |
208 | for (const FLBA& flba : values) { |
209 | uint8_t* ptr; |
210 | PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr)); |
211 | memcpy(ptr, flba.ptr, FLBA_LENGTH); |
212 | copy.emplace_back(ptr); |
213 | } |
214 | return copy; |
215 | } |
216 | |
217 | template <> |
218 | std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy( |
219 | const std::vector<ByteArray>& values) { |
220 | std::vector<ByteArray> copy; |
221 | MemoryPool* pool = default_memory_pool(); |
222 | for (const ByteArray& ba : values) { |
223 | uint8_t* ptr; |
224 | PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr)); |
225 | memcpy(ptr, ba.ptr, ba.len); |
226 | copy.emplace_back(ba.len, ptr); |
227 | } |
228 | return copy; |
229 | } |
230 | |
231 | template <typename TestType> |
232 | void TestRowGroupStatistics<TestType>::DeepFree( |
233 | std::vector<typename TestType::c_type>& values) {} |
234 | |
235 | template <> |
236 | void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) { |
237 | MemoryPool* pool = default_memory_pool(); |
238 | for (FLBA& flba : values) { |
239 | auto ptr = const_cast<uint8_t*>(flba.ptr); |
240 | memset(ptr, 0, FLBA_LENGTH); |
241 | pool->Free(ptr, FLBA_LENGTH); |
242 | } |
243 | } |
244 | |
245 | template <> |
246 | void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) { |
247 | MemoryPool* pool = default_memory_pool(); |
248 | for (ByteArray& ba : values) { |
249 | auto ptr = const_cast<uint8_t*>(ba.ptr); |
250 | memset(ptr, 0, ba.len); |
251 | pool->Free(ptr, ba.len); |
252 | } |
253 | } |
254 | |
255 | template <> |
256 | void TestRowGroupStatistics<ByteArrayType>::TestMinMaxEncode() { |
257 | this->GenerateData(1000); |
258 | // Test that we encode min max strings correctly |
259 | TypedRowGroupStatistics<ByteArrayType> statistics1(this->schema_.Column(0)); |
260 | statistics1.Update(this->values_ptr_, this->values_.size(), 0); |
261 | std::string encoded_min = statistics1.EncodeMin(); |
262 | std::string encoded_max = statistics1.EncodeMax(); |
263 | |
264 | // encoded is same as unencoded |
265 | ASSERT_EQ(encoded_min, |
266 | std::string((const char*)statistics1.min().ptr, statistics1.min().len)); |
267 | ASSERT_EQ(encoded_max, |
268 | std::string((const char*)statistics1.max().ptr, statistics1.max().len)); |
269 | |
270 | TypedRowGroupStatistics<ByteArrayType> statistics2(this->schema_.Column(0), encoded_min, |
271 | encoded_max, this->values_.size(), 0, |
272 | 0, true); |
273 | |
274 | ASSERT_EQ(encoded_min, statistics2.EncodeMin()); |
275 | ASSERT_EQ(encoded_max, statistics2.EncodeMax()); |
276 | ASSERT_EQ(statistics1.min(), statistics2.min()); |
277 | ASSERT_EQ(statistics1.max(), statistics2.max()); |
278 | } |
279 | |
280 | using TestTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType, |
281 | ByteArrayType, FLBAType, BooleanType>; |
282 | |
283 | TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes); |
284 | |
285 | TYPED_TEST(TestRowGroupStatistics, MinMaxEncode) { |
286 | this->SetUpSchema(Repetition::REQUIRED); |
287 | ASSERT_NO_FATAL_FAILURE(this->TestMinMaxEncode()); |
288 | } |
289 | |
290 | TYPED_TEST(TestRowGroupStatistics, Reset) { |
291 | this->SetUpSchema(Repetition::OPTIONAL); |
292 | ASSERT_NO_FATAL_FAILURE(this->TestReset()); |
293 | } |
294 | |
295 | TYPED_TEST(TestRowGroupStatistics, FullRoundtrip) { |
296 | this->SetUpSchema(Repetition::OPTIONAL); |
297 | ASSERT_NO_FATAL_FAILURE(this->TestFullRoundtrip(100, 31)); |
298 | ASSERT_NO_FATAL_FAILURE(this->TestFullRoundtrip(1000, 415)); |
299 | ASSERT_NO_FATAL_FAILURE(this->TestFullRoundtrip(10000, 926)); |
300 | } |
301 | |
302 | template <typename TestType> |
303 | class TestNumericRowGroupStatistics : public TestRowGroupStatistics<TestType> {}; |
304 | |
305 | using NumericTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType>; |
306 | |
307 | TYPED_TEST_CASE(TestNumericRowGroupStatistics, NumericTypes); |
308 | |
309 | TYPED_TEST(TestNumericRowGroupStatistics, Merge) { |
310 | this->SetUpSchema(Repetition::OPTIONAL); |
311 | ASSERT_NO_FATAL_FAILURE(this->TestMerge()); |
312 | } |
313 | |
314 | // Helper for basic statistics tests below |
315 | void AssertStatsSet(const ApplicationVersion& version, |
316 | std::shared_ptr<parquet::WriterProperties> props, |
317 | const ColumnDescriptor* column, bool expected_is_set) { |
318 | auto metadata_builder = ColumnChunkMetaDataBuilder::Make(props, column); |
319 | auto column_chunk = |
320 | ColumnChunkMetaData::Make(metadata_builder->contents(), column, &version); |
321 | EncodedStatistics stats; |
322 | metadata_builder->SetStatistics(false /* is_signed */, stats); |
323 | ASSERT_EQ(column_chunk->is_stats_set(), expected_is_set); |
324 | } |
325 | |
326 | // Statistics are restricted for few types in older parquet version |
327 | TEST(CorruptStatistics, Basics) { |
328 | std::string created_by = "parquet-mr version 1.8.0" ; |
329 | ApplicationVersion version(created_by); |
330 | SchemaDescriptor schema; |
331 | schema::NodePtr node; |
332 | std::vector<schema::NodePtr> fields; |
333 | // Test Physical Types |
334 | fields.push_back(schema::PrimitiveNode::Make("col1" , Repetition::OPTIONAL, Type::INT32, |
335 | LogicalType::NONE)); |
336 | fields.push_back(schema::PrimitiveNode::Make("col2" , Repetition::OPTIONAL, |
337 | Type::BYTE_ARRAY, LogicalType::NONE)); |
338 | // Test Logical Types |
339 | fields.push_back(schema::PrimitiveNode::Make("col3" , Repetition::OPTIONAL, Type::INT32, |
340 | LogicalType::DATE)); |
341 | fields.push_back(schema::PrimitiveNode::Make("col4" , Repetition::OPTIONAL, Type::INT32, |
342 | LogicalType::UINT_32)); |
343 | fields.push_back(schema::PrimitiveNode::Make("col5" , Repetition::OPTIONAL, |
344 | Type::FIXED_LEN_BYTE_ARRAY, |
345 | LogicalType::INTERVAL, 12)); |
346 | fields.push_back(schema::PrimitiveNode::Make("col6" , Repetition::OPTIONAL, |
347 | Type::BYTE_ARRAY, LogicalType::UTF8)); |
348 | node = schema::GroupNode::Make("schema" , Repetition::REQUIRED, fields); |
349 | schema.Init(node); |
350 | |
351 | parquet::WriterProperties::Builder builder; |
352 | builder.created_by(created_by); |
353 | std::shared_ptr<parquet::WriterProperties> props = builder.build(); |
354 | |
355 | AssertStatsSet(version, props, schema.Column(0), true); |
356 | AssertStatsSet(version, props, schema.Column(1), false); |
357 | AssertStatsSet(version, props, schema.Column(2), true); |
358 | AssertStatsSet(version, props, schema.Column(3), false); |
359 | AssertStatsSet(version, props, schema.Column(4), false); |
360 | AssertStatsSet(version, props, schema.Column(5), false); |
361 | } |
362 | |
363 | // Statistics for all types have no restrictions in newer parquet version |
364 | TEST(CorrectStatistics, Basics) { |
365 | std::string created_by = "parquet-cpp version 1.3.0" ; |
366 | ApplicationVersion version(created_by); |
367 | SchemaDescriptor schema; |
368 | schema::NodePtr node; |
369 | std::vector<schema::NodePtr> fields; |
370 | // Test Physical Types |
371 | fields.push_back(schema::PrimitiveNode::Make("col1" , Repetition::OPTIONAL, Type::INT32, |
372 | LogicalType::NONE)); |
373 | fields.push_back(schema::PrimitiveNode::Make("col2" , Repetition::OPTIONAL, |
374 | Type::BYTE_ARRAY, LogicalType::NONE)); |
375 | // Test Logical Types |
376 | fields.push_back(schema::PrimitiveNode::Make("col3" , Repetition::OPTIONAL, Type::INT32, |
377 | LogicalType::DATE)); |
378 | fields.push_back(schema::PrimitiveNode::Make("col4" , Repetition::OPTIONAL, Type::INT32, |
379 | LogicalType::UINT_32)); |
380 | fields.push_back(schema::PrimitiveNode::Make("col5" , Repetition::OPTIONAL, |
381 | Type::FIXED_LEN_BYTE_ARRAY, |
382 | LogicalType::INTERVAL, 12)); |
383 | fields.push_back(schema::PrimitiveNode::Make("col6" , Repetition::OPTIONAL, |
384 | Type::BYTE_ARRAY, LogicalType::UTF8)); |
385 | node = schema::GroupNode::Make("schema" , Repetition::REQUIRED, fields); |
386 | schema.Init(node); |
387 | |
388 | parquet::WriterProperties::Builder builder; |
389 | builder.created_by(created_by); |
390 | std::shared_ptr<parquet::WriterProperties> props = builder.build(); |
391 | |
392 | AssertStatsSet(version, props, schema.Column(0), true); |
393 | AssertStatsSet(version, props, schema.Column(1), true); |
394 | AssertStatsSet(version, props, schema.Column(2), true); |
395 | AssertStatsSet(version, props, schema.Column(3), true); |
396 | AssertStatsSet(version, props, schema.Column(4), false); |
397 | AssertStatsSet(version, props, schema.Column(5), true); |
398 | } |
399 | |
400 | // Test SortOrder class |
401 | static const int NUM_VALUES = 10; |
402 | |
403 | template <typename TestType> |
404 | class TestStatistics : public ::testing::Test { |
405 | public: |
406 | typedef typename TestType::c_type T; |
407 | |
408 | void AddNodes(std::string name) { |
409 | fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, |
410 | TestType::type_num, LogicalType::NONE)); |
411 | } |
412 | |
413 | void SetUpSchema() { |
414 | stats_.resize(fields_.size()); |
415 | values_.resize(NUM_VALUES); |
416 | schema_ = std::static_pointer_cast<GroupNode>( |
417 | GroupNode::Make("Schema" , Repetition::REQUIRED, fields_)); |
418 | |
419 | parquet_sink_ = std::make_shared<InMemoryOutputStream>(); |
420 | } |
421 | |
422 | void SetValues(); |
423 | |
424 | void WriteParquet() { |
425 | // Add writer properties |
426 | parquet::WriterProperties::Builder builder; |
427 | builder.compression(parquet::Compression::SNAPPY); |
428 | builder.created_by("parquet-cpp version 1.3.0" ); |
429 | std::shared_ptr<parquet::WriterProperties> props = builder.build(); |
430 | |
431 | // Create a ParquetFileWriter instance |
432 | auto file_writer = parquet::ParquetFileWriter::Open(parquet_sink_, schema_, props); |
433 | |
434 | // Append a RowGroup with a specific number of rows. |
435 | auto rg_writer = file_writer->AppendRowGroup(); |
436 | |
437 | this->SetValues(); |
438 | |
439 | // Insert Values |
440 | for (int i = 0; i < static_cast<int>(fields_.size()); i++) { |
441 | auto column_writer = |
442 | static_cast<parquet::TypedColumnWriter<TestType>*>(rg_writer->NextColumn()); |
443 | column_writer->WriteBatch(NUM_VALUES, nullptr, nullptr, values_.data()); |
444 | } |
445 | } |
446 | |
447 | void VerifyParquetStats() { |
448 | auto pbuffer = parquet_sink_->GetBuffer(); |
449 | |
450 | // Create a ParquetReader instance |
451 | std::unique_ptr<parquet::ParquetFileReader> parquet_reader = |
452 | parquet::ParquetFileReader::Open( |
453 | std::make_shared<arrow::io::BufferReader>(pbuffer)); |
454 | |
455 | // Get the File MetaData |
456 | std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); |
457 | std::shared_ptr<parquet::RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0); |
458 | for (int i = 0; i < static_cast<int>(fields_.size()); i++) { |
459 | std::shared_ptr<parquet::ColumnChunkMetaData> cc_metadata = |
460 | rg_metadata->ColumnChunk(i); |
461 | ASSERT_EQ(stats_[i].min(), cc_metadata->statistics()->EncodeMin()); |
462 | ASSERT_EQ(stats_[i].max(), cc_metadata->statistics()->EncodeMax()); |
463 | } |
464 | } |
465 | |
466 | protected: |
467 | std::vector<T> values_; |
468 | std::vector<uint8_t> values_buf_; |
469 | std::vector<schema::NodePtr> fields_; |
470 | std::shared_ptr<schema::GroupNode> schema_; |
471 | std::shared_ptr<InMemoryOutputStream> parquet_sink_; |
472 | std::vector<EncodedStatistics> stats_; |
473 | }; |
474 | |
475 | using CompareTestTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType, |
476 | ByteArrayType, FLBAType>; |
477 | |
478 | // TYPE::INT32 |
479 | template <> |
480 | void TestStatistics<Int32Type>::AddNodes(std::string name) { |
481 | // UINT_32 logical type to set Unsigned Statistics |
482 | fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32, |
483 | LogicalType::UINT_32)); |
484 | // INT_32 logical type to set Signed Statistics |
485 | fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32, |
486 | LogicalType::INT_32)); |
487 | } |
488 | |
489 | template <> |
490 | void TestStatistics<Int32Type>::SetValues() { |
491 | for (int i = 0; i < NUM_VALUES; i++) { |
492 | values_[i] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4}; |
493 | } |
494 | |
495 | // Write UINT32 min/max values |
496 | stats_[0] |
497 | .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T))) |
498 | .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T))); |
499 | |
500 | // Write INT32 min/max values |
501 | stats_[1] |
502 | .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T))) |
503 | .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T))); |
504 | } |
505 | |
506 | // TYPE::INT64 |
507 | template <> |
508 | void TestStatistics<Int64Type>::AddNodes(std::string name) { |
509 | // UINT_64 logical type to set Unsigned Statistics |
510 | fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64, |
511 | LogicalType::UINT_64)); |
512 | // INT_64 logical type to set Signed Statistics |
513 | fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64, |
514 | LogicalType::INT_64)); |
515 | } |
516 | |
517 | template <> |
518 | void TestStatistics<Int64Type>::SetValues() { |
519 | for (int i = 0; i < NUM_VALUES; i++) { |
520 | values_[i] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4}; |
521 | } |
522 | |
523 | // Write UINT64 min/max values |
524 | stats_[0] |
525 | .set_min(std::string(reinterpret_cast<const char*>(&values_[5]), sizeof(T))) |
526 | .set_max(std::string(reinterpret_cast<const char*>(&values_[4]), sizeof(T))); |
527 | |
528 | // Write INT64 min/max values |
529 | stats_[1] |
530 | .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T))) |
531 | .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T))); |
532 | } |
533 | |
534 | // TYPE::FLOAT |
535 | template <> |
536 | void TestStatistics<FloatType>::SetValues() { |
537 | for (int i = 0; i < NUM_VALUES; i++) { |
538 | values_[i] = static_cast<float>(i) - |
539 | 5; // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0}; |
540 | } |
541 | |
542 | // Write Float min/max values |
543 | stats_[0] |
544 | .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T))) |
545 | .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T))); |
546 | } |
547 | |
548 | // TYPE::DOUBLE |
549 | template <> |
550 | void TestStatistics<DoubleType>::SetValues() { |
551 | for (int i = 0; i < NUM_VALUES; i++) { |
552 | values_[i] = static_cast<float>(i) - |
553 | 5; // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0}; |
554 | } |
555 | |
556 | // Write Double min/max values |
557 | stats_[0] |
558 | .set_min(std::string(reinterpret_cast<const char*>(&values_[0]), sizeof(T))) |
559 | .set_max(std::string(reinterpret_cast<const char*>(&values_[9]), sizeof(T))); |
560 | } |
561 | |
562 | // TYPE::ByteArray |
563 | template <> |
564 | void TestStatistics<ByteArrayType>::AddNodes(std::string name) { |
565 | // UTF8 logical type to set Unsigned Statistics |
566 | fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, |
567 | Type::BYTE_ARRAY, LogicalType::UTF8)); |
568 | } |
569 | |
570 | template <> |
571 | void TestStatistics<ByteArrayType>::SetValues() { |
572 | int max_byte_array_len = 10; |
573 | size_t nbytes = NUM_VALUES * max_byte_array_len; |
574 | values_buf_.resize(nbytes); |
575 | std::vector<std::string> vals = {u8"c123" , u8"b123" , u8"a123" , u8"d123" , u8"e123" , |
576 | u8"f123" , u8"g123" , u8"h123" , u8"i123" , u8"ü123" }; |
577 | |
578 | uint8_t* base = &values_buf_.data()[0]; |
579 | for (int i = 0; i < NUM_VALUES; i++) { |
580 | memcpy(base, vals[i].c_str(), vals[i].length()); |
581 | values_[i].ptr = base; |
582 | values_[i].len = static_cast<uint32_t>(vals[i].length()); |
583 | base += vals[i].length(); |
584 | } |
585 | |
586 | // Write String min/max values |
587 | stats_[0] |
588 | .set_min( |
589 | std::string(reinterpret_cast<const char*>(vals[2].c_str()), vals[2].length())) |
590 | .set_max( |
591 | std::string(reinterpret_cast<const char*>(vals[9].c_str()), vals[9].length())); |
592 | } |
593 | |
594 | // TYPE::FLBAArray |
595 | template <> |
596 | void TestStatistics<FLBAType>::AddNodes(std::string name) { |
597 | // FLBA has only Unsigned Statistics |
598 | fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, |
599 | Type::FIXED_LEN_BYTE_ARRAY, |
600 | LogicalType::NONE, FLBA_LENGTH)); |
601 | } |
602 | |
603 | template <> |
604 | void TestStatistics<FLBAType>::SetValues() { |
605 | size_t nbytes = NUM_VALUES * FLBA_LENGTH; |
606 | values_buf_.resize(nbytes); |
607 | char vals[NUM_VALUES][FLBA_LENGTH] = {"b12345" , "a12345" , "c12345" , "d12345" , "e12345" , |
608 | "f12345" , "g12345" , "h12345" , "z12345" , "a12345" }; |
609 | |
610 | uint8_t* base = &values_buf_.data()[0]; |
611 | for (int i = 0; i < NUM_VALUES; i++) { |
612 | memcpy(base, &vals[i][0], FLBA_LENGTH); |
613 | values_[i].ptr = base; |
614 | base += FLBA_LENGTH; |
615 | } |
616 | |
617 | // Write FLBA min,max values |
618 | stats_[0] |
619 | .set_min(std::string(reinterpret_cast<const char*>(&vals[1][0]), FLBA_LENGTH)) |
620 | .set_max(std::string(reinterpret_cast<const char*>(&vals[8][0]), FLBA_LENGTH)); |
621 | } |
622 | |
623 | TYPED_TEST_CASE(TestStatistics, CompareTestTypes); |
624 | |
625 | TYPED_TEST(TestStatistics, MinMax) { |
626 | this->AddNodes("Column " ); |
627 | this->SetUpSchema(); |
628 | this->WriteParquet(); |
629 | ASSERT_NO_FATAL_FAILURE(this->VerifyParquetStats()); |
630 | } |
631 | |
632 | // Ensure UNKNOWN sort order is handled properly |
633 | using TestStatisticsFLBA = TestStatistics<FLBAType>; |
634 | |
635 | TEST_F(TestStatisticsFLBA, UnknownSortOrder) { |
636 | this->fields_.push_back(schema::PrimitiveNode::Make( |
637 | "Column 0" , Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, |
638 | FLBA_LENGTH)); |
639 | this->SetUpSchema(); |
640 | this->WriteParquet(); |
641 | |
642 | auto pbuffer = parquet_sink_->GetBuffer(); |
643 | // Create a ParquetReader instance |
644 | std::unique_ptr<parquet::ParquetFileReader> parquet_reader = |
645 | parquet::ParquetFileReader::Open( |
646 | std::make_shared<arrow::io::BufferReader>(pbuffer)); |
647 | // Get the File MetaData |
648 | std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata(); |
649 | std::shared_ptr<parquet::RowGroupMetaData> rg_metadata = file_metadata->RowGroup(0); |
650 | std::shared_ptr<parquet::ColumnChunkMetaData> cc_metadata = rg_metadata->ColumnChunk(0); |
651 | |
652 | // stats should not be set for UNKNOWN sort order |
653 | ASSERT_FALSE(cc_metadata->is_stats_set()); |
654 | } |
655 | |
656 | // PARQUET-1225: Float NaN values may lead to incorrect filtering under certain |
657 | // circumstances |
658 | TEST(TestStatisticsFloatNaN, NaNValues) { |
659 | constexpr int NUM_VALUES = 10; |
660 | NodePtr node = PrimitiveNode::Make("nan_float" , Repetition::OPTIONAL, Type::FLOAT); |
661 | ColumnDescriptor descr(node, 1, 1); |
662 | float values[NUM_VALUES] = {std::nanf("" ), -4.0f, -3.0f, -2.0f, -1.0f, |
663 | std::nanf("" ), 1.0f, 2.0f, 3.0f, std::nanf("" )}; |
664 | float nan_values[NUM_VALUES]; |
665 | for (int i = 0; i < NUM_VALUES; i++) { |
666 | nan_values[i] = std::nanf("" ); |
667 | } |
668 | |
669 | // Test values |
670 | TypedRowGroupStatistics<FloatType> nan_stats(&descr); |
671 | nan_stats.Update(&values[0], NUM_VALUES, 0); |
672 | float min = nan_stats.min(); |
673 | float max = nan_stats.max(); |
674 | ASSERT_EQ(min, -4.0f); |
675 | ASSERT_EQ(max, 3.0f); |
676 | |
677 | // Test all NaNs |
678 | TypedRowGroupStatistics<FloatType> all_nan_stats(&descr); |
679 | all_nan_stats.Update(&nan_values[0], NUM_VALUES, 0); |
680 | min = all_nan_stats.min(); |
681 | max = all_nan_stats.max(); |
682 | ASSERT_TRUE(std::isnan(min)); |
683 | ASSERT_TRUE(std::isnan(max)); |
684 | |
685 | // Test values followed by all NaNs |
686 | nan_stats.Update(&nan_values[0], NUM_VALUES, 0); |
687 | min = nan_stats.min(); |
688 | max = nan_stats.max(); |
689 | ASSERT_EQ(min, -4.0f); |
690 | ASSERT_EQ(max, 3.0f); |
691 | |
692 | // Test all NaNs followed by values |
693 | all_nan_stats.Update(&values[0], NUM_VALUES, 0); |
694 | min = all_nan_stats.min(); |
695 | max = all_nan_stats.max(); |
696 | ASSERT_EQ(min, -4.0f); |
697 | ASSERT_EQ(max, 3.0f); |
698 | |
699 | // Test values followed by all NaNs followed by values |
700 | nan_stats.Update(&values[0], NUM_VALUES, 0); |
701 | min = nan_stats.min(); |
702 | max = nan_stats.max(); |
703 | ASSERT_EQ(min, -4.0f); |
704 | ASSERT_EQ(max, 3.0f); |
705 | } |
706 | |
707 | // PARQUET-1225: Float NaN values may lead to incorrect filtering under certain |
708 | // circumstances |
709 | TEST(TestStatisticsFloatNaN, NaNValuesSpaced) { |
710 | constexpr int NUM_VALUES = 10; |
711 | NodePtr node = PrimitiveNode::Make("nan_float" , Repetition::OPTIONAL, Type::FLOAT); |
712 | ColumnDescriptor descr(node, 1, 1); |
713 | float values[NUM_VALUES] = {std::nanf("" ), -4.0f, -3.0f, -2.0f, -1.0f, |
714 | std::nanf("" ), 1.0f, 2.0f, 3.0f, std::nanf("" )}; |
715 | float nan_values[NUM_VALUES]; |
716 | for (int i = 0; i < NUM_VALUES; i++) { |
717 | nan_values[i] = std::nanf("" ); |
718 | } |
719 | std::vector<uint8_t> valid_bits(BitUtil::BytesForBits(NUM_VALUES) + 1, 255); |
720 | |
721 | // Test values |
722 | TypedRowGroupStatistics<FloatType> nan_stats(&descr); |
723 | nan_stats.UpdateSpaced(&values[0], valid_bits.data(), 0, NUM_VALUES, 0); |
724 | float min = nan_stats.min(); |
725 | float max = nan_stats.max(); |
726 | ASSERT_EQ(min, -4.0f); |
727 | ASSERT_EQ(max, 3.0f); |
728 | |
729 | // Test all NaNs |
730 | TypedRowGroupStatistics<FloatType> all_nan_stats(&descr); |
731 | all_nan_stats.UpdateSpaced(&nan_values[0], valid_bits.data(), 0, NUM_VALUES, 0); |
732 | min = all_nan_stats.min(); |
733 | max = all_nan_stats.max(); |
734 | ASSERT_TRUE(std::isnan(min)); |
735 | ASSERT_TRUE(std::isnan(max)); |
736 | |
737 | // Test values followed by all NaNs |
738 | nan_stats.UpdateSpaced(&nan_values[0], valid_bits.data(), 0, NUM_VALUES, 0); |
739 | min = nan_stats.min(); |
740 | max = nan_stats.max(); |
741 | ASSERT_EQ(min, -4.0f); |
742 | ASSERT_EQ(max, 3.0f); |
743 | |
744 | // Test all NaNs followed by values |
745 | all_nan_stats.UpdateSpaced(&values[0], valid_bits.data(), 0, NUM_VALUES, 0); |
746 | min = all_nan_stats.min(); |
747 | max = all_nan_stats.max(); |
748 | ASSERT_EQ(min, -4.0f); |
749 | ASSERT_EQ(max, 3.0f); |
750 | |
751 | // Test values followed by all NaNs followed by values |
752 | nan_stats.UpdateSpaced(&values[0], valid_bits.data(), 0, NUM_VALUES, 0); |
753 | min = nan_stats.min(); |
754 | max = nan_stats.max(); |
755 | ASSERT_EQ(min, -4.0f); |
756 | ASSERT_EQ(max, 3.0f); |
757 | } |
758 | |
759 | // NaN double values may lead to incorrect filtering under certain circumstances |
760 | TEST(TestStatisticsDoubleNaN, NaNValues) { |
761 | constexpr int NUM_VALUES = 10; |
762 | NodePtr node = PrimitiveNode::Make("nan_double" , Repetition::OPTIONAL, Type::DOUBLE); |
763 | ColumnDescriptor descr(node, 1, 1); |
764 | TypedRowGroupStatistics<DoubleType> nan_stats(&descr); |
765 | double values[NUM_VALUES] = {std::nan("" ), std::nan("" ), -3.0, -2.0, -1.0, |
766 | 0.0, 1.0, 2.0, 3.0, 4.0}; |
767 | double* values_ptr = &values[0]; |
768 | nan_stats.Update(values_ptr, NUM_VALUES, 0); |
769 | double min = nan_stats.min(); |
770 | double max = nan_stats.max(); |
771 | |
772 | ASSERT_EQ(min, -3.0); |
773 | ASSERT_EQ(max, 4.0); |
774 | } |
775 | |
776 | // Test statistics for binary column with UNSIGNED sort order |
777 | TEST(TestStatisticsMinMax, Unsigned) { |
778 | std::string dir_string(test::get_data_dir()); |
779 | std::stringstream ss; |
780 | ss << dir_string << "/binary.parquet" ; |
781 | auto path = ss.str(); |
782 | |
783 | // The file is generated by parquet-mr 1.10.0, the first version that |
784 | // supports correct statistics for binary data (see PARQUET-1025). It |
785 | // contains a single column of binary type. Data is just single byte values |
786 | // from 0x00 to 0x0B. |
787 | auto file_reader = ParquetFileReader::OpenFile(path); |
788 | auto rg_reader = file_reader->RowGroup(0); |
789 | auto metadata = rg_reader->metadata(); |
790 | auto column_schema = metadata->schema()->Column(0); |
791 | ASSERT_EQ(SortOrder::UNSIGNED, column_schema->sort_order()); |
792 | |
793 | auto column_chunk = metadata->ColumnChunk(0); |
794 | ASSERT_TRUE(column_chunk->is_stats_set()); |
795 | |
796 | std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics(); |
797 | ASSERT_TRUE(stats != NULL); |
798 | ASSERT_EQ(0, stats->null_count()); |
799 | ASSERT_EQ(12, stats->num_values()); |
800 | ASSERT_EQ(0x00, stats->EncodeMin()[0]); |
801 | ASSERT_EQ(0x0b, stats->EncodeMax()[0]); |
802 | } |
803 | } // namespace test |
804 | } // namespace parquet |
805 | |