1 | #include "duckdb/execution/operator/persistent/physical_update.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
7 | #include "duckdb/storage/data_table.hpp" |
8 | |
9 | using namespace duckdb; |
10 | using namespace std; |
11 | |
12 | void PhysicalUpdate::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state) { |
13 | vector<TypeId> update_types; |
14 | for (auto &expr : expressions) { |
15 | update_types.push_back(expr->return_type); |
16 | } |
17 | DataChunk update_chunk; |
18 | update_chunk.Initialize(update_types); |
19 | |
20 | // initialize states for bound default expressions |
21 | ExpressionExecutor default_executor(bound_defaults); |
22 | |
23 | int64_t updated_count = 0; |
24 | |
25 | DataChunk mock_chunk; |
26 | if (is_index_update) { |
27 | mock_chunk.Initialize(table.types); |
28 | } |
29 | |
30 | while (true) { |
31 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
32 | if (state->child_chunk.size() == 0) { |
33 | break; |
34 | } |
35 | state->child_chunk.Normalify(); |
36 | default_executor.SetChunk(state->child_chunk); |
37 | |
38 | // update data in the base table |
39 | // the row ids are given to us as the last column of the child chunk |
40 | auto &row_ids = state->child_chunk.data[state->child_chunk.column_count() - 1]; |
41 | update_chunk.SetCardinality(state->child_chunk); |
42 | for (idx_t i = 0; i < expressions.size(); i++) { |
43 | if (expressions[i]->type == ExpressionType::VALUE_DEFAULT) { |
44 | // default expression, set to the default value of the column |
45 | default_executor.ExecuteExpression(columns[i], update_chunk.data[i]); |
46 | } else { |
47 | assert(expressions[i]->type == ExpressionType::BOUND_REF); |
48 | // index into child chunk |
49 | auto &binding = (BoundReferenceExpression &)*expressions[i]; |
50 | update_chunk.data[i].Reference(state->child_chunk.data[binding.index]); |
51 | } |
52 | } |
53 | |
54 | if (is_index_update) { |
55 | // index update, perform a delete and an append instead |
56 | table.Delete(tableref, context, row_ids, update_chunk.size()); |
57 | mock_chunk.SetCardinality(update_chunk); |
58 | for (idx_t i = 0; i < columns.size(); i++) { |
59 | mock_chunk.data[columns[i]].Reference(update_chunk.data[i]); |
60 | } |
61 | table.Append(tableref, context, mock_chunk); |
62 | } else { |
63 | table.Update(tableref, context, row_ids, columns, update_chunk); |
64 | } |
65 | updated_count += state->child_chunk.size(); |
66 | } |
67 | |
68 | chunk.SetCardinality(1); |
69 | chunk.SetValue(0, 0, Value::BIGINT(updated_count)); |
70 | |
71 | state->finished = true; |
72 | |
73 | chunk.Verify(); |
74 | } |
75 | |