1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <algorithm>
19#include <cstring>
20#include <type_traits>
21
22#include "parquet/encoding-internal.h"
23#include "parquet/exception.h"
24#include "parquet/statistics.h"
25#include "parquet/util/memory.h"
26
27using arrow::default_memory_pool;
28using arrow::MemoryPool;
29
30namespace parquet {
31
32template <typename DType>
33TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema,
34 MemoryPool* pool)
35 : pool_(pool),
36 min_buffer_(AllocateBuffer(pool_, 0)),
37 max_buffer_(AllocateBuffer(pool_, 0)) {
38 SetDescr(schema);
39 Reset();
40}
41
42template <typename DType>
43TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min,
44 const typename DType::c_type& max,
45 int64_t num_values,
46 int64_t null_count,
47 int64_t distinct_count)
48 : pool_(default_memory_pool()),
49 min_buffer_(AllocateBuffer(pool_, 0)),
50 max_buffer_(AllocateBuffer(pool_, 0)) {
51 IncrementNumValues(num_values);
52 IncrementNullCount(null_count);
53 IncrementDistinctCount(distinct_count);
54
55 Copy(min, &min_, min_buffer_.get());
56 Copy(max, &max_, max_buffer_.get());
57 has_min_max_ = true;
58}
59
60template <typename DType>
61TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
62 const ColumnDescriptor* schema, const std::string& encoded_min,
63 const std::string& encoded_max, int64_t num_values, int64_t null_count,
64 int64_t distinct_count, bool has_min_max, MemoryPool* pool)
65 : pool_(pool),
66 min_buffer_(AllocateBuffer(pool_, 0)),
67 max_buffer_(AllocateBuffer(pool_, 0)) {
68 IncrementNumValues(num_values);
69 IncrementNullCount(null_count);
70 IncrementDistinctCount(distinct_count);
71
72 SetDescr(schema);
73
74 if (!encoded_min.empty()) {
75 PlainDecode(encoded_min, &min_);
76 }
77 if (!encoded_max.empty()) {
78 PlainDecode(encoded_max, &max_);
79 }
80 has_min_max_ = has_min_max;
81}
82
83template <typename DType>
84bool TypedRowGroupStatistics<DType>::HasMinMax() const {
85 return has_min_max_;
86}
87
88template <typename DType>
89void TypedRowGroupStatistics<DType>::SetComparator() {
90 comparator_ =
91 std::static_pointer_cast<CompareDefault<DType> >(Comparator::Make(descr_));
92}
93
94template <typename DType>
95void TypedRowGroupStatistics<DType>::Reset() {
96 ResetCounts();
97 has_min_max_ = false;
98}
99
100template <typename DType>
101inline void TypedRowGroupStatistics<DType>::SetMinMax(const T& min, const T& max) {
102 if (!has_min_max_) {
103 has_min_max_ = true;
104 Copy(min, &min_, min_buffer_.get());
105 Copy(max, &max_, max_buffer_.get());
106 } else {
107 Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get());
108 Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get());
109 }
110}
111
112template <typename T, typename Enable = void>
113struct StatsHelper {
114 inline int64_t GetValueBeginOffset(const T* values, int64_t count) { return 0; }
115
116 inline int64_t GetValueEndOffset(const T* values, int64_t count) { return count; }
117
118 inline bool IsNaN(const T value) { return false; }
119};
120
121template <typename T>
122struct StatsHelper<T, typename std::enable_if<std::is_floating_point<T>::value>::type> {
123 inline int64_t GetValueBeginOffset(const T* values, int64_t count) {
124 // Skip NaNs
125 for (int64_t i = 0; i < count; i++) {
126 if (!std::isnan(values[i])) {
127 return i;
128 }
129 }
130 return count;
131 }
132
133 inline int64_t GetValueEndOffset(const T* values, int64_t count) {
134 // Skip NaNs
135 for (int64_t i = (count - 1); i >= 0; i--) {
136 if (!std::isnan(values[i])) {
137 return (i + 1);
138 }
139 }
140 return 0;
141 }
142
143 inline bool IsNaN(const T value) { return std::isnan(value); }
144};
145
146template <typename T>
147void SetNaN(T* value) {
148 // no-op
149}
150
151template <>
152void SetNaN<float>(float* value) {
153 *value = std::nanf("");
154}
155
156template <>
157void SetNaN<double>(double* value) {
158 *value = std::nan("");
159}
160
161template <typename DType>
162void TypedRowGroupStatistics<DType>::Update(const T* values, int64_t num_not_null,
163 int64_t num_null) {
164 DCHECK_GE(num_not_null, 0);
165 DCHECK_GE(num_null, 0);
166
167 IncrementNullCount(num_null);
168 IncrementNumValues(num_not_null);
169 // TODO: support distinct count?
170 if (num_not_null == 0) return;
171
172 // PARQUET-1225: Handle NaNs
173 // The problem arises only if the starting/ending value(s)
174 // of the values-buffer contain NaN
175 StatsHelper<T> helper;
176 int64_t begin_offset = helper.GetValueBeginOffset(values, num_not_null);
177 int64_t end_offset = helper.GetValueEndOffset(values, num_not_null);
178
179 // All values are NaN
180 if (end_offset < begin_offset) {
181 // Set min/max to NaNs in this case.
182 // Don't set has_min_max flag since
183 // these values must be over-written by valid stats later
184 if (!has_min_max_) {
185 SetNaN(&min_);
186 SetNaN(&max_);
187 }
188 return;
189 }
190
191 auto batch_minmax = std::minmax_element(values + begin_offset, values + end_offset,
192 std::ref(*(this->comparator_)));
193
194 SetMinMax(*batch_minmax.first, *batch_minmax.second);
195}
196
197template <typename DType>
198void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
199 const uint8_t* valid_bits,
200 int64_t valid_bits_offset,
201 int64_t num_not_null,
202 int64_t num_null) {
203 DCHECK_GE(num_not_null, 0);
204 DCHECK_GE(num_null, 0);
205
206 IncrementNullCount(num_null);
207 IncrementNumValues(num_not_null);
208 // TODO: support distinct count?
209 if (num_not_null == 0) return;
210
211 // Find first valid entry and use that for min/max
212 // As (num_not_null != 0) there must be one
213 int64_t length = num_null + num_not_null;
214 int64_t i = 0;
215 ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
216 length);
217 StatsHelper<T> helper;
218 for (; i < length; i++) {
219 // PARQUET-1225: Handle NaNs
220 if (valid_bits_reader.IsSet() && !helper.IsNaN(values[i])) {
221 break;
222 }
223 valid_bits_reader.Next();
224 }
225
226 // All are NaNs and stats are not set yet
227 if ((i == length) && helper.IsNaN(values[i - 1])) {
228 // Don't set has_min_max flag since
229 // these values must be over-written by valid stats later
230 if (!has_min_max_) {
231 SetNaN(&min_);
232 SetNaN(&max_);
233 }
234 return;
235 }
236
237 T min = values[i];
238 T max = values[i];
239 for (; i < length; i++) {
240 if (valid_bits_reader.IsSet()) {
241 if ((std::ref(*(this->comparator_)))(values[i], min)) {
242 min = values[i];
243 } else if ((std::ref(*(this->comparator_)))(max, values[i])) {
244 max = values[i];
245 }
246 }
247 valid_bits_reader.Next();
248 }
249
250 SetMinMax(min, max);
251}
252
253template <typename DType>
254const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const {
255 return min_;
256}
257
258template <typename DType>
259const typename DType::c_type& TypedRowGroupStatistics<DType>::max() const {
260 return max_;
261}
262
263template <typename DType>
264void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>& other) {
265 this->MergeCounts(other);
266
267 if (!other.HasMinMax()) return;
268
269 SetMinMax(other.min_, other.max_);
270}
271
272template <typename DType>
273std::string TypedRowGroupStatistics<DType>::EncodeMin() {
274 std::string s;
275 if (HasMinMax()) this->PlainEncode(min_, &s);
276 return s;
277}
278
279template <typename DType>
280std::string TypedRowGroupStatistics<DType>::EncodeMax() {
281 std::string s;
282 if (HasMinMax()) this->PlainEncode(max_, &s);
283 return s;
284}
285
286template <typename DType>
287EncodedStatistics TypedRowGroupStatistics<DType>::Encode() {
288 EncodedStatistics s;
289 if (HasMinMax()) {
290 s.set_min(this->EncodeMin());
291 s.set_max(this->EncodeMax());
292 }
293 s.set_null_count(this->null_count());
294 return s;
295}
296
297template <typename DType>
298void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) {
299 PlainEncoder<DType> encoder(descr(), pool_);
300 encoder.Put(&src, 1);
301 auto buffer = encoder.FlushValues();
302 auto ptr = reinterpret_cast<const char*>(buffer->data());
303 dst->assign(ptr, buffer->size());
304}
305
306template <typename DType>
307void TypedRowGroupStatistics<DType>::PlainDecode(const std::string& src, T* dst) {
308 PlainDecoder<DType> decoder(descr());
309 decoder.SetData(1, reinterpret_cast<const uint8_t*>(src.c_str()),
310 static_cast<int>(src.size()));
311 decoder.Decode(dst, 1);
312}
313
314template <>
315void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst) {
316 dst->assign(reinterpret_cast<const char*>(src.ptr), src.len);
317}
318
319template <>
320void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst) {
321 dst->len = static_cast<uint32_t>(src.size());
322 dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str());
323}
324
325template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<BooleanType>;
326template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int32Type>;
327template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int64Type>;
328template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int96Type>;
329template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FloatType>;
330template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<DoubleType>;
331template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<ByteArrayType>;
332template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FLBAType>;
333
334} // namespace parquet
335