| 1 | #include "duckdb/transaction/duck_transaction.hpp" |
| 2 | |
| 3 | #include "duckdb/main/client_context.hpp" |
| 4 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
| 5 | #include "duckdb/common/exception.hpp" |
| 6 | #include "duckdb/parser/column_definition.hpp" |
| 7 | #include "duckdb/storage/data_table.hpp" |
| 8 | #include "duckdb/storage/write_ahead_log.hpp" |
| 9 | #include "duckdb/storage/storage_manager.hpp" |
| 10 | |
| 11 | #include "duckdb/transaction/append_info.hpp" |
| 12 | #include "duckdb/transaction/delete_info.hpp" |
| 13 | #include "duckdb/transaction/update_info.hpp" |
| 14 | #include "duckdb/transaction/local_storage.hpp" |
| 15 | #include "duckdb/main/config.hpp" |
| 16 | #include "duckdb/storage/table/column_data.hpp" |
| 17 | #include "duckdb/main/client_data.hpp" |
| 18 | #include "duckdb/main/attached_database.hpp" |
| 19 | |
| 20 | namespace duckdb { |
| 21 | |
| 22 | TransactionData::TransactionData(DuckTransaction &transaction_p) // NOLINT |
| 23 | : transaction(&transaction_p), transaction_id(transaction_p.transaction_id), start_time(transaction_p.start_time) { |
| 24 | } |
| 25 | TransactionData::TransactionData(transaction_t transaction_id_p, transaction_t start_time_p) |
| 26 | : transaction(nullptr), transaction_id(transaction_id_p), start_time(start_time_p) { |
| 27 | } |
| 28 | |
| 29 | DuckTransaction::DuckTransaction(TransactionManager &manager, ClientContext &context_p, transaction_t start_time, |
| 30 | transaction_t transaction_id) |
| 31 | : Transaction(manager, context_p), start_time(start_time), transaction_id(transaction_id), commit_id(0), |
| 32 | highest_active_query(0), undo_buffer(context_p), storage(make_uniq<LocalStorage>(args&: context_p, args&: *this)) { |
| 33 | } |
| 34 | |
| 35 | DuckTransaction::~DuckTransaction() { |
| 36 | } |
| 37 | |
| 38 | DuckTransaction &DuckTransaction::Get(ClientContext &context, AttachedDatabase &db) { |
| 39 | return DuckTransaction::Get(context, catalog&: db.GetCatalog()); |
| 40 | } |
| 41 | |
| 42 | DuckTransaction &DuckTransaction::Get(ClientContext &context, Catalog &catalog) { |
| 43 | auto &transaction = Transaction::Get(context, catalog); |
| 44 | if (!transaction.IsDuckTransaction()) { |
| 45 | throw InternalException("DuckTransaction::Get called on non-DuckDB transaction" ); |
| 46 | } |
| 47 | return transaction.Cast<DuckTransaction>(); |
| 48 | } |
| 49 | |
| 50 | LocalStorage &DuckTransaction::GetLocalStorage() { |
| 51 | return *storage; |
| 52 | } |
| 53 | |
| 54 | void DuckTransaction::PushCatalogEntry(CatalogEntry &entry, data_ptr_t , idx_t ) { |
| 55 | idx_t alloc_size = sizeof(CatalogEntry *); |
| 56 | if (extra_data_size > 0) { |
| 57 | alloc_size += extra_data_size + sizeof(idx_t); |
| 58 | } |
| 59 | auto baseptr = undo_buffer.CreateEntry(type: UndoFlags::CATALOG_ENTRY, len: alloc_size); |
| 60 | // store the pointer to the catalog entry |
| 61 | Store<CatalogEntry *>(val: &entry, ptr: baseptr); |
| 62 | if (extra_data_size > 0) { |
| 63 | // copy the extra data behind the catalog entry pointer (if any) |
| 64 | baseptr += sizeof(CatalogEntry *); |
| 65 | // first store the extra data size |
| 66 | Store<idx_t>(val: extra_data_size, ptr: baseptr); |
| 67 | baseptr += sizeof(idx_t); |
| 68 | // then copy over the actual data |
| 69 | memcpy(dest: baseptr, src: extra_data, n: extra_data_size); |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | void DuckTransaction::PushDelete(DataTable &table, ChunkVectorInfo *vinfo, row_t rows[], idx_t count, idx_t base_row) { |
| 74 | auto delete_info = reinterpret_cast<DeleteInfo *>( |
| 75 | undo_buffer.CreateEntry(type: UndoFlags::DELETE_TUPLE, len: sizeof(DeleteInfo) + sizeof(row_t) * count)); |
| 76 | delete_info->vinfo = vinfo; |
| 77 | delete_info->table = &table; |
| 78 | delete_info->count = count; |
| 79 | delete_info->base_row = base_row; |
| 80 | memcpy(dest: delete_info->rows, src: rows, n: sizeof(row_t) * count); |
| 81 | } |
| 82 | |
| 83 | void DuckTransaction::PushAppend(DataTable &table, idx_t start_row, idx_t row_count) { |
| 84 | auto append_info = |
| 85 | reinterpret_cast<AppendInfo *>(undo_buffer.CreateEntry(type: UndoFlags::INSERT_TUPLE, len: sizeof(AppendInfo))); |
| 86 | append_info->table = &table; |
| 87 | append_info->start_row = start_row; |
| 88 | append_info->count = row_count; |
| 89 | } |
| 90 | |
| 91 | UpdateInfo *DuckTransaction::CreateUpdateInfo(idx_t type_size, idx_t entries) { |
| 92 | data_ptr_t base_info = undo_buffer.CreateEntry( |
| 93 | type: UndoFlags::UPDATE_TUPLE, len: sizeof(UpdateInfo) + (sizeof(sel_t) + type_size) * STANDARD_VECTOR_SIZE); |
| 94 | auto update_info = reinterpret_cast<UpdateInfo *>(base_info); |
| 95 | update_info->max = STANDARD_VECTOR_SIZE; |
| 96 | update_info->tuples = reinterpret_cast<sel_t *>(base_info + sizeof(UpdateInfo)); |
| 97 | update_info->tuple_data = base_info + sizeof(UpdateInfo) + sizeof(sel_t) * update_info->max; |
| 98 | update_info->version_number = transaction_id; |
| 99 | return update_info; |
| 100 | } |
| 101 | |
| 102 | bool DuckTransaction::ChangesMade() { |
| 103 | return undo_buffer.ChangesMade() || storage->ChangesMade(); |
| 104 | } |
| 105 | |
| 106 | bool DuckTransaction::AutomaticCheckpoint(AttachedDatabase &db) { |
| 107 | auto &storage_manager = db.GetStorageManager(); |
| 108 | return storage_manager.AutomaticCheckpoint(estimated_wal_bytes: storage->EstimatedSize() + undo_buffer.EstimatedSize()); |
| 109 | } |
| 110 | |
| 111 | string DuckTransaction::Commit(AttachedDatabase &db, transaction_t commit_id, bool checkpoint) noexcept { |
| 112 | // "checkpoint" parameter indicates if the caller will checkpoint. If checkpoint == |
| 113 | // true: Then this function will NOT write to the WAL or flush/persist. |
| 114 | // This method only makes commit in memory, expecting caller to checkpoint/flush. |
| 115 | // false: Then this function WILL write to the WAL and Flush/Persist it. |
| 116 | this->commit_id = commit_id; |
| 117 | |
| 118 | UndoBuffer::IteratorState iterator_state; |
| 119 | LocalStorage::CommitState commit_state; |
| 120 | unique_ptr<StorageCommitState> storage_commit_state; |
| 121 | optional_ptr<WriteAheadLog> log; |
| 122 | if (!db.IsSystem()) { |
| 123 | auto &storage_manager = db.GetStorageManager(); |
| 124 | log = storage_manager.GetWriteAheadLog(); |
| 125 | storage_commit_state = storage_manager.GenStorageCommitState(transaction&: *this, checkpoint); |
| 126 | } else { |
| 127 | log = nullptr; |
| 128 | } |
| 129 | try { |
| 130 | storage->Commit(commit_state, transaction&: *this); |
| 131 | undo_buffer.Commit(iterator_state, log, commit_id); |
| 132 | if (log) { |
| 133 | // commit any sequences that were used to the WAL |
| 134 | for (auto &entry : sequence_usage) { |
| 135 | log->WriteSequenceValue(entry: *entry.first, val: entry.second); |
| 136 | } |
| 137 | } |
| 138 | if (storage_commit_state) { |
| 139 | storage_commit_state->FlushCommit(); |
| 140 | } |
| 141 | return string(); |
| 142 | } catch (std::exception &ex) { |
| 143 | return ex.what(); |
| 144 | } |
| 145 | } |
| 146 | |
| 147 | void DuckTransaction::Rollback() noexcept { |
| 148 | storage->Rollback(); |
| 149 | undo_buffer.Rollback(); |
| 150 | } |
| 151 | |
| 152 | void DuckTransaction::Cleanup() { |
| 153 | undo_buffer.Cleanup(); |
| 154 | } |
| 155 | |
| 156 | } // namespace duckdb |
| 157 | |