1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <algorithm> |
19 | #include <cstring> |
20 | #include <type_traits> |
21 | |
22 | #include "parquet/encoding-internal.h" |
23 | #include "parquet/exception.h" |
24 | #include "parquet/statistics.h" |
25 | #include "parquet/util/memory.h" |
26 | |
27 | using arrow::default_memory_pool; |
28 | using arrow::MemoryPool; |
29 | |
30 | namespace parquet { |
31 | |
32 | template <typename DType> |
33 | TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema, |
34 | MemoryPool* pool) |
35 | : pool_(pool), |
36 | min_buffer_(AllocateBuffer(pool_, 0)), |
37 | max_buffer_(AllocateBuffer(pool_, 0)) { |
38 | SetDescr(schema); |
39 | Reset(); |
40 | } |
41 | |
42 | template <typename DType> |
43 | TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min, |
44 | const typename DType::c_type& max, |
45 | int64_t num_values, |
46 | int64_t null_count, |
47 | int64_t distinct_count) |
48 | : pool_(default_memory_pool()), |
49 | min_buffer_(AllocateBuffer(pool_, 0)), |
50 | max_buffer_(AllocateBuffer(pool_, 0)) { |
51 | IncrementNumValues(num_values); |
52 | IncrementNullCount(null_count); |
53 | IncrementDistinctCount(distinct_count); |
54 | |
55 | Copy(min, &min_, min_buffer_.get()); |
56 | Copy(max, &max_, max_buffer_.get()); |
57 | has_min_max_ = true; |
58 | } |
59 | |
60 | template <typename DType> |
61 | TypedRowGroupStatistics<DType>::TypedRowGroupStatistics( |
62 | const ColumnDescriptor* schema, const std::string& encoded_min, |
63 | const std::string& encoded_max, int64_t num_values, int64_t null_count, |
64 | int64_t distinct_count, bool has_min_max, MemoryPool* pool) |
65 | : pool_(pool), |
66 | min_buffer_(AllocateBuffer(pool_, 0)), |
67 | max_buffer_(AllocateBuffer(pool_, 0)) { |
68 | IncrementNumValues(num_values); |
69 | IncrementNullCount(null_count); |
70 | IncrementDistinctCount(distinct_count); |
71 | |
72 | SetDescr(schema); |
73 | |
74 | if (!encoded_min.empty()) { |
75 | PlainDecode(encoded_min, &min_); |
76 | } |
77 | if (!encoded_max.empty()) { |
78 | PlainDecode(encoded_max, &max_); |
79 | } |
80 | has_min_max_ = has_min_max; |
81 | } |
82 | |
83 | template <typename DType> |
84 | bool TypedRowGroupStatistics<DType>::HasMinMax() const { |
85 | return has_min_max_; |
86 | } |
87 | |
88 | template <typename DType> |
89 | void TypedRowGroupStatistics<DType>::SetComparator() { |
90 | comparator_ = |
91 | std::static_pointer_cast<CompareDefault<DType> >(Comparator::Make(descr_)); |
92 | } |
93 | |
94 | template <typename DType> |
95 | void TypedRowGroupStatistics<DType>::Reset() { |
96 | ResetCounts(); |
97 | has_min_max_ = false; |
98 | } |
99 | |
100 | template <typename DType> |
101 | inline void TypedRowGroupStatistics<DType>::SetMinMax(const T& min, const T& max) { |
102 | if (!has_min_max_) { |
103 | has_min_max_ = true; |
104 | Copy(min, &min_, min_buffer_.get()); |
105 | Copy(max, &max_, max_buffer_.get()); |
106 | } else { |
107 | Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get()); |
108 | Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get()); |
109 | } |
110 | } |
111 | |
112 | template <typename T, typename Enable = void> |
113 | struct StatsHelper { |
114 | inline int64_t GetValueBeginOffset(const T* values, int64_t count) { return 0; } |
115 | |
116 | inline int64_t GetValueEndOffset(const T* values, int64_t count) { return count; } |
117 | |
118 | inline bool IsNaN(const T value) { return false; } |
119 | }; |
120 | |
121 | template <typename T> |
122 | struct StatsHelper<T, typename std::enable_if<std::is_floating_point<T>::value>::type> { |
123 | inline int64_t GetValueBeginOffset(const T* values, int64_t count) { |
124 | // Skip NaNs |
125 | for (int64_t i = 0; i < count; i++) { |
126 | if (!std::isnan(values[i])) { |
127 | return i; |
128 | } |
129 | } |
130 | return count; |
131 | } |
132 | |
133 | inline int64_t GetValueEndOffset(const T* values, int64_t count) { |
134 | // Skip NaNs |
135 | for (int64_t i = (count - 1); i >= 0; i--) { |
136 | if (!std::isnan(values[i])) { |
137 | return (i + 1); |
138 | } |
139 | } |
140 | return 0; |
141 | } |
142 | |
143 | inline bool IsNaN(const T value) { return std::isnan(value); } |
144 | }; |
145 | |
146 | template <typename T> |
147 | void SetNaN(T* value) { |
148 | // no-op |
149 | } |
150 | |
151 | template <> |
152 | void SetNaN<float>(float* value) { |
153 | *value = std::nanf("" ); |
154 | } |
155 | |
156 | template <> |
157 | void SetNaN<double>(double* value) { |
158 | *value = std::nan("" ); |
159 | } |
160 | |
161 | template <typename DType> |
162 | void TypedRowGroupStatistics<DType>::Update(const T* values, int64_t num_not_null, |
163 | int64_t num_null) { |
164 | DCHECK_GE(num_not_null, 0); |
165 | DCHECK_GE(num_null, 0); |
166 | |
167 | IncrementNullCount(num_null); |
168 | IncrementNumValues(num_not_null); |
169 | // TODO: support distinct count? |
170 | if (num_not_null == 0) return; |
171 | |
172 | // PARQUET-1225: Handle NaNs |
173 | // The problem arises only if the starting/ending value(s) |
174 | // of the values-buffer contain NaN |
175 | StatsHelper<T> helper; |
176 | int64_t begin_offset = helper.GetValueBeginOffset(values, num_not_null); |
177 | int64_t end_offset = helper.GetValueEndOffset(values, num_not_null); |
178 | |
179 | // All values are NaN |
180 | if (end_offset < begin_offset) { |
181 | // Set min/max to NaNs in this case. |
182 | // Don't set has_min_max flag since |
183 | // these values must be over-written by valid stats later |
184 | if (!has_min_max_) { |
185 | SetNaN(&min_); |
186 | SetNaN(&max_); |
187 | } |
188 | return; |
189 | } |
190 | |
191 | auto batch_minmax = std::minmax_element(values + begin_offset, values + end_offset, |
192 | std::ref(*(this->comparator_))); |
193 | |
194 | SetMinMax(*batch_minmax.first, *batch_minmax.second); |
195 | } |
196 | |
197 | template <typename DType> |
198 | void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values, |
199 | const uint8_t* valid_bits, |
200 | int64_t valid_bits_offset, |
201 | int64_t num_not_null, |
202 | int64_t num_null) { |
203 | DCHECK_GE(num_not_null, 0); |
204 | DCHECK_GE(num_null, 0); |
205 | |
206 | IncrementNullCount(num_null); |
207 | IncrementNumValues(num_not_null); |
208 | // TODO: support distinct count? |
209 | if (num_not_null == 0) return; |
210 | |
211 | // Find first valid entry and use that for min/max |
212 | // As (num_not_null != 0) there must be one |
213 | int64_t length = num_null + num_not_null; |
214 | int64_t i = 0; |
215 | ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, |
216 | length); |
217 | StatsHelper<T> helper; |
218 | for (; i < length; i++) { |
219 | // PARQUET-1225: Handle NaNs |
220 | if (valid_bits_reader.IsSet() && !helper.IsNaN(values[i])) { |
221 | break; |
222 | } |
223 | valid_bits_reader.Next(); |
224 | } |
225 | |
226 | // All are NaNs and stats are not set yet |
227 | if ((i == length) && helper.IsNaN(values[i - 1])) { |
228 | // Don't set has_min_max flag since |
229 | // these values must be over-written by valid stats later |
230 | if (!has_min_max_) { |
231 | SetNaN(&min_); |
232 | SetNaN(&max_); |
233 | } |
234 | return; |
235 | } |
236 | |
237 | T min = values[i]; |
238 | T max = values[i]; |
239 | for (; i < length; i++) { |
240 | if (valid_bits_reader.IsSet()) { |
241 | if ((std::ref(*(this->comparator_)))(values[i], min)) { |
242 | min = values[i]; |
243 | } else if ((std::ref(*(this->comparator_)))(max, values[i])) { |
244 | max = values[i]; |
245 | } |
246 | } |
247 | valid_bits_reader.Next(); |
248 | } |
249 | |
250 | SetMinMax(min, max); |
251 | } |
252 | |
253 | template <typename DType> |
254 | const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const { |
255 | return min_; |
256 | } |
257 | |
258 | template <typename DType> |
259 | const typename DType::c_type& TypedRowGroupStatistics<DType>::max() const { |
260 | return max_; |
261 | } |
262 | |
263 | template <typename DType> |
264 | void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>& other) { |
265 | this->MergeCounts(other); |
266 | |
267 | if (!other.HasMinMax()) return; |
268 | |
269 | SetMinMax(other.min_, other.max_); |
270 | } |
271 | |
272 | template <typename DType> |
273 | std::string TypedRowGroupStatistics<DType>::EncodeMin() { |
274 | std::string s; |
275 | if (HasMinMax()) this->PlainEncode(min_, &s); |
276 | return s; |
277 | } |
278 | |
279 | template <typename DType> |
280 | std::string TypedRowGroupStatistics<DType>::EncodeMax() { |
281 | std::string s; |
282 | if (HasMinMax()) this->PlainEncode(max_, &s); |
283 | return s; |
284 | } |
285 | |
286 | template <typename DType> |
287 | EncodedStatistics TypedRowGroupStatistics<DType>::Encode() { |
288 | EncodedStatistics s; |
289 | if (HasMinMax()) { |
290 | s.set_min(this->EncodeMin()); |
291 | s.set_max(this->EncodeMax()); |
292 | } |
293 | s.set_null_count(this->null_count()); |
294 | return s; |
295 | } |
296 | |
297 | template <typename DType> |
298 | void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) { |
299 | PlainEncoder<DType> encoder(descr(), pool_); |
300 | encoder.Put(&src, 1); |
301 | auto buffer = encoder.FlushValues(); |
302 | auto ptr = reinterpret_cast<const char*>(buffer->data()); |
303 | dst->assign(ptr, buffer->size()); |
304 | } |
305 | |
306 | template <typename DType> |
307 | void TypedRowGroupStatistics<DType>::PlainDecode(const std::string& src, T* dst) { |
308 | PlainDecoder<DType> decoder(descr()); |
309 | decoder.SetData(1, reinterpret_cast<const uint8_t*>(src.c_str()), |
310 | static_cast<int>(src.size())); |
311 | decoder.Decode(dst, 1); |
312 | } |
313 | |
314 | template <> |
315 | void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst) { |
316 | dst->assign(reinterpret_cast<const char*>(src.ptr), src.len); |
317 | } |
318 | |
319 | template <> |
320 | void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst) { |
321 | dst->len = static_cast<uint32_t>(src.size()); |
322 | dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str()); |
323 | } |
324 | |
325 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<BooleanType>; |
326 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int32Type>; |
327 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int64Type>; |
328 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int96Type>; |
329 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FloatType>; |
330 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<DoubleType>; |
331 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<ByteArrayType>; |
332 | template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FLBAType>; |
333 | |
334 | } // namespace parquet |
335 | |