1#include "duckdb/transaction/duck_transaction.hpp"
2
3#include "duckdb/main/client_context.hpp"
4#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
5#include "duckdb/common/exception.hpp"
6#include "duckdb/parser/column_definition.hpp"
7#include "duckdb/storage/data_table.hpp"
8#include "duckdb/storage/write_ahead_log.hpp"
9#include "duckdb/storage/storage_manager.hpp"
10
11#include "duckdb/transaction/append_info.hpp"
12#include "duckdb/transaction/delete_info.hpp"
13#include "duckdb/transaction/update_info.hpp"
14#include "duckdb/transaction/local_storage.hpp"
15#include "duckdb/main/config.hpp"
16#include "duckdb/storage/table/column_data.hpp"
17#include "duckdb/main/client_data.hpp"
18#include "duckdb/main/attached_database.hpp"
19
20namespace duckdb {
21
22TransactionData::TransactionData(DuckTransaction &transaction_p) // NOLINT
23 : transaction(&transaction_p), transaction_id(transaction_p.transaction_id), start_time(transaction_p.start_time) {
24}
25TransactionData::TransactionData(transaction_t transaction_id_p, transaction_t start_time_p)
26 : transaction(nullptr), transaction_id(transaction_id_p), start_time(start_time_p) {
27}
28
29DuckTransaction::DuckTransaction(TransactionManager &manager, ClientContext &context_p, transaction_t start_time,
30 transaction_t transaction_id)
31 : Transaction(manager, context_p), start_time(start_time), transaction_id(transaction_id), commit_id(0),
32 highest_active_query(0), undo_buffer(context_p), storage(make_uniq<LocalStorage>(args&: context_p, args&: *this)) {
33}
34
35DuckTransaction::~DuckTransaction() {
36}
37
38DuckTransaction &DuckTransaction::Get(ClientContext &context, AttachedDatabase &db) {
39 return DuckTransaction::Get(context, catalog&: db.GetCatalog());
40}
41
42DuckTransaction &DuckTransaction::Get(ClientContext &context, Catalog &catalog) {
43 auto &transaction = Transaction::Get(context, catalog);
44 if (!transaction.IsDuckTransaction()) {
45 throw InternalException("DuckTransaction::Get called on non-DuckDB transaction");
46 }
47 return transaction.Cast<DuckTransaction>();
48}
49
50LocalStorage &DuckTransaction::GetLocalStorage() {
51 return *storage;
52}
53
54void DuckTransaction::PushCatalogEntry(CatalogEntry &entry, data_ptr_t extra_data, idx_t extra_data_size) {
55 idx_t alloc_size = sizeof(CatalogEntry *);
56 if (extra_data_size > 0) {
57 alloc_size += extra_data_size + sizeof(idx_t);
58 }
59 auto baseptr = undo_buffer.CreateEntry(type: UndoFlags::CATALOG_ENTRY, len: alloc_size);
60 // store the pointer to the catalog entry
61 Store<CatalogEntry *>(val: &entry, ptr: baseptr);
62 if (extra_data_size > 0) {
63 // copy the extra data behind the catalog entry pointer (if any)
64 baseptr += sizeof(CatalogEntry *);
65 // first store the extra data size
66 Store<idx_t>(val: extra_data_size, ptr: baseptr);
67 baseptr += sizeof(idx_t);
68 // then copy over the actual data
69 memcpy(dest: baseptr, src: extra_data, n: extra_data_size);
70 }
71}
72
73void DuckTransaction::PushDelete(DataTable &table, ChunkVectorInfo *vinfo, row_t rows[], idx_t count, idx_t base_row) {
74 auto delete_info = reinterpret_cast<DeleteInfo *>(
75 undo_buffer.CreateEntry(type: UndoFlags::DELETE_TUPLE, len: sizeof(DeleteInfo) + sizeof(row_t) * count));
76 delete_info->vinfo = vinfo;
77 delete_info->table = &table;
78 delete_info->count = count;
79 delete_info->base_row = base_row;
80 memcpy(dest: delete_info->rows, src: rows, n: sizeof(row_t) * count);
81}
82
83void DuckTransaction::PushAppend(DataTable &table, idx_t start_row, idx_t row_count) {
84 auto append_info =
85 reinterpret_cast<AppendInfo *>(undo_buffer.CreateEntry(type: UndoFlags::INSERT_TUPLE, len: sizeof(AppendInfo)));
86 append_info->table = &table;
87 append_info->start_row = start_row;
88 append_info->count = row_count;
89}
90
91UpdateInfo *DuckTransaction::CreateUpdateInfo(idx_t type_size, idx_t entries) {
92 data_ptr_t base_info = undo_buffer.CreateEntry(
93 type: UndoFlags::UPDATE_TUPLE, len: sizeof(UpdateInfo) + (sizeof(sel_t) + type_size) * STANDARD_VECTOR_SIZE);
94 auto update_info = reinterpret_cast<UpdateInfo *>(base_info);
95 update_info->max = STANDARD_VECTOR_SIZE;
96 update_info->tuples = reinterpret_cast<sel_t *>(base_info + sizeof(UpdateInfo));
97 update_info->tuple_data = base_info + sizeof(UpdateInfo) + sizeof(sel_t) * update_info->max;
98 update_info->version_number = transaction_id;
99 return update_info;
100}
101
102bool DuckTransaction::ChangesMade() {
103 return undo_buffer.ChangesMade() || storage->ChangesMade();
104}
105
106bool DuckTransaction::AutomaticCheckpoint(AttachedDatabase &db) {
107 auto &storage_manager = db.GetStorageManager();
108 return storage_manager.AutomaticCheckpoint(estimated_wal_bytes: storage->EstimatedSize() + undo_buffer.EstimatedSize());
109}
110
111string DuckTransaction::Commit(AttachedDatabase &db, transaction_t commit_id, bool checkpoint) noexcept {
112 // "checkpoint" parameter indicates if the caller will checkpoint. If checkpoint ==
113 // true: Then this function will NOT write to the WAL or flush/persist.
114 // This method only makes commit in memory, expecting caller to checkpoint/flush.
115 // false: Then this function WILL write to the WAL and Flush/Persist it.
116 this->commit_id = commit_id;
117
118 UndoBuffer::IteratorState iterator_state;
119 LocalStorage::CommitState commit_state;
120 unique_ptr<StorageCommitState> storage_commit_state;
121 optional_ptr<WriteAheadLog> log;
122 if (!db.IsSystem()) {
123 auto &storage_manager = db.GetStorageManager();
124 log = storage_manager.GetWriteAheadLog();
125 storage_commit_state = storage_manager.GenStorageCommitState(transaction&: *this, checkpoint);
126 } else {
127 log = nullptr;
128 }
129 try {
130 storage->Commit(commit_state, transaction&: *this);
131 undo_buffer.Commit(iterator_state, log, commit_id);
132 if (log) {
133 // commit any sequences that were used to the WAL
134 for (auto &entry : sequence_usage) {
135 log->WriteSequenceValue(entry: *entry.first, val: entry.second);
136 }
137 }
138 if (storage_commit_state) {
139 storage_commit_state->FlushCommit();
140 }
141 return string();
142 } catch (std::exception &ex) {
143 return ex.what();
144 }
145}
146
147void DuckTransaction::Rollback() noexcept {
148 storage->Rollback();
149 undo_buffer.Rollback();
150}
151
152void DuckTransaction::Cleanup() {
153 undo_buffer.Cleanup();
154}
155
156} // namespace duckdb
157