| 1 | #include "duckdb/execution/operator/persistent/physical_delete.hpp" |
| 2 | |
| 3 | #include "duckdb/common/atomic.hpp" |
| 4 | #include "duckdb/common/types/column/column_data_collection.hpp" |
| 5 | #include "duckdb/execution/expression_executor.hpp" |
| 6 | #include "duckdb/storage/data_table.hpp" |
| 7 | #include "duckdb/storage/table/scan_state.hpp" |
| 8 | #include "duckdb/transaction/duck_transaction.hpp" |
| 9 | |
| 10 | namespace duckdb { |
| 11 | |
| 12 | //===--------------------------------------------------------------------===// |
| 13 | // Sink |
| 14 | //===--------------------------------------------------------------------===// |
| 15 | class DeleteGlobalState : public GlobalSinkState { |
| 16 | public: |
| 17 | explicit DeleteGlobalState(ClientContext &context, const vector<LogicalType> &return_types) |
| 18 | : deleted_count(0), return_collection(context, return_types) { |
| 19 | } |
| 20 | |
| 21 | mutex delete_lock; |
| 22 | idx_t deleted_count; |
| 23 | ColumnDataCollection return_collection; |
| 24 | }; |
| 25 | |
| 26 | class DeleteLocalState : public LocalSinkState { |
| 27 | public: |
| 28 | DeleteLocalState(Allocator &allocator, const vector<LogicalType> &table_types) { |
| 29 | delete_chunk.Initialize(allocator, types: table_types); |
| 30 | } |
| 31 | DataChunk delete_chunk; |
| 32 | }; |
| 33 | |
| 34 | SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 35 | auto &gstate = input.global_state.Cast<DeleteGlobalState>(); |
| 36 | auto &ustate = input.local_state.Cast<DeleteLocalState>(); |
| 37 | |
| 38 | // get rows and |
| 39 | auto &transaction = DuckTransaction::Get(context&: context.client, db&: table.db); |
| 40 | auto &row_identifiers = chunk.data[row_id_index]; |
| 41 | |
| 42 | vector<column_t> column_ids; |
| 43 | for (idx_t i = 0; i < table.column_definitions.size(); i++) { |
| 44 | column_ids.emplace_back(args&: i); |
| 45 | }; |
| 46 | auto cfs = ColumnFetchState(); |
| 47 | |
| 48 | lock_guard<mutex> delete_guard(gstate.delete_lock); |
| 49 | if (return_chunk) { |
| 50 | row_identifiers.Flatten(count: chunk.size()); |
| 51 | table.Fetch(transaction, result&: ustate.delete_chunk, column_ids, row_ids: row_identifiers, fetch_count: chunk.size(), state&: cfs); |
| 52 | gstate.return_collection.Append(new_chunk&: ustate.delete_chunk); |
| 53 | } |
| 54 | gstate.deleted_count += table.Delete(table&: tableref, context&: context.client, row_ids&: row_identifiers, count: chunk.size()); |
| 55 | |
| 56 | return SinkResultType::NEED_MORE_INPUT; |
| 57 | } |
| 58 | |
| 59 | unique_ptr<GlobalSinkState> PhysicalDelete::GetGlobalSinkState(ClientContext &context) const { |
| 60 | return make_uniq<DeleteGlobalState>(args&: context, args: GetTypes()); |
| 61 | } |
| 62 | |
| 63 | unique_ptr<LocalSinkState> PhysicalDelete::GetLocalSinkState(ExecutionContext &context) const { |
| 64 | return make_uniq<DeleteLocalState>(args&: Allocator::Get(context&: context.client), args: table.GetTypes()); |
| 65 | } |
| 66 | |
| 67 | //===--------------------------------------------------------------------===// |
| 68 | // Source |
| 69 | //===--------------------------------------------------------------------===// |
| 70 | class DeleteSourceState : public GlobalSourceState { |
| 71 | public: |
| 72 | explicit DeleteSourceState(const PhysicalDelete &op) { |
| 73 | if (op.return_chunk) { |
| 74 | D_ASSERT(op.sink_state); |
| 75 | auto &g = op.sink_state->Cast<DeleteGlobalState>(); |
| 76 | g.return_collection.InitializeScan(state&: scan_state); |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | ColumnDataScanState scan_state; |
| 81 | }; |
| 82 | |
| 83 | unique_ptr<GlobalSourceState> PhysicalDelete::GetGlobalSourceState(ClientContext &context) const { |
| 84 | return make_uniq<DeleteSourceState>(args: *this); |
| 85 | } |
| 86 | |
| 87 | SourceResultType PhysicalDelete::GetData(ExecutionContext &context, DataChunk &chunk, |
| 88 | OperatorSourceInput &input) const { |
| 89 | auto &state = input.global_state.Cast<DeleteSourceState>(); |
| 90 | auto &g = sink_state->Cast<DeleteGlobalState>(); |
| 91 | if (!return_chunk) { |
| 92 | chunk.SetCardinality(1); |
| 93 | chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: g.deleted_count)); |
| 94 | return SourceResultType::FINISHED; |
| 95 | } |
| 96 | |
| 97 | g.return_collection.Scan(state&: state.scan_state, result&: chunk); |
| 98 | |
| 99 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 100 | } |
| 101 | |
| 102 | } // namespace duckdb |
| 103 | |