1#include "duckdb/execution/operator/persistent/physical_update.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/vector_operations/vector_operations.hpp"
5#include "duckdb/execution/expression_executor.hpp"
6#include "duckdb/planner/expression/bound_reference_expression.hpp"
7#include "duckdb/storage/data_table.hpp"
8
9using namespace duckdb;
10using namespace std;
11
12void PhysicalUpdate::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state) {
13 vector<TypeId> update_types;
14 for (auto &expr : expressions) {
15 update_types.push_back(expr->return_type);
16 }
17 DataChunk update_chunk;
18 update_chunk.Initialize(update_types);
19
20 // initialize states for bound default expressions
21 ExpressionExecutor default_executor(bound_defaults);
22
23 int64_t updated_count = 0;
24
25 DataChunk mock_chunk;
26 if (is_index_update) {
27 mock_chunk.Initialize(table.types);
28 }
29
30 while (true) {
31 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
32 if (state->child_chunk.size() == 0) {
33 break;
34 }
35 state->child_chunk.Normalify();
36 default_executor.SetChunk(state->child_chunk);
37
38 // update data in the base table
39 // the row ids are given to us as the last column of the child chunk
40 auto &row_ids = state->child_chunk.data[state->child_chunk.column_count() - 1];
41 update_chunk.SetCardinality(state->child_chunk);
42 for (idx_t i = 0; i < expressions.size(); i++) {
43 if (expressions[i]->type == ExpressionType::VALUE_DEFAULT) {
44 // default expression, set to the default value of the column
45 default_executor.ExecuteExpression(columns[i], update_chunk.data[i]);
46 } else {
47 assert(expressions[i]->type == ExpressionType::BOUND_REF);
48 // index into child chunk
49 auto &binding = (BoundReferenceExpression &)*expressions[i];
50 update_chunk.data[i].Reference(state->child_chunk.data[binding.index]);
51 }
52 }
53
54 if (is_index_update) {
55 // index update, perform a delete and an append instead
56 table.Delete(tableref, context, row_ids, update_chunk.size());
57 mock_chunk.SetCardinality(update_chunk);
58 for (idx_t i = 0; i < columns.size(); i++) {
59 mock_chunk.data[columns[i]].Reference(update_chunk.data[i]);
60 }
61 table.Append(tableref, context, mock_chunk);
62 } else {
63 table.Update(tableref, context, row_ids, columns, update_chunk);
64 }
65 updated_count += state->child_chunk.size();
66 }
67
68 chunk.SetCardinality(1);
69 chunk.SetValue(0, 0, Value::BIGINT(updated_count));
70
71 state->finished = true;
72
73 chunk.Verify();
74}
75