1#include "duckdb/main/appender.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/exception.hpp"
5#include "duckdb/main/connection.hpp"
6#include "duckdb/main/client_context.hpp"
7#include "duckdb/main/database.hpp"
8#include "duckdb/storage/data_table.hpp"
9
10#include "duckdb/common/operator/cast_operators.hpp"
11
12using namespace duckdb;
13using namespace std;
14
15Appender::Appender(Connection &con, string schema_name, string table_name) : con(con), column(0) {
16 description = con.TableInfo(schema_name, table_name);
17 if (!description) {
18 // table could not be found
19 throw CatalogException(
20 StringUtil::Format("Table \"%s.%s\" could not be found", schema_name.c_str(), table_name.c_str()));
21 } else {
22 vector<TypeId> types;
23 for (auto &column : description->columns) {
24 types.push_back(GetInternalType(column.type));
25 }
26 chunk.Initialize(types);
27 con.context->RegisterAppender(this);
28 }
29}
30
31Appender::Appender(Connection &con, string table_name) : Appender(con, DEFAULT_SCHEMA, table_name) {
32}
33
34Appender::~Appender() {
35 Close();
36}
37
38void Appender::CheckInvalidated() {
39 if (!invalidated_msg.empty()) {
40 throw Exception("Invalid appender: " + invalidated_msg);
41 }
42}
43
44void Appender::BeginRow() {
45 CheckInvalidated();
46}
47
48void Appender::EndRow() {
49 CheckInvalidated();
50 // check that all rows have been appended to
51 if (column != chunk.column_count()) {
52 InvalidateException("Call to EndRow before all rows have been appended to!");
53 }
54 column = 0;
55 chunk.SetCardinality(chunk.size() + 1);
56 if (chunk.size() >= STANDARD_VECTOR_SIZE) {
57 Flush();
58 }
59}
60
61template <class SRC, class DST> void Appender::AppendValueInternal(Vector &col, SRC input) {
62 FlatVector::GetData<DST>(col)[chunk.size()] = Cast::Operation<SRC, DST>(input);
63}
64
65void Appender::InvalidateException(string msg) {
66 Invalidate(msg);
67 throw Exception(msg);
68}
69
70template <class T> void Appender::AppendValueInternal(T input) {
71 CheckInvalidated();
72 if (column >= chunk.column_count()) {
73 InvalidateException("Too many appends for chunk!");
74 }
75 auto &col = chunk.data[column];
76 switch (col.type) {
77 case TypeId::BOOL:
78 AppendValueInternal<T, bool>(col, input);
79 break;
80 case TypeId::INT8:
81 AppendValueInternal<T, int8_t>(col, input);
82 break;
83 case TypeId::INT16:
84 AppendValueInternal<T, int16_t>(col, input);
85 break;
86 case TypeId::INT32:
87 AppendValueInternal<T, int32_t>(col, input);
88 break;
89 case TypeId::INT64:
90 AppendValueInternal<T, int64_t>(col, input);
91 break;
92 case TypeId::FLOAT:
93 AppendValueInternal<T, float>(col, input);
94 break;
95 case TypeId::DOUBLE:
96 AppendValueInternal<T, double>(col, input);
97 break;
98 default:
99 AppendValue(Value::CreateValue<T>(input));
100 return;
101 }
102 column++;
103}
104
105template <> void Appender::Append(bool value) {
106 AppendValueInternal<bool>(value);
107}
108
109template <> void Appender::Append(int8_t value) {
110 AppendValueInternal<int8_t>(value);
111}
112
113template <> void Appender::Append(int16_t value) {
114 AppendValueInternal<int16_t>(value);
115}
116
117template <> void Appender::Append(int32_t value) {
118 AppendValueInternal<int32_t>(value);
119}
120
121template <> void Appender::Append(int64_t value) {
122 AppendValueInternal<int64_t>(value);
123}
124
125template <> void Appender::Append(const char *value) {
126 AppendValueInternal<string_t>(string_t(value));
127}
128
129template <> void Appender::Append(float value) {
130 if (!Value::FloatIsValid(value)) {
131 InvalidateException("Float value is out of range!");
132 }
133 AppendValueInternal<float>(value);
134}
135
136template <> void Appender::Append(double value) {
137 if (!Value::DoubleIsValid(value)) {
138 InvalidateException("Double value is out of range!");
139 }
140 AppendValueInternal<double>(value);
141}
142
143template <> void Appender::Append(Value value) {
144 if (column >= chunk.column_count()) {
145 InvalidateException("Too many appends for chunk!");
146 }
147 AppendValue(move(value));
148}
149
150template <> void Appender::Append(nullptr_t value) {
151 if (column >= chunk.column_count()) {
152 InvalidateException("Too many appends for chunk!");
153 }
154 auto &col = chunk.data[column++];
155 FlatVector::SetNull(col, chunk.size(), true);
156}
157
158void Appender::AppendValue(Value value) {
159 chunk.SetValue(column, chunk.size(), value);
160 column++;
161}
162
163void Appender::Flush() {
164 CheckInvalidated();
165 try {
166 // check that all vectors have the same length before appending
167 if (column != 0) {
168 throw Exception("Failed to Flush appender: incomplete append to row!");
169 }
170
171 if (chunk.size() == 0) {
172 return;
173 }
174 con.Append(*description, chunk);
175 } catch (Exception &ex) {
176 Invalidate(ex.what());
177 throw ex;
178 }
179 chunk.Reset();
180 column = 0;
181}
182
183void Appender::Close() {
184 if (!invalidated_msg.empty()) {
185 return;
186 }
187 if (column == 0 || column == chunk.column_count()) {
188 Flush();
189 }
190 Invalidate("The appender has been closed!");
191}
192
193void Appender::Invalidate(string msg, bool close) {
194 if (!invalidated_msg.empty()) {
195 return;
196 }
197 if (close) {
198 con.context->RemoveAppender(this);
199 }
200 assert(!msg.empty());
201 invalidated_msg = msg;
202}
203