1#include "duckdb/execution/operator/persistent/physical_insert.hpp"
2#include "duckdb/parallel/thread_context.hpp"
3#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
4#include "duckdb/common/types/column/column_data_collection.hpp"
5#include "duckdb/common/vector_operations/vector_operations.hpp"
6#include "duckdb/execution/expression_executor.hpp"
7#include "duckdb/storage/data_table.hpp"
8#include "duckdb/main/client_context.hpp"
9#include "duckdb/parser/parsed_data/create_table_info.hpp"
10#include "duckdb/planner/expression/bound_constant_expression.hpp"
11#include "duckdb/storage/table_io_manager.hpp"
12#include "duckdb/transaction/local_storage.hpp"
13#include "duckdb/parser/statement/insert_statement.hpp"
14#include "duckdb/parser/statement/update_statement.hpp"
15#include "duckdb/storage/table/scan_state.hpp"
16#include "duckdb/common/types/conflict_manager.hpp"
17#include "duckdb/execution/index/art/art.hpp"
18#include "duckdb/transaction/duck_transaction.hpp"
19#include "duckdb/storage/table/append_state.hpp"
20
21namespace duckdb {
22
23PhysicalInsert::PhysicalInsert(vector<LogicalType> types_p, TableCatalogEntry &table,
24 physical_index_vector_t<idx_t> column_index_map,
25 vector<unique_ptr<Expression>> bound_defaults,
26 vector<unique_ptr<Expression>> set_expressions, vector<PhysicalIndex> set_columns,
27 vector<LogicalType> set_types, idx_t estimated_cardinality, bool return_chunk,
28 bool parallel, OnConflictAction action_type,
29 unique_ptr<Expression> on_conflict_condition_p,
30 unique_ptr<Expression> do_update_condition_p, unordered_set<column_t> conflict_target_p,
31 vector<column_t> columns_to_fetch_p)
32 : PhysicalOperator(PhysicalOperatorType::INSERT, std::move(types_p), estimated_cardinality),
33 column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()),
34 bound_defaults(std::move(bound_defaults)), return_chunk(return_chunk), parallel(parallel),
35 action_type(action_type), set_expressions(std::move(set_expressions)), set_columns(std::move(set_columns)),
36 set_types(std::move(set_types)), on_conflict_condition(std::move(on_conflict_condition_p)),
37 do_update_condition(std::move(do_update_condition_p)), conflict_target(std::move(conflict_target_p)),
38 columns_to_fetch(std::move(columns_to_fetch_p)) {
39
40 if (action_type == OnConflictAction::THROW) {
41 return;
42 }
43
44 D_ASSERT(set_expressions.size() == set_columns.size());
45
46 // One or more columns are referenced from the existing table,
47 // we use the 'insert_types' to figure out which types these columns have
48 types_to_fetch = vector<LogicalType>(columns_to_fetch.size(), LogicalType::SQLNULL);
49 for (idx_t i = 0; i < columns_to_fetch.size(); i++) {
50 auto &id = columns_to_fetch[i];
51 D_ASSERT(id < insert_types.size());
52 types_to_fetch[i] = insert_types[id];
53 }
54}
55
56PhysicalInsert::PhysicalInsert(LogicalOperator &op, SchemaCatalogEntry &schema, unique_ptr<BoundCreateTableInfo> info_p,
57 idx_t estimated_cardinality, bool parallel)
58 : PhysicalOperator(PhysicalOperatorType::CREATE_TABLE_AS, op.types, estimated_cardinality), insert_table(nullptr),
59 return_chunk(false), schema(&schema), info(std::move(info_p)), parallel(parallel),
60 action_type(OnConflictAction::THROW) {
61 GetInsertInfo(info: *info, insert_types, bound_defaults);
62}
63
64void PhysicalInsert::GetInsertInfo(const BoundCreateTableInfo &info, vector<LogicalType> &insert_types,
65 vector<unique_ptr<Expression>> &bound_defaults) {
66 auto &create_info = info.base->Cast<CreateTableInfo>();
67 for (auto &col : create_info.columns.Physical()) {
68 insert_types.push_back(x: col.GetType());
69 bound_defaults.push_back(x: make_uniq<BoundConstantExpression>(args: Value(col.GetType())));
70 }
71}
72
73//===--------------------------------------------------------------------===//
74// Sink
75//===--------------------------------------------------------------------===//
76class InsertGlobalState : public GlobalSinkState {
77public:
78 explicit InsertGlobalState(ClientContext &context, const vector<LogicalType> &return_types, DuckTableEntry &table)
79 : table(table), insert_count(0), initialized(false), return_collection(context, return_types) {
80 }
81
82 mutex lock;
83 DuckTableEntry &table;
84 idx_t insert_count;
85 bool initialized;
86 LocalAppendState append_state;
87 ColumnDataCollection return_collection;
88};
89
90class InsertLocalState : public LocalSinkState {
91public:
92 InsertLocalState(ClientContext &context, const vector<LogicalType> &types,
93 const vector<unique_ptr<Expression>> &bound_defaults)
94 : default_executor(context, bound_defaults) {
95 insert_chunk.Initialize(allocator&: Allocator::Get(context), types);
96 }
97
98 DataChunk insert_chunk;
99 ExpressionExecutor default_executor;
100 TableAppendState local_append_state;
101 unique_ptr<RowGroupCollection> local_collection;
102 optional_ptr<OptimisticDataWriter> writer;
103 // Rows that have been updated by a DO UPDATE conflict
104 unordered_set<row_t> updated_global_rows;
105 // Rows in the transaction-local storage that have been updated by a DO UPDATE conflict
106 unordered_set<row_t> updated_local_rows;
107 idx_t update_count = 0;
108};
109
110unique_ptr<GlobalSinkState> PhysicalInsert::GetGlobalSinkState(ClientContext &context) const {
111 optional_ptr<TableCatalogEntry> table;
112 if (info) {
113 // CREATE TABLE AS
114 D_ASSERT(!insert_table);
115 auto &catalog = schema->catalog;
116 table = &catalog.CreateTable(transaction: catalog.GetCatalogTransaction(context), schema&: *schema.get_mutable(), info&: *info)
117 ->Cast<TableCatalogEntry>();
118 } else {
119 D_ASSERT(insert_table);
120 D_ASSERT(insert_table->IsDuckTable());
121 table = insert_table.get_mutable();
122 }
123 auto result = make_uniq<InsertGlobalState>(args&: context, args: GetTypes(), args&: table->Cast<DuckTableEntry>());
124 return std::move(result);
125}
126
127unique_ptr<LocalSinkState> PhysicalInsert::GetLocalSinkState(ExecutionContext &context) const {
128 return make_uniq<InsertLocalState>(args&: context.client, args: insert_types, args: bound_defaults);
129}
130
131void PhysicalInsert::ResolveDefaults(const TableCatalogEntry &table, DataChunk &chunk,
132 const physical_index_vector_t<idx_t> &column_index_map,
133 ExpressionExecutor &default_executor, DataChunk &result) {
134 chunk.Flatten();
135 default_executor.SetChunk(chunk);
136
137 result.Reset();
138 result.SetCardinality(chunk);
139
140 if (!column_index_map.empty()) {
141 // columns specified by the user, use column_index_map
142 for (auto &col : table.GetColumns().Physical()) {
143 auto storage_idx = col.StorageOid();
144 auto mapped_index = column_index_map[col.Physical()];
145 if (mapped_index == DConstants::INVALID_INDEX) {
146 // insert default value
147 default_executor.ExecuteExpression(expr_idx: storage_idx, result&: result.data[storage_idx]);
148 } else {
149 // get value from child chunk
150 D_ASSERT((idx_t)mapped_index < chunk.ColumnCount());
151 D_ASSERT(result.data[storage_idx].GetType() == chunk.data[mapped_index].GetType());
152 result.data[storage_idx].Reference(other&: chunk.data[mapped_index]);
153 }
154 }
155 } else {
156 // no columns specified, just append directly
157 for (idx_t i = 0; i < result.ColumnCount(); i++) {
158 D_ASSERT(result.data[i].GetType() == chunk.data[i].GetType());
159 result.data[i].Reference(other&: chunk.data[i]);
160 }
161 }
162}
163
164bool AllConflictsMeetCondition(DataChunk &result) {
165 auto data = FlatVector::GetData<bool>(vector&: result.data[0]);
166 for (idx_t i = 0; i < result.size(); i++) {
167 if (!data[i]) {
168 return false;
169 }
170 }
171 return true;
172}
173
174void CheckOnConflictCondition(ExecutionContext &context, DataChunk &conflicts, const unique_ptr<Expression> &condition,
175 DataChunk &result) {
176 ExpressionExecutor executor(context.client, *condition);
177 result.Initialize(context&: context.client, types: {LogicalType::BOOLEAN});
178 executor.Execute(input&: conflicts, result);
179 result.SetCardinality(conflicts.size());
180}
181
182static void CombineExistingAndInsertTuples(DataChunk &result, DataChunk &scan_chunk, DataChunk &input_chunk,
183 ClientContext &client, const PhysicalInsert &op) {
184 auto &types_to_fetch = op.types_to_fetch;
185 auto &insert_types = op.insert_types;
186
187 if (types_to_fetch.empty()) {
188 // We have not scanned the initial table, so we can just duplicate the initial chunk
189 result.Initialize(context&: client, types: input_chunk.GetTypes());
190 result.Reference(chunk&: input_chunk);
191 result.SetCardinality(input_chunk);
192 return;
193 }
194 vector<LogicalType> combined_types;
195 combined_types.reserve(n: insert_types.size() + types_to_fetch.size());
196 combined_types.insert(position: combined_types.end(), first: insert_types.begin(), last: insert_types.end());
197 combined_types.insert(position: combined_types.end(), first: types_to_fetch.begin(), last: types_to_fetch.end());
198
199 result.Initialize(context&: client, types: combined_types);
200 result.Reset();
201 // Add the VALUES list
202 for (idx_t i = 0; i < insert_types.size(); i++) {
203 idx_t col_idx = i;
204 auto &other_col = input_chunk.data[i];
205 auto &this_col = result.data[col_idx];
206 D_ASSERT(other_col.GetType() == this_col.GetType());
207 this_col.Reference(other&: other_col);
208 }
209 // Add the columns from the original conflicting tuples
210 for (idx_t i = 0; i < types_to_fetch.size(); i++) {
211 idx_t col_idx = i + insert_types.size();
212 auto &other_col = scan_chunk.data[i];
213 auto &this_col = result.data[col_idx];
214 D_ASSERT(other_col.GetType() == this_col.GetType());
215 this_col.Reference(other&: other_col);
216 }
217 // This is guaranteed by the requirement of a conflict target to have a condition or set expressions
218 // Only when we have any sort of condition or SET expression that references the existing table is this possible
219 // to not be true.
220 // We can have a SET expression without a conflict target ONLY if there is only 1 Index on the table
221 // In which case this also can't cause a discrepancy between existing tuple count and insert tuple count
222 D_ASSERT(input_chunk.size() == scan_chunk.size());
223 result.SetCardinality(input_chunk.size());
224}
225
226static void CreateUpdateChunk(ExecutionContext &context, DataChunk &chunk, TableCatalogEntry &table, Vector &row_ids,
227 DataChunk &update_chunk, const PhysicalInsert &op) {
228
229 auto &do_update_condition = op.do_update_condition;
230 auto &set_types = op.set_types;
231 auto &set_expressions = op.set_expressions;
232 // Check the optional condition for the DO UPDATE clause, to filter which rows will be updated
233 if (do_update_condition) {
234 DataChunk do_update_filter_result;
235 do_update_filter_result.Initialize(context&: context.client, types: {LogicalType::BOOLEAN});
236 ExpressionExecutor where_executor(context.client, *do_update_condition);
237 where_executor.Execute(input&: chunk, result&: do_update_filter_result);
238 do_update_filter_result.SetCardinality(chunk.size());
239
240 ManagedSelection selection(chunk.size());
241
242 auto where_data = FlatVector::GetData<bool>(vector&: do_update_filter_result.data[0]);
243 for (idx_t i = 0; i < chunk.size(); i++) {
244 if (where_data[i]) {
245 selection.Append(idx: i);
246 }
247 }
248 if (selection.Count() != selection.Size()) {
249 // Not all conflicts met the condition, need to filter out the ones that don't
250 chunk.Slice(sel_vector: selection.Selection(), count: selection.Count());
251 chunk.SetCardinality(selection.Count());
252 // Also apply this Slice to the to-update row_ids
253 row_ids.Slice(sel: selection.Selection(), count: selection.Count());
254 }
255 }
256
257 // Execute the SET expressions
258 update_chunk.Initialize(context&: context.client, types: set_types);
259 ExpressionExecutor executor(context.client, set_expressions);
260 executor.Execute(input&: chunk, result&: update_chunk);
261 update_chunk.SetCardinality(chunk);
262}
263
264template <bool GLOBAL>
265static idx_t PerformOnConflictAction(ExecutionContext &context, DataChunk &chunk, TableCatalogEntry &table,
266 Vector &row_ids, const PhysicalInsert &op) {
267
268 if (op.action_type == OnConflictAction::NOTHING) {
269 return 0;
270 }
271 auto &set_columns = op.set_columns;
272
273 DataChunk update_chunk;
274 CreateUpdateChunk(context, chunk, table, row_ids, update_chunk, op);
275
276 auto &data_table = table.GetStorage();
277 // Perform the update, using the results of the SET expressions
278 if (GLOBAL) {
279 data_table.Update(table, context&: context.client, row_ids, column_ids: set_columns, data&: update_chunk);
280 } else {
281 auto &local_storage = LocalStorage::Get(context&: context.client, db&: data_table.db);
282 // Perform the update, using the results of the SET expressions
283 local_storage.Update(table&: data_table, row_ids, column_ids: set_columns, data&: update_chunk);
284 }
285 return update_chunk.size();
286}
287
288// TODO: should we use a hash table to keep track of this instead?
289template <bool GLOBAL>
290static void RegisterUpdatedRows(InsertLocalState &lstate, const Vector &row_ids, idx_t count) {
291 // Insert all rows, if any of the rows has already been updated before, we throw an error
292 auto data = FlatVector::GetData<row_t>(vector: row_ids);
293
294 // The rowids in the transaction-local ART aren't final yet so we have to separately keep track of the two sets of
295 // rowids
296 unordered_set<row_t> &updated_rows = GLOBAL ? lstate.updated_global_rows : lstate.updated_local_rows;
297 for (idx_t i = 0; i < count; i++) {
298 auto result = updated_rows.insert(x: data[i]);
299 if (result.second == false) {
300 throw InvalidInputException(
301 "ON CONFLICT DO UPDATE can not update the same row twice in the same command, Ensure that no rows "
302 "proposed for insertion within the same command have duplicate constrained values");
303 }
304 }
305}
306
307template <bool GLOBAL>
308static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &context, InsertLocalState &lstate,
309 DataTable &data_table, const PhysicalInsert &op) {
310 auto &types_to_fetch = op.types_to_fetch;
311 auto &on_conflict_condition = op.on_conflict_condition;
312 auto &conflict_target = op.conflict_target;
313 auto &columns_to_fetch = op.columns_to_fetch;
314
315 auto &local_storage = LocalStorage::Get(context&: context.client, db&: data_table.db);
316
317 // We either want to do nothing, or perform an update when conflicts arise
318 ConflictInfo conflict_info(conflict_target);
319 ConflictManager conflict_manager(VerifyExistenceType::APPEND, lstate.insert_chunk.size(), &conflict_info);
320 if (GLOBAL) {
321 data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk, conflict_manager: &conflict_manager);
322 } else {
323 DataTable::VerifyUniqueIndexes(indexes&: local_storage.GetIndexes(table&: data_table), context&: context.client, chunk&: lstate.insert_chunk,
324 conflict_manager: &conflict_manager);
325 }
326 conflict_manager.Finalize();
327 if (conflict_manager.ConflictCount() == 0) {
328 // No conflicts found, 0 updates performed
329 return 0;
330 }
331 auto &conflicts = conflict_manager.Conflicts();
332 auto &row_ids = conflict_manager.RowIds();
333
334 DataChunk conflict_chunk; // contains only the conflicting values
335 DataChunk scan_chunk; // contains the original values, that caused the conflict
336 DataChunk combined_chunk; // contains conflict_chunk + scan_chunk (wide)
337
338 // Filter out everything but the conflicting rows
339 conflict_chunk.Initialize(context&: context.client, types: lstate.insert_chunk.GetTypes());
340 conflict_chunk.Reference(chunk&: lstate.insert_chunk);
341 conflict_chunk.Slice(sel_vector: conflicts.Selection(), count: conflicts.Count());
342 conflict_chunk.SetCardinality(conflicts.Count());
343
344 // Holds the pins for the fetched rows
345 unique_ptr<ColumnFetchState> fetch_state;
346 if (!types_to_fetch.empty()) {
347 D_ASSERT(scan_chunk.size() == 0);
348 // When these values are required for the conditions or the SET expressions,
349 // then we scan the existing table for the conflicting tuples, using the rowids
350 scan_chunk.Initialize(context&: context.client, types: types_to_fetch);
351 fetch_state = make_uniq<ColumnFetchState>();
352 if (GLOBAL) {
353 auto &transaction = DuckTransaction::Get(context&: context.client, catalog&: table.catalog);
354 data_table.Fetch(transaction, result&: scan_chunk, column_ids: columns_to_fetch, row_ids, fetch_count: conflicts.Count(), state&: *fetch_state);
355 } else {
356 local_storage.FetchChunk(table&: data_table, row_ids, count: conflicts.Count(), col_ids: columns_to_fetch, chunk&: scan_chunk,
357 fetch_state&: *fetch_state);
358 }
359 }
360
361 // Splice the Input chunk and the fetched chunk together
362 CombineExistingAndInsertTuples(result&: combined_chunk, scan_chunk, input_chunk&: conflict_chunk, client&: context.client, op);
363
364 if (on_conflict_condition) {
365 DataChunk conflict_condition_result;
366 CheckOnConflictCondition(context, conflicts&: combined_chunk, condition: on_conflict_condition, result&: conflict_condition_result);
367 bool conditions_met = AllConflictsMeetCondition(result&: conflict_condition_result);
368 if (!conditions_met) {
369 // Filter out the tuples that did pass the filter, then run the verify again
370 ManagedSelection sel(combined_chunk.size());
371 auto data = FlatVector::GetData<bool>(vector&: conflict_condition_result.data[0]);
372 for (idx_t i = 0; i < combined_chunk.size(); i++) {
373 if (!data[i]) {
374 // Only populate the selection vector with the tuples that did not meet the condition
375 sel.Append(idx: i);
376 }
377 }
378 combined_chunk.Slice(sel_vector: sel.Selection(), count: sel.Count());
379 row_ids.Slice(sel: sel.Selection(), count: sel.Count());
380 if (GLOBAL) {
381 data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: combined_chunk, conflict_manager: nullptr);
382 } else {
383 DataTable::VerifyUniqueIndexes(indexes&: local_storage.GetIndexes(table&: data_table), context&: context.client,
384 chunk&: lstate.insert_chunk, conflict_manager: nullptr);
385 }
386 throw InternalException("The previous operation was expected to throw but didn't");
387 }
388 }
389
390 RegisterUpdatedRows<GLOBAL>(lstate, row_ids, combined_chunk.size());
391
392 idx_t updated_tuples = PerformOnConflictAction<GLOBAL>(context, combined_chunk, table, row_ids, op);
393
394 // Remove the conflicting tuples from the insert chunk
395 SelectionVector sel_vec(lstate.insert_chunk.size());
396 idx_t new_size =
397 SelectionVector::Inverted(src: conflicts.Selection(), dst&: sel_vec, source_size: conflicts.Count(), count: lstate.insert_chunk.size());
398 lstate.insert_chunk.Slice(sel_vector: sel_vec, count: new_size);
399 lstate.insert_chunk.SetCardinality(new_size);
400 return updated_tuples;
401}
402
403idx_t PhysicalInsert::OnConflictHandling(TableCatalogEntry &table, ExecutionContext &context,
404 InsertLocalState &lstate) const {
405 auto &data_table = table.GetStorage();
406 if (action_type == OnConflictAction::THROW) {
407 data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk, conflict_manager: nullptr);
408 return 0;
409 }
410 // Check whether any conflicts arise, and if they all meet the conflict_target + condition
411 // If that's not the case - We throw the first error
412 idx_t updated_tuples = 0;
413 updated_tuples += HandleInsertConflicts<true>(table, context, lstate, data_table, op: *this);
414 // Also check the transaction-local storage+ART so we can detect conflicts within this transaction
415 updated_tuples += HandleInsertConflicts<false>(table, context, lstate, data_table, op: *this);
416
417 return updated_tuples;
418}
419
420SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
421 auto &gstate = input.global_state.Cast<InsertGlobalState>();
422 auto &lstate = input.local_state.Cast<InsertLocalState>();
423
424 auto &table = gstate.table;
425 auto &storage = table.GetStorage();
426 PhysicalInsert::ResolveDefaults(table, chunk, column_index_map, default_executor&: lstate.default_executor, result&: lstate.insert_chunk);
427
428 if (!parallel) {
429 if (!gstate.initialized) {
430 storage.InitializeLocalAppend(state&: gstate.append_state, context&: context.client);
431 gstate.initialized = true;
432 }
433
434 idx_t updated_tuples = OnConflictHandling(table, context, lstate);
435 gstate.insert_count += lstate.insert_chunk.size();
436 gstate.insert_count += updated_tuples;
437 storage.LocalAppend(state&: gstate.append_state, table, context&: context.client, chunk&: lstate.insert_chunk, unsafe: true);
438
439 if (return_chunk) {
440 gstate.return_collection.Append(new_chunk&: lstate.insert_chunk);
441 }
442 } else {
443 D_ASSERT(!return_chunk);
444 // parallel append
445 if (!lstate.local_collection) {
446 lock_guard<mutex> l(gstate.lock);
447 auto &table_info = storage.info;
448 auto &block_manager = TableIOManager::Get(table&: storage).GetBlockManagerForRowData();
449 lstate.local_collection =
450 make_uniq<RowGroupCollection>(args&: table_info, args&: block_manager, args: insert_types, args: MAX_ROW_ID);
451 lstate.local_collection->InitializeEmpty();
452 lstate.local_collection->InitializeAppend(state&: lstate.local_append_state);
453 lstate.writer = &gstate.table.GetStorage().CreateOptimisticWriter(context&: context.client);
454 }
455 OnConflictHandling(table, context, lstate);
456
457 auto new_row_group = lstate.local_collection->Append(chunk&: lstate.insert_chunk, state&: lstate.local_append_state);
458 if (new_row_group) {
459 lstate.writer->WriteNewRowGroup(row_groups&: *lstate.local_collection);
460 }
461 }
462
463 return SinkResultType::NEED_MORE_INPUT;
464}
465
466void PhysicalInsert::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
467 auto &gstate = gstate_p.Cast<InsertGlobalState>();
468 auto &lstate = lstate_p.Cast<InsertLocalState>();
469 auto &client_profiler = QueryProfiler::Get(context&: context.client);
470 context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.default_executor, name: "default_executor", id: 1);
471 client_profiler.Flush(profiler&: context.thread.profiler);
472
473 if (!parallel) {
474 return;
475 }
476 if (!lstate.local_collection) {
477 return;
478 }
479 // parallel append: finalize the append
480 TransactionData tdata(0, 0);
481 lstate.local_collection->FinalizeAppend(transaction: tdata, state&: lstate.local_append_state);
482
483 auto append_count = lstate.local_collection->GetTotalRows();
484
485 lock_guard<mutex> lock(gstate.lock);
486 gstate.insert_count += append_count;
487 if (append_count < RowGroup::ROW_GROUP_SIZE) {
488 // we have few rows - append to the local storage directly
489 auto &table = gstate.table;
490 auto &storage = table.GetStorage();
491 storage.InitializeLocalAppend(state&: gstate.append_state, context&: context.client);
492 auto &transaction = DuckTransaction::Get(context&: context.client, catalog&: table.catalog);
493 lstate.local_collection->Scan(transaction, fun: [&](DataChunk &insert_chunk) {
494 storage.LocalAppend(state&: gstate.append_state, table, context&: context.client, chunk&: insert_chunk);
495 return true;
496 });
497 storage.FinalizeLocalAppend(state&: gstate.append_state);
498 } else {
499 // we have written rows to disk optimistically - merge directly into the transaction-local storage
500 gstate.table.GetStorage().FinalizeOptimisticWriter(context&: context.client, writer&: *lstate.writer);
501 gstate.table.GetStorage().LocalMerge(context&: context.client, collection&: *lstate.local_collection);
502 }
503}
504
505SinkFinalizeType PhysicalInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
506 GlobalSinkState &state) const {
507 auto &gstate = state.Cast<InsertGlobalState>();
508 if (!parallel && gstate.initialized) {
509 auto &table = gstate.table;
510 auto &storage = table.GetStorage();
511 storage.FinalizeLocalAppend(state&: gstate.append_state);
512 }
513 return SinkFinalizeType::READY;
514}
515
516//===--------------------------------------------------------------------===//
517// Source
518//===--------------------------------------------------------------------===//
519class InsertSourceState : public GlobalSourceState {
520public:
521 explicit InsertSourceState(const PhysicalInsert &op) {
522 if (op.return_chunk) {
523 D_ASSERT(op.sink_state);
524 auto &g = op.sink_state->Cast<InsertGlobalState>();
525 g.return_collection.InitializeScan(state&: scan_state);
526 }
527 }
528
529 ColumnDataScanState scan_state;
530};
531
532unique_ptr<GlobalSourceState> PhysicalInsert::GetGlobalSourceState(ClientContext &context) const {
533 return make_uniq<InsertSourceState>(args: *this);
534}
535
536SourceResultType PhysicalInsert::GetData(ExecutionContext &context, DataChunk &chunk,
537 OperatorSourceInput &input) const {
538 auto &state = input.global_state.Cast<InsertSourceState>();
539 auto &insert_gstate = sink_state->Cast<InsertGlobalState>();
540 if (!return_chunk) {
541 chunk.SetCardinality(1);
542 chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: insert_gstate.insert_count));
543 return SourceResultType::FINISHED;
544 }
545
546 insert_gstate.return_collection.Scan(state&: state.scan_state, result&: chunk);
547 return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
548}
549
550} // namespace duckdb
551