1 | #include "duckdb/catalog/catalog_set.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog.hpp" |
4 | #include "duckdb/common/exception.hpp" |
5 | #include "duckdb/transaction/transaction_manager.hpp" |
6 | #include "duckdb/transaction/transaction.hpp" |
7 | #include "duckdb/common/serializer/buffered_serializer.hpp" |
8 | #include "duckdb/parser/parsed_data/alter_table_info.hpp" |
9 | |
10 | using namespace duckdb; |
11 | using namespace std; |
12 | |
13 | CatalogSet::CatalogSet(Catalog &catalog) : catalog(catalog) { |
14 | } |
15 | |
16 | bool CatalogSet::CreateEntry(Transaction &transaction, const string &name, unique_ptr<CatalogEntry> value, |
17 | unordered_set<CatalogEntry *> &dependencies) { |
18 | // lock the catalog for writing |
19 | lock_guard<mutex> write_lock(catalog.write_lock); |
20 | // lock this catalog set to disallow reading |
21 | lock_guard<mutex> read_lock(catalog_lock); |
22 | |
23 | // first check if the entry exists in the unordered set |
24 | auto entry = data.find(name); |
25 | if (entry == data.end()) { |
26 | // if it does not: entry has never been created |
27 | |
28 | // first create a dummy deleted entry for this entry |
29 | // so transactions started before the commit of this transaction don't |
30 | // see it yet |
31 | auto dummy_node = make_unique<CatalogEntry>(CatalogType::INVALID, value->catalog, name); |
32 | dummy_node->timestamp = 0; |
33 | dummy_node->deleted = true; |
34 | dummy_node->set = this; |
35 | data[name] = move(dummy_node); |
36 | } else { |
37 | // if it does, we have to check version numbers |
38 | CatalogEntry ¤t = *entry->second; |
39 | if (HasConflict(transaction, current)) { |
40 | // current version has been written to by a currently active |
41 | // transaction |
42 | throw TransactionException("Catalog write-write conflict on create with \"%s\"" , current.name.c_str()); |
43 | } |
44 | // there is a current version that has been committed |
45 | // if it has not been deleted there is a conflict |
46 | if (!current.deleted) { |
47 | return false; |
48 | } |
49 | } |
50 | // create a new entry and replace the currently stored one |
51 | // set the timestamp to the timestamp of the current transaction |
52 | // and point it at the dummy node |
53 | value->timestamp = transaction.transaction_id; |
54 | value->set = this; |
55 | |
56 | // now add the dependency set of this object to the dependency manager |
57 | catalog.dependency_manager.AddObject(transaction, value.get(), dependencies); |
58 | |
59 | value->child = move(data[name]); |
60 | value->child->parent = value.get(); |
61 | // push the old entry in the undo buffer for this transaction |
62 | transaction.PushCatalogEntry(value->child.get()); |
63 | data[name] = move(value); |
64 | return true; |
65 | } |
66 | |
67 | bool CatalogSet::AlterEntry(ClientContext &context, const string &name, AlterInfo *alter_info) { |
68 | auto &transaction = Transaction::GetTransaction(context); |
69 | // lock the catalog for writing |
70 | lock_guard<mutex> write_lock(catalog.write_lock); |
71 | |
72 | // first check if the entry exists in the unordered set |
73 | auto entry = data.find(name); |
74 | if (entry == data.end()) { |
75 | // if it does not: entry has never been created and cannot be altered |
76 | return false; |
77 | } |
78 | // if it does: we have to retrieve the entry and to check version numbers |
79 | CatalogEntry ¤t = *entry->second; |
80 | if (HasConflict(transaction, current)) { |
81 | // current version has been written to by a currently active |
82 | // transaction |
83 | throw TransactionException("Catalog write-write conflict on alter with \"%s\"" , current.name.c_str()); |
84 | } |
85 | |
86 | // lock this catalog set to disallow reading |
87 | lock_guard<mutex> read_lock(catalog_lock); |
88 | |
89 | // create a new entry and replace the currently stored one |
90 | // set the timestamp to the timestamp of the current transaction |
91 | // and point it to the updated table node |
92 | auto value = current.AlterEntry(context, alter_info); |
93 | if (!value) { |
94 | // alter failed, but did not result in an error |
95 | return true; |
96 | } |
97 | |
98 | // now transfer all dependencies from the old table to the new table |
99 | catalog.dependency_manager.AlterObject(transaction, data[name].get(), value.get()); |
100 | |
101 | value->timestamp = transaction.transaction_id; |
102 | value->child = move(data[name]); |
103 | value->child->parent = value.get(); |
104 | value->set = this; |
105 | |
106 | // serialize the AlterInfo into a temporary buffer |
107 | BufferedSerializer serializer; |
108 | alter_info->Serialize(serializer); |
109 | BinaryData serialized_alter = serializer.GetData(); |
110 | |
111 | // push the old entry in the undo buffer for this transaction |
112 | transaction.PushCatalogEntry(value->child.get(), serialized_alter.data.get(), serialized_alter.size); |
113 | data[name] = move(value); |
114 | |
115 | return true; |
116 | } |
117 | |
118 | bool CatalogSet::DropEntry(Transaction &transaction, const string &name, bool cascade) { |
119 | // lock the catalog for writing |
120 | lock_guard<mutex> write_lock(catalog.write_lock); |
121 | // we can only delete an entry that exists |
122 | auto entry = data.find(name); |
123 | if (entry == data.end()) { |
124 | return false; |
125 | } |
126 | if (HasConflict(transaction, *entry->second)) { |
127 | // current version has been written to by a currently active transaction |
128 | throw TransactionException("Catalog write-write conflict on drop with \"%s\"" , name.c_str()); |
129 | } |
130 | // there is a current version that has been committed by this transaction |
131 | if (entry->second->deleted) { |
132 | // if the entry was already deleted, it now does not exist anymore |
133 | // so we return that we could not find it |
134 | return false; |
135 | } |
136 | |
137 | // lock this catalog for reading |
138 | // create the lock set for this delete operation |
139 | set_lock_map_t lock_set; |
140 | // now drop the entry |
141 | DropEntryInternal(transaction, *entry->second, cascade, lock_set); |
142 | |
143 | return true; |
144 | } |
145 | |
146 | void CatalogSet::DropEntryInternal(Transaction &transaction, CatalogEntry ¤t, bool cascade, |
147 | set_lock_map_t &lock_set) { |
148 | assert(data.find(current.name) != data.end()); |
149 | // first check any dependencies of this object |
150 | current.catalog->dependency_manager.DropObject(transaction, ¤t, cascade, lock_set); |
151 | |
152 | // add this catalog to the lock set, if it is not there yet |
153 | if (lock_set.find(this) == lock_set.end()) { |
154 | lock_set.insert(make_pair(this, unique_lock<mutex>(catalog_lock))); |
155 | } |
156 | |
157 | // create a new entry and replace the currently stored one |
158 | // set the timestamp to the timestamp of the current transaction |
159 | // and point it at the dummy node |
160 | auto value = make_unique<CatalogEntry>(CatalogType::DELETED_ENTRY, current.catalog, current.name); |
161 | value->timestamp = transaction.transaction_id; |
162 | value->child = move(data[current.name]); |
163 | value->child->parent = value.get(); |
164 | value->set = this; |
165 | value->deleted = true; |
166 | |
167 | // push the old entry in the undo buffer for this transaction |
168 | transaction.PushCatalogEntry(value->child.get()); |
169 | |
170 | data[current.name] = move(value); |
171 | } |
172 | |
173 | bool CatalogSet::HasConflict(Transaction &transaction, CatalogEntry ¤t) { |
174 | return (current.timestamp >= TRANSACTION_ID_START && current.timestamp != transaction.transaction_id) || |
175 | (current.timestamp < TRANSACTION_ID_START && current.timestamp > transaction.start_time); |
176 | } |
177 | |
178 | CatalogEntry *CatalogSet::GetEntryForTransaction(Transaction &transaction, CatalogEntry *current) { |
179 | while (current->child) { |
180 | if (current->timestamp == transaction.transaction_id) { |
181 | // we created this version |
182 | break; |
183 | } |
184 | if (current->timestamp < transaction.start_time) { |
185 | // this version was commited before we started the transaction |
186 | break; |
187 | } |
188 | current = current->child.get(); |
189 | assert(current); |
190 | } |
191 | return current; |
192 | } |
193 | |
194 | CatalogEntry *CatalogSet::GetEntry(Transaction &transaction, const string &name) { |
195 | lock_guard<mutex> lock(catalog_lock); |
196 | |
197 | auto entry = data.find(name); |
198 | if (entry == data.end()) { |
199 | return nullptr; |
200 | } |
201 | // if it does, we have to check version numbers |
202 | CatalogEntry *current = GetEntryForTransaction(transaction, entry->second.get()); |
203 | if (current->deleted) { |
204 | return nullptr; |
205 | } |
206 | return current; |
207 | } |
208 | |
209 | CatalogEntry *CatalogSet::GetRootEntry(const string &name) { |
210 | lock_guard<mutex> lock(catalog_lock); |
211 | auto entry = data.find(name); |
212 | return entry == data.end() ? nullptr : entry->second.get(); |
213 | } |
214 | |
215 | void CatalogSet::Undo(CatalogEntry *entry) { |
216 | lock_guard<mutex> lock(catalog_lock); |
217 | |
218 | // entry has to be restored |
219 | // and entry->parent has to be removed ("rolled back") |
220 | |
221 | // i.e. we have to place (entry) as (entry->parent) again |
222 | auto &to_be_removed_node = entry->parent; |
223 | if (!to_be_removed_node->deleted) { |
224 | // delete the entry from the dependency manager as well |
225 | catalog.dependency_manager.EraseObject(to_be_removed_node); |
226 | } |
227 | if (to_be_removed_node->parent) { |
228 | // if the to be removed node has a parent, set the child pointer to the |
229 | // to be restored node |
230 | to_be_removed_node->parent->child = move(to_be_removed_node->child); |
231 | entry->parent = to_be_removed_node->parent; |
232 | } else { |
233 | // otherwise we need to update the base entry tables |
234 | auto &name = entry->name; |
235 | to_be_removed_node->child->SetAsRoot(); |
236 | data[name] = move(to_be_removed_node->child); |
237 | entry->parent = nullptr; |
238 | } |
239 | } |
240 | |