1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/array/builder_adaptive.h"
19
20#include <algorithm>
21#include <cstddef>
22#include <cstdint>
23#include <utility>
24
25#include "arrow/array.h"
26#include "arrow/buffer.h"
27#include "arrow/status.h"
28#include "arrow/type.h"
29#include "arrow/type_traits.h"
30#include "arrow/util/bit-util.h"
31#include "arrow/util/int-util.h"
32#include "arrow/util/logging.h"
33
34namespace arrow {
35
36using internal::AdaptiveIntBuilderBase;
37
38AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool)
39 : ArrayBuilder(int64(), pool),
40 data_(nullptr),
41 raw_data_(nullptr),
42 int_size_(1),
43 pending_pos_(0),
44 pending_has_nulls_(false) {}
45
46void AdaptiveIntBuilderBase::Reset() {
47 ArrayBuilder::Reset();
48 data_.reset();
49 raw_data_ = nullptr;
50 pending_pos_ = 0;
51 pending_has_nulls_ = false;
52}
53
54Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
55 RETURN_NOT_OK(CheckCapacity(capacity, capacity_));
56 capacity = std::max(capacity, kMinBuilderCapacity);
57
58 int64_t nbytes = capacity * int_size_;
59 if (capacity_ == 0) {
60 RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &data_));
61 } else {
62 RETURN_NOT_OK(data_->Resize(nbytes));
63 }
64 raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
65
66 return ArrayBuilder::Resize(capacity);
67}
68
69AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : AdaptiveIntBuilderBase(pool) {}
70
71Status AdaptiveIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
72 RETURN_NOT_OK(CommitPendingData());
73
74 std::shared_ptr<DataType> output_type;
75 switch (int_size_) {
76 case 1:
77 output_type = int8();
78 break;
79 case 2:
80 output_type = int16();
81 break;
82 case 4:
83 output_type = int32();
84 break;
85 case 8:
86 output_type = int64();
87 break;
88 default:
89 DCHECK(false);
90 return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
91 }
92
93 std::shared_ptr<Buffer> null_bitmap;
94 RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
95 RETURN_NOT_OK(TrimBuffer(length_ * int_size_, data_.get()));
96
97 *out = ArrayData::Make(output_type, length_, {null_bitmap, data_}, null_count_);
98
99 data_ = nullptr;
100 capacity_ = length_ = null_count_ = 0;
101 return Status::OK();
102}
103
104Status AdaptiveIntBuilder::CommitPendingData() {
105 if (pending_pos_ == 0) {
106 return Status::OK();
107 }
108 RETURN_NOT_OK(Reserve(pending_pos_));
109 const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
110 RETURN_NOT_OK(AppendValuesInternal(reinterpret_cast<const int64_t*>(pending_data_),
111 pending_pos_, valid_bytes));
112 pending_has_nulls_ = false;
113 pending_pos_ = 0;
114 return Status::OK();
115}
116
117static constexpr int64_t kAdaptiveIntChunkSize = 8192;
118
119Status AdaptiveIntBuilder::AppendValuesInternal(const int64_t* values, int64_t length,
120 const uint8_t* valid_bytes) {
121 while (length > 0) {
122 // In case `length` is very large, we don't want to trash the cache by
123 // scanning it twice (first to detect int width, second to copy the data).
124 // Instead, process data in L2-cacheable chunks.
125 const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize);
126
127 uint8_t new_int_size;
128 new_int_size = internal::DetectIntWidth(values, valid_bytes, chunk_size, int_size_);
129
130 DCHECK_GE(new_int_size, int_size_);
131 if (new_int_size > int_size_) {
132 // This updates int_size_
133 RETURN_NOT_OK(ExpandIntSize(new_int_size));
134 }
135
136 switch (int_size_) {
137 case 1:
138 internal::DowncastInts(values, reinterpret_cast<int8_t*>(raw_data_) + length_,
139 chunk_size);
140 break;
141 case 2:
142 internal::DowncastInts(values, reinterpret_cast<int16_t*>(raw_data_) + length_,
143 chunk_size);
144 break;
145 case 4:
146 internal::DowncastInts(values, reinterpret_cast<int32_t*>(raw_data_) + length_,
147 chunk_size);
148 break;
149 case 8:
150 internal::DowncastInts(values, reinterpret_cast<int64_t*>(raw_data_) + length_,
151 chunk_size);
152 break;
153 default:
154 DCHECK(false);
155 }
156
157 // This updates length_
158 ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size);
159 values += chunk_size;
160 if (valid_bytes != nullptr) {
161 valid_bytes += chunk_size;
162 }
163 length -= chunk_size;
164 }
165
166 return Status::OK();
167}
168
169Status AdaptiveUIntBuilder::CommitPendingData() {
170 if (pending_pos_ == 0) {
171 return Status::OK();
172 }
173 RETURN_NOT_OK(Reserve(pending_pos_));
174 const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
175 RETURN_NOT_OK(AppendValuesInternal(pending_data_, pending_pos_, valid_bytes));
176 pending_has_nulls_ = false;
177 pending_pos_ = 0;
178 return Status::OK();
179}
180
181Status AdaptiveIntBuilder::AppendValues(const int64_t* values, int64_t length,
182 const uint8_t* valid_bytes) {
183 RETURN_NOT_OK(CommitPendingData());
184 RETURN_NOT_OK(Reserve(length));
185
186 return AppendValuesInternal(values, length, valid_bytes);
187}
188
189template <typename new_type, typename old_type>
190typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
191AdaptiveIntBuilder::ExpandIntSizeInternal() {
192 return Status::OK();
193}
194
195#define __LESS(a, b) (a) < (b)
196template <typename new_type, typename old_type>
197typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
198AdaptiveIntBuilder::ExpandIntSizeInternal() {
199 int_size_ = sizeof(new_type);
200 RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
201 raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
202 const old_type* src = reinterpret_cast<old_type*>(raw_data_);
203 new_type* dst = reinterpret_cast<new_type*>(raw_data_);
204
205 // By doing the backward copy, we ensure that no element is overriden during
206 // the copy process and the copy stays in-place.
207 std::copy_backward(src, src + length_, dst + length_);
208
209 return Status::OK();
210}
211#undef __LESS
212
213template <typename new_type>
214Status AdaptiveIntBuilder::ExpandIntSizeN() {
215 switch (int_size_) {
216 case 1:
217 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int8_t>()));
218 break;
219 case 2:
220 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int16_t>()));
221 break;
222 case 4:
223 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int32_t>()));
224 break;
225 case 8:
226 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int64_t>()));
227 break;
228 default:
229 DCHECK(false);
230 }
231 return Status::OK();
232}
233
234Status AdaptiveIntBuilder::ExpandIntSize(uint8_t new_int_size) {
235 switch (new_int_size) {
236 case 1:
237 RETURN_NOT_OK((ExpandIntSizeN<int8_t>()));
238 break;
239 case 2:
240 RETURN_NOT_OK((ExpandIntSizeN<int16_t>()));
241 break;
242 case 4:
243 RETURN_NOT_OK((ExpandIntSizeN<int32_t>()));
244 break;
245 case 8:
246 RETURN_NOT_OK((ExpandIntSizeN<int64_t>()));
247 break;
248 default:
249 DCHECK(false);
250 }
251 return Status::OK();
252}
253
254AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool)
255 : AdaptiveIntBuilderBase(pool) {}
256
257Status AdaptiveUIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
258 RETURN_NOT_OK(CommitPendingData());
259
260 std::shared_ptr<DataType> output_type;
261 switch (int_size_) {
262 case 1:
263 output_type = uint8();
264 break;
265 case 2:
266 output_type = uint16();
267 break;
268 case 4:
269 output_type = uint32();
270 break;
271 case 8:
272 output_type = uint64();
273 break;
274 default:
275 DCHECK(false);
276 return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
277 }
278
279 std::shared_ptr<Buffer> null_bitmap;
280 RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
281 RETURN_NOT_OK(TrimBuffer(length_ * int_size_, data_.get()));
282
283 *out = ArrayData::Make(output_type, length_, {null_bitmap, data_}, null_count_);
284
285 data_ = nullptr;
286 capacity_ = length_ = null_count_ = 0;
287 return Status::OK();
288}
289
290Status AdaptiveUIntBuilder::AppendValuesInternal(const uint64_t* values, int64_t length,
291 const uint8_t* valid_bytes) {
292 while (length > 0) {
293 // See AdaptiveIntBuilder::AppendValuesInternal
294 const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize);
295
296 uint8_t new_int_size;
297 new_int_size = internal::DetectUIntWidth(values, valid_bytes, chunk_size, int_size_);
298
299 DCHECK_GE(new_int_size, int_size_);
300 if (new_int_size > int_size_) {
301 // This updates int_size_
302 RETURN_NOT_OK(ExpandIntSize(new_int_size));
303 }
304
305 switch (int_size_) {
306 case 1:
307 internal::DowncastUInts(values, reinterpret_cast<uint8_t*>(raw_data_) + length_,
308 chunk_size);
309 break;
310 case 2:
311 internal::DowncastUInts(values, reinterpret_cast<uint16_t*>(raw_data_) + length_,
312 chunk_size);
313 break;
314 case 4:
315 internal::DowncastUInts(values, reinterpret_cast<uint32_t*>(raw_data_) + length_,
316 chunk_size);
317 break;
318 case 8:
319 internal::DowncastUInts(values, reinterpret_cast<uint64_t*>(raw_data_) + length_,
320 chunk_size);
321 break;
322 default:
323 DCHECK(false);
324 }
325
326 // This updates length_
327 ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size);
328 values += chunk_size;
329 if (valid_bytes != nullptr) {
330 valid_bytes += chunk_size;
331 }
332 length -= chunk_size;
333 }
334
335 return Status::OK();
336}
337
338Status AdaptiveUIntBuilder::AppendValues(const uint64_t* values, int64_t length,
339 const uint8_t* valid_bytes) {
340 RETURN_NOT_OK(Reserve(length));
341
342 return AppendValuesInternal(values, length, valid_bytes);
343}
344
345template <typename new_type, typename old_type>
346typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
347AdaptiveUIntBuilder::ExpandIntSizeInternal() {
348 return Status::OK();
349}
350
351#define __LESS(a, b) (a) < (b)
352template <typename new_type, typename old_type>
353typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
354AdaptiveUIntBuilder::ExpandIntSizeInternal() {
355 int_size_ = sizeof(new_type);
356 RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
357
358 old_type* src = reinterpret_cast<old_type*>(raw_data_);
359 new_type* dst = reinterpret_cast<new_type*>(raw_data_);
360 // By doing the backward copy, we ensure that no element is overriden during
361 // the copy process and the copy stays in-place.
362 std::copy_backward(src, src + length_, dst + length_);
363
364 return Status::OK();
365}
366#undef __LESS
367
368template <typename new_type>
369Status AdaptiveUIntBuilder::ExpandIntSizeN() {
370 switch (int_size_) {
371 case 1:
372 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint8_t>()));
373 break;
374 case 2:
375 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint16_t>()));
376 break;
377 case 4:
378 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint32_t>()));
379 break;
380 case 8:
381 RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint64_t>()));
382 break;
383 default:
384 DCHECK(false);
385 }
386 return Status::OK();
387}
388
389Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t new_int_size) {
390 switch (new_int_size) {
391 case 1:
392 RETURN_NOT_OK((ExpandIntSizeN<uint8_t>()));
393 break;
394 case 2:
395 RETURN_NOT_OK((ExpandIntSizeN<uint16_t>()));
396 break;
397 case 4:
398 RETURN_NOT_OK((ExpandIntSizeN<uint32_t>()));
399 break;
400 case 8:
401 RETURN_NOT_OK((ExpandIntSizeN<uint64_t>()));
402 break;
403 default:
404 DCHECK(false);
405 }
406 return Status::OK();
407}
408
409} // namespace arrow
410