1#include "duckdb/main/appender.hpp"
2
3#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
4#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
5#include "duckdb/common/exception.hpp"
6#include "duckdb/common/operator/cast_operators.hpp"
7#include "duckdb/common/operator/decimal_cast_operators.hpp"
8#include "duckdb/common/operator/string_cast.hpp"
9#include "duckdb/common/string_util.hpp"
10#include "duckdb/common/types/column/column_data_collection.hpp"
11#include "duckdb/main/client_context.hpp"
12#include "duckdb/main/connection.hpp"
13#include "duckdb/main/database.hpp"
14#include "duckdb/storage/data_table.hpp"
15
16namespace duckdb {
17
18BaseAppender::BaseAppender(Allocator &allocator, AppenderType type_p)
19 : allocator(allocator), column(0), appender_type(type_p) {
20}
21
22BaseAppender::BaseAppender(Allocator &allocator_p, vector<LogicalType> types_p, AppenderType type_p)
23 : allocator(allocator_p), types(std::move(types_p)), collection(make_uniq<ColumnDataCollection>(args&: allocator, args&: types)),
24 column(0), appender_type(type_p) {
25 InitializeChunk();
26}
27
28BaseAppender::~BaseAppender() {
29}
30
31void BaseAppender::Destructor() {
32 if (Exception::UncaughtException()) {
33 return;
34 }
35 // flush any remaining chunks, but only if we are not cleaning up the appender as part of an exception stack unwind
36 // wrapped in a try/catch because Close() can throw if the table was dropped in the meantime
37 try {
38 Close();
39 } catch (...) {
40 }
41}
42
43InternalAppender::InternalAppender(ClientContext &context_p, TableCatalogEntry &table_p)
44 : BaseAppender(Allocator::DefaultAllocator(), table_p.GetTypes(), AppenderType::PHYSICAL), context(context_p),
45 table(table_p) {
46}
47
48InternalAppender::~InternalAppender() {
49 Destructor();
50}
51
52Appender::Appender(Connection &con, const string &schema_name, const string &table_name)
53 : BaseAppender(Allocator::DefaultAllocator(), AppenderType::LOGICAL), context(con.context) {
54 description = con.TableInfo(schema_name, table_name);
55 if (!description) {
56 // table could not be found
57 throw CatalogException(StringUtil::Format(fmt_str: "Table \"%s.%s\" could not be found", params: schema_name, params: table_name));
58 }
59 for (auto &column : description->columns) {
60 types.push_back(x: column.Type());
61 }
62 InitializeChunk();
63 collection = make_uniq<ColumnDataCollection>(args&: allocator, args&: types);
64}
65
66Appender::Appender(Connection &con, const string &table_name) : Appender(con, DEFAULT_SCHEMA, table_name) {
67}
68
69Appender::~Appender() {
70 Destructor();
71}
72
73void BaseAppender::InitializeChunk() {
74 chunk.Initialize(allocator, types);
75}
76
77void BaseAppender::BeginRow() {
78}
79
80void BaseAppender::EndRow() {
81 // check that all rows have been appended to
82 if (column != chunk.ColumnCount()) {
83 throw InvalidInputException("Call to EndRow before all rows have been appended to!");
84 }
85 column = 0;
86 chunk.SetCardinality(chunk.size() + 1);
87 if (chunk.size() >= STANDARD_VECTOR_SIZE) {
88 FlushChunk();
89 }
90}
91
92template <class SRC, class DST>
93void BaseAppender::AppendValueInternal(Vector &col, SRC input) {
94 FlatVector::GetData<DST>(col)[chunk.size()] = Cast::Operation<SRC, DST>(input);
95}
96
97template <class SRC, class DST>
98void BaseAppender::AppendDecimalValueInternal(Vector &col, SRC input) {
99 switch (appender_type) {
100 case AppenderType::LOGICAL: {
101 auto &type = col.GetType();
102 D_ASSERT(type.id() == LogicalTypeId::DECIMAL);
103 auto width = DecimalType::GetWidth(type);
104 auto scale = DecimalType::GetScale(type);
105 TryCastToDecimal::Operation<SRC, DST>(input, FlatVector::GetData<DST>(col)[chunk.size()], nullptr, width,
106 scale);
107 return;
108 }
109 case AppenderType::PHYSICAL: {
110 AppendValueInternal<SRC, DST>(col, input);
111 return;
112 }
113 default:
114 throw InternalException("Type not implemented for AppenderType");
115 }
116}
117
118template <class T>
119void BaseAppender::AppendValueInternal(T input) {
120 if (column >= types.size()) {
121 throw InvalidInputException("Too many appends for chunk!");
122 }
123 auto &col = chunk.data[column];
124 switch (col.GetType().id()) {
125 case LogicalTypeId::BOOLEAN:
126 AppendValueInternal<T, bool>(col, input);
127 break;
128 case LogicalTypeId::UTINYINT:
129 AppendValueInternal<T, uint8_t>(col, input);
130 break;
131 case LogicalTypeId::TINYINT:
132 AppendValueInternal<T, int8_t>(col, input);
133 break;
134 case LogicalTypeId::USMALLINT:
135 AppendValueInternal<T, uint16_t>(col, input);
136 break;
137 case LogicalTypeId::SMALLINT:
138 AppendValueInternal<T, int16_t>(col, input);
139 break;
140 case LogicalTypeId::UINTEGER:
141 AppendValueInternal<T, uint32_t>(col, input);
142 break;
143 case LogicalTypeId::INTEGER:
144 AppendValueInternal<T, int32_t>(col, input);
145 break;
146 case LogicalTypeId::UBIGINT:
147 AppendValueInternal<T, uint64_t>(col, input);
148 break;
149 case LogicalTypeId::BIGINT:
150 AppendValueInternal<T, int64_t>(col, input);
151 break;
152 case LogicalTypeId::HUGEINT:
153 AppendValueInternal<T, hugeint_t>(col, input);
154 break;
155 case LogicalTypeId::FLOAT:
156 AppendValueInternal<T, float>(col, input);
157 break;
158 case LogicalTypeId::DOUBLE:
159 AppendValueInternal<T, double>(col, input);
160 break;
161 case LogicalTypeId::DECIMAL:
162 switch (col.GetType().InternalType()) {
163 case PhysicalType::INT16:
164 AppendDecimalValueInternal<T, int16_t>(col, input);
165 break;
166 case PhysicalType::INT32:
167 AppendDecimalValueInternal<T, int32_t>(col, input);
168 break;
169 case PhysicalType::INT64:
170 AppendDecimalValueInternal<T, int64_t>(col, input);
171 break;
172 case PhysicalType::INT128:
173 AppendDecimalValueInternal<T, hugeint_t>(col, input);
174 break;
175 default:
176 throw InternalException("Internal type not recognized for Decimal");
177 }
178 break;
179 case LogicalTypeId::DATE:
180 AppendValueInternal<T, date_t>(col, input);
181 break;
182 case LogicalTypeId::TIMESTAMP:
183 case LogicalTypeId::TIMESTAMP_TZ:
184 AppendValueInternal<T, timestamp_t>(col, input);
185 break;
186 case LogicalTypeId::TIME:
187 case LogicalTypeId::TIME_TZ:
188 AppendValueInternal<T, dtime_t>(col, input);
189 break;
190 case LogicalTypeId::INTERVAL:
191 AppendValueInternal<T, interval_t>(col, input);
192 break;
193 case LogicalTypeId::VARCHAR:
194 FlatVector::GetData<string_t>(vector&: col)[chunk.size()] = StringCast::Operation<T>(input, col);
195 break;
196 default:
197 AppendValue(value: Value::CreateValue<T>(input));
198 return;
199 }
200 column++;
201}
202
203template <>
204void BaseAppender::Append(bool value) {
205 AppendValueInternal<bool>(input: value);
206}
207
208template <>
209void BaseAppender::Append(int8_t value) {
210 AppendValueInternal<int8_t>(input: value);
211}
212
213template <>
214void BaseAppender::Append(int16_t value) {
215 AppendValueInternal<int16_t>(input: value);
216}
217
218template <>
219void BaseAppender::Append(int32_t value) {
220 AppendValueInternal<int32_t>(input: value);
221}
222
223template <>
224void BaseAppender::Append(int64_t value) {
225 AppendValueInternal<int64_t>(input: value);
226}
227
228template <>
229void BaseAppender::Append(hugeint_t value) {
230 AppendValueInternal<hugeint_t>(input: value);
231}
232
233template <>
234void BaseAppender::Append(uint8_t value) {
235 AppendValueInternal<uint8_t>(input: value);
236}
237
238template <>
239void BaseAppender::Append(uint16_t value) {
240 AppendValueInternal<uint16_t>(input: value);
241}
242
243template <>
244void BaseAppender::Append(uint32_t value) {
245 AppendValueInternal<uint32_t>(input: value);
246}
247
248template <>
249void BaseAppender::Append(uint64_t value) {
250 AppendValueInternal<uint64_t>(input: value);
251}
252
253template <>
254void BaseAppender::Append(const char *value) {
255 AppendValueInternal<string_t>(input: string_t(value));
256}
257
258void BaseAppender::Append(const char *value, uint32_t length) {
259 AppendValueInternal<string_t>(input: string_t(value, length));
260}
261
262template <>
263void BaseAppender::Append(string_t value) {
264 AppendValueInternal<string_t>(input: value);
265}
266
267template <>
268void BaseAppender::Append(float value) {
269 AppendValueInternal<float>(input: value);
270}
271
272template <>
273void BaseAppender::Append(double value) {
274 AppendValueInternal<double>(input: value);
275}
276
277template <>
278void BaseAppender::Append(date_t value) {
279 AppendValueInternal<date_t>(input: value);
280}
281
282template <>
283void BaseAppender::Append(dtime_t value) {
284 AppendValueInternal<dtime_t>(input: value);
285}
286
287template <>
288void BaseAppender::Append(timestamp_t value) {
289 AppendValueInternal<timestamp_t>(input: value);
290}
291
292template <>
293void BaseAppender::Append(interval_t value) {
294 AppendValueInternal<interval_t>(input: value);
295}
296
297template <>
298void BaseAppender::Append(Value value) { // NOLINT: template shtuff
299 if (column >= chunk.ColumnCount()) {
300 throw InvalidInputException("Too many appends for chunk!");
301 }
302 AppendValue(value);
303}
304
305template <>
306void BaseAppender::Append(std::nullptr_t value) {
307 if (column >= chunk.ColumnCount()) {
308 throw InvalidInputException("Too many appends for chunk!");
309 }
310 auto &col = chunk.data[column++];
311 FlatVector::SetNull(vector&: col, idx: chunk.size(), is_null: true);
312}
313
314void BaseAppender::AppendValue(const Value &value) {
315 chunk.SetValue(col_idx: column, index: chunk.size(), val: value);
316 column++;
317}
318
319void BaseAppender::AppendDataChunk(DataChunk &chunk) {
320 if (chunk.GetTypes() != types) {
321 throw InvalidInputException("Type mismatch in Append DataChunk and the types required for appender");
322 }
323 collection->Append(new_chunk&: chunk);
324 if (collection->Count() >= FLUSH_COUNT) {
325 Flush();
326 }
327}
328
329void BaseAppender::FlushChunk() {
330 if (chunk.size() == 0) {
331 return;
332 }
333 collection->Append(new_chunk&: chunk);
334 chunk.Reset();
335 if (collection->Count() >= FLUSH_COUNT) {
336 Flush();
337 }
338}
339
340void BaseAppender::Flush() {
341 // check that all vectors have the same length before appending
342 if (column != 0) {
343 throw InvalidInputException("Failed to Flush appender: incomplete append to row!");
344 }
345
346 FlushChunk();
347 if (collection->Count() == 0) {
348 return;
349 }
350 FlushInternal(collection&: *collection);
351
352 collection->Reset();
353 column = 0;
354}
355
356void Appender::FlushInternal(ColumnDataCollection &collection) {
357 context->Append(description&: *description, collection);
358}
359
360void InternalAppender::FlushInternal(ColumnDataCollection &collection) {
361 table.GetStorage().LocalAppend(table, context, collection);
362}
363
364void BaseAppender::Close() {
365 if (column == 0 || column == types.size()) {
366 Flush();
367 }
368}
369
370} // namespace duckdb
371