1 | #include "duckdb/main/appender.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
4 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
5 | #include "duckdb/common/exception.hpp" |
6 | #include "duckdb/common/operator/cast_operators.hpp" |
7 | #include "duckdb/common/operator/decimal_cast_operators.hpp" |
8 | #include "duckdb/common/operator/string_cast.hpp" |
9 | #include "duckdb/common/string_util.hpp" |
10 | #include "duckdb/common/types/column/column_data_collection.hpp" |
11 | #include "duckdb/main/client_context.hpp" |
12 | #include "duckdb/main/connection.hpp" |
13 | #include "duckdb/main/database.hpp" |
14 | #include "duckdb/storage/data_table.hpp" |
15 | |
16 | namespace duckdb { |
17 | |
18 | BaseAppender::BaseAppender(Allocator &allocator, AppenderType type_p) |
19 | : allocator(allocator), column(0), appender_type(type_p) { |
20 | } |
21 | |
22 | BaseAppender::BaseAppender(Allocator &allocator_p, vector<LogicalType> types_p, AppenderType type_p) |
23 | : allocator(allocator_p), types(std::move(types_p)), collection(make_uniq<ColumnDataCollection>(args&: allocator, args&: types)), |
24 | column(0), appender_type(type_p) { |
25 | InitializeChunk(); |
26 | } |
27 | |
28 | BaseAppender::~BaseAppender() { |
29 | } |
30 | |
31 | void BaseAppender::Destructor() { |
32 | if (Exception::UncaughtException()) { |
33 | return; |
34 | } |
35 | // flush any remaining chunks, but only if we are not cleaning up the appender as part of an exception stack unwind |
36 | // wrapped in a try/catch because Close() can throw if the table was dropped in the meantime |
37 | try { |
38 | Close(); |
39 | } catch (...) { |
40 | } |
41 | } |
42 | |
43 | InternalAppender::InternalAppender(ClientContext &context_p, TableCatalogEntry &table_p) |
44 | : BaseAppender(Allocator::DefaultAllocator(), table_p.GetTypes(), AppenderType::PHYSICAL), context(context_p), |
45 | table(table_p) { |
46 | } |
47 | |
48 | InternalAppender::~InternalAppender() { |
49 | Destructor(); |
50 | } |
51 | |
52 | Appender::Appender(Connection &con, const string &schema_name, const string &table_name) |
53 | : BaseAppender(Allocator::DefaultAllocator(), AppenderType::LOGICAL), context(con.context) { |
54 | description = con.TableInfo(schema_name, table_name); |
55 | if (!description) { |
56 | // table could not be found |
57 | throw CatalogException(StringUtil::Format(fmt_str: "Table \"%s.%s\" could not be found" , params: schema_name, params: table_name)); |
58 | } |
59 | for (auto &column : description->columns) { |
60 | types.push_back(x: column.Type()); |
61 | } |
62 | InitializeChunk(); |
63 | collection = make_uniq<ColumnDataCollection>(args&: allocator, args&: types); |
64 | } |
65 | |
66 | Appender::Appender(Connection &con, const string &table_name) : Appender(con, DEFAULT_SCHEMA, table_name) { |
67 | } |
68 | |
69 | Appender::~Appender() { |
70 | Destructor(); |
71 | } |
72 | |
73 | void BaseAppender::InitializeChunk() { |
74 | chunk.Initialize(allocator, types); |
75 | } |
76 | |
77 | void BaseAppender::BeginRow() { |
78 | } |
79 | |
80 | void BaseAppender::EndRow() { |
81 | // check that all rows have been appended to |
82 | if (column != chunk.ColumnCount()) { |
83 | throw InvalidInputException("Call to EndRow before all rows have been appended to!" ); |
84 | } |
85 | column = 0; |
86 | chunk.SetCardinality(chunk.size() + 1); |
87 | if (chunk.size() >= STANDARD_VECTOR_SIZE) { |
88 | FlushChunk(); |
89 | } |
90 | } |
91 | |
92 | template <class SRC, class DST> |
93 | void BaseAppender::AppendValueInternal(Vector &col, SRC input) { |
94 | FlatVector::GetData<DST>(col)[chunk.size()] = Cast::Operation<SRC, DST>(input); |
95 | } |
96 | |
97 | template <class SRC, class DST> |
98 | void BaseAppender::AppendDecimalValueInternal(Vector &col, SRC input) { |
99 | switch (appender_type) { |
100 | case AppenderType::LOGICAL: { |
101 | auto &type = col.GetType(); |
102 | D_ASSERT(type.id() == LogicalTypeId::DECIMAL); |
103 | auto width = DecimalType::GetWidth(type); |
104 | auto scale = DecimalType::GetScale(type); |
105 | TryCastToDecimal::Operation<SRC, DST>(input, FlatVector::GetData<DST>(col)[chunk.size()], nullptr, width, |
106 | scale); |
107 | return; |
108 | } |
109 | case AppenderType::PHYSICAL: { |
110 | AppendValueInternal<SRC, DST>(col, input); |
111 | return; |
112 | } |
113 | default: |
114 | throw InternalException("Type not implemented for AppenderType" ); |
115 | } |
116 | } |
117 | |
118 | template <class T> |
119 | void BaseAppender::AppendValueInternal(T input) { |
120 | if (column >= types.size()) { |
121 | throw InvalidInputException("Too many appends for chunk!" ); |
122 | } |
123 | auto &col = chunk.data[column]; |
124 | switch (col.GetType().id()) { |
125 | case LogicalTypeId::BOOLEAN: |
126 | AppendValueInternal<T, bool>(col, input); |
127 | break; |
128 | case LogicalTypeId::UTINYINT: |
129 | AppendValueInternal<T, uint8_t>(col, input); |
130 | break; |
131 | case LogicalTypeId::TINYINT: |
132 | AppendValueInternal<T, int8_t>(col, input); |
133 | break; |
134 | case LogicalTypeId::USMALLINT: |
135 | AppendValueInternal<T, uint16_t>(col, input); |
136 | break; |
137 | case LogicalTypeId::SMALLINT: |
138 | AppendValueInternal<T, int16_t>(col, input); |
139 | break; |
140 | case LogicalTypeId::UINTEGER: |
141 | AppendValueInternal<T, uint32_t>(col, input); |
142 | break; |
143 | case LogicalTypeId::INTEGER: |
144 | AppendValueInternal<T, int32_t>(col, input); |
145 | break; |
146 | case LogicalTypeId::UBIGINT: |
147 | AppendValueInternal<T, uint64_t>(col, input); |
148 | break; |
149 | case LogicalTypeId::BIGINT: |
150 | AppendValueInternal<T, int64_t>(col, input); |
151 | break; |
152 | case LogicalTypeId::HUGEINT: |
153 | AppendValueInternal<T, hugeint_t>(col, input); |
154 | break; |
155 | case LogicalTypeId::FLOAT: |
156 | AppendValueInternal<T, float>(col, input); |
157 | break; |
158 | case LogicalTypeId::DOUBLE: |
159 | AppendValueInternal<T, double>(col, input); |
160 | break; |
161 | case LogicalTypeId::DECIMAL: |
162 | switch (col.GetType().InternalType()) { |
163 | case PhysicalType::INT16: |
164 | AppendDecimalValueInternal<T, int16_t>(col, input); |
165 | break; |
166 | case PhysicalType::INT32: |
167 | AppendDecimalValueInternal<T, int32_t>(col, input); |
168 | break; |
169 | case PhysicalType::INT64: |
170 | AppendDecimalValueInternal<T, int64_t>(col, input); |
171 | break; |
172 | case PhysicalType::INT128: |
173 | AppendDecimalValueInternal<T, hugeint_t>(col, input); |
174 | break; |
175 | default: |
176 | throw InternalException("Internal type not recognized for Decimal" ); |
177 | } |
178 | break; |
179 | case LogicalTypeId::DATE: |
180 | AppendValueInternal<T, date_t>(col, input); |
181 | break; |
182 | case LogicalTypeId::TIMESTAMP: |
183 | case LogicalTypeId::TIMESTAMP_TZ: |
184 | AppendValueInternal<T, timestamp_t>(col, input); |
185 | break; |
186 | case LogicalTypeId::TIME: |
187 | case LogicalTypeId::TIME_TZ: |
188 | AppendValueInternal<T, dtime_t>(col, input); |
189 | break; |
190 | case LogicalTypeId::INTERVAL: |
191 | AppendValueInternal<T, interval_t>(col, input); |
192 | break; |
193 | case LogicalTypeId::VARCHAR: |
194 | FlatVector::GetData<string_t>(vector&: col)[chunk.size()] = StringCast::Operation<T>(input, col); |
195 | break; |
196 | default: |
197 | AppendValue(value: Value::CreateValue<T>(input)); |
198 | return; |
199 | } |
200 | column++; |
201 | } |
202 | |
203 | template <> |
204 | void BaseAppender::Append(bool value) { |
205 | AppendValueInternal<bool>(input: value); |
206 | } |
207 | |
208 | template <> |
209 | void BaseAppender::Append(int8_t value) { |
210 | AppendValueInternal<int8_t>(input: value); |
211 | } |
212 | |
213 | template <> |
214 | void BaseAppender::Append(int16_t value) { |
215 | AppendValueInternal<int16_t>(input: value); |
216 | } |
217 | |
218 | template <> |
219 | void BaseAppender::Append(int32_t value) { |
220 | AppendValueInternal<int32_t>(input: value); |
221 | } |
222 | |
223 | template <> |
224 | void BaseAppender::Append(int64_t value) { |
225 | AppendValueInternal<int64_t>(input: value); |
226 | } |
227 | |
228 | template <> |
229 | void BaseAppender::Append(hugeint_t value) { |
230 | AppendValueInternal<hugeint_t>(input: value); |
231 | } |
232 | |
233 | template <> |
234 | void BaseAppender::Append(uint8_t value) { |
235 | AppendValueInternal<uint8_t>(input: value); |
236 | } |
237 | |
238 | template <> |
239 | void BaseAppender::Append(uint16_t value) { |
240 | AppendValueInternal<uint16_t>(input: value); |
241 | } |
242 | |
243 | template <> |
244 | void BaseAppender::Append(uint32_t value) { |
245 | AppendValueInternal<uint32_t>(input: value); |
246 | } |
247 | |
248 | template <> |
249 | void BaseAppender::Append(uint64_t value) { |
250 | AppendValueInternal<uint64_t>(input: value); |
251 | } |
252 | |
253 | template <> |
254 | void BaseAppender::Append(const char *value) { |
255 | AppendValueInternal<string_t>(input: string_t(value)); |
256 | } |
257 | |
258 | void BaseAppender::Append(const char *value, uint32_t length) { |
259 | AppendValueInternal<string_t>(input: string_t(value, length)); |
260 | } |
261 | |
262 | template <> |
263 | void BaseAppender::Append(string_t value) { |
264 | AppendValueInternal<string_t>(input: value); |
265 | } |
266 | |
267 | template <> |
268 | void BaseAppender::Append(float value) { |
269 | AppendValueInternal<float>(input: value); |
270 | } |
271 | |
272 | template <> |
273 | void BaseAppender::Append(double value) { |
274 | AppendValueInternal<double>(input: value); |
275 | } |
276 | |
277 | template <> |
278 | void BaseAppender::Append(date_t value) { |
279 | AppendValueInternal<date_t>(input: value); |
280 | } |
281 | |
282 | template <> |
283 | void BaseAppender::Append(dtime_t value) { |
284 | AppendValueInternal<dtime_t>(input: value); |
285 | } |
286 | |
287 | template <> |
288 | void BaseAppender::Append(timestamp_t value) { |
289 | AppendValueInternal<timestamp_t>(input: value); |
290 | } |
291 | |
292 | template <> |
293 | void BaseAppender::Append(interval_t value) { |
294 | AppendValueInternal<interval_t>(input: value); |
295 | } |
296 | |
297 | template <> |
298 | void BaseAppender::Append(Value value) { // NOLINT: template shtuff |
299 | if (column >= chunk.ColumnCount()) { |
300 | throw InvalidInputException("Too many appends for chunk!" ); |
301 | } |
302 | AppendValue(value); |
303 | } |
304 | |
305 | template <> |
306 | void BaseAppender::Append(std::nullptr_t value) { |
307 | if (column >= chunk.ColumnCount()) { |
308 | throw InvalidInputException("Too many appends for chunk!" ); |
309 | } |
310 | auto &col = chunk.data[column++]; |
311 | FlatVector::SetNull(vector&: col, idx: chunk.size(), is_null: true); |
312 | } |
313 | |
314 | void BaseAppender::AppendValue(const Value &value) { |
315 | chunk.SetValue(col_idx: column, index: chunk.size(), val: value); |
316 | column++; |
317 | } |
318 | |
319 | void BaseAppender::AppendDataChunk(DataChunk &chunk) { |
320 | if (chunk.GetTypes() != types) { |
321 | throw InvalidInputException("Type mismatch in Append DataChunk and the types required for appender" ); |
322 | } |
323 | collection->Append(new_chunk&: chunk); |
324 | if (collection->Count() >= FLUSH_COUNT) { |
325 | Flush(); |
326 | } |
327 | } |
328 | |
329 | void BaseAppender::FlushChunk() { |
330 | if (chunk.size() == 0) { |
331 | return; |
332 | } |
333 | collection->Append(new_chunk&: chunk); |
334 | chunk.Reset(); |
335 | if (collection->Count() >= FLUSH_COUNT) { |
336 | Flush(); |
337 | } |
338 | } |
339 | |
340 | void BaseAppender::Flush() { |
341 | // check that all vectors have the same length before appending |
342 | if (column != 0) { |
343 | throw InvalidInputException("Failed to Flush appender: incomplete append to row!" ); |
344 | } |
345 | |
346 | FlushChunk(); |
347 | if (collection->Count() == 0) { |
348 | return; |
349 | } |
350 | FlushInternal(collection&: *collection); |
351 | |
352 | collection->Reset(); |
353 | column = 0; |
354 | } |
355 | |
356 | void Appender::FlushInternal(ColumnDataCollection &collection) { |
357 | context->Append(description&: *description, collection); |
358 | } |
359 | |
360 | void InternalAppender::FlushInternal(ColumnDataCollection &collection) { |
361 | table.GetStorage().LocalAppend(table, context, collection); |
362 | } |
363 | |
364 | void BaseAppender::Close() { |
365 | if (column == 0 || column == types.size()) { |
366 | Flush(); |
367 | } |
368 | } |
369 | |
370 | } // namespace duckdb |
371 | |