1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/table_builder.h" |
19 | |
20 | #include <memory> |
21 | #include <utility> |
22 | |
23 | #include "arrow/array.h" |
24 | #include "arrow/builder.h" |
25 | #include "arrow/record_batch.h" |
26 | #include "arrow/status.h" |
27 | #include "arrow/type.h" |
28 | #include "arrow/util/logging.h" |
29 | |
30 | namespace arrow { |
31 | |
32 | // ---------------------------------------------------------------------- |
33 | // RecordBatchBuilder |
34 | |
35 | RecordBatchBuilder::RecordBatchBuilder(const std::shared_ptr<Schema>& schema, |
36 | MemoryPool* pool, int64_t initial_capacity) |
37 | : schema_(schema), initial_capacity_(initial_capacity), pool_(pool) {} |
38 | |
39 | Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool, |
40 | std::unique_ptr<RecordBatchBuilder>* builder) { |
41 | return Make(schema, pool, kMinBuilderCapacity, builder); |
42 | } |
43 | |
44 | Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool, |
45 | int64_t initial_capacity, |
46 | std::unique_ptr<RecordBatchBuilder>* builder) { |
47 | builder->reset(new RecordBatchBuilder(schema, pool, initial_capacity)); |
48 | RETURN_NOT_OK((*builder)->CreateBuilders()); |
49 | return (*builder)->InitBuilders(); |
50 | } |
51 | |
52 | Status RecordBatchBuilder::Flush(bool reset_builders, |
53 | std::shared_ptr<RecordBatch>* batch) { |
54 | std::vector<std::shared_ptr<Array>> fields; |
55 | fields.resize(this->num_fields()); |
56 | |
57 | int64_t length = 0; |
58 | for (int i = 0; i < this->num_fields(); ++i) { |
59 | RETURN_NOT_OK(raw_field_builders_[i]->Finish(&fields[i])); |
60 | if (i > 0 && fields[i]->length() != length) { |
61 | return Status::Invalid("All fields must be same length when calling Flush" ); |
62 | } |
63 | length = fields[i]->length(); |
64 | } |
65 | *batch = RecordBatch::Make(schema_, length, std::move(fields)); |
66 | if (reset_builders) { |
67 | return InitBuilders(); |
68 | } else { |
69 | return Status::OK(); |
70 | } |
71 | } |
72 | |
73 | Status RecordBatchBuilder::Flush(std::shared_ptr<RecordBatch>* batch) { |
74 | return Flush(true, batch); |
75 | } |
76 | |
77 | void RecordBatchBuilder::SetInitialCapacity(int64_t capacity) { |
78 | DCHECK_GT(capacity, 0) << "Initial capacity must be positive" ; |
79 | initial_capacity_ = capacity; |
80 | } |
81 | |
82 | Status RecordBatchBuilder::CreateBuilders() { |
83 | field_builders_.resize(this->num_fields()); |
84 | raw_field_builders_.resize(this->num_fields()); |
85 | for (int i = 0; i < this->num_fields(); ++i) { |
86 | RETURN_NOT_OK(MakeBuilder(pool_, schema_->field(i)->type(), &field_builders_[i])); |
87 | raw_field_builders_[i] = field_builders_[i].get(); |
88 | } |
89 | return Status::OK(); |
90 | } |
91 | |
92 | Status RecordBatchBuilder::InitBuilders() { |
93 | for (int i = 0; i < this->num_fields(); ++i) { |
94 | RETURN_NOT_OK(raw_field_builders_[i]->Reserve(initial_capacity_)); |
95 | } |
96 | return Status::OK(); |
97 | } |
98 | |
99 | } // namespace arrow |
100 | |