1 | #include "duckdb/transaction/local_storage.hpp" |
2 | #include "duckdb/execution/index/art/art.hpp" |
3 | #include "duckdb/storage/table/append_state.hpp" |
4 | #include "duckdb/storage/write_ahead_log.hpp" |
5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
6 | #include "duckdb/storage/table/row_group.hpp" |
7 | #include "duckdb/transaction/duck_transaction.hpp" |
8 | #include "duckdb/planner/table_filter.hpp" |
9 | #include "duckdb/storage/partial_block_manager.hpp" |
10 | |
11 | #include "duckdb/storage/table/column_checkpoint_state.hpp" |
12 | #include "duckdb/storage/table_io_manager.hpp" |
13 | #include "duckdb/storage/table/scan_state.hpp" |
14 | |
15 | namespace duckdb { |
16 | |
17 | LocalTableStorage::LocalTableStorage(DataTable &table) |
18 | : table_ref(table), allocator(Allocator::Get(db&: table.db)), deleted_rows(0), optimistic_writer(table), |
19 | merged_storage(false) { |
20 | auto types = table.GetTypes(); |
21 | row_groups = make_shared<RowGroupCollection>(args&: table.info, args&: TableIOManager::Get(table).GetBlockManagerForRowData(), |
22 | args&: types, args: MAX_ROW_ID, args: 0); |
23 | row_groups->InitializeEmpty(); |
24 | |
25 | table.info->indexes.Scan(callback: [&](Index &index) { |
26 | D_ASSERT(index.type == IndexType::ART); |
27 | auto &art = index.Cast<ART>(); |
28 | ; |
29 | if (art.constraint_type != IndexConstraintType::NONE) { |
30 | // unique index: create a local ART index that maintains the same unique constraint |
31 | vector<unique_ptr<Expression>> unbound_expressions; |
32 | unbound_expressions.reserve(n: art.unbound_expressions.size()); |
33 | for (auto &expr : art.unbound_expressions) { |
34 | unbound_expressions.push_back(x: expr->Copy()); |
35 | } |
36 | indexes.AddIndex(index: make_uniq<ART>(args&: art.column_ids, args&: art.table_io_manager, args: std::move(unbound_expressions), |
37 | args&: art.constraint_type, args&: art.db)); |
38 | } |
39 | return false; |
40 | }); |
41 | } |
42 | |
43 | LocalTableStorage::LocalTableStorage(ClientContext &context, DataTable &new_dt, LocalTableStorage &parent, |
44 | idx_t changed_idx, const LogicalType &target_type, |
45 | const vector<column_t> &bound_columns, Expression &cast_expr) |
46 | : table_ref(new_dt), allocator(Allocator::Get(db&: new_dt.db)), deleted_rows(parent.deleted_rows), |
47 | optimistic_writer(new_dt, parent.optimistic_writer), optimistic_writers(std::move(parent.optimistic_writers)), |
48 | merged_storage(parent.merged_storage) { |
49 | row_groups = parent.row_groups->AlterType(context, changed_idx, target_type, bound_columns, cast_expr); |
50 | parent.row_groups.reset(); |
51 | indexes.Move(other&: parent.indexes); |
52 | } |
53 | |
54 | LocalTableStorage::LocalTableStorage(DataTable &new_dt, LocalTableStorage &parent, idx_t drop_idx) |
55 | : table_ref(new_dt), allocator(Allocator::Get(db&: new_dt.db)), deleted_rows(parent.deleted_rows), |
56 | optimistic_writer(new_dt, parent.optimistic_writer), optimistic_writers(std::move(parent.optimistic_writers)), |
57 | merged_storage(parent.merged_storage) { |
58 | row_groups = parent.row_groups->RemoveColumn(col_idx: drop_idx); |
59 | parent.row_groups.reset(); |
60 | indexes.Move(other&: parent.indexes); |
61 | } |
62 | |
63 | LocalTableStorage::LocalTableStorage(ClientContext &context, DataTable &new_dt, LocalTableStorage &parent, |
64 | ColumnDefinition &new_column, optional_ptr<Expression> default_value) |
65 | : table_ref(new_dt), allocator(Allocator::Get(db&: new_dt.db)), deleted_rows(parent.deleted_rows), |
66 | optimistic_writer(new_dt, parent.optimistic_writer), optimistic_writers(std::move(parent.optimistic_writers)), |
67 | merged_storage(parent.merged_storage) { |
68 | row_groups = parent.row_groups->AddColumn(context, new_column, default_value: default_value.get()); |
69 | parent.row_groups.reset(); |
70 | indexes.Move(other&: parent.indexes); |
71 | } |
72 | |
73 | LocalTableStorage::~LocalTableStorage() { |
74 | } |
75 | |
76 | void LocalTableStorage::InitializeScan(CollectionScanState &state, optional_ptr<TableFilterSet> table_filters) { |
77 | if (row_groups->GetTotalRows() == 0) { |
78 | // nothing to scan |
79 | return; |
80 | } |
81 | row_groups->InitializeScan(state, column_ids: state.GetColumnIds(), table_filters: table_filters.get()); |
82 | } |
83 | |
84 | idx_t LocalTableStorage::EstimatedSize() { |
85 | idx_t appended_rows = row_groups->GetTotalRows() - deleted_rows; |
86 | if (appended_rows == 0) { |
87 | return 0; |
88 | } |
89 | idx_t row_size = 0; |
90 | auto &types = row_groups->GetTypes(); |
91 | for (auto &type : types) { |
92 | row_size += GetTypeIdSize(type: type.InternalType()); |
93 | } |
94 | return appended_rows * row_size; |
95 | } |
96 | |
97 | void LocalTableStorage::WriteNewRowGroup() { |
98 | if (deleted_rows != 0) { |
99 | // we have deletes - we cannot merge row groups |
100 | return; |
101 | } |
102 | optimistic_writer.WriteNewRowGroup(row_groups&: *row_groups); |
103 | } |
104 | |
105 | void LocalTableStorage::FlushBlocks() { |
106 | if (!merged_storage && row_groups->GetTotalRows() > RowGroup::ROW_GROUP_SIZE) { |
107 | optimistic_writer.WriteLastRowGroup(row_groups&: *row_groups); |
108 | } |
109 | optimistic_writer.FinalFlush(); |
110 | } |
111 | |
112 | PreservedError LocalTableStorage::AppendToIndexes(DuckTransaction &transaction, RowGroupCollection &source, |
113 | TableIndexList &index_list, const vector<LogicalType> &table_types, |
114 | row_t &start_row) { |
115 | // only need to scan for index append |
116 | // figure out which columns we need to scan for the set of indexes |
117 | auto columns = index_list.GetRequiredColumns(); |
118 | // create an empty mock chunk that contains all the correct types for the table |
119 | DataChunk mock_chunk; |
120 | mock_chunk.InitializeEmpty(types: table_types); |
121 | PreservedError error; |
122 | source.Scan(transaction, column_ids: columns, fun: [&](DataChunk &chunk) -> bool { |
123 | // construct the mock chunk by referencing the required columns |
124 | for (idx_t i = 0; i < columns.size(); i++) { |
125 | mock_chunk.data[columns[i]].Reference(other&: chunk.data[i]); |
126 | } |
127 | mock_chunk.SetCardinality(chunk); |
128 | // append this chunk to the indexes of the table |
129 | error = DataTable::AppendToIndexes(indexes&: index_list, chunk&: mock_chunk, row_start: start_row); |
130 | if (error) { |
131 | return false; |
132 | } |
133 | start_row += chunk.size(); |
134 | return true; |
135 | }); |
136 | return error; |
137 | } |
138 | |
139 | void LocalTableStorage::AppendToIndexes(DuckTransaction &transaction, TableAppendState &append_state, |
140 | idx_t append_count, bool append_to_table) { |
141 | auto &table = table_ref.get(); |
142 | if (append_to_table) { |
143 | table.InitializeAppend(transaction, state&: append_state, append_count); |
144 | } |
145 | PreservedError error; |
146 | if (append_to_table) { |
147 | // appending: need to scan entire |
148 | row_groups->Scan(transaction, fun: [&](DataChunk &chunk) -> bool { |
149 | // append this chunk to the indexes of the table |
150 | error = table.AppendToIndexes(chunk, row_start: append_state.current_row); |
151 | if (error) { |
152 | return false; |
153 | } |
154 | // append to base table |
155 | table.Append(chunk, state&: append_state); |
156 | return true; |
157 | }); |
158 | } else { |
159 | error = |
160 | AppendToIndexes(transaction, source&: *row_groups, index_list&: table.info->indexes, table_types: table.GetTypes(), start_row&: append_state.current_row); |
161 | } |
162 | if (error) { |
163 | // need to revert the append |
164 | row_t current_row = append_state.row_start; |
165 | // remove the data from the indexes, if there are any indexes |
166 | row_groups->Scan(transaction, fun: [&](DataChunk &chunk) -> bool { |
167 | // append this chunk to the indexes of the table |
168 | try { |
169 | table.RemoveFromIndexes(state&: append_state, chunk, row_start: current_row); |
170 | } catch (Exception &ex) { |
171 | error = PreservedError(ex); |
172 | return false; |
173 | } catch (std::exception &ex) { |
174 | error = PreservedError(ex); |
175 | return false; |
176 | } |
177 | |
178 | current_row += chunk.size(); |
179 | if (current_row >= append_state.current_row) { |
180 | // finished deleting all rows from the index: abort now |
181 | return false; |
182 | } |
183 | return true; |
184 | }); |
185 | if (append_to_table) { |
186 | table.RevertAppendInternal(start_row: append_state.row_start, count: append_count); |
187 | } |
188 | error.Throw(); |
189 | } |
190 | } |
191 | |
192 | OptimisticDataWriter &LocalTableStorage::CreateOptimisticWriter() { |
193 | auto writer = make_uniq<OptimisticDataWriter>(args&: table_ref.get()); |
194 | optimistic_writers.push_back(x: std::move(writer)); |
195 | return *optimistic_writers.back(); |
196 | } |
197 | |
198 | void LocalTableStorage::FinalizeOptimisticWriter(OptimisticDataWriter &writer) { |
199 | // remove the writer from the set of optimistic writers |
200 | unique_ptr<OptimisticDataWriter> owned_writer; |
201 | for (idx_t i = 0; i < optimistic_writers.size(); i++) { |
202 | if (optimistic_writers[i].get() == &writer) { |
203 | owned_writer = std::move(optimistic_writers[i]); |
204 | optimistic_writers.erase(position: optimistic_writers.begin() + i); |
205 | break; |
206 | } |
207 | } |
208 | if (!owned_writer) { |
209 | throw InternalException("Error in FinalizeOptimisticWriter - could not find writer" ); |
210 | } |
211 | optimistic_writer.Merge(other&: *owned_writer); |
212 | } |
213 | |
214 | void LocalTableStorage::Rollback() { |
215 | for (auto &writer : optimistic_writers) { |
216 | writer->Rollback(); |
217 | } |
218 | optimistic_writers.clear(); |
219 | optimistic_writer.Rollback(); |
220 | } |
221 | |
222 | //===--------------------------------------------------------------------===// |
223 | // LocalTableManager |
224 | //===--------------------------------------------------------------------===// |
225 | optional_ptr<LocalTableStorage> LocalTableManager::GetStorage(DataTable &table) { |
226 | lock_guard<mutex> l(table_storage_lock); |
227 | auto entry = table_storage.find(x: table); |
228 | return entry == table_storage.end() ? nullptr : entry->second.get(); |
229 | } |
230 | |
231 | LocalTableStorage &LocalTableManager::GetOrCreateStorage(DataTable &table) { |
232 | lock_guard<mutex> l(table_storage_lock); |
233 | auto entry = table_storage.find(x: table); |
234 | if (entry == table_storage.end()) { |
235 | auto new_storage = make_shared<LocalTableStorage>(args&: table); |
236 | auto storage = new_storage.get(); |
237 | table_storage.insert(x: make_pair(x: reference<DataTable>(table), y: std::move(new_storage))); |
238 | return *storage; |
239 | } else { |
240 | return *entry->second.get(); |
241 | } |
242 | } |
243 | |
244 | bool LocalTableManager::IsEmpty() { |
245 | lock_guard<mutex> l(table_storage_lock); |
246 | return table_storage.empty(); |
247 | } |
248 | |
249 | shared_ptr<LocalTableStorage> LocalTableManager::MoveEntry(DataTable &table) { |
250 | lock_guard<mutex> l(table_storage_lock); |
251 | auto entry = table_storage.find(x: table); |
252 | if (entry == table_storage.end()) { |
253 | return nullptr; |
254 | } |
255 | auto storage_entry = std::move(entry->second); |
256 | table_storage.erase(position: entry); |
257 | return storage_entry; |
258 | } |
259 | |
260 | reference_map_t<DataTable, shared_ptr<LocalTableStorage>> LocalTableManager::MoveEntries() { |
261 | lock_guard<mutex> l(table_storage_lock); |
262 | return std::move(table_storage); |
263 | } |
264 | |
265 | idx_t LocalTableManager::EstimatedSize() { |
266 | lock_guard<mutex> l(table_storage_lock); |
267 | idx_t estimated_size = 0; |
268 | for (auto &storage : table_storage) { |
269 | estimated_size += storage.second->EstimatedSize(); |
270 | } |
271 | return estimated_size; |
272 | } |
273 | |
274 | void LocalTableManager::InsertEntry(DataTable &table, shared_ptr<LocalTableStorage> entry) { |
275 | lock_guard<mutex> l(table_storage_lock); |
276 | D_ASSERT(table_storage.find(table) == table_storage.end()); |
277 | table_storage[table] = std::move(entry); |
278 | } |
279 | |
280 | //===--------------------------------------------------------------------===// |
281 | // LocalStorage |
282 | //===--------------------------------------------------------------------===// |
283 | LocalStorage::LocalStorage(ClientContext &context, DuckTransaction &transaction) |
284 | : context(context), transaction(transaction) { |
285 | } |
286 | |
287 | LocalStorage::CommitState::CommitState() { |
288 | } |
289 | |
290 | LocalStorage::CommitState::~CommitState() { |
291 | } |
292 | |
293 | LocalStorage &LocalStorage::Get(DuckTransaction &transaction) { |
294 | return transaction.GetLocalStorage(); |
295 | } |
296 | |
297 | LocalStorage &LocalStorage::Get(ClientContext &context, AttachedDatabase &db) { |
298 | return DuckTransaction::Get(context, db).GetLocalStorage(); |
299 | } |
300 | |
301 | LocalStorage &LocalStorage::Get(ClientContext &context, Catalog &catalog) { |
302 | return LocalStorage::Get(context, db&: catalog.GetAttached()); |
303 | } |
304 | |
305 | void LocalStorage::InitializeScan(DataTable &table, CollectionScanState &state, |
306 | optional_ptr<TableFilterSet> table_filters) { |
307 | auto storage = table_manager.GetStorage(table); |
308 | if (storage == nullptr) { |
309 | return; |
310 | } |
311 | storage->InitializeScan(state, table_filters); |
312 | } |
313 | |
314 | void LocalStorage::Scan(CollectionScanState &state, const vector<storage_t> &column_ids, DataChunk &result) { |
315 | state.Scan(transaction, result); |
316 | } |
317 | |
318 | void LocalStorage::InitializeParallelScan(DataTable &table, ParallelCollectionScanState &state) { |
319 | auto storage = table_manager.GetStorage(table); |
320 | if (!storage) { |
321 | state.max_row = 0; |
322 | state.vector_index = 0; |
323 | state.current_row_group = nullptr; |
324 | } else { |
325 | storage->row_groups->InitializeParallelScan(state); |
326 | } |
327 | } |
328 | |
329 | bool LocalStorage::NextParallelScan(ClientContext &context, DataTable &table, ParallelCollectionScanState &state, |
330 | CollectionScanState &scan_state) { |
331 | auto storage = table_manager.GetStorage(table); |
332 | if (!storage) { |
333 | return false; |
334 | } |
335 | return storage->row_groups->NextParallelScan(context, state, scan_state); |
336 | } |
337 | |
338 | void LocalStorage::InitializeAppend(LocalAppendState &state, DataTable &table) { |
339 | state.storage = &table_manager.GetOrCreateStorage(table); |
340 | state.storage->row_groups->InitializeAppend(transaction: TransactionData(transaction), state&: state.append_state, append_count: 0); |
341 | } |
342 | |
343 | void LocalStorage::Append(LocalAppendState &state, DataChunk &chunk) { |
344 | // append to unique indices (if any) |
345 | auto storage = state.storage; |
346 | idx_t base_id = MAX_ROW_ID + storage->row_groups->GetTotalRows() + state.append_state.total_append_count; |
347 | auto error = DataTable::AppendToIndexes(indexes&: storage->indexes, chunk, row_start: base_id); |
348 | if (error) { |
349 | error.Throw(); |
350 | } |
351 | |
352 | //! Append the chunk to the local storage |
353 | auto new_row_group = storage->row_groups->Append(chunk, state&: state.append_state); |
354 | |
355 | //! Check if we should pre-emptively flush blocks to disk |
356 | if (new_row_group) { |
357 | storage->WriteNewRowGroup(); |
358 | } |
359 | } |
360 | |
361 | void LocalStorage::FinalizeAppend(LocalAppendState &state) { |
362 | state.storage->row_groups->FinalizeAppend(transaction: state.append_state.transaction, state&: state.append_state); |
363 | } |
364 | |
365 | void LocalStorage::LocalMerge(DataTable &table, RowGroupCollection &collection) { |
366 | auto &storage = table_manager.GetOrCreateStorage(table); |
367 | if (!storage.indexes.Empty()) { |
368 | // append data to indexes if required |
369 | row_t base_id = MAX_ROW_ID + storage.row_groups->GetTotalRows(); |
370 | auto error = storage.AppendToIndexes(transaction, source&: collection, index_list&: storage.indexes, table_types: table.GetTypes(), start_row&: base_id); |
371 | if (error) { |
372 | error.Throw(); |
373 | } |
374 | } |
375 | storage.row_groups->MergeStorage(data&: collection); |
376 | storage.merged_storage = true; |
377 | } |
378 | |
379 | OptimisticDataWriter &LocalStorage::CreateOptimisticWriter(DataTable &table) { |
380 | auto &storage = table_manager.GetOrCreateStorage(table); |
381 | return storage.CreateOptimisticWriter(); |
382 | } |
383 | |
384 | void LocalStorage::FinalizeOptimisticWriter(DataTable &table, OptimisticDataWriter &writer) { |
385 | auto &storage = table_manager.GetOrCreateStorage(table); |
386 | storage.FinalizeOptimisticWriter(writer); |
387 | } |
388 | |
389 | bool LocalStorage::ChangesMade() noexcept { |
390 | return !table_manager.IsEmpty(); |
391 | } |
392 | |
393 | bool LocalStorage::Find(DataTable &table) { |
394 | return table_manager.GetStorage(table) != nullptr; |
395 | } |
396 | |
397 | idx_t LocalStorage::EstimatedSize() { |
398 | return table_manager.EstimatedSize(); |
399 | } |
400 | |
401 | idx_t LocalStorage::Delete(DataTable &table, Vector &row_ids, idx_t count) { |
402 | auto storage = table_manager.GetStorage(table); |
403 | D_ASSERT(storage); |
404 | |
405 | // delete from unique indices (if any) |
406 | if (!storage->indexes.Empty()) { |
407 | storage->row_groups->RemoveFromIndexes(indexes&: storage->indexes, row_identifiers&: row_ids, count); |
408 | } |
409 | |
410 | auto ids = FlatVector::GetData<row_t>(vector&: row_ids); |
411 | idx_t delete_count = storage->row_groups->Delete(transaction: TransactionData(0, 0), table, ids, count); |
412 | storage->deleted_rows += delete_count; |
413 | return delete_count; |
414 | } |
415 | |
416 | void LocalStorage::Update(DataTable &table, Vector &row_ids, const vector<PhysicalIndex> &column_ids, |
417 | DataChunk &updates) { |
418 | auto storage = table_manager.GetStorage(table); |
419 | D_ASSERT(storage); |
420 | |
421 | auto ids = FlatVector::GetData<row_t>(vector&: row_ids); |
422 | storage->row_groups->Update(transaction: TransactionData(0, 0), ids, column_ids, updates); |
423 | } |
424 | |
425 | void LocalStorage::Flush(DataTable &table, LocalTableStorage &storage) { |
426 | if (storage.row_groups->GetTotalRows() <= storage.deleted_rows) { |
427 | return; |
428 | } |
429 | idx_t append_count = storage.row_groups->GetTotalRows() - storage.deleted_rows; |
430 | |
431 | TableAppendState append_state; |
432 | table.AppendLock(state&: append_state); |
433 | if ((append_state.row_start == 0 || storage.row_groups->GetTotalRows() >= MERGE_THRESHOLD) && |
434 | storage.deleted_rows == 0) { |
435 | // table is currently empty OR we are bulk appending: move over the storage directly |
436 | // first flush any outstanding blocks |
437 | storage.FlushBlocks(); |
438 | // now append to the indexes (if there are any) |
439 | // FIXME: we should be able to merge the transaction-local index directly into the main table index |
440 | // as long we just rewrite some row-ids |
441 | if (!table.info->indexes.Empty()) { |
442 | storage.AppendToIndexes(transaction, append_state, append_count, append_to_table: false); |
443 | } |
444 | // finally move over the row groups |
445 | table.MergeStorage(data&: *storage.row_groups, indexes&: storage.indexes); |
446 | } else { |
447 | // check if we have written data |
448 | // if we have, we cannot merge to disk after all |
449 | // so we need to revert the data we have already written |
450 | storage.Rollback(); |
451 | // append to the indexes and append to the base table |
452 | storage.AppendToIndexes(transaction, append_state, append_count, append_to_table: true); |
453 | } |
454 | transaction.PushAppend(table, row_start: append_state.row_start, row_count: append_count); |
455 | |
456 | // possibly vacuum any excess index data |
457 | table.info->indexes.Scan(callback: [&](Index &index) { |
458 | index.Vacuum(); |
459 | return false; |
460 | }); |
461 | } |
462 | |
463 | void LocalStorage::Commit(LocalStorage::CommitState &commit_state, DuckTransaction &transaction) { |
464 | // commit local storage |
465 | // iterate over all entries in the table storage map and commit them |
466 | // after this, the local storage is no longer required and can be cleared |
467 | auto table_storage = table_manager.MoveEntries(); |
468 | for (auto &entry : table_storage) { |
469 | auto table = entry.first; |
470 | auto storage = entry.second.get(); |
471 | Flush(table, storage&: *storage); |
472 | entry.second.reset(); |
473 | } |
474 | } |
475 | |
476 | void LocalStorage::Rollback() { |
477 | // rollback local storage |
478 | // after this, the local storage is no longer required and can be cleared |
479 | auto table_storage = table_manager.MoveEntries(); |
480 | for (auto &entry : table_storage) { |
481 | auto storage = entry.second.get(); |
482 | if (!storage) { |
483 | continue; |
484 | } |
485 | storage->Rollback(); |
486 | |
487 | entry.second.reset(); |
488 | } |
489 | } |
490 | |
491 | idx_t LocalStorage::AddedRows(DataTable &table) { |
492 | auto storage = table_manager.GetStorage(table); |
493 | if (!storage) { |
494 | return 0; |
495 | } |
496 | return storage->row_groups->GetTotalRows() - storage->deleted_rows; |
497 | } |
498 | |
499 | void LocalStorage::MoveStorage(DataTable &old_dt, DataTable &new_dt) { |
500 | // check if there are any pending appends for the old version of the table |
501 | auto new_storage = table_manager.MoveEntry(table&: old_dt); |
502 | if (!new_storage) { |
503 | return; |
504 | } |
505 | // take over the storage from the old entry |
506 | new_storage->table_ref = new_dt; |
507 | table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage)); |
508 | } |
509 | |
510 | void LocalStorage::AddColumn(DataTable &old_dt, DataTable &new_dt, ColumnDefinition &new_column, |
511 | optional_ptr<Expression> default_value) { |
512 | // check if there are any pending appends for the old version of the table |
513 | auto storage = table_manager.MoveEntry(table&: old_dt); |
514 | if (!storage) { |
515 | return; |
516 | } |
517 | auto new_storage = make_shared<LocalTableStorage>(args&: context, args&: new_dt, args&: *storage, args&: new_column, args&: default_value); |
518 | table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage)); |
519 | } |
520 | |
521 | void LocalStorage::DropColumn(DataTable &old_dt, DataTable &new_dt, idx_t removed_column) { |
522 | // check if there are any pending appends for the old version of the table |
523 | auto storage = table_manager.MoveEntry(table&: old_dt); |
524 | if (!storage) { |
525 | return; |
526 | } |
527 | auto new_storage = make_shared<LocalTableStorage>(args&: new_dt, args&: *storage, args&: removed_column); |
528 | table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage)); |
529 | } |
530 | |
531 | void LocalStorage::ChangeType(DataTable &old_dt, DataTable &new_dt, idx_t changed_idx, const LogicalType &target_type, |
532 | const vector<column_t> &bound_columns, Expression &cast_expr) { |
533 | // check if there are any pending appends for the old version of the table |
534 | auto storage = table_manager.MoveEntry(table&: old_dt); |
535 | if (!storage) { |
536 | return; |
537 | } |
538 | auto new_storage = |
539 | make_shared<LocalTableStorage>(args&: context, args&: new_dt, args&: *storage, args&: changed_idx, args: target_type, args: bound_columns, args&: cast_expr); |
540 | table_manager.InsertEntry(table&: new_dt, entry: std::move(new_storage)); |
541 | } |
542 | |
543 | void LocalStorage::FetchChunk(DataTable &table, Vector &row_ids, idx_t count, const vector<column_t> &col_ids, |
544 | DataChunk &chunk, ColumnFetchState &fetch_state) { |
545 | auto storage = table_manager.GetStorage(table); |
546 | if (!storage) { |
547 | throw InternalException("LocalStorage::FetchChunk - local storage not found" ); |
548 | } |
549 | |
550 | storage->row_groups->Fetch(transaction, result&: chunk, column_ids: col_ids, row_identifiers: row_ids, fetch_count: count, state&: fetch_state); |
551 | } |
552 | |
553 | TableIndexList &LocalStorage::GetIndexes(DataTable &table) { |
554 | auto storage = table_manager.GetStorage(table); |
555 | if (!storage) { |
556 | throw InternalException("LocalStorage::GetIndexes - local storage not found" ); |
557 | } |
558 | return storage->indexes; |
559 | } |
560 | |
561 | void LocalStorage::VerifyNewConstraint(DataTable &parent, const BoundConstraint &constraint) { |
562 | auto storage = table_manager.GetStorage(table&: parent); |
563 | if (!storage) { |
564 | return; |
565 | } |
566 | storage->row_groups->VerifyNewConstraint(parent, constraint); |
567 | } |
568 | |
569 | } // namespace duckdb |
570 | |