1#include "duckdb/transaction/local_storage.hpp"
2#include "duckdb/execution/index/art/art.hpp"
3#include "duckdb/storage/table/append_state.hpp"
4#include "duckdb/storage/write_ahead_log.hpp"
5#include "duckdb/common/vector_operations/vector_operations.hpp"
6#include "duckdb/storage/table/row_group.hpp"
7#include "duckdb/transaction/duck_transaction.hpp"
8#include "duckdb/planner/table_filter.hpp"
9#include "duckdb/storage/partial_block_manager.hpp"
10
11#include "duckdb/storage/table/column_checkpoint_state.hpp"
12#include "duckdb/storage/table_io_manager.hpp"
13#include "duckdb/storage/table/scan_state.hpp"
14
15namespace duckdb {
16
17LocalTableStorage::LocalTableStorage(DataTable &table)
18 : table_ref(table), allocator(Allocator::Get(db&: table.db)), deleted_rows(0), optimistic_writer(table),
19 merged_storage(false) {
20 auto types = table.GetTypes();
21 row_groups = make_shared<RowGroupCollection>(args&: table.info, args&: TableIOManager::Get(table).GetBlockManagerForRowData(),
22 args&: types, args: MAX_ROW_ID, args: 0);
23 row_groups->InitializeEmpty();
24
25 table.info->indexes.Scan(callback: [&](Index &index) {
26 D_ASSERT(index.type == IndexType::ART);
27 auto &art = index.Cast<ART>();
28 ;
29 if (art.constraint_type != IndexConstraintType::NONE) {
30 // unique index: create a local ART index that maintains the same unique constraint
31 vector<unique_ptr<Expression>> unbound_expressions;
32 unbound_expressions.reserve(n: art.unbound_expressions.size());
33 for (auto &expr : art.unbound_expressions) {
34 unbound_expressions.push_back(x: expr->Copy());
35 }
36 indexes.AddIndex(index: make_uniq<ART>(args&: art.column_ids, args&: art.table_io_manager, args: std::move(unbound_expressions),
37 args&: art.constraint_type, args&: art.db));
38 }
39 return false;
40 });
41}
42
43LocalTableStorage::LocalTableStorage(ClientContext &context, DataTable &new_dt, LocalTableStorage &parent,
44 idx_t changed_idx, const LogicalType &target_type,
45 const vector<column_t> &bound_columns, Expression &cast_expr)
46 : table_ref(new_dt), allocator(Allocator::Get(db&: new_dt.db)), deleted_rows(parent.deleted_rows),
47 optimistic_writer(new_dt, parent.optimistic_writer), optimistic_writers(std::move(parent.optimistic_writers)),
48 merged_storage(parent.merged_storage) {
49 row_groups = parent.row_groups->AlterType(context, changed_idx, target_type, bound_columns, cast_expr);
50 parent.row_groups.reset();
51 indexes.Move(other&: parent.indexes);
52}
53
54LocalTableStorage::LocalTableStorage(DataTable &new_dt, LocalTableStorage &parent, idx_t drop_idx)
55 : table_ref(new_dt), allocator(Allocator::Get(db&: new_dt.db)), deleted_rows(parent.deleted_rows),
56 optimistic_writer(new_dt, parent.optimistic_writer), optimistic_writers(std::move(parent.optimistic_writers)),
57 merged_storage(parent.merged_storage) {
58 row_groups = parent.row_groups->RemoveColumn(col_idx: drop_idx);
59 parent.row_groups.reset();
60 indexes.Move(other&: parent.indexes);
61}
62
63LocalTableStorage::LocalTableStorage(ClientContext &context, DataTable &new_dt, LocalTableStorage &parent,
64 ColumnDefinition &new_column, optional_ptr<Expression> default_value)
65 : table_ref(new_dt), allocator(Allocator::Get(db&: new_dt.db)), deleted_rows(parent.deleted_rows),
66 optimistic_writer(new_dt, parent.optimistic_writer), optimistic_writers(std::move(parent.optimistic_writers)),
67 merged_storage(parent.merged_storage) {
68 row_groups = parent.row_groups->AddColumn(context, new_column, default_value: default_value.get());
69 parent.row_groups.reset();
70 indexes.Move(other&: parent.indexes);
71}
72
73LocalTableStorage::~LocalTableStorage() {
74}
75
76void LocalTableStorage::InitializeScan(CollectionScanState &state, optional_ptr<TableFilterSet> table_filters) {
77 if (row_groups->GetTotalRows() == 0) {
78 // nothing to scan
79 return;
80 }
81 row_groups->InitializeScan(state, column_ids: state.GetColumnIds(), table_filters: table_filters.get());
82}
83
84idx_t LocalTableStorage::EstimatedSize() {
85 idx_t appended_rows = row_groups->GetTotalRows() - deleted_rows;
86 if (appended_rows == 0) {
87 return 0;
88 }
89 idx_t row_size = 0;
90 auto &types = row_groups->GetTypes();
91 for (auto &type : types) {
92 row_size += GetTypeIdSize(type: type.InternalType());
93 }
94 return appended_rows * row_size;
95}
96
97void LocalTableStorage::WriteNewRowGroup() {
98 if (deleted_rows != 0) {
99 // we have deletes - we cannot merge row groups
100 return;
101 }
102 optimistic_writer.WriteNewRowGroup(row_groups&: *row_groups);
103}
104
105void LocalTableStorage::FlushBlocks() {
106 if (!merged_storage && row_groups->GetTotalRows() > RowGroup::ROW_GROUP_SIZE) {
107 optimistic_writer.WriteLastRowGroup(row_groups&: *row_groups);
108 }
109 optimistic_writer.FinalFlush();
110}
111
112PreservedError LocalTableStorage::AppendToIndexes(DuckTransaction &transaction, RowGroupCollection &source,
113 TableIndexList &index_list, const vector<LogicalType> &table_types,
114 row_t &start_row) {
115 // only need to scan for index append
116 // figure out which columns we need to scan for the set of indexes
117 auto columns = index_list.GetRequiredColumns();
118 // create an empty mock chunk that contains all the correct types for the table
119 DataChunk mock_chunk;
120 mock_chunk.InitializeEmpty(types: table_types);
121 PreservedError error;
122 source.Scan(transaction, column_ids: columns, fun: [&](DataChunk &chunk) -> bool {
123 // construct the mock chunk by referencing the required columns
124 for (idx_t i = 0; i < columns.size(); i++) {
125 mock_chunk.data[columns[i]].Reference(other&: chunk.data[i]);
126 }
127 mock_chunk.SetCardinality(chunk);
128 // append this chunk to the indexes of the table
129 error = DataTable::AppendToIndexes(indexes&: index_list, chunk&: mock_chunk, row_start: start_row);
130 if (error) {
131 return false;
132 }
133 start_row += chunk.size();
134 return true;
135 });
136 return error;
137}
138
139void LocalTableStorage::AppendToIndexes(DuckTransaction &transaction, TableAppendState &append_state,
140 idx_t append_count, bool append_to_table) {
141 auto &table = table_ref.get();
142 if (append_to_table) {
143 table.InitializeAppend(transaction, state&: append_state, append_count);
144 }
145 PreservedError error;
146 if (append_to_table) {
147 // appending: need to scan entire
148 row_groups->Scan(transaction, fun: [&](DataChunk &chunk) -> bool {
149 // append this chunk to the indexes of the table
150 error = table.AppendToIndexes(chunk, row_start: append_state.current_row);
151 if (error) {
152 return false;
153 }
154 // append to base table
155 table.Append(chunk, state&: append_state);
156 return true;
157 });
158 } else {
159 error =
160 AppendToIndexes(transaction, source&: *row_groups, index_list&: table.info->indexes, table_types: table.GetTypes(), start_row&: append_state.current_row);
161 }
162 if (error) {
163 // need to revert the append
164 row_t current_row = append_state.row_start;
165 // remove the data from the indexes, if there are any indexes
166 row_groups->Scan(transaction, fun: [&](DataChunk &chunk) -> bool {
167 // append this chunk to the indexes of the table
168 try {
169 table.RemoveFromIndexes(state&: append_state, chunk, row_start: current_row);
170 } catch (Exception &ex) {
171 error = PreservedError(ex);
172 return false;
173 } catch (std::exception &ex) {
174 error = PreservedError(ex);
175 return false;
176 }
177
178 current_row += chunk.size();
179 if (current_row >= append_state.current_row) {
180 // finished deleting all rows from the index: abort now
181 return false;
182 }
183 return true;
184 });
185 if (append_to_table) {
186 table.RevertAppendInternal(start_row: append_state.row_start, count: append_count);
187 }
188 error.Throw();
189 }
190}
191
192OptimisticDataWriter &LocalTableStorage::CreateOptimisticWriter() {
193 auto writer = make_uniq<OptimisticDataWriter>(args&: table_ref.get());
194 optimistic_writers.push_back(x: std::move(writer));
195 return *optimistic_writers.back();
196}
197
198void LocalTableStorage::FinalizeOptimisticWriter(OptimisticDataWriter &writer) {
199 // remove the writer from the set of optimistic writers
200 unique_ptr<OptimisticDataWriter> owned_writer;
201 for (idx_t i = 0; i < optimistic_writers.size(); i++) {
202 if (optimistic_writers[i].get() == &writer) {
203 owned_writer = std::move(optimistic_writers[i]);
204 optimistic_writers.erase(position: optimistic_writers.begin() + i);
205 break;
206 }
207 }
208 if (!owned_writer) {
209 throw InternalException("Error in FinalizeOptimisticWriter - could not find writer");
210 }
211 optimistic_writer.Merge(other&: *owned_writer);
212}
213
214void LocalTableStorage::Rollback() {
215 for (auto &writer : optimistic_writers) {
216 writer->Rollback();
217 }
218 optimistic_writers.clear();
219 optimistic_writer.Rollback();
220}
221
222//===--------------------------------------------------------------------===//
223// LocalTableManager
224//===--------------------------------------------------------------------===//
225optional_ptr<LocalTableStorage> LocalTableManager::GetStorage(DataTable &table) {
226 lock_guard<mutex> l(table_storage_lock);
227 auto entry = table_storage.find(x: table);
228 return entry == table_storage.end() ? nullptr : entry->second.get();
229}
230
231LocalTableStorage &LocalTableManager::GetOrCreateStorage(DataTable &table) {
232 lock_guard<mutex> l(table_storage_lock);
233 auto entry = table_storage.find(x: table);
234 if (entry == table_storage.end()) {
235 auto new_storage = make_shared<LocalTableStorage>(args&: table);
236 auto storage = new_storage.get();
237 table_storage.insert(x: make_pair(x: reference<DataTable>(table), y: std::move(new_storage)));
238 return *storage;
239 } else {
240 return *entry->second.get();
241 }
242}
243
244bool LocalTableManager::IsEmpty() {
245 lock_guard<mutex> l(table_storage_lock);
246 return table_storage.empty();
247}
248
249shared_ptr<LocalTableStorage> LocalTableManager::MoveEntry(DataTable &table) {
250 lock_guard<mutex> l(table_storage_lock);
251 auto entry = table_storage.find(x: table);
252 if (entry == table_storage.end()) {
253 return nullptr;
254 }
255 auto storage_entry = std::move(entry->second);
256 table_storage.erase(position: entry);
257 return storage_entry;
258}
259
260reference_map_t<DataTable, shared_ptr<LocalTableStorage>> LocalTableManager::MoveEntries() {
261 lock_guard<mutex> l(table_storage_lock);
262 return std::move(table_storage);
263}
264
265idx_t LocalTableManager::EstimatedSize() {
266 lock_guard<mutex> l(table_storage_lock);
267 idx_t estimated_size = 0;
268 for (auto &storage : table_storage) {
269 estimated_size += storage.second->EstimatedSize();
270 }
271 return estimated_size;
272}
273
274void LocalTableManager::InsertEntry(DataTable &table, shared_ptr<LocalTableStorage> entry) {
275 lock_guard<mutex> l(table_storage_lock);
276 D_ASSERT(table_storage.find(table) == table_storage.end());
277 table_storage[table] = std::move(entry);
278}
279
280//===--------------------------------------------------------------------===//
281// LocalStorage
282//===--------------------------------------------------------------------===//
283LocalStorage::LocalStorage(ClientContext &context, DuckTransaction &transaction)
284 : context(context), transaction(transaction) {
285}
286
287LocalStorage::CommitState::CommitState() {
288}
289
290LocalStorage::CommitState::~CommitState() {
291}
292
293LocalStorage &LocalStorage::Get(DuckTransaction &transaction) {
294 return transaction.GetLocalStorage();
295}
296
297LocalStorage &LocalStorage::Get(ClientContext &context, AttachedDatabase &db) {
298 return DuckTransaction::Get(context, db).GetLocalStorage();
299}
300
301LocalStorage &LocalStorage::Get(ClientContext &context, Catalog &catalog) {
302 return LocalStorage::Get(context, db&: catalog.GetAttached());
303}
304
305void LocalStorage::InitializeScan(DataTable &table, CollectionScanState &state,
306 optional_ptr<TableFilterSet> table_filters) {
307 auto storage = table_manager.GetStorage(table);
308 if (storage == nullptr) {
309 return;
310 }
311 storage->InitializeScan(state, table_filters);
312}
313
314void LocalStorage::Scan(CollectionScanState &state, const vector<storage_t> &column_ids, DataChunk &result) {
315 state.Scan(transaction, result);
316}
317
318void LocalStorage::InitializeParallelScan(DataTable &table, ParallelCollectionScanState &state) {
319 auto storage = table_manager.GetStorage(table);
320 if (!storage) {
321 state.max_row = 0;
322 state.vector_index = 0;
323 state.current_row_group = nullptr;
324 } else {
325 storage->row_groups->InitializeParallelScan(state);
326 }
327}
328
329bool LocalStorage::NextParallelScan(ClientContext &context, DataTable &table, ParallelCollectionScanState &state,
330 CollectionScanState &scan_state) {
331 auto storage = table_manager.GetStorage(table);
332 if (!storage) {
333 return false;
334 }
335 return storage->row_groups->NextParallelScan(context, state, scan_state);
336}
337
338void LocalStorage::InitializeAppend(LocalAppendState &state, DataTable &table) {
339 state.storage = &table_manager.GetOrCreateStorage(table);
340 state.storage->row_groups->InitializeAppend(transaction: TransactionData(transaction), state&: state.append_state, append_count: 0);
341}
342
343void LocalStorage::Append(LocalAppendState &state, DataChunk &chunk) {
344 // append to unique indices (if any)
345 auto storage = state.storage;
346 idx_t base_id = MAX_ROW_ID + storage->row_groups->GetTotalRows() + state.append_state.total_append_count;
347 auto error = DataTable::AppendToIndexes(indexes&: storage->indexes, chunk, row_start: base_id);
348 if (error) {
349 error.Throw();
350 }
351
352 //! Append the chunk to the local storage
353 auto new_row_group = storage->row_groups->Append(chunk, state&: state.append_state);
354
355 //! Check if we should pre-emptively flush blocks to disk
356 if (new_row_group) {
357 storage->WriteNewRowGroup();
358 }
359}
360
361void LocalStorage::FinalizeAppend(LocalAppendState &state) {
362 state.storage->row_groups->FinalizeAppend(transaction: state.append_state.transaction, state&: state.append_state);
363}
364
365void LocalStorage::LocalMerge(DataTable &table, RowGroupCollection &collection) {
366 auto &storage = table_manager.GetOrCreateStorage(table);
367 if (!storage.indexes.Empty()) {
368 // append data to indexes if required
369 row_t base_id = MAX_ROW_ID + storage.row_groups->GetTotalRows();
370 auto error = storage.AppendToIndexes(transaction, source&: collection, index_list&: storage.indexes, table_types: table.GetTypes(), start_row&: base_id);
371 if (error) {
372 error.Throw();
373 }
374 }
375 storage.row_groups->MergeStorage(data&: collection);
376 storage.merged_storage = true;
377}
378
379OptimisticDataWriter &LocalStorage::CreateOptimisticWriter(DataTable &table) {
380 auto &storage = table_manager.GetOrCreateStorage(table);
381 return storage.CreateOptimisticWriter();
382}
383
384void LocalStorage::FinalizeOptimisticWriter(DataTable &table, OptimisticDataWriter &writer) {
385 auto &storage = table_manager.GetOrCreateStorage(table);
386 storage.FinalizeOptimisticWriter(writer);
387}
388
389bool LocalStorage::ChangesMade() noexcept {
390 return !table_manager.IsEmpty();
391}
392
393bool LocalStorage::Find(DataTable &table) {
394 return table_manager.GetStorage(table) != nullptr;
395}
396
397idx_t LocalStorage::EstimatedSize() {
398 return table_manager.EstimatedSize();
399}
400
401idx_t LocalStorage::Delete(DataTable &table, Vector &row_ids, idx_t count) {
402 auto storage = table_manager.GetStorage(table);
403 D_ASSERT(storage);
404
405 // delete from unique indices (if any)
406 if (!storage->indexes.Empty()) {
407 storage->row_groups->RemoveFromIndexes(indexes&: storage->indexes, row_identifiers&: row_ids, count);
408 }
409
410 auto ids = FlatVector::GetData<row_t>(vector&: row_ids);
411 idx_t delete_count = storage->row_groups->Delete(transaction: TransactionData(0, 0), table, ids, count);
412 storage->deleted_rows += delete_count;
413 return delete_count;
414}
415
416void LocalStorage::Update(DataTable &table, Vector &row_ids, const vector<PhysicalIndex> &column_ids,
417 DataChunk &updates) {
418 auto storage = table_manager.GetStorage(table);
419 D_ASSERT(storage);
420
421 auto ids = FlatVector::GetData<row_t>(vector&: row_ids);
422 storage->row_groups->Update(transaction: TransactionData(0, 0), ids, column_ids, updates);
423}
424
425void LocalStorage::Flush(DataTable &table, LocalTableStorage &storage) {
426 if (storage.row_groups->GetTotalRows() <= storage.deleted_rows) {
427 return;
428 }
429 idx_t append_count = storage.row_groups->GetTotalRows() - storage.deleted_rows;
430
431 TableAppendState append_state;
432 table.AppendLock(state&: append_state);
433 if ((append_state.row_start == 0 || storage.row_groups->GetTotalRows() >= MERGE_THRESHOLD) &&
434 storage.deleted_rows == 0) {
435 // table is currently empty OR we are bulk appending: move over the storage directly
436 // first flush any outstanding blocks
437 storage.FlushBlocks();
438 // now append to the indexes (if there are any)
439 // FIXME: we should be able to merge the transaction-local index directly into the main table index
440 // as long we just rewrite some row-ids
441 if (!table.info->indexes.Empty()) {
442 storage.AppendToIndexes(transaction, append_state, append_count, append_to_table: false);
443 }
444 // finally move over the row groups
445 table.MergeStorage(data&: *storage.row_groups, indexes&: storage.indexes);
446 } else {
447 // check if we have written data
448 // if we have, we cannot merge to disk after all
449 // so we need to revert the data we have already written
450 storage.Rollback();
451 // append to the indexes and append to the base table
452 storage.AppendToIndexes(transaction, append_state, append_count, append_to_table: true);
453 }
454 transaction.PushAppend(table, row_start: append_state.row_start, row_count: append_count);
455
456 // possibly vacuum any excess index data
457 table.info->indexes.Scan(callback: [&](Index &index) {
458 index.Vacuum();
459 return false;
460 });
461}
462
463void LocalStorage::Commit(LocalStorage::CommitState &commit_state, DuckTransaction &transaction) {
464 // commit local storage
465 // iterate over all entries in the table storage map and commit them
466 // after this, the local storage is no longer required and can be cleared
467 auto table_storage = table_manager.MoveEntries();
468 for (auto &entry : table_storage) {
469 auto table = entry.first;
470 auto storage = entry.second.get();
471 Flush(table, storage&: *storage);
472 entry.second.reset();
473 }
474}
475
476void LocalStorage::Rollback() {
477 // rollback local storage
478 // after this, the local storage is no longer required and can be cleared
479 auto table_storage = table_manager.MoveEntries();
480 for (auto &entry : table_storage) {
481 auto storage = entry.second.get();
482 if (!storage) {
483 continue;
484 }
485 storage->Rollback();
486
487 entry.second.reset();
488 }
489}
490
491idx_t LocalStorage::AddedRows(DataTable &table) {
492 auto storage = table_manager.GetStorage(table);
493 if (!storage) {
494 return 0;
495 }
496 return storage->row_groups->GetTotalRows() - storage->deleted_rows;
497}
498
499void LocalStorage::MoveStorage(DataTable &old_dt, DataTable &new_dt) {
500 // check if there are any pending appends for the old version of the table
501 auto new_storage = table_manager.MoveEntry(table&: old_dt);
502 if (!new_storage) {
503 return;
504 }
505 // take over the storage from the old entry
506 new_storage->table_ref = new_dt;
507 table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage));
508}
509
510void LocalStorage::AddColumn(DataTable &old_dt, DataTable &new_dt, ColumnDefinition &new_column,
511 optional_ptr<Expression> default_value) {
512 // check if there are any pending appends for the old version of the table
513 auto storage = table_manager.MoveEntry(table&: old_dt);
514 if (!storage) {
515 return;
516 }
517 auto new_storage = make_shared<LocalTableStorage>(args&: context, args&: new_dt, args&: *storage, args&: new_column, args&: default_value);
518 table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage));
519}
520
521void LocalStorage::DropColumn(DataTable &old_dt, DataTable &new_dt, idx_t removed_column) {
522 // check if there are any pending appends for the old version of the table
523 auto storage = table_manager.MoveEntry(table&: old_dt);
524 if (!storage) {
525 return;
526 }
527 auto new_storage = make_shared<LocalTableStorage>(args&: new_dt, args&: *storage, args&: removed_column);
528 table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage));
529}
530
531void LocalStorage::ChangeType(DataTable &old_dt, DataTable &new_dt, idx_t changed_idx, const LogicalType &target_type,
532 const vector<column_t> &bound_columns, Expression &cast_expr) {
533 // check if there are any pending appends for the old version of the table
534 auto storage = table_manager.MoveEntry(table&: old_dt);
535 if (!storage) {
536 return;
537 }
538 auto new_storage =
539 make_shared<LocalTableStorage>(args&: context, args&: new_dt, args&: *storage, args&: changed_idx, args: target_type, args: bound_columns, args&: cast_expr);
540 table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage));
541}
542
543void LocalStorage::FetchChunk(DataTable &table, Vector &row_ids, idx_t count, const vector<column_t> &col_ids,
544 DataChunk &chunk, ColumnFetchState &fetch_state) {
545 auto storage = table_manager.GetStorage(table);
546 if (!storage) {
547 throw InternalException("LocalStorage::FetchChunk - local storage not found");
548 }
549
550 storage->row_groups->Fetch(transaction, result&: chunk, column_ids: col_ids, row_identifiers: row_ids, fetch_count: count, state&: fetch_state);
551}
552
553TableIndexList &LocalStorage::GetIndexes(DataTable &table) {
554 auto storage = table_manager.GetStorage(table);
555 if (!storage) {
556 throw InternalException("LocalStorage::GetIndexes - local storage not found");
557 }
558 return storage->indexes;
559}
560
561void LocalStorage::VerifyNewConstraint(DataTable &parent, const BoundConstraint &constraint) {
562 auto storage = table_manager.GetStorage(table&: parent);
563 if (!storage) {
564 return;
565 }
566 storage->row_groups->VerifyNewConstraint(parent, constraint);
567}
568
569} // namespace duckdb
570