1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/array/builder_adaptive.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstddef> |
22 | #include <cstdint> |
23 | #include <utility> |
24 | |
25 | #include "arrow/array.h" |
26 | #include "arrow/buffer.h" |
27 | #include "arrow/status.h" |
28 | #include "arrow/type.h" |
29 | #include "arrow/type_traits.h" |
30 | #include "arrow/util/bit-util.h" |
31 | #include "arrow/util/int-util.h" |
32 | #include "arrow/util/logging.h" |
33 | |
34 | namespace arrow { |
35 | |
36 | using internal::AdaptiveIntBuilderBase; |
37 | |
38 | AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool) |
39 | : ArrayBuilder(int64(), pool), |
40 | data_(nullptr), |
41 | raw_data_(nullptr), |
42 | int_size_(1), |
43 | pending_pos_(0), |
44 | pending_has_nulls_(false) {} |
45 | |
46 | void AdaptiveIntBuilderBase::Reset() { |
47 | ArrayBuilder::Reset(); |
48 | data_.reset(); |
49 | raw_data_ = nullptr; |
50 | pending_pos_ = 0; |
51 | pending_has_nulls_ = false; |
52 | } |
53 | |
54 | Status AdaptiveIntBuilderBase::Resize(int64_t capacity) { |
55 | RETURN_NOT_OK(CheckCapacity(capacity, capacity_)); |
56 | capacity = std::max(capacity, kMinBuilderCapacity); |
57 | |
58 | int64_t nbytes = capacity * int_size_; |
59 | if (capacity_ == 0) { |
60 | RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &data_)); |
61 | } else { |
62 | RETURN_NOT_OK(data_->Resize(nbytes)); |
63 | } |
64 | raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data()); |
65 | |
66 | return ArrayBuilder::Resize(capacity); |
67 | } |
68 | |
69 | AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : AdaptiveIntBuilderBase(pool) {} |
70 | |
71 | Status AdaptiveIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) { |
72 | RETURN_NOT_OK(CommitPendingData()); |
73 | |
74 | std::shared_ptr<DataType> output_type; |
75 | switch (int_size_) { |
76 | case 1: |
77 | output_type = int8(); |
78 | break; |
79 | case 2: |
80 | output_type = int16(); |
81 | break; |
82 | case 4: |
83 | output_type = int32(); |
84 | break; |
85 | case 8: |
86 | output_type = int64(); |
87 | break; |
88 | default: |
89 | DCHECK(false); |
90 | return Status::NotImplemented("Only ints of size 1,2,4,8 are supported" ); |
91 | } |
92 | |
93 | std::shared_ptr<Buffer> null_bitmap; |
94 | RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); |
95 | RETURN_NOT_OK(TrimBuffer(length_ * int_size_, data_.get())); |
96 | |
97 | *out = ArrayData::Make(output_type, length_, {null_bitmap, data_}, null_count_); |
98 | |
99 | data_ = nullptr; |
100 | capacity_ = length_ = null_count_ = 0; |
101 | return Status::OK(); |
102 | } |
103 | |
104 | Status AdaptiveIntBuilder::CommitPendingData() { |
105 | if (pending_pos_ == 0) { |
106 | return Status::OK(); |
107 | } |
108 | RETURN_NOT_OK(Reserve(pending_pos_)); |
109 | const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr; |
110 | RETURN_NOT_OK(AppendValuesInternal(reinterpret_cast<const int64_t*>(pending_data_), |
111 | pending_pos_, valid_bytes)); |
112 | pending_has_nulls_ = false; |
113 | pending_pos_ = 0; |
114 | return Status::OK(); |
115 | } |
116 | |
117 | static constexpr int64_t kAdaptiveIntChunkSize = 8192; |
118 | |
119 | Status AdaptiveIntBuilder::AppendValuesInternal(const int64_t* values, int64_t length, |
120 | const uint8_t* valid_bytes) { |
121 | while (length > 0) { |
122 | // In case `length` is very large, we don't want to trash the cache by |
123 | // scanning it twice (first to detect int width, second to copy the data). |
124 | // Instead, process data in L2-cacheable chunks. |
125 | const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize); |
126 | |
127 | uint8_t new_int_size; |
128 | new_int_size = internal::DetectIntWidth(values, valid_bytes, chunk_size, int_size_); |
129 | |
130 | DCHECK_GE(new_int_size, int_size_); |
131 | if (new_int_size > int_size_) { |
132 | // This updates int_size_ |
133 | RETURN_NOT_OK(ExpandIntSize(new_int_size)); |
134 | } |
135 | |
136 | switch (int_size_) { |
137 | case 1: |
138 | internal::DowncastInts(values, reinterpret_cast<int8_t*>(raw_data_) + length_, |
139 | chunk_size); |
140 | break; |
141 | case 2: |
142 | internal::DowncastInts(values, reinterpret_cast<int16_t*>(raw_data_) + length_, |
143 | chunk_size); |
144 | break; |
145 | case 4: |
146 | internal::DowncastInts(values, reinterpret_cast<int32_t*>(raw_data_) + length_, |
147 | chunk_size); |
148 | break; |
149 | case 8: |
150 | internal::DowncastInts(values, reinterpret_cast<int64_t*>(raw_data_) + length_, |
151 | chunk_size); |
152 | break; |
153 | default: |
154 | DCHECK(false); |
155 | } |
156 | |
157 | // This updates length_ |
158 | ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size); |
159 | values += chunk_size; |
160 | if (valid_bytes != nullptr) { |
161 | valid_bytes += chunk_size; |
162 | } |
163 | length -= chunk_size; |
164 | } |
165 | |
166 | return Status::OK(); |
167 | } |
168 | |
169 | Status AdaptiveUIntBuilder::CommitPendingData() { |
170 | if (pending_pos_ == 0) { |
171 | return Status::OK(); |
172 | } |
173 | RETURN_NOT_OK(Reserve(pending_pos_)); |
174 | const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr; |
175 | RETURN_NOT_OK(AppendValuesInternal(pending_data_, pending_pos_, valid_bytes)); |
176 | pending_has_nulls_ = false; |
177 | pending_pos_ = 0; |
178 | return Status::OK(); |
179 | } |
180 | |
181 | Status AdaptiveIntBuilder::AppendValues(const int64_t* values, int64_t length, |
182 | const uint8_t* valid_bytes) { |
183 | RETURN_NOT_OK(CommitPendingData()); |
184 | RETURN_NOT_OK(Reserve(length)); |
185 | |
186 | return AppendValuesInternal(values, length, valid_bytes); |
187 | } |
188 | |
189 | template <typename new_type, typename old_type> |
190 | typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type |
191 | AdaptiveIntBuilder::ExpandIntSizeInternal() { |
192 | return Status::OK(); |
193 | } |
194 | |
195 | #define __LESS(a, b) (a) < (b) |
196 | template <typename new_type, typename old_type> |
197 | typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type |
198 | AdaptiveIntBuilder::ExpandIntSizeInternal() { |
199 | int_size_ = sizeof(new_type); |
200 | RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type))); |
201 | raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data()); |
202 | const old_type* src = reinterpret_cast<old_type*>(raw_data_); |
203 | new_type* dst = reinterpret_cast<new_type*>(raw_data_); |
204 | |
205 | // By doing the backward copy, we ensure that no element is overriden during |
206 | // the copy process and the copy stays in-place. |
207 | std::copy_backward(src, src + length_, dst + length_); |
208 | |
209 | return Status::OK(); |
210 | } |
211 | #undef __LESS |
212 | |
213 | template <typename new_type> |
214 | Status AdaptiveIntBuilder::ExpandIntSizeN() { |
215 | switch (int_size_) { |
216 | case 1: |
217 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int8_t>())); |
218 | break; |
219 | case 2: |
220 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int16_t>())); |
221 | break; |
222 | case 4: |
223 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int32_t>())); |
224 | break; |
225 | case 8: |
226 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int64_t>())); |
227 | break; |
228 | default: |
229 | DCHECK(false); |
230 | } |
231 | return Status::OK(); |
232 | } |
233 | |
234 | Status AdaptiveIntBuilder::ExpandIntSize(uint8_t new_int_size) { |
235 | switch (new_int_size) { |
236 | case 1: |
237 | RETURN_NOT_OK((ExpandIntSizeN<int8_t>())); |
238 | break; |
239 | case 2: |
240 | RETURN_NOT_OK((ExpandIntSizeN<int16_t>())); |
241 | break; |
242 | case 4: |
243 | RETURN_NOT_OK((ExpandIntSizeN<int32_t>())); |
244 | break; |
245 | case 8: |
246 | RETURN_NOT_OK((ExpandIntSizeN<int64_t>())); |
247 | break; |
248 | default: |
249 | DCHECK(false); |
250 | } |
251 | return Status::OK(); |
252 | } |
253 | |
254 | AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool) |
255 | : AdaptiveIntBuilderBase(pool) {} |
256 | |
257 | Status AdaptiveUIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) { |
258 | RETURN_NOT_OK(CommitPendingData()); |
259 | |
260 | std::shared_ptr<DataType> output_type; |
261 | switch (int_size_) { |
262 | case 1: |
263 | output_type = uint8(); |
264 | break; |
265 | case 2: |
266 | output_type = uint16(); |
267 | break; |
268 | case 4: |
269 | output_type = uint32(); |
270 | break; |
271 | case 8: |
272 | output_type = uint64(); |
273 | break; |
274 | default: |
275 | DCHECK(false); |
276 | return Status::NotImplemented("Only ints of size 1,2,4,8 are supported" ); |
277 | } |
278 | |
279 | std::shared_ptr<Buffer> null_bitmap; |
280 | RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); |
281 | RETURN_NOT_OK(TrimBuffer(length_ * int_size_, data_.get())); |
282 | |
283 | *out = ArrayData::Make(output_type, length_, {null_bitmap, data_}, null_count_); |
284 | |
285 | data_ = nullptr; |
286 | capacity_ = length_ = null_count_ = 0; |
287 | return Status::OK(); |
288 | } |
289 | |
290 | Status AdaptiveUIntBuilder::AppendValuesInternal(const uint64_t* values, int64_t length, |
291 | const uint8_t* valid_bytes) { |
292 | while (length > 0) { |
293 | // See AdaptiveIntBuilder::AppendValuesInternal |
294 | const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize); |
295 | |
296 | uint8_t new_int_size; |
297 | new_int_size = internal::DetectUIntWidth(values, valid_bytes, chunk_size, int_size_); |
298 | |
299 | DCHECK_GE(new_int_size, int_size_); |
300 | if (new_int_size > int_size_) { |
301 | // This updates int_size_ |
302 | RETURN_NOT_OK(ExpandIntSize(new_int_size)); |
303 | } |
304 | |
305 | switch (int_size_) { |
306 | case 1: |
307 | internal::DowncastUInts(values, reinterpret_cast<uint8_t*>(raw_data_) + length_, |
308 | chunk_size); |
309 | break; |
310 | case 2: |
311 | internal::DowncastUInts(values, reinterpret_cast<uint16_t*>(raw_data_) + length_, |
312 | chunk_size); |
313 | break; |
314 | case 4: |
315 | internal::DowncastUInts(values, reinterpret_cast<uint32_t*>(raw_data_) + length_, |
316 | chunk_size); |
317 | break; |
318 | case 8: |
319 | internal::DowncastUInts(values, reinterpret_cast<uint64_t*>(raw_data_) + length_, |
320 | chunk_size); |
321 | break; |
322 | default: |
323 | DCHECK(false); |
324 | } |
325 | |
326 | // This updates length_ |
327 | ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size); |
328 | values += chunk_size; |
329 | if (valid_bytes != nullptr) { |
330 | valid_bytes += chunk_size; |
331 | } |
332 | length -= chunk_size; |
333 | } |
334 | |
335 | return Status::OK(); |
336 | } |
337 | |
338 | Status AdaptiveUIntBuilder::AppendValues(const uint64_t* values, int64_t length, |
339 | const uint8_t* valid_bytes) { |
340 | RETURN_NOT_OK(Reserve(length)); |
341 | |
342 | return AppendValuesInternal(values, length, valid_bytes); |
343 | } |
344 | |
345 | template <typename new_type, typename old_type> |
346 | typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type |
347 | AdaptiveUIntBuilder::ExpandIntSizeInternal() { |
348 | return Status::OK(); |
349 | } |
350 | |
351 | #define __LESS(a, b) (a) < (b) |
352 | template <typename new_type, typename old_type> |
353 | typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type |
354 | AdaptiveUIntBuilder::ExpandIntSizeInternal() { |
355 | int_size_ = sizeof(new_type); |
356 | RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type))); |
357 | |
358 | old_type* src = reinterpret_cast<old_type*>(raw_data_); |
359 | new_type* dst = reinterpret_cast<new_type*>(raw_data_); |
360 | // By doing the backward copy, we ensure that no element is overriden during |
361 | // the copy process and the copy stays in-place. |
362 | std::copy_backward(src, src + length_, dst + length_); |
363 | |
364 | return Status::OK(); |
365 | } |
366 | #undef __LESS |
367 | |
368 | template <typename new_type> |
369 | Status AdaptiveUIntBuilder::ExpandIntSizeN() { |
370 | switch (int_size_) { |
371 | case 1: |
372 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint8_t>())); |
373 | break; |
374 | case 2: |
375 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint16_t>())); |
376 | break; |
377 | case 4: |
378 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint32_t>())); |
379 | break; |
380 | case 8: |
381 | RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint64_t>())); |
382 | break; |
383 | default: |
384 | DCHECK(false); |
385 | } |
386 | return Status::OK(); |
387 | } |
388 | |
389 | Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t new_int_size) { |
390 | switch (new_int_size) { |
391 | case 1: |
392 | RETURN_NOT_OK((ExpandIntSizeN<uint8_t>())); |
393 | break; |
394 | case 2: |
395 | RETURN_NOT_OK((ExpandIntSizeN<uint16_t>())); |
396 | break; |
397 | case 4: |
398 | RETURN_NOT_OK((ExpandIntSizeN<uint32_t>())); |
399 | break; |
400 | case 8: |
401 | RETURN_NOT_OK((ExpandIntSizeN<uint64_t>())); |
402 | break; |
403 | default: |
404 | DCHECK(false); |
405 | } |
406 | return Status::OK(); |
407 | } |
408 | |
409 | } // namespace arrow |
410 | |