1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/table_builder.h"
19
20#include <memory>
21#include <utility>
22
23#include "arrow/array.h"
24#include "arrow/builder.h"
25#include "arrow/record_batch.h"
26#include "arrow/status.h"
27#include "arrow/type.h"
28#include "arrow/util/logging.h"
29
30namespace arrow {
31
32// ----------------------------------------------------------------------
33// RecordBatchBuilder
34
35RecordBatchBuilder::RecordBatchBuilder(const std::shared_ptr<Schema>& schema,
36 MemoryPool* pool, int64_t initial_capacity)
37 : schema_(schema), initial_capacity_(initial_capacity), pool_(pool) {}
38
39Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
40 std::unique_ptr<RecordBatchBuilder>* builder) {
41 return Make(schema, pool, kMinBuilderCapacity, builder);
42}
43
44Status RecordBatchBuilder::Make(const std::shared_ptr<Schema>& schema, MemoryPool* pool,
45 int64_t initial_capacity,
46 std::unique_ptr<RecordBatchBuilder>* builder) {
47 builder->reset(new RecordBatchBuilder(schema, pool, initial_capacity));
48 RETURN_NOT_OK((*builder)->CreateBuilders());
49 return (*builder)->InitBuilders();
50}
51
52Status RecordBatchBuilder::Flush(bool reset_builders,
53 std::shared_ptr<RecordBatch>* batch) {
54 std::vector<std::shared_ptr<Array>> fields;
55 fields.resize(this->num_fields());
56
57 int64_t length = 0;
58 for (int i = 0; i < this->num_fields(); ++i) {
59 RETURN_NOT_OK(raw_field_builders_[i]->Finish(&fields[i]));
60 if (i > 0 && fields[i]->length() != length) {
61 return Status::Invalid("All fields must be same length when calling Flush");
62 }
63 length = fields[i]->length();
64 }
65 *batch = RecordBatch::Make(schema_, length, std::move(fields));
66 if (reset_builders) {
67 return InitBuilders();
68 } else {
69 return Status::OK();
70 }
71}
72
73Status RecordBatchBuilder::Flush(std::shared_ptr<RecordBatch>* batch) {
74 return Flush(true, batch);
75}
76
77void RecordBatchBuilder::SetInitialCapacity(int64_t capacity) {
78 DCHECK_GT(capacity, 0) << "Initial capacity must be positive";
79 initial_capacity_ = capacity;
80}
81
82Status RecordBatchBuilder::CreateBuilders() {
83 field_builders_.resize(this->num_fields());
84 raw_field_builders_.resize(this->num_fields());
85 for (int i = 0; i < this->num_fields(); ++i) {
86 RETURN_NOT_OK(MakeBuilder(pool_, schema_->field(i)->type(), &field_builders_[i]));
87 raw_field_builders_[i] = field_builders_[i].get();
88 }
89 return Status::OK();
90}
91
92Status RecordBatchBuilder::InitBuilders() {
93 for (int i = 0; i < this->num_fields(); ++i) {
94 RETURN_NOT_OK(raw_field_builders_[i]->Reserve(initial_capacity_));
95 }
96 return Status::OK();
97}
98
99} // namespace arrow
100