1#include "duckdb/catalog/catalog_set.hpp"
2
3#include "duckdb/catalog/catalog.hpp"
4#include "duckdb/common/exception.hpp"
5#include "duckdb/transaction/transaction_manager.hpp"
6#include "duckdb/transaction/transaction.hpp"
7#include "duckdb/common/serializer/buffered_serializer.hpp"
8#include "duckdb/parser/parsed_data/alter_table_info.hpp"
9
10using namespace duckdb;
11using namespace std;
12
13CatalogSet::CatalogSet(Catalog &catalog) : catalog(catalog) {
14}
15
16bool CatalogSet::CreateEntry(Transaction &transaction, const string &name, unique_ptr<CatalogEntry> value,
17 unordered_set<CatalogEntry *> &dependencies) {
18 // lock the catalog for writing
19 lock_guard<mutex> write_lock(catalog.write_lock);
20 // lock this catalog set to disallow reading
21 lock_guard<mutex> read_lock(catalog_lock);
22
23 // first check if the entry exists in the unordered set
24 auto entry = data.find(name);
25 if (entry == data.end()) {
26 // if it does not: entry has never been created
27
28 // first create a dummy deleted entry for this entry
29 // so transactions started before the commit of this transaction don't
30 // see it yet
31 auto dummy_node = make_unique<CatalogEntry>(CatalogType::INVALID, value->catalog, name);
32 dummy_node->timestamp = 0;
33 dummy_node->deleted = true;
34 dummy_node->set = this;
35 data[name] = move(dummy_node);
36 } else {
37 // if it does, we have to check version numbers
38 CatalogEntry &current = *entry->second;
39 if (HasConflict(transaction, current)) {
40 // current version has been written to by a currently active
41 // transaction
42 throw TransactionException("Catalog write-write conflict on create with \"%s\"", current.name.c_str());
43 }
44 // there is a current version that has been committed
45 // if it has not been deleted there is a conflict
46 if (!current.deleted) {
47 return false;
48 }
49 }
50 // create a new entry and replace the currently stored one
51 // set the timestamp to the timestamp of the current transaction
52 // and point it at the dummy node
53 value->timestamp = transaction.transaction_id;
54 value->set = this;
55
56 // now add the dependency set of this object to the dependency manager
57 catalog.dependency_manager.AddObject(transaction, value.get(), dependencies);
58
59 value->child = move(data[name]);
60 value->child->parent = value.get();
61 // push the old entry in the undo buffer for this transaction
62 transaction.PushCatalogEntry(value->child.get());
63 data[name] = move(value);
64 return true;
65}
66
67bool CatalogSet::AlterEntry(ClientContext &context, const string &name, AlterInfo *alter_info) {
68 auto &transaction = Transaction::GetTransaction(context);
69 // lock the catalog for writing
70 lock_guard<mutex> write_lock(catalog.write_lock);
71
72 // first check if the entry exists in the unordered set
73 auto entry = data.find(name);
74 if (entry == data.end()) {
75 // if it does not: entry has never been created and cannot be altered
76 return false;
77 }
78 // if it does: we have to retrieve the entry and to check version numbers
79 CatalogEntry &current = *entry->second;
80 if (HasConflict(transaction, current)) {
81 // current version has been written to by a currently active
82 // transaction
83 throw TransactionException("Catalog write-write conflict on alter with \"%s\"", current.name.c_str());
84 }
85
86 // lock this catalog set to disallow reading
87 lock_guard<mutex> read_lock(catalog_lock);
88
89 // create a new entry and replace the currently stored one
90 // set the timestamp to the timestamp of the current transaction
91 // and point it to the updated table node
92 auto value = current.AlterEntry(context, alter_info);
93 if (!value) {
94 // alter failed, but did not result in an error
95 return true;
96 }
97
98 // now transfer all dependencies from the old table to the new table
99 catalog.dependency_manager.AlterObject(transaction, data[name].get(), value.get());
100
101 value->timestamp = transaction.transaction_id;
102 value->child = move(data[name]);
103 value->child->parent = value.get();
104 value->set = this;
105
106 // serialize the AlterInfo into a temporary buffer
107 BufferedSerializer serializer;
108 alter_info->Serialize(serializer);
109 BinaryData serialized_alter = serializer.GetData();
110
111 // push the old entry in the undo buffer for this transaction
112 transaction.PushCatalogEntry(value->child.get(), serialized_alter.data.get(), serialized_alter.size);
113 data[name] = move(value);
114
115 return true;
116}
117
118bool CatalogSet::DropEntry(Transaction &transaction, const string &name, bool cascade) {
119 // lock the catalog for writing
120 lock_guard<mutex> write_lock(catalog.write_lock);
121 // we can only delete an entry that exists
122 auto entry = data.find(name);
123 if (entry == data.end()) {
124 return false;
125 }
126 if (HasConflict(transaction, *entry->second)) {
127 // current version has been written to by a currently active transaction
128 throw TransactionException("Catalog write-write conflict on drop with \"%s\"", name.c_str());
129 }
130 // there is a current version that has been committed by this transaction
131 if (entry->second->deleted) {
132 // if the entry was already deleted, it now does not exist anymore
133 // so we return that we could not find it
134 return false;
135 }
136
137 // lock this catalog for reading
138 // create the lock set for this delete operation
139 set_lock_map_t lock_set;
140 // now drop the entry
141 DropEntryInternal(transaction, *entry->second, cascade, lock_set);
142
143 return true;
144}
145
146void CatalogSet::DropEntryInternal(Transaction &transaction, CatalogEntry &current, bool cascade,
147 set_lock_map_t &lock_set) {
148 assert(data.find(current.name) != data.end());
149 // first check any dependencies of this object
150 current.catalog->dependency_manager.DropObject(transaction, &current, cascade, lock_set);
151
152 // add this catalog to the lock set, if it is not there yet
153 if (lock_set.find(this) == lock_set.end()) {
154 lock_set.insert(make_pair(this, unique_lock<mutex>(catalog_lock)));
155 }
156
157 // create a new entry and replace the currently stored one
158 // set the timestamp to the timestamp of the current transaction
159 // and point it at the dummy node
160 auto value = make_unique<CatalogEntry>(CatalogType::DELETED_ENTRY, current.catalog, current.name);
161 value->timestamp = transaction.transaction_id;
162 value->child = move(data[current.name]);
163 value->child->parent = value.get();
164 value->set = this;
165 value->deleted = true;
166
167 // push the old entry in the undo buffer for this transaction
168 transaction.PushCatalogEntry(value->child.get());
169
170 data[current.name] = move(value);
171}
172
173bool CatalogSet::HasConflict(Transaction &transaction, CatalogEntry &current) {
174 return (current.timestamp >= TRANSACTION_ID_START && current.timestamp != transaction.transaction_id) ||
175 (current.timestamp < TRANSACTION_ID_START && current.timestamp > transaction.start_time);
176}
177
178CatalogEntry *CatalogSet::GetEntryForTransaction(Transaction &transaction, CatalogEntry *current) {
179 while (current->child) {
180 if (current->timestamp == transaction.transaction_id) {
181 // we created this version
182 break;
183 }
184 if (current->timestamp < transaction.start_time) {
185 // this version was commited before we started the transaction
186 break;
187 }
188 current = current->child.get();
189 assert(current);
190 }
191 return current;
192}
193
194CatalogEntry *CatalogSet::GetEntry(Transaction &transaction, const string &name) {
195 lock_guard<mutex> lock(catalog_lock);
196
197 auto entry = data.find(name);
198 if (entry == data.end()) {
199 return nullptr;
200 }
201 // if it does, we have to check version numbers
202 CatalogEntry *current = GetEntryForTransaction(transaction, entry->second.get());
203 if (current->deleted) {
204 return nullptr;
205 }
206 return current;
207}
208
209CatalogEntry *CatalogSet::GetRootEntry(const string &name) {
210 lock_guard<mutex> lock(catalog_lock);
211 auto entry = data.find(name);
212 return entry == data.end() ? nullptr : entry->second.get();
213}
214
215void CatalogSet::Undo(CatalogEntry *entry) {
216 lock_guard<mutex> lock(catalog_lock);
217
218 // entry has to be restored
219 // and entry->parent has to be removed ("rolled back")
220
221 // i.e. we have to place (entry) as (entry->parent) again
222 auto &to_be_removed_node = entry->parent;
223 if (!to_be_removed_node->deleted) {
224 // delete the entry from the dependency manager as well
225 catalog.dependency_manager.EraseObject(to_be_removed_node);
226 }
227 if (to_be_removed_node->parent) {
228 // if the to be removed node has a parent, set the child pointer to the
229 // to be restored node
230 to_be_removed_node->parent->child = move(to_be_removed_node->child);
231 entry->parent = to_be_removed_node->parent;
232 } else {
233 // otherwise we need to update the base entry tables
234 auto &name = entry->name;
235 to_be_removed_node->child->SetAsRoot();
236 data[name] = move(to_be_removed_node->child);
237 entry->parent = nullptr;
238 }
239}
240