1#include "duckdb/execution/operator/persistent/physical_delete.hpp"
2
3#include "duckdb/common/atomic.hpp"
4#include "duckdb/common/types/column/column_data_collection.hpp"
5#include "duckdb/execution/expression_executor.hpp"
6#include "duckdb/storage/data_table.hpp"
7#include "duckdb/storage/table/scan_state.hpp"
8#include "duckdb/transaction/duck_transaction.hpp"
9
10namespace duckdb {
11
12//===--------------------------------------------------------------------===//
13// Sink
14//===--------------------------------------------------------------------===//
15class DeleteGlobalState : public GlobalSinkState {
16public:
17 explicit DeleteGlobalState(ClientContext &context, const vector<LogicalType> &return_types)
18 : deleted_count(0), return_collection(context, return_types) {
19 }
20
21 mutex delete_lock;
22 idx_t deleted_count;
23 ColumnDataCollection return_collection;
24};
25
26class DeleteLocalState : public LocalSinkState {
27public:
28 DeleteLocalState(Allocator &allocator, const vector<LogicalType> &table_types) {
29 delete_chunk.Initialize(allocator, types: table_types);
30 }
31 DataChunk delete_chunk;
32};
33
34SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
35 auto &gstate = input.global_state.Cast<DeleteGlobalState>();
36 auto &ustate = input.local_state.Cast<DeleteLocalState>();
37
38 // get rows and
39 auto &transaction = DuckTransaction::Get(context&: context.client, db&: table.db);
40 auto &row_identifiers = chunk.data[row_id_index];
41
42 vector<column_t> column_ids;
43 for (idx_t i = 0; i < table.column_definitions.size(); i++) {
44 column_ids.emplace_back(args&: i);
45 };
46 auto cfs = ColumnFetchState();
47
48 lock_guard<mutex> delete_guard(gstate.delete_lock);
49 if (return_chunk) {
50 row_identifiers.Flatten(count: chunk.size());
51 table.Fetch(transaction, result&: ustate.delete_chunk, column_ids, row_ids: row_identifiers, fetch_count: chunk.size(), state&: cfs);
52 gstate.return_collection.Append(new_chunk&: ustate.delete_chunk);
53 }
54 gstate.deleted_count += table.Delete(table&: tableref, context&: context.client, row_ids&: row_identifiers, count: chunk.size());
55
56 return SinkResultType::NEED_MORE_INPUT;
57}
58
59unique_ptr<GlobalSinkState> PhysicalDelete::GetGlobalSinkState(ClientContext &context) const {
60 return make_uniq<DeleteGlobalState>(args&: context, args: GetTypes());
61}
62
63unique_ptr<LocalSinkState> PhysicalDelete::GetLocalSinkState(ExecutionContext &context) const {
64 return make_uniq<DeleteLocalState>(args&: Allocator::Get(context&: context.client), args: table.GetTypes());
65}
66
67//===--------------------------------------------------------------------===//
68// Source
69//===--------------------------------------------------------------------===//
70class DeleteSourceState : public GlobalSourceState {
71public:
72 explicit DeleteSourceState(const PhysicalDelete &op) {
73 if (op.return_chunk) {
74 D_ASSERT(op.sink_state);
75 auto &g = op.sink_state->Cast<DeleteGlobalState>();
76 g.return_collection.InitializeScan(state&: scan_state);
77 }
78 }
79
80 ColumnDataScanState scan_state;
81};
82
83unique_ptr<GlobalSourceState> PhysicalDelete::GetGlobalSourceState(ClientContext &context) const {
84 return make_uniq<DeleteSourceState>(args: *this);
85}
86
87SourceResultType PhysicalDelete::GetData(ExecutionContext &context, DataChunk &chunk,
88 OperatorSourceInput &input) const {
89 auto &state = input.global_state.Cast<DeleteSourceState>();
90 auto &g = sink_state->Cast<DeleteGlobalState>();
91 if (!return_chunk) {
92 chunk.SetCardinality(1);
93 chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: g.deleted_count));
94 return SourceResultType::FINISHED;
95 }
96
97 g.return_collection.Scan(state&: state.scan_state, result&: chunk);
98
99 return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
100}
101
102} // namespace duckdb
103