| 1 | #include "duckdb/catalog/catalog_set.hpp" |
| 2 | |
| 3 | #include "duckdb/catalog/catalog.hpp" |
| 4 | #include "duckdb/common/exception.hpp" |
| 5 | #include "duckdb/transaction/transaction_manager.hpp" |
| 6 | #include "duckdb/transaction/transaction.hpp" |
| 7 | #include "duckdb/common/serializer/buffered_serializer.hpp" |
| 8 | #include "duckdb/parser/parsed_data/alter_table_info.hpp" |
| 9 | |
| 10 | using namespace duckdb; |
| 11 | using namespace std; |
| 12 | |
| 13 | CatalogSet::CatalogSet(Catalog &catalog) : catalog(catalog) { |
| 14 | } |
| 15 | |
| 16 | bool CatalogSet::CreateEntry(Transaction &transaction, const string &name, unique_ptr<CatalogEntry> value, |
| 17 | unordered_set<CatalogEntry *> &dependencies) { |
| 18 | // lock the catalog for writing |
| 19 | lock_guard<mutex> write_lock(catalog.write_lock); |
| 20 | // lock this catalog set to disallow reading |
| 21 | lock_guard<mutex> read_lock(catalog_lock); |
| 22 | |
| 23 | // first check if the entry exists in the unordered set |
| 24 | auto entry = data.find(name); |
| 25 | if (entry == data.end()) { |
| 26 | // if it does not: entry has never been created |
| 27 | |
| 28 | // first create a dummy deleted entry for this entry |
| 29 | // so transactions started before the commit of this transaction don't |
| 30 | // see it yet |
| 31 | auto dummy_node = make_unique<CatalogEntry>(CatalogType::INVALID, value->catalog, name); |
| 32 | dummy_node->timestamp = 0; |
| 33 | dummy_node->deleted = true; |
| 34 | dummy_node->set = this; |
| 35 | data[name] = move(dummy_node); |
| 36 | } else { |
| 37 | // if it does, we have to check version numbers |
| 38 | CatalogEntry ¤t = *entry->second; |
| 39 | if (HasConflict(transaction, current)) { |
| 40 | // current version has been written to by a currently active |
| 41 | // transaction |
| 42 | throw TransactionException("Catalog write-write conflict on create with \"%s\"" , current.name.c_str()); |
| 43 | } |
| 44 | // there is a current version that has been committed |
| 45 | // if it has not been deleted there is a conflict |
| 46 | if (!current.deleted) { |
| 47 | return false; |
| 48 | } |
| 49 | } |
| 50 | // create a new entry and replace the currently stored one |
| 51 | // set the timestamp to the timestamp of the current transaction |
| 52 | // and point it at the dummy node |
| 53 | value->timestamp = transaction.transaction_id; |
| 54 | value->set = this; |
| 55 | |
| 56 | // now add the dependency set of this object to the dependency manager |
| 57 | catalog.dependency_manager.AddObject(transaction, value.get(), dependencies); |
| 58 | |
| 59 | value->child = move(data[name]); |
| 60 | value->child->parent = value.get(); |
| 61 | // push the old entry in the undo buffer for this transaction |
| 62 | transaction.PushCatalogEntry(value->child.get()); |
| 63 | data[name] = move(value); |
| 64 | return true; |
| 65 | } |
| 66 | |
| 67 | bool CatalogSet::AlterEntry(ClientContext &context, const string &name, AlterInfo *alter_info) { |
| 68 | auto &transaction = Transaction::GetTransaction(context); |
| 69 | // lock the catalog for writing |
| 70 | lock_guard<mutex> write_lock(catalog.write_lock); |
| 71 | |
| 72 | // first check if the entry exists in the unordered set |
| 73 | auto entry = data.find(name); |
| 74 | if (entry == data.end()) { |
| 75 | // if it does not: entry has never been created and cannot be altered |
| 76 | return false; |
| 77 | } |
| 78 | // if it does: we have to retrieve the entry and to check version numbers |
| 79 | CatalogEntry ¤t = *entry->second; |
| 80 | if (HasConflict(transaction, current)) { |
| 81 | // current version has been written to by a currently active |
| 82 | // transaction |
| 83 | throw TransactionException("Catalog write-write conflict on alter with \"%s\"" , current.name.c_str()); |
| 84 | } |
| 85 | |
| 86 | // lock this catalog set to disallow reading |
| 87 | lock_guard<mutex> read_lock(catalog_lock); |
| 88 | |
| 89 | // create a new entry and replace the currently stored one |
| 90 | // set the timestamp to the timestamp of the current transaction |
| 91 | // and point it to the updated table node |
| 92 | auto value = current.AlterEntry(context, alter_info); |
| 93 | if (!value) { |
| 94 | // alter failed, but did not result in an error |
| 95 | return true; |
| 96 | } |
| 97 | |
| 98 | // now transfer all dependencies from the old table to the new table |
| 99 | catalog.dependency_manager.AlterObject(transaction, data[name].get(), value.get()); |
| 100 | |
| 101 | value->timestamp = transaction.transaction_id; |
| 102 | value->child = move(data[name]); |
| 103 | value->child->parent = value.get(); |
| 104 | value->set = this; |
| 105 | |
| 106 | // serialize the AlterInfo into a temporary buffer |
| 107 | BufferedSerializer serializer; |
| 108 | alter_info->Serialize(serializer); |
| 109 | BinaryData serialized_alter = serializer.GetData(); |
| 110 | |
| 111 | // push the old entry in the undo buffer for this transaction |
| 112 | transaction.PushCatalogEntry(value->child.get(), serialized_alter.data.get(), serialized_alter.size); |
| 113 | data[name] = move(value); |
| 114 | |
| 115 | return true; |
| 116 | } |
| 117 | |
| 118 | bool CatalogSet::DropEntry(Transaction &transaction, const string &name, bool cascade) { |
| 119 | // lock the catalog for writing |
| 120 | lock_guard<mutex> write_lock(catalog.write_lock); |
| 121 | // we can only delete an entry that exists |
| 122 | auto entry = data.find(name); |
| 123 | if (entry == data.end()) { |
| 124 | return false; |
| 125 | } |
| 126 | if (HasConflict(transaction, *entry->second)) { |
| 127 | // current version has been written to by a currently active transaction |
| 128 | throw TransactionException("Catalog write-write conflict on drop with \"%s\"" , name.c_str()); |
| 129 | } |
| 130 | // there is a current version that has been committed by this transaction |
| 131 | if (entry->second->deleted) { |
| 132 | // if the entry was already deleted, it now does not exist anymore |
| 133 | // so we return that we could not find it |
| 134 | return false; |
| 135 | } |
| 136 | |
| 137 | // lock this catalog for reading |
| 138 | // create the lock set for this delete operation |
| 139 | set_lock_map_t lock_set; |
| 140 | // now drop the entry |
| 141 | DropEntryInternal(transaction, *entry->second, cascade, lock_set); |
| 142 | |
| 143 | return true; |
| 144 | } |
| 145 | |
| 146 | void CatalogSet::DropEntryInternal(Transaction &transaction, CatalogEntry ¤t, bool cascade, |
| 147 | set_lock_map_t &lock_set) { |
| 148 | assert(data.find(current.name) != data.end()); |
| 149 | // first check any dependencies of this object |
| 150 | current.catalog->dependency_manager.DropObject(transaction, ¤t, cascade, lock_set); |
| 151 | |
| 152 | // add this catalog to the lock set, if it is not there yet |
| 153 | if (lock_set.find(this) == lock_set.end()) { |
| 154 | lock_set.insert(make_pair(this, unique_lock<mutex>(catalog_lock))); |
| 155 | } |
| 156 | |
| 157 | // create a new entry and replace the currently stored one |
| 158 | // set the timestamp to the timestamp of the current transaction |
| 159 | // and point it at the dummy node |
| 160 | auto value = make_unique<CatalogEntry>(CatalogType::DELETED_ENTRY, current.catalog, current.name); |
| 161 | value->timestamp = transaction.transaction_id; |
| 162 | value->child = move(data[current.name]); |
| 163 | value->child->parent = value.get(); |
| 164 | value->set = this; |
| 165 | value->deleted = true; |
| 166 | |
| 167 | // push the old entry in the undo buffer for this transaction |
| 168 | transaction.PushCatalogEntry(value->child.get()); |
| 169 | |
| 170 | data[current.name] = move(value); |
| 171 | } |
| 172 | |
| 173 | bool CatalogSet::HasConflict(Transaction &transaction, CatalogEntry ¤t) { |
| 174 | return (current.timestamp >= TRANSACTION_ID_START && current.timestamp != transaction.transaction_id) || |
| 175 | (current.timestamp < TRANSACTION_ID_START && current.timestamp > transaction.start_time); |
| 176 | } |
| 177 | |
| 178 | CatalogEntry *CatalogSet::GetEntryForTransaction(Transaction &transaction, CatalogEntry *current) { |
| 179 | while (current->child) { |
| 180 | if (current->timestamp == transaction.transaction_id) { |
| 181 | // we created this version |
| 182 | break; |
| 183 | } |
| 184 | if (current->timestamp < transaction.start_time) { |
| 185 | // this version was commited before we started the transaction |
| 186 | break; |
| 187 | } |
| 188 | current = current->child.get(); |
| 189 | assert(current); |
| 190 | } |
| 191 | return current; |
| 192 | } |
| 193 | |
| 194 | CatalogEntry *CatalogSet::GetEntry(Transaction &transaction, const string &name) { |
| 195 | lock_guard<mutex> lock(catalog_lock); |
| 196 | |
| 197 | auto entry = data.find(name); |
| 198 | if (entry == data.end()) { |
| 199 | return nullptr; |
| 200 | } |
| 201 | // if it does, we have to check version numbers |
| 202 | CatalogEntry *current = GetEntryForTransaction(transaction, entry->second.get()); |
| 203 | if (current->deleted) { |
| 204 | return nullptr; |
| 205 | } |
| 206 | return current; |
| 207 | } |
| 208 | |
| 209 | CatalogEntry *CatalogSet::GetRootEntry(const string &name) { |
| 210 | lock_guard<mutex> lock(catalog_lock); |
| 211 | auto entry = data.find(name); |
| 212 | return entry == data.end() ? nullptr : entry->second.get(); |
| 213 | } |
| 214 | |
| 215 | void CatalogSet::Undo(CatalogEntry *entry) { |
| 216 | lock_guard<mutex> lock(catalog_lock); |
| 217 | |
| 218 | // entry has to be restored |
| 219 | // and entry->parent has to be removed ("rolled back") |
| 220 | |
| 221 | // i.e. we have to place (entry) as (entry->parent) again |
| 222 | auto &to_be_removed_node = entry->parent; |
| 223 | if (!to_be_removed_node->deleted) { |
| 224 | // delete the entry from the dependency manager as well |
| 225 | catalog.dependency_manager.EraseObject(to_be_removed_node); |
| 226 | } |
| 227 | if (to_be_removed_node->parent) { |
| 228 | // if the to be removed node has a parent, set the child pointer to the |
| 229 | // to be restored node |
| 230 | to_be_removed_node->parent->child = move(to_be_removed_node->child); |
| 231 | entry->parent = to_be_removed_node->parent; |
| 232 | } else { |
| 233 | // otherwise we need to update the base entry tables |
| 234 | auto &name = entry->name; |
| 235 | to_be_removed_node->child->SetAsRoot(); |
| 236 | data[name] = move(to_be_removed_node->child); |
| 237 | entry->parent = nullptr; |
| 238 | } |
| 239 | } |
| 240 | |