1 | #include "duckdb/transaction/duck_transaction.hpp" |
2 | |
3 | #include "duckdb/main/client_context.hpp" |
4 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
5 | #include "duckdb/common/exception.hpp" |
6 | #include "duckdb/parser/column_definition.hpp" |
7 | #include "duckdb/storage/data_table.hpp" |
8 | #include "duckdb/storage/write_ahead_log.hpp" |
9 | #include "duckdb/storage/storage_manager.hpp" |
10 | |
11 | #include "duckdb/transaction/append_info.hpp" |
12 | #include "duckdb/transaction/delete_info.hpp" |
13 | #include "duckdb/transaction/update_info.hpp" |
14 | #include "duckdb/transaction/local_storage.hpp" |
15 | #include "duckdb/main/config.hpp" |
16 | #include "duckdb/storage/table/column_data.hpp" |
17 | #include "duckdb/main/client_data.hpp" |
18 | #include "duckdb/main/attached_database.hpp" |
19 | |
20 | namespace duckdb { |
21 | |
22 | TransactionData::TransactionData(DuckTransaction &transaction_p) // NOLINT |
23 | : transaction(&transaction_p), transaction_id(transaction_p.transaction_id), start_time(transaction_p.start_time) { |
24 | } |
25 | TransactionData::TransactionData(transaction_t transaction_id_p, transaction_t start_time_p) |
26 | : transaction(nullptr), transaction_id(transaction_id_p), start_time(start_time_p) { |
27 | } |
28 | |
29 | DuckTransaction::DuckTransaction(TransactionManager &manager, ClientContext &context_p, transaction_t start_time, |
30 | transaction_t transaction_id) |
31 | : Transaction(manager, context_p), start_time(start_time), transaction_id(transaction_id), commit_id(0), |
32 | highest_active_query(0), undo_buffer(context_p), storage(make_uniq<LocalStorage>(args&: context_p, args&: *this)) { |
33 | } |
34 | |
35 | DuckTransaction::~DuckTransaction() { |
36 | } |
37 | |
38 | DuckTransaction &DuckTransaction::Get(ClientContext &context, AttachedDatabase &db) { |
39 | return DuckTransaction::Get(context, catalog&: db.GetCatalog()); |
40 | } |
41 | |
42 | DuckTransaction &DuckTransaction::Get(ClientContext &context, Catalog &catalog) { |
43 | auto &transaction = Transaction::Get(context, catalog); |
44 | if (!transaction.IsDuckTransaction()) { |
45 | throw InternalException("DuckTransaction::Get called on non-DuckDB transaction" ); |
46 | } |
47 | return transaction.Cast<DuckTransaction>(); |
48 | } |
49 | |
50 | LocalStorage &DuckTransaction::GetLocalStorage() { |
51 | return *storage; |
52 | } |
53 | |
54 | void DuckTransaction::PushCatalogEntry(CatalogEntry &entry, data_ptr_t , idx_t ) { |
55 | idx_t alloc_size = sizeof(CatalogEntry *); |
56 | if (extra_data_size > 0) { |
57 | alloc_size += extra_data_size + sizeof(idx_t); |
58 | } |
59 | auto baseptr = undo_buffer.CreateEntry(type: UndoFlags::CATALOG_ENTRY, len: alloc_size); |
60 | // store the pointer to the catalog entry |
61 | Store<CatalogEntry *>(val: &entry, ptr: baseptr); |
62 | if (extra_data_size > 0) { |
63 | // copy the extra data behind the catalog entry pointer (if any) |
64 | baseptr += sizeof(CatalogEntry *); |
65 | // first store the extra data size |
66 | Store<idx_t>(val: extra_data_size, ptr: baseptr); |
67 | baseptr += sizeof(idx_t); |
68 | // then copy over the actual data |
69 | memcpy(dest: baseptr, src: extra_data, n: extra_data_size); |
70 | } |
71 | } |
72 | |
73 | void DuckTransaction::PushDelete(DataTable &table, ChunkVectorInfo *vinfo, row_t rows[], idx_t count, idx_t base_row) { |
74 | auto delete_info = reinterpret_cast<DeleteInfo *>( |
75 | undo_buffer.CreateEntry(type: UndoFlags::DELETE_TUPLE, len: sizeof(DeleteInfo) + sizeof(row_t) * count)); |
76 | delete_info->vinfo = vinfo; |
77 | delete_info->table = &table; |
78 | delete_info->count = count; |
79 | delete_info->base_row = base_row; |
80 | memcpy(dest: delete_info->rows, src: rows, n: sizeof(row_t) * count); |
81 | } |
82 | |
83 | void DuckTransaction::PushAppend(DataTable &table, idx_t start_row, idx_t row_count) { |
84 | auto append_info = |
85 | reinterpret_cast<AppendInfo *>(undo_buffer.CreateEntry(type: UndoFlags::INSERT_TUPLE, len: sizeof(AppendInfo))); |
86 | append_info->table = &table; |
87 | append_info->start_row = start_row; |
88 | append_info->count = row_count; |
89 | } |
90 | |
91 | UpdateInfo *DuckTransaction::CreateUpdateInfo(idx_t type_size, idx_t entries) { |
92 | data_ptr_t base_info = undo_buffer.CreateEntry( |
93 | type: UndoFlags::UPDATE_TUPLE, len: sizeof(UpdateInfo) + (sizeof(sel_t) + type_size) * STANDARD_VECTOR_SIZE); |
94 | auto update_info = reinterpret_cast<UpdateInfo *>(base_info); |
95 | update_info->max = STANDARD_VECTOR_SIZE; |
96 | update_info->tuples = reinterpret_cast<sel_t *>(base_info + sizeof(UpdateInfo)); |
97 | update_info->tuple_data = base_info + sizeof(UpdateInfo) + sizeof(sel_t) * update_info->max; |
98 | update_info->version_number = transaction_id; |
99 | return update_info; |
100 | } |
101 | |
102 | bool DuckTransaction::ChangesMade() { |
103 | return undo_buffer.ChangesMade() || storage->ChangesMade(); |
104 | } |
105 | |
106 | bool DuckTransaction::AutomaticCheckpoint(AttachedDatabase &db) { |
107 | auto &storage_manager = db.GetStorageManager(); |
108 | return storage_manager.AutomaticCheckpoint(estimated_wal_bytes: storage->EstimatedSize() + undo_buffer.EstimatedSize()); |
109 | } |
110 | |
111 | string DuckTransaction::Commit(AttachedDatabase &db, transaction_t commit_id, bool checkpoint) noexcept { |
112 | // "checkpoint" parameter indicates if the caller will checkpoint. If checkpoint == |
113 | // true: Then this function will NOT write to the WAL or flush/persist. |
114 | // This method only makes commit in memory, expecting caller to checkpoint/flush. |
115 | // false: Then this function WILL write to the WAL and Flush/Persist it. |
116 | this->commit_id = commit_id; |
117 | |
118 | UndoBuffer::IteratorState iterator_state; |
119 | LocalStorage::CommitState commit_state; |
120 | unique_ptr<StorageCommitState> storage_commit_state; |
121 | optional_ptr<WriteAheadLog> log; |
122 | if (!db.IsSystem()) { |
123 | auto &storage_manager = db.GetStorageManager(); |
124 | log = storage_manager.GetWriteAheadLog(); |
125 | storage_commit_state = storage_manager.GenStorageCommitState(transaction&: *this, checkpoint); |
126 | } else { |
127 | log = nullptr; |
128 | } |
129 | try { |
130 | storage->Commit(commit_state, transaction&: *this); |
131 | undo_buffer.Commit(iterator_state, log, commit_id); |
132 | if (log) { |
133 | // commit any sequences that were used to the WAL |
134 | for (auto &entry : sequence_usage) { |
135 | log->WriteSequenceValue(entry: *entry.first, val: entry.second); |
136 | } |
137 | } |
138 | if (storage_commit_state) { |
139 | storage_commit_state->FlushCommit(); |
140 | } |
141 | return string(); |
142 | } catch (std::exception &ex) { |
143 | return ex.what(); |
144 | } |
145 | } |
146 | |
147 | void DuckTransaction::Rollback() noexcept { |
148 | storage->Rollback(); |
149 | undo_buffer.Rollback(); |
150 | } |
151 | |
152 | void DuckTransaction::Cleanup() { |
153 | undo_buffer.Cleanup(); |
154 | } |
155 | |
156 | } // namespace duckdb |
157 | |