1 | #include "duckdb/transaction/transaction_manager.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_set.hpp" |
4 | #include "duckdb/common/exception.hpp" |
5 | #include "duckdb/common/helper.hpp" |
6 | #include "duckdb/common/types/timestamp.hpp" |
7 | #include "duckdb/catalog/catalog.hpp" |
8 | #include "duckdb/storage/storage_manager.hpp" |
9 | #include "duckdb/transaction/transaction.hpp" |
10 | |
11 | using namespace duckdb; |
12 | using namespace std; |
13 | |
14 | TransactionManager::TransactionManager(StorageManager &storage) : storage(storage) { |
15 | // start timestamp starts at zero |
16 | current_start_timestamp = 0; |
17 | // transaction ID starts very high: |
18 | // it should be much higher than the current start timestamp |
19 | // if transaction_id < start_timestamp for any set of active transactions |
20 | // uncommited data could be read by |
21 | current_transaction_id = TRANSACTION_ID_START; |
22 | // the current active query id |
23 | current_query_number = 1; |
24 | } |
25 | |
26 | TransactionManager::~TransactionManager() { |
27 | } |
28 | |
29 | Transaction *TransactionManager::StartTransaction() { |
30 | // obtain the transaction lock during this function |
31 | lock_guard<mutex> lock(transaction_lock); |
32 | |
33 | if (current_start_timestamp >= TRANSACTION_ID_START) { |
34 | throw Exception("Cannot start more transactions, ran out of " |
35 | "transaction identifiers!" ); |
36 | } |
37 | |
38 | // obtain the start time and transaction ID of this transaction |
39 | transaction_t start_time = current_start_timestamp++; |
40 | transaction_t transaction_id = current_transaction_id++; |
41 | timestamp_t start_timestamp = Timestamp::GetCurrentTimestamp(); |
42 | |
43 | // create the actual transaction |
44 | auto transaction = make_unique<Transaction>(start_time, transaction_id, start_timestamp); |
45 | auto transaction_ptr = transaction.get(); |
46 | |
47 | // store it in the set of active transactions |
48 | active_transactions.push_back(move(transaction)); |
49 | return transaction_ptr; |
50 | } |
51 | |
52 | string TransactionManager::CommitTransaction(Transaction *transaction) { |
53 | // obtain the transaction lock during this function |
54 | lock_guard<mutex> lock(transaction_lock); |
55 | |
56 | // obtain a commit id for the transaction |
57 | transaction_t commit_id = current_start_timestamp++; |
58 | // commit the UndoBuffer of the transaction |
59 | string error = transaction->Commit(storage.GetWriteAheadLog(), commit_id); |
60 | if (!error.empty()) { |
61 | // commit unsuccessful: rollback the transaction instead |
62 | transaction->commit_id = 0; |
63 | transaction->Rollback(); |
64 | } |
65 | |
66 | // commit successful: remove the transaction id from the list of active transactions |
67 | // potentially resulting in garbage collection |
68 | RemoveTransaction(transaction); |
69 | return error; |
70 | } |
71 | |
72 | void TransactionManager::RollbackTransaction(Transaction *transaction) { |
73 | // obtain the transaction lock during this function |
74 | lock_guard<mutex> lock(transaction_lock); |
75 | |
76 | // rollback the transaction |
77 | transaction->Rollback(); |
78 | |
79 | // remove the transaction id from the list of active transactions |
80 | // potentially resulting in garbage collection |
81 | RemoveTransaction(transaction); |
82 | } |
83 | |
84 | void TransactionManager::RemoveTransaction(Transaction *transaction) noexcept { |
85 | // remove the transaction from the list of active transactions |
86 | idx_t t_index = active_transactions.size(); |
87 | // check for the lowest and highest start time in the list of transactions |
88 | transaction_t lowest_start_time = TRANSACTION_ID_START; |
89 | transaction_t lowest_active_query = MAXIMUM_QUERY_ID; |
90 | for (idx_t i = 0; i < active_transactions.size(); i++) { |
91 | if (active_transactions[i].get() == transaction) { |
92 | t_index = i; |
93 | } else { |
94 | lowest_start_time = std::min(lowest_start_time, active_transactions[i]->start_time); |
95 | lowest_active_query = std::min(lowest_active_query, active_transactions[i]->active_query); |
96 | } |
97 | } |
98 | transaction_t lowest_stored_query = lowest_start_time; |
99 | assert(t_index != active_transactions.size()); |
100 | auto current_transaction = move(active_transactions[t_index]); |
101 | if (transaction->commit_id != 0) { |
102 | // the transaction was committed, add it to the list of recently |
103 | // committed transactions |
104 | recently_committed_transactions.push_back(move(current_transaction)); |
105 | } else { |
106 | // the transaction was aborted, but we might still need its information |
107 | // add it to the set of transactions awaiting GC |
108 | current_transaction->highest_active_query = current_query_number; |
109 | old_transactions.push_back(move(current_transaction)); |
110 | } |
111 | // remove the transaction from the set of currently active transactions |
112 | active_transactions.erase(active_transactions.begin() + t_index); |
113 | // traverse the recently_committed transactions to see if we can remove any |
114 | idx_t i = 0; |
115 | for (; i < recently_committed_transactions.size(); i++) { |
116 | assert(recently_committed_transactions[i]); |
117 | lowest_stored_query = std::min(recently_committed_transactions[i]->start_time, lowest_stored_query); |
118 | if (recently_committed_transactions[i]->commit_id < lowest_start_time) { |
119 | // changes made BEFORE this transaction are no longer relevant |
120 | // we can cleanup the undo buffer |
121 | |
122 | // HOWEVER: any currently running QUERY can still be using |
123 | // the version information after the cleanup! |
124 | |
125 | // if we remove the UndoBuffer immediately, we have a race |
126 | // condition |
127 | |
128 | // we can only safely do the actual memory cleanup when all the |
129 | // currently active queries have finished running! (actually, |
130 | // when all the currently active scans have finished running...) |
131 | recently_committed_transactions[i]->Cleanup(); |
132 | // store the current highest active query |
133 | recently_committed_transactions[i]->highest_active_query = current_query_number; |
134 | // move it to the list of transactions awaiting GC |
135 | old_transactions.push_back(move(recently_committed_transactions[i])); |
136 | } else { |
137 | // recently_committed_transactions is ordered on commit_id |
138 | // implicitly thus if the current one is bigger than |
139 | // lowest_start_time any subsequent ones are also bigger |
140 | break; |
141 | } |
142 | } |
143 | if (i > 0) { |
144 | // we garbage collected transactions: remove them from the list |
145 | recently_committed_transactions.erase(recently_committed_transactions.begin(), |
146 | recently_committed_transactions.begin() + i); |
147 | } |
148 | // check if we can free the memory of any old transactions |
149 | i = active_transactions.size() == 0 ? old_transactions.size() : 0; |
150 | for (; i < old_transactions.size(); i++) { |
151 | assert(old_transactions[i]); |
152 | assert(old_transactions[i]->highest_active_query > 0); |
153 | if (old_transactions[i]->highest_active_query >= lowest_active_query) { |
154 | // there is still a query running that could be using |
155 | // this transactions' data |
156 | break; |
157 | } |
158 | } |
159 | if (i > 0) { |
160 | // we garbage collected transactions: remove them from the list |
161 | old_transactions.erase(old_transactions.begin(), old_transactions.begin() + i); |
162 | } |
163 | // check if we can free the memory of any old catalog sets |
164 | for (i = 0; i < old_catalog_sets.size(); i++) { |
165 | assert(old_catalog_sets[i].highest_active_query > 0); |
166 | if (old_catalog_sets[i].highest_active_query >= lowest_stored_query) { |
167 | // there is still a query running that could be using |
168 | // this catalog sets' data |
169 | break; |
170 | } |
171 | } |
172 | if (i > 0) { |
173 | // we garbage collected catalog sets: remove them from the list |
174 | old_catalog_sets.erase(old_catalog_sets.begin(), old_catalog_sets.begin() + i); |
175 | } |
176 | } |
177 | |
178 | void TransactionManager::AddCatalogSet(ClientContext &context, unique_ptr<CatalogSet> catalog_set) { |
179 | // remove the dependencies from all entries of the CatalogSet |
180 | Catalog::GetCatalog(context).dependency_manager.ClearDependencies(*catalog_set); |
181 | |
182 | lock_guard<mutex> lock(transaction_lock); |
183 | if (active_transactions.size() > 0) { |
184 | // if there are active transactions we wait with deleting the objects |
185 | StoredCatalogSet set; |
186 | set.stored_set = move(catalog_set); |
187 | set.highest_active_query = current_start_timestamp; |
188 | |
189 | old_catalog_sets.push_back(move(set)); |
190 | } |
191 | } |
192 | |