1#include "duckdb/transaction/transaction_manager.hpp"
2
3#include "duckdb/catalog/catalog_set.hpp"
4#include "duckdb/common/exception.hpp"
5#include "duckdb/common/helper.hpp"
6#include "duckdb/common/types/timestamp.hpp"
7#include "duckdb/catalog/catalog.hpp"
8#include "duckdb/storage/storage_manager.hpp"
9#include "duckdb/transaction/transaction.hpp"
10
11using namespace duckdb;
12using namespace std;
13
14TransactionManager::TransactionManager(StorageManager &storage) : storage(storage) {
15 // start timestamp starts at zero
16 current_start_timestamp = 0;
17 // transaction ID starts very high:
18 // it should be much higher than the current start timestamp
19 // if transaction_id < start_timestamp for any set of active transactions
20 // uncommited data could be read by
21 current_transaction_id = TRANSACTION_ID_START;
22 // the current active query id
23 current_query_number = 1;
24}
25
26TransactionManager::~TransactionManager() {
27}
28
29Transaction *TransactionManager::StartTransaction() {
30 // obtain the transaction lock during this function
31 lock_guard<mutex> lock(transaction_lock);
32
33 if (current_start_timestamp >= TRANSACTION_ID_START) {
34 throw Exception("Cannot start more transactions, ran out of "
35 "transaction identifiers!");
36 }
37
38 // obtain the start time and transaction ID of this transaction
39 transaction_t start_time = current_start_timestamp++;
40 transaction_t transaction_id = current_transaction_id++;
41 timestamp_t start_timestamp = Timestamp::GetCurrentTimestamp();
42
43 // create the actual transaction
44 auto transaction = make_unique<Transaction>(start_time, transaction_id, start_timestamp);
45 auto transaction_ptr = transaction.get();
46
47 // store it in the set of active transactions
48 active_transactions.push_back(move(transaction));
49 return transaction_ptr;
50}
51
52string TransactionManager::CommitTransaction(Transaction *transaction) {
53 // obtain the transaction lock during this function
54 lock_guard<mutex> lock(transaction_lock);
55
56 // obtain a commit id for the transaction
57 transaction_t commit_id = current_start_timestamp++;
58 // commit the UndoBuffer of the transaction
59 string error = transaction->Commit(storage.GetWriteAheadLog(), commit_id);
60 if (!error.empty()) {
61 // commit unsuccessful: rollback the transaction instead
62 transaction->commit_id = 0;
63 transaction->Rollback();
64 }
65
66 // commit successful: remove the transaction id from the list of active transactions
67 // potentially resulting in garbage collection
68 RemoveTransaction(transaction);
69 return error;
70}
71
72void TransactionManager::RollbackTransaction(Transaction *transaction) {
73 // obtain the transaction lock during this function
74 lock_guard<mutex> lock(transaction_lock);
75
76 // rollback the transaction
77 transaction->Rollback();
78
79 // remove the transaction id from the list of active transactions
80 // potentially resulting in garbage collection
81 RemoveTransaction(transaction);
82}
83
84void TransactionManager::RemoveTransaction(Transaction *transaction) noexcept {
85 // remove the transaction from the list of active transactions
86 idx_t t_index = active_transactions.size();
87 // check for the lowest and highest start time in the list of transactions
88 transaction_t lowest_start_time = TRANSACTION_ID_START;
89 transaction_t lowest_active_query = MAXIMUM_QUERY_ID;
90 for (idx_t i = 0; i < active_transactions.size(); i++) {
91 if (active_transactions[i].get() == transaction) {
92 t_index = i;
93 } else {
94 lowest_start_time = std::min(lowest_start_time, active_transactions[i]->start_time);
95 lowest_active_query = std::min(lowest_active_query, active_transactions[i]->active_query);
96 }
97 }
98 transaction_t lowest_stored_query = lowest_start_time;
99 assert(t_index != active_transactions.size());
100 auto current_transaction = move(active_transactions[t_index]);
101 if (transaction->commit_id != 0) {
102 // the transaction was committed, add it to the list of recently
103 // committed transactions
104 recently_committed_transactions.push_back(move(current_transaction));
105 } else {
106 // the transaction was aborted, but we might still need its information
107 // add it to the set of transactions awaiting GC
108 current_transaction->highest_active_query = current_query_number;
109 old_transactions.push_back(move(current_transaction));
110 }
111 // remove the transaction from the set of currently active transactions
112 active_transactions.erase(active_transactions.begin() + t_index);
113 // traverse the recently_committed transactions to see if we can remove any
114 idx_t i = 0;
115 for (; i < recently_committed_transactions.size(); i++) {
116 assert(recently_committed_transactions[i]);
117 lowest_stored_query = std::min(recently_committed_transactions[i]->start_time, lowest_stored_query);
118 if (recently_committed_transactions[i]->commit_id < lowest_start_time) {
119 // changes made BEFORE this transaction are no longer relevant
120 // we can cleanup the undo buffer
121
122 // HOWEVER: any currently running QUERY can still be using
123 // the version information after the cleanup!
124
125 // if we remove the UndoBuffer immediately, we have a race
126 // condition
127
128 // we can only safely do the actual memory cleanup when all the
129 // currently active queries have finished running! (actually,
130 // when all the currently active scans have finished running...)
131 recently_committed_transactions[i]->Cleanup();
132 // store the current highest active query
133 recently_committed_transactions[i]->highest_active_query = current_query_number;
134 // move it to the list of transactions awaiting GC
135 old_transactions.push_back(move(recently_committed_transactions[i]));
136 } else {
137 // recently_committed_transactions is ordered on commit_id
138 // implicitly thus if the current one is bigger than
139 // lowest_start_time any subsequent ones are also bigger
140 break;
141 }
142 }
143 if (i > 0) {
144 // we garbage collected transactions: remove them from the list
145 recently_committed_transactions.erase(recently_committed_transactions.begin(),
146 recently_committed_transactions.begin() + i);
147 }
148 // check if we can free the memory of any old transactions
149 i = active_transactions.size() == 0 ? old_transactions.size() : 0;
150 for (; i < old_transactions.size(); i++) {
151 assert(old_transactions[i]);
152 assert(old_transactions[i]->highest_active_query > 0);
153 if (old_transactions[i]->highest_active_query >= lowest_active_query) {
154 // there is still a query running that could be using
155 // this transactions' data
156 break;
157 }
158 }
159 if (i > 0) {
160 // we garbage collected transactions: remove them from the list
161 old_transactions.erase(old_transactions.begin(), old_transactions.begin() + i);
162 }
163 // check if we can free the memory of any old catalog sets
164 for (i = 0; i < old_catalog_sets.size(); i++) {
165 assert(old_catalog_sets[i].highest_active_query > 0);
166 if (old_catalog_sets[i].highest_active_query >= lowest_stored_query) {
167 // there is still a query running that could be using
168 // this catalog sets' data
169 break;
170 }
171 }
172 if (i > 0) {
173 // we garbage collected catalog sets: remove them from the list
174 old_catalog_sets.erase(old_catalog_sets.begin(), old_catalog_sets.begin() + i);
175 }
176}
177
178void TransactionManager::AddCatalogSet(ClientContext &context, unique_ptr<CatalogSet> catalog_set) {
179 // remove the dependencies from all entries of the CatalogSet
180 Catalog::GetCatalog(context).dependency_manager.ClearDependencies(*catalog_set);
181
182 lock_guard<mutex> lock(transaction_lock);
183 if (active_transactions.size() > 0) {
184 // if there are active transactions we wait with deleting the objects
185 StoredCatalogSet set;
186 set.stored_set = move(catalog_set);
187 set.highest_active_query = current_start_timestamp;
188
189 old_catalog_sets.push_back(move(set));
190 }
191}
192