1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <algorithm> |
19 | #include <cmath> |
20 | #include <cstring> |
21 | #include <type_traits> |
22 | |
23 | #include "arrow/array.h" |
24 | #include "arrow/type.h" |
25 | #include "arrow/util/checked_cast.h" |
26 | #include "arrow/util/logging.h" |
27 | |
28 | #include "parquet/encoding.h" |
29 | #include "parquet/exception.h" |
30 | #include "parquet/platform.h" |
31 | #include "parquet/schema.h" |
32 | #include "parquet/statistics.h" |
33 | |
34 | using arrow::default_memory_pool; |
35 | using arrow::MemoryPool; |
36 | using arrow::internal::checked_cast; |
37 | |
38 | namespace parquet { |
39 | |
40 | // ---------------------------------------------------------------------- |
41 | // Comparator implementations |
42 | |
43 | template <typename DType, bool is_signed> |
44 | struct CompareHelper { |
45 | typedef typename DType::c_type T; |
46 | static inline bool Compare(int type_length, const T& a, const T& b) { return a < b; } |
47 | }; |
48 | |
49 | template <> |
50 | struct CompareHelper<Int96Type, true> { |
51 | static inline bool Compare(int type_length, const Int96& a, const Int96& b) { |
52 | // Only the MSB bit is by Signed comparison |
53 | // For little-endian, this is the last bit of Int96 type |
54 | const int32_t amsb = static_cast<const int32_t>(a.value[2]); |
55 | const int32_t bmsb = static_cast<const int32_t>(b.value[2]); |
56 | if (amsb != bmsb) { |
57 | return (amsb < bmsb); |
58 | } else if (a.value[1] != b.value[1]) { |
59 | return (a.value[1] < b.value[1]); |
60 | } |
61 | return (a.value[0] < b.value[0]); |
62 | } |
63 | }; |
64 | |
65 | template <> |
66 | struct CompareHelper<ByteArrayType, true> { |
67 | static inline bool Compare(int type_length, const ByteArray& a, const ByteArray& b) { |
68 | const int8_t* aptr = reinterpret_cast<const int8_t*>(a.ptr); |
69 | const int8_t* bptr = reinterpret_cast<const int8_t*>(b.ptr); |
70 | return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); |
71 | } |
72 | }; |
73 | |
74 | template <> |
75 | struct CompareHelper<FLBAType, true> { |
76 | static inline bool Compare(int type_length, const FLBA& a, const FLBA& b) { |
77 | const int8_t* aptr = reinterpret_cast<const int8_t*>(a.ptr); |
78 | const int8_t* bptr = reinterpret_cast<const int8_t*>(b.ptr); |
79 | return std::lexicographical_compare(aptr, aptr + type_length, bptr, |
80 | bptr + type_length); |
81 | } |
82 | }; |
83 | |
84 | template <> |
85 | struct CompareHelper<Int32Type, false> { |
86 | static inline bool Compare(int type_length, int32_t a, int32_t b) { |
87 | const uint32_t ua = a; |
88 | const uint32_t ub = b; |
89 | return ua < ub; |
90 | } |
91 | }; |
92 | |
93 | template <> |
94 | struct CompareHelper<Int64Type, false> { |
95 | static inline bool Compare(int type_length, int64_t a, int64_t b) { |
96 | const uint64_t ua = a; |
97 | const uint64_t ub = b; |
98 | return ua < ub; |
99 | } |
100 | }; |
101 | |
102 | template <> |
103 | struct CompareHelper<Int96Type, false> { |
104 | static inline bool Compare(int type_length, const Int96& a, const Int96& b) { |
105 | if (a.value[2] != b.value[2]) { |
106 | return (a.value[2] < b.value[2]); |
107 | } else if (a.value[1] != b.value[1]) { |
108 | return (a.value[1] < b.value[1]); |
109 | } |
110 | return (a.value[0] < b.value[0]); |
111 | } |
112 | }; |
113 | |
114 | template <> |
115 | struct CompareHelper<ByteArrayType, false> { |
116 | static inline bool Compare(int type_length, const ByteArray& a, const ByteArray& b) { |
117 | const uint8_t* aptr = reinterpret_cast<const uint8_t*>(a.ptr); |
118 | const uint8_t* bptr = reinterpret_cast<const uint8_t*>(b.ptr); |
119 | return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); |
120 | } |
121 | }; |
122 | |
123 | template <> |
124 | struct CompareHelper<FLBAType, false> { |
125 | static inline bool Compare(int type_length, const FLBA& a, const FLBA& b) { |
126 | const uint8_t* aptr = reinterpret_cast<const uint8_t*>(a.ptr); |
127 | const uint8_t* bptr = reinterpret_cast<const uint8_t*>(b.ptr); |
128 | return std::lexicographical_compare(aptr, aptr + type_length, bptr, |
129 | bptr + type_length); |
130 | } |
131 | }; |
132 | |
133 | template <typename T> |
134 | T CleanStatistic(T val) { |
135 | return val; |
136 | } |
137 | |
138 | template <> |
139 | float CleanStatistic(float val) { |
140 | // ARROW-5562: Return positive 0 for -0 and any value within float epsilon of |
141 | // 0 |
142 | return fabs(val) < 1E-7 ? 0.0f : val; |
143 | } |
144 | |
145 | template <> |
146 | double CleanStatistic(double val) { |
147 | // ARROW-5562: Return positive 0 for -0 and any value within double epsilon |
148 | // of 0 |
149 | return fabs(val) < 1E-13 ? 0.0 : val; |
150 | } |
151 | |
152 | template <bool is_signed, typename DType> |
153 | class TypedComparatorImpl : virtual public TypedComparator<DType> { |
154 | public: |
155 | typedef typename DType::c_type T; |
156 | |
157 | explicit TypedComparatorImpl(int type_length = -1) : type_length_(type_length) {} |
158 | |
159 | bool CompareInline(const T& a, const T& b) const { |
160 | return CompareHelper<DType, is_signed>::Compare(type_length_, a, b); |
161 | } |
162 | |
163 | bool Compare(const T& a, const T& b) override { return CompareInline(a, b); } |
164 | |
165 | void GetMinMax(const T* values, int64_t length, T* out_min, T* out_max) override { |
166 | T min = values[0]; |
167 | T max = values[0]; |
168 | for (int64_t i = 1; i < length; i++) { |
169 | if (CompareInline(values[i], min)) { |
170 | min = values[i]; |
171 | } else if (CompareInline(max, values[i])) { |
172 | max = values[i]; |
173 | } |
174 | } |
175 | *out_min = CleanStatistic<T>(min); |
176 | *out_max = CleanStatistic<T>(max); |
177 | } |
178 | |
179 | void GetMinMaxSpaced(const T* values, int64_t length, const uint8_t* valid_bits, |
180 | int64_t valid_bits_offset, T* out_min, T* out_max) override { |
181 | ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, |
182 | length); |
183 | |
184 | // Find the first non-null value |
185 | int64_t first_non_null = 0; |
186 | while (!valid_bits_reader.IsSet()) { |
187 | ++first_non_null; |
188 | valid_bits_reader.Next(); |
189 | } |
190 | |
191 | T min = values[first_non_null]; |
192 | T max = values[first_non_null]; |
193 | valid_bits_reader.Next(); |
194 | for (int64_t i = first_non_null + 1; i < length; i++) { |
195 | if (valid_bits_reader.IsSet()) { |
196 | if (CompareInline(values[i], min)) { |
197 | min = values[i]; |
198 | } else if (CompareInline(max, values[i])) { |
199 | max = values[i]; |
200 | } |
201 | } |
202 | valid_bits_reader.Next(); |
203 | } |
204 | *out_min = CleanStatistic<T>(min); |
205 | *out_max = CleanStatistic<T>(max); |
206 | } |
207 | |
208 | void GetMinMax(const ::arrow::Array& values, T* out_min, T* out_max) override; |
209 | |
210 | private: |
211 | int type_length_; |
212 | }; |
213 | |
214 | template <bool is_signed, typename DType> |
215 | void TypedComparatorImpl<is_signed, DType>::GetMinMax(const ::arrow::Array& values, |
216 | typename DType::c_type* out_min, |
217 | typename DType::c_type* out_max) { |
218 | ParquetException::NYI(values.type()->ToString()); |
219 | } |
220 | |
221 | template <bool is_signed> |
222 | void GetMinMaxBinaryHelper( |
223 | const TypedComparatorImpl<is_signed, ByteArrayType>& comparator, |
224 | const ::arrow::Array& values, ByteArray* out_min, ByteArray* out_max) { |
225 | const auto& data = checked_cast<const ::arrow::BinaryArray&>(values); |
226 | |
227 | ByteArray min, max; |
228 | if (data.null_count() > 0) { |
229 | ::arrow::internal::BitmapReader valid_bits_reader(data.null_bitmap_data(), |
230 | data.offset(), data.length()); |
231 | |
232 | int64_t first_non_null = 0; |
233 | while (!valid_bits_reader.IsSet()) { |
234 | ++first_non_null; |
235 | valid_bits_reader.Next(); |
236 | } |
237 | min = data.GetView(first_non_null); |
238 | max = data.GetView(first_non_null); |
239 | for (int64_t i = first_non_null; i < data.length(); i++) { |
240 | ByteArray val = data.GetView(i); |
241 | if (valid_bits_reader.IsSet()) { |
242 | if (comparator.CompareInline(val, min)) { |
243 | min = val; |
244 | } else if (comparator.CompareInline(max, val)) { |
245 | max = val; |
246 | } |
247 | } |
248 | valid_bits_reader.Next(); |
249 | } |
250 | } else { |
251 | min = data.GetView(0); |
252 | max = data.GetView(0); |
253 | for (int64_t i = 0; i < data.length(); i++) { |
254 | ByteArray val = data.GetView(i); |
255 | if (comparator.CompareInline(val, min)) { |
256 | min = val; |
257 | } else if (comparator.CompareInline(max, val)) { |
258 | max = val; |
259 | } |
260 | } |
261 | } |
262 | *out_min = min; |
263 | *out_max = max; |
264 | } |
265 | |
266 | template <> |
267 | void TypedComparatorImpl<true, ByteArrayType>::GetMinMax(const ::arrow::Array& values, |
268 | ByteArray* out_min, |
269 | ByteArray* out_max) { |
270 | GetMinMaxBinaryHelper<true>(*this, values, out_min, out_max); |
271 | } |
272 | |
273 | template <> |
274 | void TypedComparatorImpl<false, ByteArrayType>::GetMinMax(const ::arrow::Array& values, |
275 | ByteArray* out_min, |
276 | ByteArray* out_max) { |
277 | GetMinMaxBinaryHelper<false>(*this, values, out_min, out_max); |
278 | } |
279 | |
280 | std::shared_ptr<Comparator> Comparator::Make(Type::type physical_type, |
281 | SortOrder::type sort_order, |
282 | int type_length) { |
283 | if (SortOrder::SIGNED == sort_order) { |
284 | switch (physical_type) { |
285 | case Type::BOOLEAN: |
286 | return std::make_shared<TypedComparatorImpl<true, BooleanType>>(); |
287 | case Type::INT32: |
288 | return std::make_shared<TypedComparatorImpl<true, Int32Type>>(); |
289 | case Type::INT64: |
290 | return std::make_shared<TypedComparatorImpl<true, Int64Type>>(); |
291 | case Type::INT96: |
292 | return std::make_shared<TypedComparatorImpl<true, Int96Type>>(); |
293 | case Type::FLOAT: |
294 | return std::make_shared<TypedComparatorImpl<true, FloatType>>(); |
295 | case Type::DOUBLE: |
296 | return std::make_shared<TypedComparatorImpl<true, DoubleType>>(); |
297 | case Type::BYTE_ARRAY: |
298 | return std::make_shared<TypedComparatorImpl<true, ByteArrayType>>(); |
299 | case Type::FIXED_LEN_BYTE_ARRAY: |
300 | return std::make_shared<TypedComparatorImpl<true, FLBAType>>(type_length); |
301 | default: |
302 | ParquetException::NYI("Signed Compare not implemented" ); |
303 | } |
304 | } else if (SortOrder::UNSIGNED == sort_order) { |
305 | switch (physical_type) { |
306 | case Type::INT32: |
307 | return std::make_shared<TypedComparatorImpl<false, Int32Type>>(); |
308 | case Type::INT64: |
309 | return std::make_shared<TypedComparatorImpl<false, Int64Type>>(); |
310 | case Type::INT96: |
311 | return std::make_shared<TypedComparatorImpl<false, Int96Type>>(); |
312 | case Type::BYTE_ARRAY: |
313 | return std::make_shared<TypedComparatorImpl<false, ByteArrayType>>(); |
314 | case Type::FIXED_LEN_BYTE_ARRAY: |
315 | return std::make_shared<TypedComparatorImpl<false, FLBAType>>(type_length); |
316 | default: |
317 | ParquetException::NYI("Unsigned Compare not implemented" ); |
318 | } |
319 | } else { |
320 | throw ParquetException("UNKNOWN Sort Order" ); |
321 | } |
322 | return nullptr; |
323 | } |
324 | |
325 | std::shared_ptr<Comparator> Comparator::Make(const ColumnDescriptor* descr) { |
326 | return Make(descr->physical_type(), descr->sort_order(), descr->type_length()); |
327 | } |
328 | |
329 | // ---------------------------------------------------------------------- |
330 | |
331 | template <typename T, typename Enable = void> |
332 | struct StatsHelper { |
333 | bool CanHaveNaN() { return false; } |
334 | |
335 | inline int64_t GetValueBeginOffset(const T* values, int64_t count) { return 0; } |
336 | |
337 | inline int64_t GetValueEndOffset(const T* values, int64_t count) { return count; } |
338 | |
339 | inline bool IsNaN(const T value) { return false; } |
340 | }; |
341 | |
342 | template <typename T> |
343 | struct StatsHelper<T, typename std::enable_if<std::is_floating_point<T>::value>::type> { |
344 | bool CanHaveNaN() { return true; } |
345 | |
346 | inline int64_t GetValueBeginOffset(const T* values, int64_t count) { |
347 | // Skip NaNs |
348 | for (int64_t i = 0; i < count; i++) { |
349 | if (!std::isnan(values[i])) { |
350 | return i; |
351 | } |
352 | } |
353 | return count; |
354 | } |
355 | |
356 | inline int64_t GetValueEndOffset(const T* values, int64_t count) { |
357 | // Skip NaNs |
358 | for (int64_t i = (count - 1); i >= 0; i--) { |
359 | if (!std::isnan(values[i])) { |
360 | return (i + 1); |
361 | } |
362 | } |
363 | return 0; |
364 | } |
365 | |
366 | inline bool IsNaN(const T value) { return std::isnan(value); } |
367 | }; |
368 | |
369 | template <typename T> |
370 | void SetNaN(T* value) { |
371 | // no-op |
372 | } |
373 | |
374 | template <> |
375 | void SetNaN<float>(float* value) { |
376 | *value = std::nanf("" ); |
377 | } |
378 | |
379 | template <> |
380 | void SetNaN<double>(double* value) { |
381 | *value = std::nan("" ); |
382 | } |
383 | |
384 | template <typename DType> |
385 | class TypedStatisticsImpl : public TypedStatistics<DType> { |
386 | public: |
387 | using T = typename DType::c_type; |
388 | |
389 | TypedStatisticsImpl(const ColumnDescriptor* descr, MemoryPool* pool) |
390 | : descr_(descr), |
391 | pool_(pool), |
392 | min_buffer_(AllocateBuffer(pool_, 0)), |
393 | max_buffer_(AllocateBuffer(pool_, 0)) { |
394 | auto comp = Comparator::Make(descr); |
395 | comparator_ = std::static_pointer_cast<TypedComparator<DType>>(comp); |
396 | Reset(); |
397 | } |
398 | |
399 | TypedStatisticsImpl(const T& min, const T& max, int64_t num_values, int64_t null_count, |
400 | int64_t distinct_count) |
401 | : pool_(default_memory_pool()), |
402 | min_buffer_(AllocateBuffer(pool_, 0)), |
403 | max_buffer_(AllocateBuffer(pool_, 0)) { |
404 | IncrementNumValues(num_values); |
405 | IncrementNullCount(null_count); |
406 | IncrementDistinctCount(distinct_count); |
407 | |
408 | Copy(min, &min_, min_buffer_.get()); |
409 | Copy(max, &max_, max_buffer_.get()); |
410 | has_min_max_ = true; |
411 | } |
412 | |
413 | TypedStatisticsImpl(const ColumnDescriptor* descr, const std::string& encoded_min, |
414 | const std::string& encoded_max, int64_t num_values, |
415 | int64_t null_count, int64_t distinct_count, bool has_min_max, |
416 | MemoryPool* pool) |
417 | : TypedStatisticsImpl(descr, pool) { |
418 | IncrementNumValues(num_values); |
419 | IncrementNullCount(null_count); |
420 | IncrementDistinctCount(distinct_count); |
421 | |
422 | if (!encoded_min.empty()) { |
423 | PlainDecode(encoded_min, &min_); |
424 | } |
425 | if (!encoded_max.empty()) { |
426 | PlainDecode(encoded_max, &max_); |
427 | } |
428 | has_min_max_ = has_min_max; |
429 | } |
430 | |
431 | bool HasMinMax() const override { return has_min_max_; } |
432 | |
433 | void Reset() override { |
434 | ResetCounts(); |
435 | has_min_max_ = false; |
436 | } |
437 | |
438 | void SetMinMax(const T& arg_min, const T& arg_max) override { |
439 | if (!has_min_max_) { |
440 | has_min_max_ = true; |
441 | Copy(arg_min, &min_, min_buffer_.get()); |
442 | Copy(arg_max, &max_, max_buffer_.get()); |
443 | } else { |
444 | Copy(comparator_->Compare(min_, arg_min) ? min_ : arg_min, &min_, |
445 | min_buffer_.get()); |
446 | Copy(comparator_->Compare(max_, arg_max) ? arg_max : max_, &max_, |
447 | max_buffer_.get()); |
448 | } |
449 | } |
450 | |
451 | void Merge(const TypedStatistics<DType>& other) override { |
452 | this->MergeCounts(other); |
453 | if (!other.HasMinMax()) return; |
454 | SetMinMax(other.min(), other.max()); |
455 | } |
456 | |
457 | void Update(const T* values, int64_t num_not_null, int64_t num_null) override; |
458 | void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced, |
459 | int64_t num_not_null, int64_t num_null) override; |
460 | |
461 | void Update(const ::arrow::Array& values) override { |
462 | IncrementNullCount(values.null_count()); |
463 | IncrementNumValues(values.length() - values.null_count()); |
464 | |
465 | // TODO: support distinct count? |
466 | if (values.null_count() == values.length()) { |
467 | return; |
468 | } |
469 | |
470 | StatsHelper<T> helper; |
471 | if (helper.CanHaveNaN()) { |
472 | ParquetException::NYI("No NaN handling for Arrow arrays yet" ); |
473 | } |
474 | |
475 | T batch_min, batch_max; |
476 | comparator_->GetMinMax(values, &batch_min, &batch_max); |
477 | SetMinMax(batch_min, batch_max); |
478 | } |
479 | |
480 | const T& min() const override { return min_; } |
481 | |
482 | const T& max() const override { return max_; } |
483 | |
484 | Type::type physical_type() const override { return descr_->physical_type(); } |
485 | |
486 | const ColumnDescriptor* descr() const override { return descr_; } |
487 | |
488 | std::string EncodeMin() override { |
489 | std::string s; |
490 | if (HasMinMax()) this->PlainEncode(min_, &s); |
491 | return s; |
492 | } |
493 | |
494 | std::string EncodeMax() override { |
495 | std::string s; |
496 | if (HasMinMax()) this->PlainEncode(max_, &s); |
497 | return s; |
498 | } |
499 | |
500 | EncodedStatistics Encode() override { |
501 | EncodedStatistics s; |
502 | if (HasMinMax()) { |
503 | s.set_min(this->EncodeMin()); |
504 | s.set_max(this->EncodeMax()); |
505 | } |
506 | s.set_null_count(this->null_count()); |
507 | return s; |
508 | } |
509 | |
510 | int64_t null_count() const override { return statistics_.null_count; } |
511 | int64_t distinct_count() const override { return statistics_.distinct_count; } |
512 | int64_t num_values() const override { return num_values_; } |
513 | |
514 | private: |
515 | const ColumnDescriptor* descr_; |
516 | bool has_min_max_ = false; |
517 | T min_; |
518 | T max_; |
519 | ::arrow::MemoryPool* pool_; |
520 | int64_t num_values_ = 0; |
521 | EncodedStatistics statistics_; |
522 | std::shared_ptr<TypedComparator<DType>> comparator_; |
523 | std::shared_ptr<ResizableBuffer> min_buffer_, max_buffer_; |
524 | |
525 | void PlainEncode(const T& src, std::string* dst); |
526 | void PlainDecode(const std::string& src, T* dst); |
527 | |
528 | void Copy(const T& src, T* dst, ResizableBuffer*) { *dst = src; } |
529 | |
530 | void IncrementNullCount(int64_t n) { statistics_.null_count += n; } |
531 | |
532 | void IncrementNumValues(int64_t n) { num_values_ += n; } |
533 | |
534 | void IncrementDistinctCount(int64_t n) { statistics_.distinct_count += n; } |
535 | |
536 | void MergeCounts(const Statistics& other) { |
537 | this->statistics_.null_count += other.null_count(); |
538 | this->statistics_.distinct_count += other.distinct_count(); |
539 | this->num_values_ += other.num_values(); |
540 | } |
541 | |
542 | void ResetCounts() { |
543 | this->statistics_.null_count = 0; |
544 | this->statistics_.distinct_count = 0; |
545 | this->num_values_ = 0; |
546 | } |
547 | }; |
548 | |
549 | template <> |
550 | inline void TypedStatisticsImpl<FLBAType>::Copy(const FLBA& src, FLBA* dst, |
551 | ResizableBuffer* buffer) { |
552 | if (dst->ptr == src.ptr) return; |
553 | uint32_t len = descr_->type_length(); |
554 | PARQUET_THROW_NOT_OK(buffer->Resize(len, false)); |
555 | std::memcpy(buffer->mutable_data(), src.ptr, len); |
556 | *dst = FLBA(buffer->data()); |
557 | } |
558 | |
559 | template <> |
560 | inline void TypedStatisticsImpl<ByteArrayType>::Copy(const ByteArray& src, ByteArray* dst, |
561 | ResizableBuffer* buffer) { |
562 | if (dst->ptr == src.ptr) return; |
563 | PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false)); |
564 | std::memcpy(buffer->mutable_data(), src.ptr, src.len); |
565 | *dst = ByteArray(src.len, buffer->data()); |
566 | } |
567 | |
568 | template <typename DType> |
569 | void TypedStatisticsImpl<DType>::Update(const T* values, int64_t num_not_null, |
570 | int64_t num_null) { |
571 | DCHECK_GE(num_not_null, 0); |
572 | DCHECK_GE(num_null, 0); |
573 | |
574 | IncrementNullCount(num_null); |
575 | IncrementNumValues(num_not_null); |
576 | // TODO: support distinct count? |
577 | if (num_not_null == 0) return; |
578 | |
579 | // PARQUET-1225: Handle NaNs |
580 | // The problem arises only if the starting/ending value(s) |
581 | // of the values-buffer contain NaN |
582 | StatsHelper<T> helper; |
583 | int64_t begin_offset = helper.GetValueBeginOffset(values, num_not_null); |
584 | int64_t end_offset = helper.GetValueEndOffset(values, num_not_null); |
585 | |
586 | // All values are NaN |
587 | if (helper.CanHaveNaN() && end_offset < begin_offset) { |
588 | // Set min/max to NaNs in this case. |
589 | // Don't set has_min_max flag since |
590 | // these values must be over-written by valid stats later |
591 | if (!has_min_max_) { |
592 | SetNaN(&min_); |
593 | SetNaN(&max_); |
594 | } |
595 | return; |
596 | } |
597 | |
598 | T batch_min, batch_max; |
599 | comparator_->GetMinMax(values + begin_offset, end_offset - begin_offset, &batch_min, |
600 | &batch_max); |
601 | SetMinMax(batch_min, batch_max); |
602 | } |
603 | |
604 | template <typename DType> |
605 | void TypedStatisticsImpl<DType>::UpdateSpaced(const T* values, const uint8_t* valid_bits, |
606 | int64_t valid_bits_offset, |
607 | int64_t num_not_null, int64_t num_null) { |
608 | DCHECK_GE(num_not_null, 0); |
609 | DCHECK_GE(num_null, 0); |
610 | |
611 | IncrementNullCount(num_null); |
612 | IncrementNumValues(num_not_null); |
613 | // TODO: support distinct count? |
614 | if (num_not_null == 0) return; |
615 | |
616 | // Find first valid entry and use that for min/max |
617 | // As (num_not_null != 0) there must be one |
618 | int64_t length = num_null + num_not_null; |
619 | int64_t i = 0; |
620 | StatsHelper<T> helper; |
621 | if (helper.CanHaveNaN()) { |
622 | ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, |
623 | length); |
624 | for (; i < length; i++) { |
625 | // PARQUET-1225: Handle NaNs |
626 | if (valid_bits_reader.IsSet() && !helper.IsNaN(values[i])) { |
627 | break; |
628 | } |
629 | valid_bits_reader.Next(); |
630 | } |
631 | |
632 | // All are NaNs and stats are not set yet |
633 | if ((i == length) && helper.IsNaN(values[i - 1])) { |
634 | // Don't set has_min_max flag since |
635 | // these values must be over-written by valid stats later |
636 | if (!has_min_max_) { |
637 | SetNaN(&min_); |
638 | SetNaN(&max_); |
639 | } |
640 | return; |
641 | } |
642 | } |
643 | |
644 | // Find min and max values from remaining non-NaN values |
645 | T batch_min, batch_max; |
646 | comparator_->GetMinMaxSpaced(values + i, length - i, valid_bits, valid_bits_offset + i, |
647 | &batch_min, &batch_max); |
648 | SetMinMax(batch_min, batch_max); |
649 | } |
650 | |
651 | template <typename DType> |
652 | void TypedStatisticsImpl<DType>::PlainEncode(const T& src, std::string* dst) { |
653 | auto encoder = MakeTypedEncoder<DType>(Encoding::PLAIN, false, descr_, pool_); |
654 | encoder->Put(&src, 1); |
655 | auto buffer = encoder->FlushValues(); |
656 | auto ptr = reinterpret_cast<const char*>(buffer->data()); |
657 | dst->assign(ptr, buffer->size()); |
658 | } |
659 | |
660 | template <typename DType> |
661 | void TypedStatisticsImpl<DType>::PlainDecode(const std::string& src, T* dst) { |
662 | auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_); |
663 | decoder->SetData(1, reinterpret_cast<const uint8_t*>(src.c_str()), |
664 | static_cast<int>(src.size())); |
665 | decoder->Decode(dst, 1); |
666 | } |
667 | |
668 | template <> |
669 | void TypedStatisticsImpl<ByteArrayType>::PlainEncode(const T& src, std::string* dst) { |
670 | dst->assign(reinterpret_cast<const char*>(src.ptr), src.len); |
671 | } |
672 | |
673 | template <> |
674 | void TypedStatisticsImpl<ByteArrayType>::PlainDecode(const std::string& src, T* dst) { |
675 | dst->len = static_cast<uint32_t>(src.size()); |
676 | dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str()); |
677 | } |
678 | |
679 | // ---------------------------------------------------------------------- |
680 | // Public factory functions |
681 | |
682 | std::shared_ptr<Statistics> Statistics::Make(const ColumnDescriptor* descr, |
683 | ::arrow::MemoryPool* pool) { |
684 | switch (descr->physical_type()) { |
685 | case Type::BOOLEAN: |
686 | return std::make_shared<TypedStatisticsImpl<BooleanType>>(descr, pool); |
687 | case Type::INT32: |
688 | return std::make_shared<TypedStatisticsImpl<Int32Type>>(descr, pool); |
689 | case Type::INT64: |
690 | return std::make_shared<TypedStatisticsImpl<Int64Type>>(descr, pool); |
691 | case Type::FLOAT: |
692 | return std::make_shared<TypedStatisticsImpl<FloatType>>(descr, pool); |
693 | case Type::DOUBLE: |
694 | return std::make_shared<TypedStatisticsImpl<DoubleType>>(descr, pool); |
695 | case Type::BYTE_ARRAY: |
696 | return std::make_shared<TypedStatisticsImpl<ByteArrayType>>(descr, pool); |
697 | case Type::FIXED_LEN_BYTE_ARRAY: |
698 | return std::make_shared<TypedStatisticsImpl<FLBAType>>(descr, pool); |
699 | default: |
700 | ParquetException::NYI("Statistics not implemented" ); |
701 | } |
702 | } |
703 | |
704 | std::shared_ptr<Statistics> Statistics::Make(Type::type physical_type, const void* min, |
705 | const void* max, int64_t num_values, |
706 | int64_t null_count, int64_t distinct_count) { |
707 | #define MAKE_STATS(CAP_TYPE, KLASS) \ |
708 | case Type::CAP_TYPE: \ |
709 | return std::make_shared<TypedStatisticsImpl<KLASS>>( \ |
710 | *reinterpret_cast<const typename KLASS::c_type*>(min), \ |
711 | *reinterpret_cast<const typename KLASS::c_type*>(max), num_values, null_count, \ |
712 | distinct_count) |
713 | |
714 | switch (physical_type) { |
715 | MAKE_STATS(BOOLEAN, BooleanType); |
716 | MAKE_STATS(INT32, Int32Type); |
717 | MAKE_STATS(INT64, Int64Type); |
718 | MAKE_STATS(FLOAT, FloatType); |
719 | MAKE_STATS(DOUBLE, DoubleType); |
720 | MAKE_STATS(BYTE_ARRAY, ByteArrayType); |
721 | MAKE_STATS(FIXED_LEN_BYTE_ARRAY, FLBAType); |
722 | default: |
723 | break; |
724 | } |
725 | #undef MAKE_STATS |
726 | DCHECK(false) << "Cannot reach here" ; |
727 | return nullptr; |
728 | } |
729 | |
730 | std::shared_ptr<Statistics> Statistics::Make(const ColumnDescriptor* descr, |
731 | const std::string& encoded_min, |
732 | const std::string& encoded_max, |
733 | int64_t num_values, int64_t null_count, |
734 | int64_t distinct_count, bool has_min_max, |
735 | ::arrow::MemoryPool* pool) { |
736 | #define MAKE_STATS(CAP_TYPE, KLASS) \ |
737 | case Type::CAP_TYPE: \ |
738 | return std::make_shared<TypedStatisticsImpl<KLASS>>( \ |
739 | descr, encoded_min, encoded_max, num_values, null_count, distinct_count, \ |
740 | has_min_max, pool) |
741 | |
742 | switch (descr->physical_type()) { |
743 | MAKE_STATS(BOOLEAN, BooleanType); |
744 | MAKE_STATS(INT32, Int32Type); |
745 | MAKE_STATS(INT64, Int64Type); |
746 | MAKE_STATS(FLOAT, FloatType); |
747 | MAKE_STATS(DOUBLE, DoubleType); |
748 | MAKE_STATS(BYTE_ARRAY, ByteArrayType); |
749 | MAKE_STATS(FIXED_LEN_BYTE_ARRAY, FLBAType); |
750 | default: |
751 | break; |
752 | } |
753 | #undef MAKE_STATS |
754 | DCHECK(false) << "Cannot reach here" ; |
755 | return nullptr; |
756 | } |
757 | |
758 | } // namespace parquet |
759 | |