1 | #include "duckdb/execution/operator/persistent/physical_insert.hpp" |
2 | #include "duckdb/parallel/thread_context.hpp" |
3 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
4 | #include "duckdb/common/types/column/column_data_collection.hpp" |
5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
6 | #include "duckdb/execution/expression_executor.hpp" |
7 | #include "duckdb/storage/data_table.hpp" |
8 | #include "duckdb/main/client_context.hpp" |
9 | #include "duckdb/parser/parsed_data/create_table_info.hpp" |
10 | #include "duckdb/planner/expression/bound_constant_expression.hpp" |
11 | #include "duckdb/storage/table_io_manager.hpp" |
12 | #include "duckdb/transaction/local_storage.hpp" |
13 | #include "duckdb/parser/statement/insert_statement.hpp" |
14 | #include "duckdb/parser/statement/update_statement.hpp" |
15 | #include "duckdb/storage/table/scan_state.hpp" |
16 | #include "duckdb/common/types/conflict_manager.hpp" |
17 | #include "duckdb/execution/index/art/art.hpp" |
18 | #include "duckdb/transaction/duck_transaction.hpp" |
19 | #include "duckdb/storage/table/append_state.hpp" |
20 | |
21 | namespace duckdb { |
22 | |
23 | PhysicalInsert::PhysicalInsert(vector<LogicalType> types_p, TableCatalogEntry &table, |
24 | physical_index_vector_t<idx_t> column_index_map, |
25 | vector<unique_ptr<Expression>> bound_defaults, |
26 | vector<unique_ptr<Expression>> set_expressions, vector<PhysicalIndex> set_columns, |
27 | vector<LogicalType> set_types, idx_t estimated_cardinality, bool return_chunk, |
28 | bool parallel, OnConflictAction action_type, |
29 | unique_ptr<Expression> on_conflict_condition_p, |
30 | unique_ptr<Expression> do_update_condition_p, unordered_set<column_t> conflict_target_p, |
31 | vector<column_t> columns_to_fetch_p) |
32 | : PhysicalOperator(PhysicalOperatorType::INSERT, std::move(types_p), estimated_cardinality), |
33 | column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()), |
34 | bound_defaults(std::move(bound_defaults)), return_chunk(return_chunk), parallel(parallel), |
35 | action_type(action_type), set_expressions(std::move(set_expressions)), set_columns(std::move(set_columns)), |
36 | set_types(std::move(set_types)), on_conflict_condition(std::move(on_conflict_condition_p)), |
37 | do_update_condition(std::move(do_update_condition_p)), conflict_target(std::move(conflict_target_p)), |
38 | columns_to_fetch(std::move(columns_to_fetch_p)) { |
39 | |
40 | if (action_type == OnConflictAction::THROW) { |
41 | return; |
42 | } |
43 | |
44 | D_ASSERT(set_expressions.size() == set_columns.size()); |
45 | |
46 | // One or more columns are referenced from the existing table, |
47 | // we use the 'insert_types' to figure out which types these columns have |
48 | types_to_fetch = vector<LogicalType>(columns_to_fetch.size(), LogicalType::SQLNULL); |
49 | for (idx_t i = 0; i < columns_to_fetch.size(); i++) { |
50 | auto &id = columns_to_fetch[i]; |
51 | D_ASSERT(id < insert_types.size()); |
52 | types_to_fetch[i] = insert_types[id]; |
53 | } |
54 | } |
55 | |
56 | PhysicalInsert::PhysicalInsert(LogicalOperator &op, SchemaCatalogEntry &schema, unique_ptr<BoundCreateTableInfo> info_p, |
57 | idx_t estimated_cardinality, bool parallel) |
58 | : PhysicalOperator(PhysicalOperatorType::CREATE_TABLE_AS, op.types, estimated_cardinality), insert_table(nullptr), |
59 | return_chunk(false), schema(&schema), info(std::move(info_p)), parallel(parallel), |
60 | action_type(OnConflictAction::THROW) { |
61 | GetInsertInfo(info: *info, insert_types, bound_defaults); |
62 | } |
63 | |
64 | void PhysicalInsert::GetInsertInfo(const BoundCreateTableInfo &info, vector<LogicalType> &insert_types, |
65 | vector<unique_ptr<Expression>> &bound_defaults) { |
66 | auto &create_info = info.base->Cast<CreateTableInfo>(); |
67 | for (auto &col : create_info.columns.Physical()) { |
68 | insert_types.push_back(x: col.GetType()); |
69 | bound_defaults.push_back(x: make_uniq<BoundConstantExpression>(args: Value(col.GetType()))); |
70 | } |
71 | } |
72 | |
73 | //===--------------------------------------------------------------------===// |
74 | // Sink |
75 | //===--------------------------------------------------------------------===// |
76 | class InsertGlobalState : public GlobalSinkState { |
77 | public: |
78 | explicit InsertGlobalState(ClientContext &context, const vector<LogicalType> &return_types, DuckTableEntry &table) |
79 | : table(table), insert_count(0), initialized(false), return_collection(context, return_types) { |
80 | } |
81 | |
82 | mutex lock; |
83 | DuckTableEntry &table; |
84 | idx_t insert_count; |
85 | bool initialized; |
86 | LocalAppendState append_state; |
87 | ColumnDataCollection return_collection; |
88 | }; |
89 | |
90 | class InsertLocalState : public LocalSinkState { |
91 | public: |
92 | InsertLocalState(ClientContext &context, const vector<LogicalType> &types, |
93 | const vector<unique_ptr<Expression>> &bound_defaults) |
94 | : default_executor(context, bound_defaults) { |
95 | insert_chunk.Initialize(allocator&: Allocator::Get(context), types); |
96 | } |
97 | |
98 | DataChunk insert_chunk; |
99 | ExpressionExecutor default_executor; |
100 | TableAppendState local_append_state; |
101 | unique_ptr<RowGroupCollection> local_collection; |
102 | optional_ptr<OptimisticDataWriter> writer; |
103 | // Rows that have been updated by a DO UPDATE conflict |
104 | unordered_set<row_t> updated_global_rows; |
105 | // Rows in the transaction-local storage that have been updated by a DO UPDATE conflict |
106 | unordered_set<row_t> updated_local_rows; |
107 | idx_t update_count = 0; |
108 | }; |
109 | |
110 | unique_ptr<GlobalSinkState> PhysicalInsert::GetGlobalSinkState(ClientContext &context) const { |
111 | optional_ptr<TableCatalogEntry> table; |
112 | if (info) { |
113 | // CREATE TABLE AS |
114 | D_ASSERT(!insert_table); |
115 | auto &catalog = schema->catalog; |
116 | table = &catalog.CreateTable(transaction: catalog.GetCatalogTransaction(context), schema&: *schema.get_mutable(), info&: *info) |
117 | ->Cast<TableCatalogEntry>(); |
118 | } else { |
119 | D_ASSERT(insert_table); |
120 | D_ASSERT(insert_table->IsDuckTable()); |
121 | table = insert_table.get_mutable(); |
122 | } |
123 | auto result = make_uniq<InsertGlobalState>(args&: context, args: GetTypes(), args&: table->Cast<DuckTableEntry>()); |
124 | return std::move(result); |
125 | } |
126 | |
127 | unique_ptr<LocalSinkState> PhysicalInsert::GetLocalSinkState(ExecutionContext &context) const { |
128 | return make_uniq<InsertLocalState>(args&: context.client, args: insert_types, args: bound_defaults); |
129 | } |
130 | |
131 | void PhysicalInsert::ResolveDefaults(const TableCatalogEntry &table, DataChunk &chunk, |
132 | const physical_index_vector_t<idx_t> &column_index_map, |
133 | ExpressionExecutor &default_executor, DataChunk &result) { |
134 | chunk.Flatten(); |
135 | default_executor.SetChunk(chunk); |
136 | |
137 | result.Reset(); |
138 | result.SetCardinality(chunk); |
139 | |
140 | if (!column_index_map.empty()) { |
141 | // columns specified by the user, use column_index_map |
142 | for (auto &col : table.GetColumns().Physical()) { |
143 | auto storage_idx = col.StorageOid(); |
144 | auto mapped_index = column_index_map[col.Physical()]; |
145 | if (mapped_index == DConstants::INVALID_INDEX) { |
146 | // insert default value |
147 | default_executor.ExecuteExpression(expr_idx: storage_idx, result&: result.data[storage_idx]); |
148 | } else { |
149 | // get value from child chunk |
150 | D_ASSERT((idx_t)mapped_index < chunk.ColumnCount()); |
151 | D_ASSERT(result.data[storage_idx].GetType() == chunk.data[mapped_index].GetType()); |
152 | result.data[storage_idx].Reference(other&: chunk.data[mapped_index]); |
153 | } |
154 | } |
155 | } else { |
156 | // no columns specified, just append directly |
157 | for (idx_t i = 0; i < result.ColumnCount(); i++) { |
158 | D_ASSERT(result.data[i].GetType() == chunk.data[i].GetType()); |
159 | result.data[i].Reference(other&: chunk.data[i]); |
160 | } |
161 | } |
162 | } |
163 | |
164 | bool AllConflictsMeetCondition(DataChunk &result) { |
165 | auto data = FlatVector::GetData<bool>(vector&: result.data[0]); |
166 | for (idx_t i = 0; i < result.size(); i++) { |
167 | if (!data[i]) { |
168 | return false; |
169 | } |
170 | } |
171 | return true; |
172 | } |
173 | |
174 | void CheckOnConflictCondition(ExecutionContext &context, DataChunk &conflicts, const unique_ptr<Expression> &condition, |
175 | DataChunk &result) { |
176 | ExpressionExecutor executor(context.client, *condition); |
177 | result.Initialize(context&: context.client, types: {LogicalType::BOOLEAN}); |
178 | executor.Execute(input&: conflicts, result); |
179 | result.SetCardinality(conflicts.size()); |
180 | } |
181 | |
182 | static void CombineExistingAndInsertTuples(DataChunk &result, DataChunk &scan_chunk, DataChunk &input_chunk, |
183 | ClientContext &client, const PhysicalInsert &op) { |
184 | auto &types_to_fetch = op.types_to_fetch; |
185 | auto &insert_types = op.insert_types; |
186 | |
187 | if (types_to_fetch.empty()) { |
188 | // We have not scanned the initial table, so we can just duplicate the initial chunk |
189 | result.Initialize(context&: client, types: input_chunk.GetTypes()); |
190 | result.Reference(chunk&: input_chunk); |
191 | result.SetCardinality(input_chunk); |
192 | return; |
193 | } |
194 | vector<LogicalType> combined_types; |
195 | combined_types.reserve(n: insert_types.size() + types_to_fetch.size()); |
196 | combined_types.insert(position: combined_types.end(), first: insert_types.begin(), last: insert_types.end()); |
197 | combined_types.insert(position: combined_types.end(), first: types_to_fetch.begin(), last: types_to_fetch.end()); |
198 | |
199 | result.Initialize(context&: client, types: combined_types); |
200 | result.Reset(); |
201 | // Add the VALUES list |
202 | for (idx_t i = 0; i < insert_types.size(); i++) { |
203 | idx_t col_idx = i; |
204 | auto &other_col = input_chunk.data[i]; |
205 | auto &this_col = result.data[col_idx]; |
206 | D_ASSERT(other_col.GetType() == this_col.GetType()); |
207 | this_col.Reference(other&: other_col); |
208 | } |
209 | // Add the columns from the original conflicting tuples |
210 | for (idx_t i = 0; i < types_to_fetch.size(); i++) { |
211 | idx_t col_idx = i + insert_types.size(); |
212 | auto &other_col = scan_chunk.data[i]; |
213 | auto &this_col = result.data[col_idx]; |
214 | D_ASSERT(other_col.GetType() == this_col.GetType()); |
215 | this_col.Reference(other&: other_col); |
216 | } |
217 | // This is guaranteed by the requirement of a conflict target to have a condition or set expressions |
218 | // Only when we have any sort of condition or SET expression that references the existing table is this possible |
219 | // to not be true. |
220 | // We can have a SET expression without a conflict target ONLY if there is only 1 Index on the table |
221 | // In which case this also can't cause a discrepancy between existing tuple count and insert tuple count |
222 | D_ASSERT(input_chunk.size() == scan_chunk.size()); |
223 | result.SetCardinality(input_chunk.size()); |
224 | } |
225 | |
226 | static void CreateUpdateChunk(ExecutionContext &context, DataChunk &chunk, TableCatalogEntry &table, Vector &row_ids, |
227 | DataChunk &update_chunk, const PhysicalInsert &op) { |
228 | |
229 | auto &do_update_condition = op.do_update_condition; |
230 | auto &set_types = op.set_types; |
231 | auto &set_expressions = op.set_expressions; |
232 | // Check the optional condition for the DO UPDATE clause, to filter which rows will be updated |
233 | if (do_update_condition) { |
234 | DataChunk do_update_filter_result; |
235 | do_update_filter_result.Initialize(context&: context.client, types: {LogicalType::BOOLEAN}); |
236 | ExpressionExecutor where_executor(context.client, *do_update_condition); |
237 | where_executor.Execute(input&: chunk, result&: do_update_filter_result); |
238 | do_update_filter_result.SetCardinality(chunk.size()); |
239 | |
240 | ManagedSelection selection(chunk.size()); |
241 | |
242 | auto where_data = FlatVector::GetData<bool>(vector&: do_update_filter_result.data[0]); |
243 | for (idx_t i = 0; i < chunk.size(); i++) { |
244 | if (where_data[i]) { |
245 | selection.Append(idx: i); |
246 | } |
247 | } |
248 | if (selection.Count() != selection.Size()) { |
249 | // Not all conflicts met the condition, need to filter out the ones that don't |
250 | chunk.Slice(sel_vector: selection.Selection(), count: selection.Count()); |
251 | chunk.SetCardinality(selection.Count()); |
252 | // Also apply this Slice to the to-update row_ids |
253 | row_ids.Slice(sel: selection.Selection(), count: selection.Count()); |
254 | } |
255 | } |
256 | |
257 | // Execute the SET expressions |
258 | update_chunk.Initialize(context&: context.client, types: set_types); |
259 | ExpressionExecutor executor(context.client, set_expressions); |
260 | executor.Execute(input&: chunk, result&: update_chunk); |
261 | update_chunk.SetCardinality(chunk); |
262 | } |
263 | |
264 | template <bool GLOBAL> |
265 | static idx_t PerformOnConflictAction(ExecutionContext &context, DataChunk &chunk, TableCatalogEntry &table, |
266 | Vector &row_ids, const PhysicalInsert &op) { |
267 | |
268 | if (op.action_type == OnConflictAction::NOTHING) { |
269 | return 0; |
270 | } |
271 | auto &set_columns = op.set_columns; |
272 | |
273 | DataChunk update_chunk; |
274 | CreateUpdateChunk(context, chunk, table, row_ids, update_chunk, op); |
275 | |
276 | auto &data_table = table.GetStorage(); |
277 | // Perform the update, using the results of the SET expressions |
278 | if (GLOBAL) { |
279 | data_table.Update(table, context&: context.client, row_ids, column_ids: set_columns, data&: update_chunk); |
280 | } else { |
281 | auto &local_storage = LocalStorage::Get(context&: context.client, db&: data_table.db); |
282 | // Perform the update, using the results of the SET expressions |
283 | local_storage.Update(table&: data_table, row_ids, column_ids: set_columns, data&: update_chunk); |
284 | } |
285 | return update_chunk.size(); |
286 | } |
287 | |
288 | // TODO: should we use a hash table to keep track of this instead? |
289 | template <bool GLOBAL> |
290 | static void RegisterUpdatedRows(InsertLocalState &lstate, const Vector &row_ids, idx_t count) { |
291 | // Insert all rows, if any of the rows has already been updated before, we throw an error |
292 | auto data = FlatVector::GetData<row_t>(vector: row_ids); |
293 | |
294 | // The rowids in the transaction-local ART aren't final yet so we have to separately keep track of the two sets of |
295 | // rowids |
296 | unordered_set<row_t> &updated_rows = GLOBAL ? lstate.updated_global_rows : lstate.updated_local_rows; |
297 | for (idx_t i = 0; i < count; i++) { |
298 | auto result = updated_rows.insert(x: data[i]); |
299 | if (result.second == false) { |
300 | throw InvalidInputException( |
301 | "ON CONFLICT DO UPDATE can not update the same row twice in the same command, Ensure that no rows " |
302 | "proposed for insertion within the same command have duplicate constrained values" ); |
303 | } |
304 | } |
305 | } |
306 | |
307 | template <bool GLOBAL> |
308 | static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &context, InsertLocalState &lstate, |
309 | DataTable &data_table, const PhysicalInsert &op) { |
310 | auto &types_to_fetch = op.types_to_fetch; |
311 | auto &on_conflict_condition = op.on_conflict_condition; |
312 | auto &conflict_target = op.conflict_target; |
313 | auto &columns_to_fetch = op.columns_to_fetch; |
314 | |
315 | auto &local_storage = LocalStorage::Get(context&: context.client, db&: data_table.db); |
316 | |
317 | // We either want to do nothing, or perform an update when conflicts arise |
318 | ConflictInfo conflict_info(conflict_target); |
319 | ConflictManager conflict_manager(VerifyExistenceType::APPEND, lstate.insert_chunk.size(), &conflict_info); |
320 | if (GLOBAL) { |
321 | data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk, conflict_manager: &conflict_manager); |
322 | } else { |
323 | DataTable::VerifyUniqueIndexes(indexes&: local_storage.GetIndexes(table&: data_table), context&: context.client, chunk&: lstate.insert_chunk, |
324 | conflict_manager: &conflict_manager); |
325 | } |
326 | conflict_manager.Finalize(); |
327 | if (conflict_manager.ConflictCount() == 0) { |
328 | // No conflicts found, 0 updates performed |
329 | return 0; |
330 | } |
331 | auto &conflicts = conflict_manager.Conflicts(); |
332 | auto &row_ids = conflict_manager.RowIds(); |
333 | |
334 | DataChunk conflict_chunk; // contains only the conflicting values |
335 | DataChunk scan_chunk; // contains the original values, that caused the conflict |
336 | DataChunk combined_chunk; // contains conflict_chunk + scan_chunk (wide) |
337 | |
338 | // Filter out everything but the conflicting rows |
339 | conflict_chunk.Initialize(context&: context.client, types: lstate.insert_chunk.GetTypes()); |
340 | conflict_chunk.Reference(chunk&: lstate.insert_chunk); |
341 | conflict_chunk.Slice(sel_vector: conflicts.Selection(), count: conflicts.Count()); |
342 | conflict_chunk.SetCardinality(conflicts.Count()); |
343 | |
344 | // Holds the pins for the fetched rows |
345 | unique_ptr<ColumnFetchState> fetch_state; |
346 | if (!types_to_fetch.empty()) { |
347 | D_ASSERT(scan_chunk.size() == 0); |
348 | // When these values are required for the conditions or the SET expressions, |
349 | // then we scan the existing table for the conflicting tuples, using the rowids |
350 | scan_chunk.Initialize(context&: context.client, types: types_to_fetch); |
351 | fetch_state = make_uniq<ColumnFetchState>(); |
352 | if (GLOBAL) { |
353 | auto &transaction = DuckTransaction::Get(context&: context.client, catalog&: table.catalog); |
354 | data_table.Fetch(transaction, result&: scan_chunk, column_ids: columns_to_fetch, row_ids, fetch_count: conflicts.Count(), state&: *fetch_state); |
355 | } else { |
356 | local_storage.FetchChunk(table&: data_table, row_ids, count: conflicts.Count(), col_ids: columns_to_fetch, chunk&: scan_chunk, |
357 | fetch_state&: *fetch_state); |
358 | } |
359 | } |
360 | |
361 | // Splice the Input chunk and the fetched chunk together |
362 | CombineExistingAndInsertTuples(result&: combined_chunk, scan_chunk, input_chunk&: conflict_chunk, client&: context.client, op); |
363 | |
364 | if (on_conflict_condition) { |
365 | DataChunk conflict_condition_result; |
366 | CheckOnConflictCondition(context, conflicts&: combined_chunk, condition: on_conflict_condition, result&: conflict_condition_result); |
367 | bool conditions_met = AllConflictsMeetCondition(result&: conflict_condition_result); |
368 | if (!conditions_met) { |
369 | // Filter out the tuples that did pass the filter, then run the verify again |
370 | ManagedSelection sel(combined_chunk.size()); |
371 | auto data = FlatVector::GetData<bool>(vector&: conflict_condition_result.data[0]); |
372 | for (idx_t i = 0; i < combined_chunk.size(); i++) { |
373 | if (!data[i]) { |
374 | // Only populate the selection vector with the tuples that did not meet the condition |
375 | sel.Append(idx: i); |
376 | } |
377 | } |
378 | combined_chunk.Slice(sel_vector: sel.Selection(), count: sel.Count()); |
379 | row_ids.Slice(sel: sel.Selection(), count: sel.Count()); |
380 | if (GLOBAL) { |
381 | data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: combined_chunk, conflict_manager: nullptr); |
382 | } else { |
383 | DataTable::VerifyUniqueIndexes(indexes&: local_storage.GetIndexes(table&: data_table), context&: context.client, |
384 | chunk&: lstate.insert_chunk, conflict_manager: nullptr); |
385 | } |
386 | throw InternalException("The previous operation was expected to throw but didn't" ); |
387 | } |
388 | } |
389 | |
390 | RegisterUpdatedRows<GLOBAL>(lstate, row_ids, combined_chunk.size()); |
391 | |
392 | idx_t updated_tuples = PerformOnConflictAction<GLOBAL>(context, combined_chunk, table, row_ids, op); |
393 | |
394 | // Remove the conflicting tuples from the insert chunk |
395 | SelectionVector sel_vec(lstate.insert_chunk.size()); |
396 | idx_t new_size = |
397 | SelectionVector::Inverted(src: conflicts.Selection(), dst&: sel_vec, source_size: conflicts.Count(), count: lstate.insert_chunk.size()); |
398 | lstate.insert_chunk.Slice(sel_vector: sel_vec, count: new_size); |
399 | lstate.insert_chunk.SetCardinality(new_size); |
400 | return updated_tuples; |
401 | } |
402 | |
403 | idx_t PhysicalInsert::OnConflictHandling(TableCatalogEntry &table, ExecutionContext &context, |
404 | InsertLocalState &lstate) const { |
405 | auto &data_table = table.GetStorage(); |
406 | if (action_type == OnConflictAction::THROW) { |
407 | data_table.VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk, conflict_manager: nullptr); |
408 | return 0; |
409 | } |
410 | // Check whether any conflicts arise, and if they all meet the conflict_target + condition |
411 | // If that's not the case - We throw the first error |
412 | idx_t updated_tuples = 0; |
413 | updated_tuples += HandleInsertConflicts<true>(table, context, lstate, data_table, op: *this); |
414 | // Also check the transaction-local storage+ART so we can detect conflicts within this transaction |
415 | updated_tuples += HandleInsertConflicts<false>(table, context, lstate, data_table, op: *this); |
416 | |
417 | return updated_tuples; |
418 | } |
419 | |
420 | SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
421 | auto &gstate = input.global_state.Cast<InsertGlobalState>(); |
422 | auto &lstate = input.local_state.Cast<InsertLocalState>(); |
423 | |
424 | auto &table = gstate.table; |
425 | auto &storage = table.GetStorage(); |
426 | PhysicalInsert::ResolveDefaults(table, chunk, column_index_map, default_executor&: lstate.default_executor, result&: lstate.insert_chunk); |
427 | |
428 | if (!parallel) { |
429 | if (!gstate.initialized) { |
430 | storage.InitializeLocalAppend(state&: gstate.append_state, context&: context.client); |
431 | gstate.initialized = true; |
432 | } |
433 | |
434 | idx_t updated_tuples = OnConflictHandling(table, context, lstate); |
435 | gstate.insert_count += lstate.insert_chunk.size(); |
436 | gstate.insert_count += updated_tuples; |
437 | storage.LocalAppend(state&: gstate.append_state, table, context&: context.client, chunk&: lstate.insert_chunk, unsafe: true); |
438 | |
439 | if (return_chunk) { |
440 | gstate.return_collection.Append(new_chunk&: lstate.insert_chunk); |
441 | } |
442 | } else { |
443 | D_ASSERT(!return_chunk); |
444 | // parallel append |
445 | if (!lstate.local_collection) { |
446 | lock_guard<mutex> l(gstate.lock); |
447 | auto &table_info = storage.info; |
448 | auto &block_manager = TableIOManager::Get(table&: storage).GetBlockManagerForRowData(); |
449 | lstate.local_collection = |
450 | make_uniq<RowGroupCollection>(args&: table_info, args&: block_manager, args: insert_types, args: MAX_ROW_ID); |
451 | lstate.local_collection->InitializeEmpty(); |
452 | lstate.local_collection->InitializeAppend(state&: lstate.local_append_state); |
453 | lstate.writer = &gstate.table.GetStorage().CreateOptimisticWriter(context&: context.client); |
454 | } |
455 | OnConflictHandling(table, context, lstate); |
456 | |
457 | auto new_row_group = lstate.local_collection->Append(chunk&: lstate.insert_chunk, state&: lstate.local_append_state); |
458 | if (new_row_group) { |
459 | lstate.writer->WriteNewRowGroup(row_groups&: *lstate.local_collection); |
460 | } |
461 | } |
462 | |
463 | return SinkResultType::NEED_MORE_INPUT; |
464 | } |
465 | |
466 | void PhysicalInsert::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
467 | auto &gstate = gstate_p.Cast<InsertGlobalState>(); |
468 | auto &lstate = lstate_p.Cast<InsertLocalState>(); |
469 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
470 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.default_executor, name: "default_executor" , id: 1); |
471 | client_profiler.Flush(profiler&: context.thread.profiler); |
472 | |
473 | if (!parallel) { |
474 | return; |
475 | } |
476 | if (!lstate.local_collection) { |
477 | return; |
478 | } |
479 | // parallel append: finalize the append |
480 | TransactionData tdata(0, 0); |
481 | lstate.local_collection->FinalizeAppend(transaction: tdata, state&: lstate.local_append_state); |
482 | |
483 | auto append_count = lstate.local_collection->GetTotalRows(); |
484 | |
485 | lock_guard<mutex> lock(gstate.lock); |
486 | gstate.insert_count += append_count; |
487 | if (append_count < RowGroup::ROW_GROUP_SIZE) { |
488 | // we have few rows - append to the local storage directly |
489 | auto &table = gstate.table; |
490 | auto &storage = table.GetStorage(); |
491 | storage.InitializeLocalAppend(state&: gstate.append_state, context&: context.client); |
492 | auto &transaction = DuckTransaction::Get(context&: context.client, catalog&: table.catalog); |
493 | lstate.local_collection->Scan(transaction, fun: [&](DataChunk &insert_chunk) { |
494 | storage.LocalAppend(state&: gstate.append_state, table, context&: context.client, chunk&: insert_chunk); |
495 | return true; |
496 | }); |
497 | storage.FinalizeLocalAppend(state&: gstate.append_state); |
498 | } else { |
499 | // we have written rows to disk optimistically - merge directly into the transaction-local storage |
500 | gstate.table.GetStorage().FinalizeOptimisticWriter(context&: context.client, writer&: *lstate.writer); |
501 | gstate.table.GetStorage().LocalMerge(context&: context.client, collection&: *lstate.local_collection); |
502 | } |
503 | } |
504 | |
505 | SinkFinalizeType PhysicalInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
506 | GlobalSinkState &state) const { |
507 | auto &gstate = state.Cast<InsertGlobalState>(); |
508 | if (!parallel && gstate.initialized) { |
509 | auto &table = gstate.table; |
510 | auto &storage = table.GetStorage(); |
511 | storage.FinalizeLocalAppend(state&: gstate.append_state); |
512 | } |
513 | return SinkFinalizeType::READY; |
514 | } |
515 | |
516 | //===--------------------------------------------------------------------===// |
517 | // Source |
518 | //===--------------------------------------------------------------------===// |
519 | class InsertSourceState : public GlobalSourceState { |
520 | public: |
521 | explicit InsertSourceState(const PhysicalInsert &op) { |
522 | if (op.return_chunk) { |
523 | D_ASSERT(op.sink_state); |
524 | auto &g = op.sink_state->Cast<InsertGlobalState>(); |
525 | g.return_collection.InitializeScan(state&: scan_state); |
526 | } |
527 | } |
528 | |
529 | ColumnDataScanState scan_state; |
530 | }; |
531 | |
532 | unique_ptr<GlobalSourceState> PhysicalInsert::GetGlobalSourceState(ClientContext &context) const { |
533 | return make_uniq<InsertSourceState>(args: *this); |
534 | } |
535 | |
536 | SourceResultType PhysicalInsert::GetData(ExecutionContext &context, DataChunk &chunk, |
537 | OperatorSourceInput &input) const { |
538 | auto &state = input.global_state.Cast<InsertSourceState>(); |
539 | auto &insert_gstate = sink_state->Cast<InsertGlobalState>(); |
540 | if (!return_chunk) { |
541 | chunk.SetCardinality(1); |
542 | chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: insert_gstate.insert_count)); |
543 | return SourceResultType::FINISHED; |
544 | } |
545 | |
546 | insert_gstate.return_collection.Scan(state&: state.scan_state, result&: chunk); |
547 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
548 | } |
549 | |
550 | } // namespace duckdb |
551 | |