1 | #include "duckdb/execution/operator/persistent/physical_delete.hpp" |
2 | |
3 | #include "duckdb/common/atomic.hpp" |
4 | #include "duckdb/common/types/column/column_data_collection.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/storage/data_table.hpp" |
7 | #include "duckdb/storage/table/scan_state.hpp" |
8 | #include "duckdb/transaction/duck_transaction.hpp" |
9 | |
10 | namespace duckdb { |
11 | |
12 | //===--------------------------------------------------------------------===// |
13 | // Sink |
14 | //===--------------------------------------------------------------------===// |
15 | class DeleteGlobalState : public GlobalSinkState { |
16 | public: |
17 | explicit DeleteGlobalState(ClientContext &context, const vector<LogicalType> &return_types) |
18 | : deleted_count(0), return_collection(context, return_types) { |
19 | } |
20 | |
21 | mutex delete_lock; |
22 | idx_t deleted_count; |
23 | ColumnDataCollection return_collection; |
24 | }; |
25 | |
26 | class DeleteLocalState : public LocalSinkState { |
27 | public: |
28 | DeleteLocalState(Allocator &allocator, const vector<LogicalType> &table_types) { |
29 | delete_chunk.Initialize(allocator, types: table_types); |
30 | } |
31 | DataChunk delete_chunk; |
32 | }; |
33 | |
34 | SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
35 | auto &gstate = input.global_state.Cast<DeleteGlobalState>(); |
36 | auto &ustate = input.local_state.Cast<DeleteLocalState>(); |
37 | |
38 | // get rows and |
39 | auto &transaction = DuckTransaction::Get(context&: context.client, db&: table.db); |
40 | auto &row_identifiers = chunk.data[row_id_index]; |
41 | |
42 | vector<column_t> column_ids; |
43 | for (idx_t i = 0; i < table.column_definitions.size(); i++) { |
44 | column_ids.emplace_back(args&: i); |
45 | }; |
46 | auto cfs = ColumnFetchState(); |
47 | |
48 | lock_guard<mutex> delete_guard(gstate.delete_lock); |
49 | if (return_chunk) { |
50 | row_identifiers.Flatten(count: chunk.size()); |
51 | table.Fetch(transaction, result&: ustate.delete_chunk, column_ids, row_ids: row_identifiers, fetch_count: chunk.size(), state&: cfs); |
52 | gstate.return_collection.Append(new_chunk&: ustate.delete_chunk); |
53 | } |
54 | gstate.deleted_count += table.Delete(table&: tableref, context&: context.client, row_ids&: row_identifiers, count: chunk.size()); |
55 | |
56 | return SinkResultType::NEED_MORE_INPUT; |
57 | } |
58 | |
59 | unique_ptr<GlobalSinkState> PhysicalDelete::GetGlobalSinkState(ClientContext &context) const { |
60 | return make_uniq<DeleteGlobalState>(args&: context, args: GetTypes()); |
61 | } |
62 | |
63 | unique_ptr<LocalSinkState> PhysicalDelete::GetLocalSinkState(ExecutionContext &context) const { |
64 | return make_uniq<DeleteLocalState>(args&: Allocator::Get(context&: context.client), args: table.GetTypes()); |
65 | } |
66 | |
67 | //===--------------------------------------------------------------------===// |
68 | // Source |
69 | //===--------------------------------------------------------------------===// |
70 | class DeleteSourceState : public GlobalSourceState { |
71 | public: |
72 | explicit DeleteSourceState(const PhysicalDelete &op) { |
73 | if (op.return_chunk) { |
74 | D_ASSERT(op.sink_state); |
75 | auto &g = op.sink_state->Cast<DeleteGlobalState>(); |
76 | g.return_collection.InitializeScan(state&: scan_state); |
77 | } |
78 | } |
79 | |
80 | ColumnDataScanState scan_state; |
81 | }; |
82 | |
83 | unique_ptr<GlobalSourceState> PhysicalDelete::GetGlobalSourceState(ClientContext &context) const { |
84 | return make_uniq<DeleteSourceState>(args: *this); |
85 | } |
86 | |
87 | SourceResultType PhysicalDelete::GetData(ExecutionContext &context, DataChunk &chunk, |
88 | OperatorSourceInput &input) const { |
89 | auto &state = input.global_state.Cast<DeleteSourceState>(); |
90 | auto &g = sink_state->Cast<DeleteGlobalState>(); |
91 | if (!return_chunk) { |
92 | chunk.SetCardinality(1); |
93 | chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: g.deleted_count)); |
94 | return SourceResultType::FINISHED; |
95 | } |
96 | |
97 | g.return_collection.Scan(state&: state.scan_state, result&: chunk); |
98 | |
99 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
100 | } |
101 | |
102 | } // namespace duckdb |
103 | |