1#include "duckdb/transaction/duck_transaction_manager.hpp"
2
3#include "duckdb/catalog/catalog_set.hpp"
4#include "duckdb/common/exception.hpp"
5#include "duckdb/common/helper.hpp"
6#include "duckdb/common/types/timestamp.hpp"
7#include "duckdb/catalog/catalog.hpp"
8#include "duckdb/catalog/dependency_manager.hpp"
9#include "duckdb/storage/storage_manager.hpp"
10#include "duckdb/transaction/duck_transaction.hpp"
11#include "duckdb/main/client_context.hpp"
12#include "duckdb/main/connection_manager.hpp"
13#include "duckdb/main/attached_database.hpp"
14#include "duckdb/main/database_manager.hpp"
15
16namespace duckdb {
17
18struct CheckpointLock {
19 explicit CheckpointLock(DuckTransactionManager &manager) : manager(manager), is_locked(false) {
20 }
21 ~CheckpointLock() {
22 Unlock();
23 }
24
25 DuckTransactionManager &manager;
26 bool is_locked;
27
28 void Lock() {
29 D_ASSERT(!manager.thread_is_checkpointing);
30 manager.thread_is_checkpointing = true;
31 is_locked = true;
32 }
33 void Unlock() {
34 if (!is_locked) {
35 return;
36 }
37 D_ASSERT(manager.thread_is_checkpointing);
38 manager.thread_is_checkpointing = false;
39 is_locked = false;
40 }
41};
42
43DuckTransactionManager::DuckTransactionManager(AttachedDatabase &db)
44 : TransactionManager(db), thread_is_checkpointing(false) {
45 // start timestamp starts at two
46 current_start_timestamp = 2;
47 // transaction ID starts very high:
48 // it should be much higher than the current start timestamp
49 // if transaction_id < start_timestamp for any set of active transactions
50 // uncommited data could be read by
51 current_transaction_id = TRANSACTION_ID_START;
52 lowest_active_id = TRANSACTION_ID_START;
53 lowest_active_start = MAX_TRANSACTION_ID;
54}
55
56DuckTransactionManager::~DuckTransactionManager() {
57}
58
59DuckTransactionManager &DuckTransactionManager::Get(AttachedDatabase &db) {
60 auto &transaction_manager = TransactionManager::Get(db);
61 if (!transaction_manager.IsDuckTransactionManager()) {
62 throw InternalException("Calling DuckTransactionManager::Get on non-DuckDB transaction manager");
63 }
64 return reinterpret_cast<DuckTransactionManager &>(transaction_manager);
65}
66
67Transaction *DuckTransactionManager::StartTransaction(ClientContext &context) {
68 // obtain the transaction lock during this function
69 lock_guard<mutex> lock(transaction_lock);
70 if (current_start_timestamp >= TRANSACTION_ID_START) { // LCOV_EXCL_START
71 throw InternalException("Cannot start more transactions, ran out of "
72 "transaction identifiers!");
73 } // LCOV_EXCL_STOP
74
75 // obtain the start time and transaction ID of this transaction
76 transaction_t start_time = current_start_timestamp++;
77 transaction_t transaction_id = current_transaction_id++;
78 if (active_transactions.empty()) {
79 lowest_active_start = start_time;
80 lowest_active_id = transaction_id;
81 }
82
83 // create the actual transaction
84 auto transaction = make_uniq<DuckTransaction>(args&: *this, args&: context, args&: start_time, args&: transaction_id);
85 auto transaction_ptr = transaction.get();
86
87 // store it in the set of active transactions
88 active_transactions.push_back(x: std::move(transaction));
89 return transaction_ptr;
90}
91
92struct ClientLockWrapper {
93 ClientLockWrapper(mutex &client_lock, shared_ptr<ClientContext> connection)
94 : connection(std::move(connection)), connection_lock(make_uniq<lock_guard<mutex>>(args&: client_lock)) {
95 }
96
97 shared_ptr<ClientContext> connection;
98 unique_ptr<lock_guard<mutex>> connection_lock;
99};
100
101void DuckTransactionManager::LockClients(vector<ClientLockWrapper> &client_locks, ClientContext &context) {
102 auto &connection_manager = ConnectionManager::Get(context);
103 client_locks.emplace_back(args&: connection_manager.connections_lock, args: nullptr);
104 auto connection_list = connection_manager.GetConnectionList();
105 for (auto &con : connection_list) {
106 if (con.get() == &context) {
107 continue;
108 }
109 auto &context_lock = con->context_lock;
110 client_locks.emplace_back(args&: context_lock, args: std::move(con));
111 }
112}
113
114void DuckTransactionManager::Checkpoint(ClientContext &context, bool force) {
115 auto &storage_manager = db.GetStorageManager();
116 if (storage_manager.InMemory()) {
117 return;
118 }
119
120 // first check if no other thread is checkpointing right now
121 auto lock = unique_lock<mutex>(transaction_lock);
122 if (thread_is_checkpointing) {
123 throw TransactionException("Cannot CHECKPOINT: another thread is checkpointing right now");
124 }
125 CheckpointLock checkpoint_lock(*this);
126 checkpoint_lock.Lock();
127 lock.unlock();
128
129 // lock all the clients AND the connection manager now
130 // this ensures no new queries can be started, and no new connections to the database can be made
131 // to avoid deadlock we release the transaction lock while locking the clients
132 vector<ClientLockWrapper> client_locks;
133 LockClients(client_locks, context);
134
135 auto current = &DuckTransaction::Get(context, db);
136 lock.lock();
137 if (current->ChangesMade()) {
138 throw TransactionException("Cannot CHECKPOINT: the current transaction has transaction local changes");
139 }
140 if (!force) {
141 if (!CanCheckpoint(current)) {
142 throw TransactionException("Cannot CHECKPOINT: there are other transactions. Use FORCE CHECKPOINT to abort "
143 "the other transactions and force a checkpoint");
144 }
145 } else {
146 if (!CanCheckpoint(current)) {
147 for (size_t i = 0; i < active_transactions.size(); i++) {
148 auto &transaction = active_transactions[i];
149 // rollback the transaction
150 transaction->Rollback();
151 auto transaction_context = transaction->context.lock();
152
153 // remove the transaction id from the list of active transactions
154 // potentially resulting in garbage collection
155 RemoveTransaction(transaction&: *transaction);
156 if (transaction_context) {
157 transaction_context->transaction.ClearTransaction();
158 }
159 i--;
160 }
161 D_ASSERT(CanCheckpoint(nullptr));
162 }
163 }
164 storage_manager.CreateCheckpoint();
165}
166
167bool DuckTransactionManager::CanCheckpoint(optional_ptr<DuckTransaction> current) {
168 if (db.IsSystem()) {
169 return false;
170 }
171 auto &storage_manager = db.GetStorageManager();
172 if (storage_manager.InMemory()) {
173 return false;
174 }
175 if (!recently_committed_transactions.empty() || !old_transactions.empty()) {
176 return false;
177 }
178 for (auto &transaction : active_transactions) {
179 if (transaction.get() != current.get()) {
180 return false;
181 }
182 }
183 return true;
184}
185
186string DuckTransactionManager::CommitTransaction(ClientContext &context, Transaction *transaction_p) {
187 auto &transaction = transaction_p->Cast<DuckTransaction>();
188 vector<ClientLockWrapper> client_locks;
189 auto lock = make_uniq<lock_guard<mutex>>(args&: transaction_lock);
190 CheckpointLock checkpoint_lock(*this);
191 // check if we can checkpoint
192 bool checkpoint = thread_is_checkpointing ? false : CanCheckpoint(current: &transaction);
193 if (checkpoint) {
194 if (transaction.AutomaticCheckpoint(db)) {
195 checkpoint_lock.Lock();
196 // we might be able to checkpoint: lock all clients
197 // to avoid deadlock we release the transaction lock while locking the clients
198 lock.reset();
199
200 LockClients(client_locks, context);
201
202 lock = make_uniq<lock_guard<mutex>>(args&: transaction_lock);
203 checkpoint = CanCheckpoint(current: &transaction);
204 if (!checkpoint) {
205 checkpoint_lock.Unlock();
206 client_locks.clear();
207 }
208 } else {
209 checkpoint = false;
210 }
211 }
212 // obtain a commit id for the transaction
213 transaction_t commit_id = current_start_timestamp++;
214 // commit the UndoBuffer of the transaction
215 string error = transaction.Commit(db, commit_id, checkpoint);
216 if (!error.empty()) {
217 // commit unsuccessful: rollback the transaction instead
218 checkpoint = false;
219 transaction.commit_id = 0;
220 transaction.Rollback();
221 }
222 if (!checkpoint) {
223 // we won't checkpoint after all: unlock the clients again
224 checkpoint_lock.Unlock();
225 client_locks.clear();
226 }
227
228 // commit successful: remove the transaction id from the list of active transactions
229 // potentially resulting in garbage collection
230 RemoveTransaction(transaction);
231 // now perform a checkpoint if (1) we are able to checkpoint, and (2) the WAL has reached sufficient size to
232 // checkpoint
233 if (checkpoint) {
234 // checkpoint the database to disk
235 auto &storage_manager = db.GetStorageManager();
236 storage_manager.CreateCheckpoint(delete_wal: false, force_checkpoint: true);
237 }
238 return error;
239}
240
241void DuckTransactionManager::RollbackTransaction(Transaction *transaction_p) {
242 auto &transaction = transaction_p->Cast<DuckTransaction>();
243 // obtain the transaction lock during this function
244 lock_guard<mutex> lock(transaction_lock);
245
246 // rollback the transaction
247 transaction.Rollback();
248
249 // remove the transaction id from the list of active transactions
250 // potentially resulting in garbage collection
251 RemoveTransaction(transaction);
252}
253
254void DuckTransactionManager::RemoveTransaction(DuckTransaction &transaction) noexcept {
255 // remove the transaction from the list of active transactions
256 idx_t t_index = active_transactions.size();
257 // check for the lowest and highest start time in the list of transactions
258 transaction_t lowest_start_time = TRANSACTION_ID_START;
259 transaction_t lowest_transaction_id = MAX_TRANSACTION_ID;
260 transaction_t lowest_active_query = MAXIMUM_QUERY_ID;
261 for (idx_t i = 0; i < active_transactions.size(); i++) {
262 if (active_transactions[i].get() == &transaction) {
263 t_index = i;
264 } else {
265 transaction_t active_query = active_transactions[i]->active_query;
266 lowest_start_time = MinValue(a: lowest_start_time, b: active_transactions[i]->start_time);
267 lowest_active_query = MinValue(a: lowest_active_query, b: active_query);
268 lowest_transaction_id = MinValue(a: lowest_transaction_id, b: active_transactions[i]->transaction_id);
269 }
270 }
271 lowest_active_start = lowest_start_time;
272 lowest_active_id = lowest_transaction_id;
273
274 transaction_t lowest_stored_query = lowest_start_time;
275 D_ASSERT(t_index != active_transactions.size());
276 auto current_transaction = std::move(active_transactions[t_index]);
277 auto current_query = DatabaseManager::Get(db).ActiveQueryNumber();
278 if (transaction.commit_id != 0) {
279 // the transaction was committed, add it to the list of recently
280 // committed transactions
281 recently_committed_transactions.push_back(x: std::move(current_transaction));
282 } else {
283 // the transaction was aborted, but we might still need its information
284 // add it to the set of transactions awaiting GC
285 current_transaction->highest_active_query = current_query;
286 old_transactions.push_back(x: std::move(current_transaction));
287 }
288 // remove the transaction from the set of currently active transactions
289 active_transactions.erase(position: active_transactions.begin() + t_index);
290 // traverse the recently_committed transactions to see if we can remove any
291 idx_t i = 0;
292 for (; i < recently_committed_transactions.size(); i++) {
293 D_ASSERT(recently_committed_transactions[i]);
294 lowest_stored_query = MinValue(a: recently_committed_transactions[i]->start_time, b: lowest_stored_query);
295 if (recently_committed_transactions[i]->commit_id < lowest_start_time) {
296 // changes made BEFORE this transaction are no longer relevant
297 // we can cleanup the undo buffer
298
299 // HOWEVER: any currently running QUERY can still be using
300 // the version information after the cleanup!
301
302 // if we remove the UndoBuffer immediately, we have a race
303 // condition
304
305 // we can only safely do the actual memory cleanup when all the
306 // currently active queries have finished running! (actually,
307 // when all the currently active scans have finished running...)
308 recently_committed_transactions[i]->Cleanup();
309 // store the current highest active query
310 recently_committed_transactions[i]->highest_active_query = current_query;
311 // move it to the list of transactions awaiting GC
312 old_transactions.push_back(x: std::move(recently_committed_transactions[i]));
313 } else {
314 // recently_committed_transactions is ordered on commit_id
315 // implicitly thus if the current one is bigger than
316 // lowest_start_time any subsequent ones are also bigger
317 break;
318 }
319 }
320 if (i > 0) {
321 // we garbage collected transactions: remove them from the list
322 recently_committed_transactions.erase(first: recently_committed_transactions.begin(),
323 last: recently_committed_transactions.begin() + i);
324 }
325 // check if we can free the memory of any old transactions
326 i = active_transactions.empty() ? old_transactions.size() : 0;
327 for (; i < old_transactions.size(); i++) {
328 D_ASSERT(old_transactions[i]);
329 D_ASSERT(old_transactions[i]->highest_active_query > 0);
330 if (old_transactions[i]->highest_active_query >= lowest_active_query) {
331 // there is still a query running that could be using
332 // this transactions' data
333 break;
334 }
335 }
336 if (i > 0) {
337 // we garbage collected transactions: remove them from the list
338 old_transactions.erase(first: old_transactions.begin(), last: old_transactions.begin() + i);
339 }
340}
341
342} // namespace duckdb
343