1#include "duckdb/storage/data_table.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/chrono.hpp"
5#include "duckdb/common/exception.hpp"
6#include "duckdb/common/helper.hpp"
7#include "duckdb/common/vector_operations/vector_operations.hpp"
8#include "duckdb/execution/expression_executor.hpp"
9#include "duckdb/main/client_context.hpp"
10#include "duckdb/parser/constraints/list.hpp"
11#include "duckdb/planner/constraints/list.hpp"
12#include "duckdb/planner/expression_binder/check_binder.hpp"
13#include "duckdb/planner/table_filter.hpp"
14#include "duckdb/storage/checkpoint/table_data_writer.hpp"
15#include "duckdb/storage/storage_manager.hpp"
16#include "duckdb/storage/table_storage_info.hpp"
17#include "duckdb/storage/table/persistent_table_data.hpp"
18#include "duckdb/storage/table/row_group.hpp"
19#include "duckdb/storage/table/standard_column_data.hpp"
20#include "duckdb/transaction/duck_transaction.hpp"
21#include "duckdb/transaction/transaction_manager.hpp"
22#include "duckdb/execution/index/art/art.hpp"
23#include "duckdb/main/attached_database.hpp"
24#include "duckdb/common/types/conflict_manager.hpp"
25#include "duckdb/common/types/constraint_conflict_info.hpp"
26#include "duckdb/storage/table/append_state.hpp"
27#include "duckdb/storage/table/scan_state.hpp"
28
29namespace duckdb {
30
31DataTableInfo::DataTableInfo(AttachedDatabase &db, shared_ptr<TableIOManager> table_io_manager_p, string schema,
32 string table)
33 : db(db), table_io_manager(std::move(table_io_manager_p)), cardinality(0), schema(std::move(schema)),
34 table(std::move(table)) {
35}
36
37bool DataTableInfo::IsTemporary() const {
38 return db.IsTemporary();
39}
40
41DataTable::DataTable(AttachedDatabase &db, shared_ptr<TableIOManager> table_io_manager_p, const string &schema,
42 const string &table, vector<ColumnDefinition> column_definitions_p,
43 unique_ptr<PersistentTableData> data)
44 : info(make_shared<DataTableInfo>(args&: db, args: std::move(table_io_manager_p), args: schema, args: table)),
45 column_definitions(std::move(column_definitions_p)), db(db), is_root(true) {
46 // initialize the table with the existing data from disk, if any
47 auto types = GetTypes();
48 this->row_groups =
49 make_shared<RowGroupCollection>(args&: info, args&: TableIOManager::Get(table&: *this).GetBlockManagerForRowData(), args&: types, args: 0);
50 if (data && data->row_group_count > 0) {
51 this->row_groups->Initialize(data&: *data);
52 } else {
53 this->row_groups->InitializeEmpty();
54 D_ASSERT(row_groups->GetTotalRows() == 0);
55 }
56 row_groups->Verify();
57}
58
59DataTable::DataTable(ClientContext &context, DataTable &parent, ColumnDefinition &new_column, Expression *default_value)
60 : info(parent.info), db(parent.db), is_root(true) {
61 // add the column definitions from this DataTable
62 for (auto &column_def : parent.column_definitions) {
63 column_definitions.emplace_back(args: column_def.Copy());
64 }
65 column_definitions.emplace_back(args: new_column.Copy());
66 // prevent any new tuples from being added to the parent
67 lock_guard<mutex> parent_lock(parent.append_lock);
68
69 this->row_groups = parent.row_groups->AddColumn(context, new_column, default_value);
70
71 // also add this column to client local storage
72 auto &local_storage = LocalStorage::Get(context, db);
73 local_storage.AddColumn(old_dt&: parent, new_dt&: *this, new_column, default_value);
74
75 // this table replaces the previous table, hence the parent is no longer the root DataTable
76 parent.is_root = false;
77}
78
79DataTable::DataTable(ClientContext &context, DataTable &parent, idx_t removed_column)
80 : info(parent.info), db(parent.db), is_root(true) {
81 // prevent any new tuples from being added to the parent
82 lock_guard<mutex> parent_lock(parent.append_lock);
83
84 for (auto &column_def : parent.column_definitions) {
85 column_definitions.emplace_back(args: column_def.Copy());
86 }
87 // first check if there are any indexes that exist that point to the removed column
88 info->indexes.Scan(callback: [&](Index &index) {
89 for (auto &column_id : index.column_ids) {
90 if (column_id == removed_column) {
91 throw CatalogException("Cannot drop this column: an index depends on it!");
92 } else if (column_id > removed_column) {
93 throw CatalogException("Cannot drop this column: an index depends on a column after it!");
94 }
95 }
96 return false;
97 });
98
99 // erase the column definitions from this DataTable
100 D_ASSERT(removed_column < column_definitions.size());
101 column_definitions.erase(position: column_definitions.begin() + removed_column);
102
103 storage_t storage_idx = 0;
104 for (idx_t i = 0; i < column_definitions.size(); i++) {
105 auto &col = column_definitions[i];
106 col.SetOid(i);
107 if (col.Generated()) {
108 continue;
109 }
110 col.SetStorageOid(storage_idx++);
111 }
112
113 // alter the row_groups and remove the column from each of them
114 this->row_groups = parent.row_groups->RemoveColumn(col_idx: removed_column);
115
116 // scan the original table, and fill the new column with the transformed value
117 auto &local_storage = LocalStorage::Get(context, db);
118 local_storage.DropColumn(old_dt&: parent, new_dt&: *this, removed_column);
119
120 // this table replaces the previous table, hence the parent is no longer the root DataTable
121 parent.is_root = false;
122}
123
124// Alter column to add new constraint
125DataTable::DataTable(ClientContext &context, DataTable &parent, unique_ptr<BoundConstraint> constraint)
126 : info(parent.info), db(parent.db), row_groups(parent.row_groups), is_root(true) {
127
128 lock_guard<mutex> parent_lock(parent.append_lock);
129 for (auto &column_def : parent.column_definitions) {
130 column_definitions.emplace_back(args: column_def.Copy());
131 }
132
133 // Verify the new constraint against current persistent/local data
134 VerifyNewConstraint(context, parent, constraint: constraint.get());
135
136 // Get the local data ownership from old dt
137 auto &local_storage = LocalStorage::Get(context, db);
138 local_storage.MoveStorage(old_dt&: parent, new_dt&: *this);
139 // this table replaces the previous table, hence the parent is no longer the root DataTable
140 parent.is_root = false;
141}
142
143DataTable::DataTable(ClientContext &context, DataTable &parent, idx_t changed_idx, const LogicalType &target_type,
144 const vector<column_t> &bound_columns, Expression &cast_expr)
145 : info(parent.info), db(parent.db), is_root(true) {
146 // prevent any tuples from being added to the parent
147 lock_guard<mutex> lock(append_lock);
148 for (auto &column_def : parent.column_definitions) {
149 column_definitions.emplace_back(args: column_def.Copy());
150 }
151 // first check if there are any indexes that exist that point to the changed column
152 info->indexes.Scan(callback: [&](Index &index) {
153 for (auto &column_id : index.column_ids) {
154 if (column_id == changed_idx) {
155 throw CatalogException("Cannot change the type of this column: an index depends on it!");
156 }
157 }
158 return false;
159 });
160
161 // change the type in this DataTable
162 column_definitions[changed_idx].SetType(target_type);
163
164 // set up the statistics for the table
165 // the column that had its type changed will have the new statistics computed during conversion
166 this->row_groups = parent.row_groups->AlterType(context, changed_idx, target_type, bound_columns, cast_expr);
167
168 // scan the original table, and fill the new column with the transformed value
169 auto &local_storage = LocalStorage::Get(context, db);
170 local_storage.ChangeType(old_dt&: parent, new_dt&: *this, changed_idx, target_type, bound_columns, cast_expr);
171
172 // this table replaces the previous table, hence the parent is no longer the root DataTable
173 parent.is_root = false;
174}
175
176vector<LogicalType> DataTable::GetTypes() {
177 vector<LogicalType> types;
178 for (auto &it : column_definitions) {
179 types.push_back(x: it.Type());
180 }
181 return types;
182}
183
184TableIOManager &TableIOManager::Get(DataTable &table) {
185 return *table.info->table_io_manager;
186}
187
188//===--------------------------------------------------------------------===//
189// Scan
190//===--------------------------------------------------------------------===//
191void DataTable::InitializeScan(TableScanState &state, const vector<column_t> &column_ids,
192 TableFilterSet *table_filters) {
193 state.Initialize(column_ids, table_filters);
194 row_groups->InitializeScan(state&: state.table_state, column_ids, table_filters);
195}
196
197void DataTable::InitializeScan(DuckTransaction &transaction, TableScanState &state, const vector<column_t> &column_ids,
198 TableFilterSet *table_filters) {
199 InitializeScan(state, column_ids, table_filters);
200 auto &local_storage = LocalStorage::Get(transaction);
201 local_storage.InitializeScan(table&: *this, state&: state.local_state, table_filters);
202}
203
204void DataTable::InitializeScanWithOffset(TableScanState &state, const vector<column_t> &column_ids, idx_t start_row,
205 idx_t end_row) {
206 state.Initialize(column_ids);
207 row_groups->InitializeScanWithOffset(state&: state.table_state, column_ids, start_row, end_row);
208}
209
210idx_t DataTable::MaxThreads(ClientContext &context) {
211 idx_t parallel_scan_vector_count = RowGroup::ROW_GROUP_VECTOR_COUNT;
212 if (ClientConfig::GetConfig(context).verify_parallelism) {
213 parallel_scan_vector_count = 1;
214 }
215 idx_t parallel_scan_tuple_count = STANDARD_VECTOR_SIZE * parallel_scan_vector_count;
216 return GetTotalRows() / parallel_scan_tuple_count + 1;
217}
218
219void DataTable::InitializeParallelScan(ClientContext &context, ParallelTableScanState &state) {
220 row_groups->InitializeParallelScan(state&: state.scan_state);
221
222 auto &local_storage = LocalStorage::Get(context, db);
223 local_storage.InitializeParallelScan(table&: *this, state&: state.local_state);
224}
225
226bool DataTable::NextParallelScan(ClientContext &context, ParallelTableScanState &state, TableScanState &scan_state) {
227 if (row_groups->NextParallelScan(context, state&: state.scan_state, scan_state&: scan_state.table_state)) {
228 return true;
229 }
230 scan_state.table_state.batch_index = state.scan_state.batch_index;
231 auto &local_storage = LocalStorage::Get(context, db);
232 if (local_storage.NextParallelScan(context, table&: *this, state&: state.local_state, scan_state&: scan_state.local_state)) {
233 return true;
234 } else {
235 // finished all scans: no more scans remaining
236 return false;
237 }
238}
239
240void DataTable::Scan(DuckTransaction &transaction, DataChunk &result, TableScanState &state) {
241 // scan the persistent segments
242 if (state.table_state.Scan(transaction, result)) {
243 D_ASSERT(result.size() > 0);
244 return;
245 }
246
247 // scan the transaction-local segments
248 auto &local_storage = LocalStorage::Get(transaction);
249 local_storage.Scan(state&: state.local_state, column_ids: state.GetColumnIds(), result);
250}
251
252bool DataTable::CreateIndexScan(TableScanState &state, DataChunk &result, TableScanType type) {
253 return state.table_state.ScanCommitted(result, type);
254}
255
256//===--------------------------------------------------------------------===//
257// Fetch
258//===--------------------------------------------------------------------===//
259void DataTable::Fetch(DuckTransaction &transaction, DataChunk &result, const vector<column_t> &column_ids,
260 const Vector &row_identifiers, idx_t fetch_count, ColumnFetchState &state) {
261 row_groups->Fetch(transaction, result, column_ids, row_identifiers, fetch_count, state);
262}
263
264//===--------------------------------------------------------------------===//
265// Append
266//===--------------------------------------------------------------------===//
267static void VerifyNotNullConstraint(TableCatalogEntry &table, Vector &vector, idx_t count, const string &col_name) {
268 if (!VectorOperations::HasNull(input&: vector, count)) {
269 return;
270 }
271
272 throw ConstraintException("NOT NULL constraint failed: %s.%s", table.name, col_name);
273}
274
275// To avoid throwing an error at SELECT, instead this moves the error detection to INSERT
276static void VerifyGeneratedExpressionSuccess(ClientContext &context, TableCatalogEntry &table, DataChunk &chunk,
277 Expression &expr, column_t index) {
278 auto &col = table.GetColumn(idx: LogicalIndex(index));
279 D_ASSERT(col.Generated());
280 ExpressionExecutor executor(context, expr);
281 Vector result(col.Type());
282 try {
283 executor.ExecuteExpression(input&: chunk, result);
284 } catch (InternalException &ex) {
285 throw;
286 } catch (std::exception &ex) {
287 throw ConstraintException("Incorrect value for generated column \"%s %s AS (%s)\" : %s", col.Name(),
288 col.Type().ToString(), col.GeneratedExpression().ToString(), ex.what());
289 }
290}
291
292static void VerifyCheckConstraint(ClientContext &context, TableCatalogEntry &table, Expression &expr,
293 DataChunk &chunk) {
294 ExpressionExecutor executor(context, expr);
295 Vector result(LogicalType::INTEGER);
296 try {
297 executor.ExecuteExpression(input&: chunk, result);
298 } catch (std::exception &ex) {
299 throw ConstraintException("CHECK constraint failed: %s (Error: %s)", table.name, ex.what());
300 } catch (...) { // LCOV_EXCL_START
301 throw ConstraintException("CHECK constraint failed: %s (Unknown Error)", table.name);
302 } // LCOV_EXCL_STOP
303 UnifiedVectorFormat vdata;
304 result.ToUnifiedFormat(count: chunk.size(), data&: vdata);
305
306 auto dataptr = UnifiedVectorFormat::GetData<int32_t>(format: vdata);
307 for (idx_t i = 0; i < chunk.size(); i++) {
308 auto idx = vdata.sel->get_index(idx: i);
309 if (vdata.validity.RowIsValid(row_idx: idx) && dataptr[idx] == 0) {
310 throw ConstraintException("CHECK constraint failed: %s", table.name);
311 }
312 }
313}
314
315bool DataTable::IsForeignKeyIndex(const vector<PhysicalIndex> &fk_keys, Index &index, ForeignKeyType fk_type) {
316 if (fk_type == ForeignKeyType::FK_TYPE_PRIMARY_KEY_TABLE ? !index.IsUnique() : !index.IsForeign()) {
317 return false;
318 }
319 if (fk_keys.size() != index.column_ids.size()) {
320 return false;
321 }
322 for (auto &fk_key : fk_keys) {
323 bool is_found = false;
324 for (auto &index_key : index.column_ids) {
325 if (fk_key.index == index_key) {
326 is_found = true;
327 break;
328 }
329 }
330 if (!is_found) {
331 return false;
332 }
333 }
334 return true;
335}
336
337// Find the first index that is not null, and did not find a match
338static idx_t FirstMissingMatch(const ManagedSelection &matches) {
339 idx_t match_idx = 0;
340
341 for (idx_t i = 0; i < matches.Size(); i++) {
342 auto match = matches.IndexMapsToLocation(idx: match_idx, location: i);
343 match_idx += match;
344 if (!match) {
345 // This index is missing in the matches vector
346 return i;
347 }
348 }
349 return DConstants::INVALID_INDEX;
350}
351
352idx_t LocateErrorIndex(bool is_append, const ManagedSelection &matches) {
353 idx_t failed_index = DConstants::INVALID_INDEX;
354 if (!is_append) {
355 // We expected to find nothing, so the first error is the first match
356 failed_index = matches[0];
357 } else {
358 // We expected to find matches for all of them, so the first missing match is the first error
359 return FirstMissingMatch(matches);
360 }
361 return failed_index;
362}
363
364[[noreturn]] static void ThrowForeignKeyConstraintError(idx_t failed_index, bool is_append, Index &index,
365 DataChunk &input) {
366 auto verify_type = is_append ? VerifyExistenceType::APPEND_FK : VerifyExistenceType::DELETE_FK;
367
368 D_ASSERT(failed_index != DConstants::INVALID_INDEX);
369 D_ASSERT(index.type == IndexType::ART);
370 auto &art_index = index.Cast<ART>();
371 auto key_name = art_index.GenerateErrorKeyName(input, row: failed_index);
372 auto exception_msg = art_index.GenerateConstraintErrorMessage(verify_type, key_name);
373 throw ConstraintException(exception_msg);
374}
375
376bool IsForeignKeyConstraintError(bool is_append, idx_t input_count, const ManagedSelection &matches) {
377 if (is_append) {
378 // We need to find a match for all of the values
379 return matches.Count() != input_count;
380 } else {
381 // We should not find any matches
382 return matches.Count() != 0;
383 }
384}
385
386static bool IsAppend(VerifyExistenceType verify_type) {
387 return verify_type == VerifyExistenceType::APPEND_FK;
388}
389
390void DataTable::VerifyForeignKeyConstraint(const BoundForeignKeyConstraint &bfk, ClientContext &context,
391 DataChunk &chunk, VerifyExistenceType verify_type) {
392 const vector<PhysicalIndex> *src_keys_ptr = &bfk.info.fk_keys;
393 const vector<PhysicalIndex> *dst_keys_ptr = &bfk.info.pk_keys;
394
395 bool is_append = IsAppend(verify_type);
396 if (!is_append) {
397 src_keys_ptr = &bfk.info.pk_keys;
398 dst_keys_ptr = &bfk.info.fk_keys;
399 }
400
401 auto &table_entry_ptr =
402 Catalog::GetEntry<TableCatalogEntry>(context, INVALID_CATALOG, schema_name: bfk.info.schema, name: bfk.info.table);
403 // make the data chunk to check
404 vector<LogicalType> types;
405 for (auto &col : table_entry_ptr.GetColumns().Physical()) {
406 types.emplace_back(args: col.Type());
407 }
408 DataChunk dst_chunk;
409 dst_chunk.InitializeEmpty(types);
410 for (idx_t i = 0; i < src_keys_ptr->size(); i++) {
411 dst_chunk.data[(*dst_keys_ptr)[i].index].Reference(other&: chunk.data[(*src_keys_ptr)[i].index]);
412 }
413 dst_chunk.SetCardinality(chunk.size());
414 auto &data_table = table_entry_ptr.GetStorage();
415
416 idx_t count = dst_chunk.size();
417 if (count <= 0) {
418 return;
419 }
420
421 // Set up a way to record conflicts, rather than directly throw on them
422 unordered_set<column_t> empty_column_list;
423 ConflictInfo empty_conflict_info(empty_column_list, false);
424 ConflictManager regular_conflicts(verify_type, count, &empty_conflict_info);
425 ConflictManager transaction_conflicts(verify_type, count, &empty_conflict_info);
426 regular_conflicts.SetMode(ConflictManagerMode::SCAN);
427 transaction_conflicts.SetMode(ConflictManagerMode::SCAN);
428
429 data_table.info->indexes.VerifyForeignKey(fk_keys: *dst_keys_ptr, chunk&: dst_chunk, conflict_manager&: regular_conflicts);
430 regular_conflicts.Finalize();
431 auto &regular_matches = regular_conflicts.Conflicts();
432 // check whether or not the chunk can be inserted or deleted into the referenced table' transaction local storage
433 auto &local_storage = LocalStorage::Get(context, db);
434
435 bool error = IsForeignKeyConstraintError(is_append, input_count: count, matches: regular_matches);
436 bool transaction_error = false;
437
438 bool transaction_check = local_storage.Find(table&: data_table);
439 if (transaction_check) {
440 auto &transact_index = local_storage.GetIndexes(table&: data_table);
441 transact_index.VerifyForeignKey(fk_keys: *dst_keys_ptr, chunk&: dst_chunk, conflict_manager&: transaction_conflicts);
442 transaction_conflicts.Finalize();
443 auto &transaction_matches = transaction_conflicts.Conflicts();
444 transaction_error = IsForeignKeyConstraintError(is_append, input_count: count, matches: transaction_matches);
445 }
446
447 if (!transaction_error && !error) {
448 // No error occurred;
449 return;
450 }
451
452 // Some error occurred, and we likely want to throw
453 optional_ptr<Index> index;
454 optional_ptr<Index> transaction_index;
455
456 auto fk_type = is_append ? ForeignKeyType::FK_TYPE_PRIMARY_KEY_TABLE : ForeignKeyType::FK_TYPE_FOREIGN_KEY_TABLE;
457 // check whether or not the chunk can be inserted or deleted into the referenced table' storage
458 index = data_table.info->indexes.FindForeignKeyIndex(fk_keys: *dst_keys_ptr, fk_type);
459 if (transaction_check) {
460 auto &transact_index = local_storage.GetIndexes(table&: data_table);
461 // check whether or not the chunk can be inserted or deleted into the referenced table' storage
462 transaction_index = transact_index.FindForeignKeyIndex(fk_keys: *dst_keys_ptr, fk_type);
463 }
464
465 if (!transaction_check) {
466 // Only local state is checked, throw the error
467 D_ASSERT(error);
468 auto failed_index = LocateErrorIndex(is_append, matches: regular_matches);
469 D_ASSERT(failed_index != DConstants::INVALID_INDEX);
470 ThrowForeignKeyConstraintError(failed_index, is_append, index&: *index, input&: dst_chunk);
471 }
472 if (transaction_error && error && is_append) {
473 // When we want to do an append, we only throw if the foreign key does not exist in both transaction and local
474 // storage
475 auto &transaction_matches = transaction_conflicts.Conflicts();
476 idx_t failed_index = DConstants::INVALID_INDEX;
477 idx_t regular_idx = 0;
478 idx_t transaction_idx = 0;
479 for (idx_t i = 0; i < count; i++) {
480 bool in_regular = regular_matches.IndexMapsToLocation(idx: regular_idx, location: i);
481 regular_idx += in_regular;
482 bool in_transaction = transaction_matches.IndexMapsToLocation(idx: transaction_idx, location: i);
483 transaction_idx += in_transaction;
484
485 if (!in_regular && !in_transaction) {
486 // We need to find a match for all of the input values
487 // The failed index is i, it does not show up in either regular or transaction storage
488 failed_index = i;
489 break;
490 }
491 }
492 if (failed_index == DConstants::INVALID_INDEX) {
493 // We don't throw, every value was present in either regular or transaction storage
494 return;
495 }
496 ThrowForeignKeyConstraintError(failed_index, is_append: true, index&: *index, input&: dst_chunk);
497 }
498 if (!is_append && transaction_check) {
499 auto &transaction_matches = transaction_conflicts.Conflicts();
500 if (error) {
501 auto failed_index = LocateErrorIndex(is_append: false, matches: regular_matches);
502 D_ASSERT(failed_index != DConstants::INVALID_INDEX);
503 ThrowForeignKeyConstraintError(failed_index, is_append: false, index&: *index, input&: dst_chunk);
504 } else {
505 D_ASSERT(transaction_error);
506 D_ASSERT(transaction_matches.Count() != DConstants::INVALID_INDEX);
507 auto failed_index = LocateErrorIndex(is_append: false, matches: transaction_matches);
508 D_ASSERT(failed_index != DConstants::INVALID_INDEX);
509 ThrowForeignKeyConstraintError(failed_index, is_append: false, index&: *transaction_index, input&: dst_chunk);
510 }
511 }
512}
513
514void DataTable::VerifyAppendForeignKeyConstraint(const BoundForeignKeyConstraint &bfk, ClientContext &context,
515 DataChunk &chunk) {
516 VerifyForeignKeyConstraint(bfk, context, chunk, verify_type: VerifyExistenceType::APPEND_FK);
517}
518
519void DataTable::VerifyDeleteForeignKeyConstraint(const BoundForeignKeyConstraint &bfk, ClientContext &context,
520 DataChunk &chunk) {
521 VerifyForeignKeyConstraint(bfk, context, chunk, verify_type: VerifyExistenceType::DELETE_FK);
522}
523
524void DataTable::VerifyNewConstraint(ClientContext &context, DataTable &parent, const BoundConstraint *constraint) {
525 if (constraint->type != ConstraintType::NOT_NULL) {
526 throw NotImplementedException("FIXME: ALTER COLUMN with such constraint is not supported yet");
527 }
528
529 parent.row_groups->VerifyNewConstraint(parent, constraint: *constraint);
530 auto &local_storage = LocalStorage::Get(context, db);
531 local_storage.VerifyNewConstraint(parent, constraint: *constraint);
532}
533
534bool HasUniqueIndexes(TableIndexList &list) {
535 bool has_unique_index = false;
536 list.Scan(callback: [&](Index &index) {
537 if (index.IsUnique()) {
538 return has_unique_index = true;
539 return true;
540 }
541 return false;
542 });
543 return has_unique_index;
544}
545
546void DataTable::VerifyUniqueIndexes(TableIndexList &indexes, ClientContext &context, DataChunk &chunk,
547 ConflictManager *conflict_manager) {
548 //! check whether or not the chunk can be inserted into the indexes
549 if (!conflict_manager) {
550 // Only need to verify that no unique constraints are violated
551 indexes.Scan(callback: [&](Index &index) {
552 if (!index.IsUnique()) {
553 return false;
554 }
555 index.VerifyAppend(chunk);
556 return false;
557 });
558 return;
559 }
560
561 D_ASSERT(conflict_manager);
562 // The conflict manager is only provided when a ON CONFLICT clause was provided to the INSERT statement
563
564 idx_t matching_indexes = 0;
565 auto &conflict_info = conflict_manager->GetConflictInfo();
566 // First we figure out how many indexes match our conflict target
567 // So we can optimize accordingly
568 indexes.Scan(callback: [&](Index &index) {
569 matching_indexes += conflict_info.ConflictTargetMatches(index);
570 return false;
571 });
572 conflict_manager->SetMode(ConflictManagerMode::SCAN);
573 conflict_manager->SetIndexCount(matching_indexes);
574 // First we verify only the indexes that match our conflict target
575 unordered_set<Index *> checked_indexes;
576 indexes.Scan(callback: [&](Index &index) {
577 if (!index.IsUnique()) {
578 return false;
579 }
580 if (conflict_info.ConflictTargetMatches(index)) {
581 index.VerifyAppend(chunk, conflict_manager&: *conflict_manager);
582 checked_indexes.insert(x: &index);
583 }
584 return false;
585 });
586
587 conflict_manager->SetMode(ConflictManagerMode::THROW);
588 // Then we scan the other indexes, throwing if they cause conflicts on tuples that were not found during
589 // the scan
590 indexes.Scan(callback: [&](Index &index) {
591 if (!index.IsUnique()) {
592 return false;
593 }
594 if (checked_indexes.count(x: &index)) {
595 // Already checked this constraint
596 return false;
597 }
598 index.VerifyAppend(chunk, conflict_manager&: *conflict_manager);
599 return false;
600 });
601}
602
603void DataTable::VerifyAppendConstraints(TableCatalogEntry &table, ClientContext &context, DataChunk &chunk,
604 ConflictManager *conflict_manager) {
605 if (table.HasGeneratedColumns()) {
606 // Verify that the generated columns expression work with the inserted values
607 auto binder = Binder::CreateBinder(context);
608 physical_index_set_t bound_columns;
609 CheckBinder generated_check_binder(*binder, context, table.name, table.GetColumns(), bound_columns);
610 for (auto &col : table.GetColumns().Logical()) {
611 if (!col.Generated()) {
612 continue;
613 }
614 D_ASSERT(col.Type().id() != LogicalTypeId::ANY);
615 generated_check_binder.target_type = col.Type();
616 auto to_be_bound_expression = col.GeneratedExpression().Copy();
617 auto bound_expression = generated_check_binder.Bind(expr&: to_be_bound_expression);
618 VerifyGeneratedExpressionSuccess(context, table, chunk, expr&: *bound_expression, index: col.Oid());
619 }
620 }
621
622 if (HasUniqueIndexes(list&: info->indexes)) {
623 VerifyUniqueIndexes(indexes&: info->indexes, context, chunk, conflict_manager);
624 }
625
626 auto &constraints = table.GetConstraints();
627 auto &bound_constraints = table.GetBoundConstraints();
628 for (idx_t i = 0; i < bound_constraints.size(); i++) {
629 auto &base_constraint = constraints[i];
630 auto &constraint = bound_constraints[i];
631 switch (base_constraint->type) {
632 case ConstraintType::NOT_NULL: {
633 auto &bound_not_null = *reinterpret_cast<BoundNotNullConstraint *>(constraint.get());
634 auto &not_null = *reinterpret_cast<NotNullConstraint *>(base_constraint.get());
635 auto &col = table.GetColumns().GetColumn(index: LogicalIndex(not_null.index));
636 VerifyNotNullConstraint(table, vector&: chunk.data[bound_not_null.index.index], count: chunk.size(), col_name: col.Name());
637 break;
638 }
639 case ConstraintType::CHECK: {
640 auto &check = *reinterpret_cast<BoundCheckConstraint *>(constraint.get());
641 VerifyCheckConstraint(context, table, expr&: *check.expression, chunk);
642 break;
643 }
644 case ConstraintType::UNIQUE: {
645 // These were handled earlier on
646 break;
647 }
648 case ConstraintType::FOREIGN_KEY: {
649 auto &bfk = *reinterpret_cast<BoundForeignKeyConstraint *>(constraint.get());
650 if (bfk.info.type == ForeignKeyType::FK_TYPE_FOREIGN_KEY_TABLE ||
651 bfk.info.type == ForeignKeyType::FK_TYPE_SELF_REFERENCE_TABLE) {
652 VerifyAppendForeignKeyConstraint(bfk, context, chunk);
653 }
654 break;
655 }
656 default:
657 throw NotImplementedException("Constraint type not implemented!");
658 }
659 }
660}
661
662void DataTable::InitializeLocalAppend(LocalAppendState &state, ClientContext &context) {
663 if (!is_root) {
664 throw TransactionException("Transaction conflict: adding entries to a table that has been altered!");
665 }
666 auto &local_storage = LocalStorage::Get(context, db);
667 local_storage.InitializeAppend(state, table&: *this);
668}
669
670void DataTable::LocalAppend(LocalAppendState &state, TableCatalogEntry &table, ClientContext &context, DataChunk &chunk,
671 bool unsafe) {
672 if (chunk.size() == 0) {
673 return;
674 }
675 D_ASSERT(chunk.ColumnCount() == table.GetColumns().PhysicalColumnCount());
676 if (!is_root) {
677 throw TransactionException("Transaction conflict: adding entries to a table that has been altered!");
678 }
679
680 chunk.Verify();
681
682 // verify any constraints on the new chunk
683 if (!unsafe) {
684 VerifyAppendConstraints(table, context, chunk);
685 }
686
687 // append to the transaction local data
688 LocalStorage::Append(state, chunk);
689}
690
691void DataTable::FinalizeLocalAppend(LocalAppendState &state) {
692 LocalStorage::FinalizeAppend(state);
693}
694
695OptimisticDataWriter &DataTable::CreateOptimisticWriter(ClientContext &context) {
696 auto &local_storage = LocalStorage::Get(context, db);
697 return local_storage.CreateOptimisticWriter(table&: *this);
698}
699
700void DataTable::FinalizeOptimisticWriter(ClientContext &context, OptimisticDataWriter &writer) {
701 auto &local_storage = LocalStorage::Get(context, db);
702 local_storage.FinalizeOptimisticWriter(table&: *this, writer);
703}
704
705void DataTable::LocalMerge(ClientContext &context, RowGroupCollection &collection) {
706 auto &local_storage = LocalStorage::Get(context, db);
707 local_storage.LocalMerge(table&: *this, collection);
708}
709
710void DataTable::LocalAppend(TableCatalogEntry &table, ClientContext &context, DataChunk &chunk) {
711 LocalAppendState append_state;
712 auto &storage = table.GetStorage();
713 storage.InitializeLocalAppend(state&: append_state, context);
714 storage.LocalAppend(state&: append_state, table, context, chunk);
715 storage.FinalizeLocalAppend(state&: append_state);
716}
717
718void DataTable::LocalAppend(TableCatalogEntry &table, ClientContext &context, ColumnDataCollection &collection) {
719 LocalAppendState append_state;
720 auto &storage = table.GetStorage();
721 storage.InitializeLocalAppend(state&: append_state, context);
722 for (auto &chunk : collection.Chunks()) {
723 storage.LocalAppend(state&: append_state, table, context, chunk);
724 }
725 storage.FinalizeLocalAppend(state&: append_state);
726}
727
728void DataTable::AppendLock(TableAppendState &state) {
729 state.append_lock = unique_lock<mutex>(append_lock);
730 if (!is_root) {
731 throw TransactionException("Transaction conflict: adding entries to a table that has been altered!");
732 }
733 state.row_start = row_groups->GetTotalRows();
734 state.current_row = state.row_start;
735}
736
737void DataTable::InitializeAppend(DuckTransaction &transaction, TableAppendState &state, idx_t append_count) {
738 // obtain the append lock for this table
739 if (!state.append_lock) {
740 throw InternalException("DataTable::AppendLock should be called before DataTable::InitializeAppend");
741 }
742 row_groups->InitializeAppend(transaction, state, append_count);
743}
744
745void DataTable::Append(DataChunk &chunk, TableAppendState &state) {
746 D_ASSERT(is_root);
747 row_groups->Append(chunk, state);
748}
749
750void DataTable::ScanTableSegment(idx_t row_start, idx_t count, const std::function<void(DataChunk &chunk)> &function) {
751 idx_t end = row_start + count;
752
753 vector<column_t> column_ids;
754 vector<LogicalType> types;
755 for (idx_t i = 0; i < this->column_definitions.size(); i++) {
756 auto &col = this->column_definitions[i];
757 column_ids.push_back(x: i);
758 types.push_back(x: col.Type());
759 }
760 DataChunk chunk;
761 chunk.Initialize(allocator&: Allocator::Get(db), types);
762
763 CreateIndexScanState state;
764
765 InitializeScanWithOffset(state, column_ids, start_row: row_start, end_row: row_start + count);
766 auto row_start_aligned = state.table_state.row_group->start + state.table_state.vector_index * STANDARD_VECTOR_SIZE;
767
768 idx_t current_row = row_start_aligned;
769 while (current_row < end) {
770 state.table_state.ScanCommitted(result&: chunk, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS);
771 if (chunk.size() == 0) {
772 break;
773 }
774 idx_t end_row = current_row + chunk.size();
775 // start of chunk is current_row
776 // end of chunk is end_row
777 // figure out if we need to write the entire chunk or just part of it
778 idx_t chunk_start = MaxValue<idx_t>(a: current_row, b: row_start);
779 idx_t chunk_end = MinValue<idx_t>(a: end_row, b: end);
780 D_ASSERT(chunk_start < chunk_end);
781 idx_t chunk_count = chunk_end - chunk_start;
782 if (chunk_count != chunk.size()) {
783 D_ASSERT(chunk_count <= chunk.size());
784 // need to slice the chunk before insert
785 idx_t start_in_chunk;
786 if (current_row >= row_start) {
787 start_in_chunk = 0;
788 } else {
789 start_in_chunk = row_start - current_row;
790 }
791 SelectionVector sel(start_in_chunk, chunk_count);
792 chunk.Slice(sel_vector: sel, count: chunk_count);
793 chunk.Verify();
794 }
795 function(chunk);
796 chunk.Reset();
797 current_row = end_row;
798 }
799}
800
801void DataTable::MergeStorage(RowGroupCollection &data, TableIndexList &indexes) {
802 row_groups->MergeStorage(data);
803 row_groups->Verify();
804}
805
806void DataTable::WriteToLog(WriteAheadLog &log, idx_t row_start, idx_t count) {
807 if (log.skip_writing) {
808 return;
809 }
810 log.WriteSetTable(schema&: info->schema, table&: info->table);
811 ScanTableSegment(row_start, count, function: [&](DataChunk &chunk) { log.WriteInsert(chunk); });
812}
813
814void DataTable::CommitAppend(transaction_t commit_id, idx_t row_start, idx_t count) {
815 lock_guard<mutex> lock(append_lock);
816 row_groups->CommitAppend(commit_id, row_start, count);
817 info->cardinality += count;
818}
819
820void DataTable::RevertAppendInternal(idx_t start_row, idx_t count) {
821 if (count == 0) {
822 // nothing to revert!
823 return;
824 }
825 // adjust the cardinality
826 info->cardinality = start_row;
827 D_ASSERT(is_root);
828 // revert appends made to row_groups
829 row_groups->RevertAppendInternal(start_row, count);
830}
831
832void DataTable::RevertAppend(idx_t start_row, idx_t count) {
833 lock_guard<mutex> lock(append_lock);
834
835 if (!info->indexes.Empty()) {
836 idx_t current_row_base = start_row;
837 row_t row_data[STANDARD_VECTOR_SIZE];
838 Vector row_identifiers(LogicalType::ROW_TYPE, data_ptr_cast(src: row_data));
839 ScanTableSegment(row_start: start_row, count, function: [&](DataChunk &chunk) {
840 for (idx_t i = 0; i < chunk.size(); i++) {
841 row_data[i] = current_row_base + i;
842 }
843 info->indexes.Scan(callback: [&](Index &index) {
844 index.Delete(entries&: chunk, row_identifiers);
845 return false;
846 });
847 current_row_base += chunk.size();
848 });
849 }
850 RevertAppendInternal(start_row, count);
851}
852
853//===--------------------------------------------------------------------===//
854// Indexes
855//===--------------------------------------------------------------------===//
856PreservedError DataTable::AppendToIndexes(TableIndexList &indexes, DataChunk &chunk, row_t row_start) {
857 PreservedError error;
858 if (indexes.Empty()) {
859 return error;
860 }
861 // first generate the vector of row identifiers
862 Vector row_identifiers(LogicalType::ROW_TYPE);
863 VectorOperations::GenerateSequence(result&: row_identifiers, count: chunk.size(), start: row_start, increment: 1);
864
865 vector<Index *> already_appended;
866 bool append_failed = false;
867 // now append the entries to the indices
868 indexes.Scan(callback: [&](Index &index) {
869 try {
870 error = index.Append(entries&: chunk, row_identifiers);
871 } catch (Exception &ex) {
872 error = PreservedError(ex);
873 } catch (std::exception &ex) {
874 error = PreservedError(ex);
875 }
876 if (error) {
877 append_failed = true;
878 return true;
879 }
880 already_appended.push_back(x: &index);
881 return false;
882 });
883
884 if (append_failed) {
885 // constraint violation!
886 // remove any appended entries from previous indexes (if any)
887 for (auto *index : already_appended) {
888 index->Delete(entries&: chunk, row_identifiers);
889 }
890 }
891 return error;
892}
893
894PreservedError DataTable::AppendToIndexes(DataChunk &chunk, row_t row_start) {
895 D_ASSERT(is_root);
896 return AppendToIndexes(indexes&: info->indexes, chunk, row_start);
897}
898
899void DataTable::RemoveFromIndexes(TableAppendState &state, DataChunk &chunk, row_t row_start) {
900 D_ASSERT(is_root);
901 if (info->indexes.Empty()) {
902 return;
903 }
904 // first generate the vector of row identifiers
905 Vector row_identifiers(LogicalType::ROW_TYPE);
906 VectorOperations::GenerateSequence(result&: row_identifiers, count: chunk.size(), start: row_start, increment: 1);
907
908 // now remove the entries from the indices
909 RemoveFromIndexes(state, chunk, row_identifiers);
910}
911
912void DataTable::RemoveFromIndexes(TableAppendState &state, DataChunk &chunk, Vector &row_identifiers) {
913 D_ASSERT(is_root);
914 info->indexes.Scan(callback: [&](Index &index) {
915 index.Delete(entries&: chunk, row_identifiers);
916 return false;
917 });
918}
919
920void DataTable::RemoveFromIndexes(Vector &row_identifiers, idx_t count) {
921 D_ASSERT(is_root);
922 row_groups->RemoveFromIndexes(indexes&: info->indexes, row_identifiers, count);
923}
924
925//===--------------------------------------------------------------------===//
926// Delete
927//===--------------------------------------------------------------------===//
928static bool TableHasDeleteConstraints(TableCatalogEntry &table) {
929 auto &bound_constraints = table.GetBoundConstraints();
930 for (auto &constraint : bound_constraints) {
931 switch (constraint->type) {
932 case ConstraintType::NOT_NULL:
933 case ConstraintType::CHECK:
934 case ConstraintType::UNIQUE:
935 break;
936 case ConstraintType::FOREIGN_KEY: {
937 auto &bfk = *reinterpret_cast<BoundForeignKeyConstraint *>(constraint.get());
938 if (bfk.info.type == ForeignKeyType::FK_TYPE_PRIMARY_KEY_TABLE ||
939 bfk.info.type == ForeignKeyType::FK_TYPE_SELF_REFERENCE_TABLE) {
940 return true;
941 }
942 break;
943 }
944 default:
945 throw NotImplementedException("Constraint type not implemented!");
946 }
947 }
948 return false;
949}
950
951void DataTable::VerifyDeleteConstraints(TableCatalogEntry &table, ClientContext &context, DataChunk &chunk) {
952 auto &bound_constraints = table.GetBoundConstraints();
953 for (auto &constraint : bound_constraints) {
954 switch (constraint->type) {
955 case ConstraintType::NOT_NULL:
956 case ConstraintType::CHECK:
957 case ConstraintType::UNIQUE:
958 break;
959 case ConstraintType::FOREIGN_KEY: {
960 auto &bfk = *reinterpret_cast<BoundForeignKeyConstraint *>(constraint.get());
961 if (bfk.info.type == ForeignKeyType::FK_TYPE_PRIMARY_KEY_TABLE ||
962 bfk.info.type == ForeignKeyType::FK_TYPE_SELF_REFERENCE_TABLE) {
963 VerifyDeleteForeignKeyConstraint(bfk, context, chunk);
964 }
965 break;
966 }
967 default:
968 throw NotImplementedException("Constraint type not implemented!");
969 }
970 }
971}
972
973idx_t DataTable::Delete(TableCatalogEntry &table, ClientContext &context, Vector &row_identifiers, idx_t count) {
974 D_ASSERT(row_identifiers.GetType().InternalType() == ROW_TYPE);
975 if (count == 0) {
976 return 0;
977 }
978
979 auto &transaction = DuckTransaction::Get(context, db);
980 auto &local_storage = LocalStorage::Get(transaction);
981 bool has_delete_constraints = TableHasDeleteConstraints(table);
982
983 row_identifiers.Flatten(count);
984 auto ids = FlatVector::GetData<row_t>(vector&: row_identifiers);
985
986 DataChunk verify_chunk;
987 vector<column_t> col_ids;
988 vector<LogicalType> types;
989 ColumnFetchState fetch_state;
990 if (has_delete_constraints) {
991 // initialize the chunk if there are any constraints to verify
992 for (idx_t i = 0; i < column_definitions.size(); i++) {
993 col_ids.push_back(x: column_definitions[i].StorageOid());
994 types.emplace_back(args: column_definitions[i].Type());
995 }
996 verify_chunk.Initialize(allocator&: Allocator::Get(context), types);
997 }
998 idx_t pos = 0;
999 idx_t delete_count = 0;
1000 while (pos < count) {
1001 idx_t start = pos;
1002 bool is_transaction_delete = ids[pos] >= MAX_ROW_ID;
1003 // figure out which batch of rows to delete now
1004 for (pos++; pos < count; pos++) {
1005 bool row_is_transaction_delete = ids[pos] >= MAX_ROW_ID;
1006 if (row_is_transaction_delete != is_transaction_delete) {
1007 break;
1008 }
1009 }
1010 idx_t current_offset = start;
1011 idx_t current_count = pos - start;
1012
1013 Vector offset_ids(row_identifiers, current_offset, pos);
1014 if (is_transaction_delete) {
1015 // transaction-local delete
1016 if (has_delete_constraints) {
1017 // perform the constraint verification
1018 local_storage.FetchChunk(table&: *this, row_ids&: offset_ids, count: current_count, col_ids, chunk&: verify_chunk, fetch_state);
1019 VerifyDeleteConstraints(table, context, chunk&: verify_chunk);
1020 }
1021 delete_count += local_storage.Delete(table&: *this, row_ids&: offset_ids, count: current_count);
1022 } else {
1023 // regular table delete
1024 if (has_delete_constraints) {
1025 // perform the constraint verification
1026 Fetch(transaction, result&: verify_chunk, column_ids: col_ids, row_identifiers: offset_ids, fetch_count: current_count, state&: fetch_state);
1027 VerifyDeleteConstraints(table, context, chunk&: verify_chunk);
1028 }
1029 delete_count += row_groups->Delete(transaction, table&: *this, ids: ids + current_offset, count: current_count);
1030 }
1031 }
1032 return delete_count;
1033}
1034
1035//===--------------------------------------------------------------------===//
1036// Update
1037//===--------------------------------------------------------------------===//
1038static void CreateMockChunk(vector<LogicalType> &types, const vector<PhysicalIndex> &column_ids, DataChunk &chunk,
1039 DataChunk &mock_chunk) {
1040 // construct a mock DataChunk
1041 mock_chunk.InitializeEmpty(types);
1042 for (column_t i = 0; i < column_ids.size(); i++) {
1043 mock_chunk.data[column_ids[i].index].Reference(other&: chunk.data[i]);
1044 }
1045 mock_chunk.SetCardinality(chunk.size());
1046}
1047
1048static bool CreateMockChunk(TableCatalogEntry &table, const vector<PhysicalIndex> &column_ids,
1049 physical_index_set_t &desired_column_ids, DataChunk &chunk, DataChunk &mock_chunk) {
1050 idx_t found_columns = 0;
1051 // check whether the desired columns are present in the UPDATE clause
1052 for (column_t i = 0; i < column_ids.size(); i++) {
1053 if (desired_column_ids.find(x: column_ids[i]) != desired_column_ids.end()) {
1054 found_columns++;
1055 }
1056 }
1057 if (found_columns == 0) {
1058 // no columns were found: no need to check the constraint again
1059 return false;
1060 }
1061 if (found_columns != desired_column_ids.size()) {
1062 // not all columns in UPDATE clause are present!
1063 // this should not be triggered at all as the binder should add these columns
1064 throw InternalException("Not all columns required for the CHECK constraint are present in the UPDATED chunk!");
1065 }
1066 // construct a mock DataChunk
1067 auto types = table.GetTypes();
1068 CreateMockChunk(types, column_ids, chunk, mock_chunk);
1069 return true;
1070}
1071
1072void DataTable::VerifyUpdateConstraints(ClientContext &context, TableCatalogEntry &table, DataChunk &chunk,
1073 const vector<PhysicalIndex> &column_ids) {
1074 auto &constraints = table.GetConstraints();
1075 auto &bound_constraints = table.GetBoundConstraints();
1076 for (idx_t i = 0; i < bound_constraints.size(); i++) {
1077 auto &base_constraint = constraints[i];
1078 auto &constraint = bound_constraints[i];
1079 switch (constraint->type) {
1080 case ConstraintType::NOT_NULL: {
1081 auto &bound_not_null = *reinterpret_cast<BoundNotNullConstraint *>(constraint.get());
1082 auto &not_null = *reinterpret_cast<NotNullConstraint *>(base_constraint.get());
1083 // check if the constraint is in the list of column_ids
1084 // FIXME: double usage of 'i'?
1085 for (idx_t i = 0; i < column_ids.size(); i++) {
1086 if (column_ids[i] == bound_not_null.index) {
1087 // found the column id: check the data in
1088 auto &col = table.GetColumn(idx: LogicalIndex(not_null.index));
1089 VerifyNotNullConstraint(table, vector&: chunk.data[i], count: chunk.size(), col_name: col.Name());
1090 break;
1091 }
1092 }
1093 break;
1094 }
1095 case ConstraintType::CHECK: {
1096 auto &check = *reinterpret_cast<BoundCheckConstraint *>(constraint.get());
1097
1098 DataChunk mock_chunk;
1099 if (CreateMockChunk(table, column_ids, desired_column_ids&: check.bound_columns, chunk, mock_chunk)) {
1100 VerifyCheckConstraint(context, table, expr&: *check.expression, chunk&: mock_chunk);
1101 }
1102 break;
1103 }
1104 case ConstraintType::UNIQUE:
1105 case ConstraintType::FOREIGN_KEY:
1106 break;
1107 default:
1108 throw NotImplementedException("Constraint type not implemented!");
1109 }
1110 }
1111 // update should not be called for indexed columns!
1112 // instead update should have been rewritten to delete + update on higher layer
1113#ifdef DEBUG
1114 info->indexes.Scan([&](Index &index) {
1115 D_ASSERT(!index.IndexIsUpdated(column_ids));
1116 return false;
1117 });
1118
1119#endif
1120}
1121
1122void DataTable::Update(TableCatalogEntry &table, ClientContext &context, Vector &row_ids,
1123 const vector<PhysicalIndex> &column_ids, DataChunk &updates) {
1124 D_ASSERT(row_ids.GetType().InternalType() == ROW_TYPE);
1125
1126 D_ASSERT(column_ids.size() == updates.ColumnCount());
1127 auto count = updates.size();
1128 updates.Verify();
1129 if (count == 0) {
1130 return;
1131 }
1132
1133 if (!is_root) {
1134 throw TransactionException("Transaction conflict: cannot update a table that has been altered!");
1135 }
1136
1137 // first verify that no constraints are violated
1138 VerifyUpdateConstraints(context, table, chunk&: updates, column_ids);
1139
1140 // now perform the actual update
1141 auto &transaction = DuckTransaction::Get(context, db);
1142
1143 updates.Flatten();
1144 row_ids.Flatten(count);
1145 auto ids = FlatVector::GetData<row_t>(vector&: row_ids);
1146 auto first_id = FlatVector::GetValue<row_t>(vector&: row_ids, idx: 0);
1147 if (first_id >= MAX_ROW_ID) {
1148 // update is in transaction-local storage: push update into local storage
1149 auto &local_storage = LocalStorage::Get(context, db);
1150 local_storage.Update(table&: *this, row_ids, column_ids, data&: updates);
1151 return;
1152 }
1153
1154 // update is in the row groups
1155 // we need to figure out for each id to which row group it belongs
1156 // usually all (or many) ids belong to the same row group
1157 // we iterate over the ids and check for every id if it belongs to the same row group as their predecessor
1158 row_groups->Update(transaction, ids, column_ids, updates);
1159}
1160
1161void DataTable::UpdateColumn(TableCatalogEntry &table, ClientContext &context, Vector &row_ids,
1162 const vector<column_t> &column_path, DataChunk &updates) {
1163 D_ASSERT(row_ids.GetType().InternalType() == ROW_TYPE);
1164 D_ASSERT(updates.ColumnCount() == 1);
1165 updates.Verify();
1166 if (updates.size() == 0) {
1167 return;
1168 }
1169
1170 if (!is_root) {
1171 throw TransactionException("Transaction conflict: cannot update a table that has been altered!");
1172 }
1173
1174 // now perform the actual update
1175 auto &transaction = DuckTransaction::Get(context, db);
1176
1177 updates.Flatten();
1178 row_ids.Flatten(count: updates.size());
1179 row_groups->UpdateColumn(transaction, row_ids, column_path, updates);
1180}
1181
1182//===--------------------------------------------------------------------===//
1183// Index Scan
1184//===--------------------------------------------------------------------===//
1185void DataTable::InitializeWALCreateIndexScan(CreateIndexScanState &state, const vector<column_t> &column_ids) {
1186 // we grab the append lock to make sure nothing is appended until AFTER we finish the index scan
1187 state.append_lock = std::unique_lock<mutex>(append_lock);
1188 InitializeScan(state, column_ids);
1189}
1190
1191void DataTable::WALAddIndex(ClientContext &context, unique_ptr<Index> index,
1192 const vector<unique_ptr<Expression>> &expressions) {
1193
1194 // if the data table is empty
1195 if (row_groups->IsEmpty()) {
1196 info->indexes.AddIndex(index: std::move(index));
1197 return;
1198 }
1199
1200 auto &allocator = Allocator::Get(db);
1201
1202 // intermediate holds scanned chunks of the underlying data to create the index
1203 DataChunk intermediate;
1204 vector<LogicalType> intermediate_types;
1205 vector<column_t> column_ids;
1206 for (auto &it : column_definitions) {
1207 intermediate_types.push_back(x: it.Type());
1208 column_ids.push_back(x: it.Oid());
1209 }
1210 column_ids.push_back(x: COLUMN_IDENTIFIER_ROW_ID);
1211 intermediate_types.emplace_back(args: LogicalType::ROW_TYPE);
1212
1213 intermediate.Initialize(allocator, types: intermediate_types);
1214
1215 // holds the result of executing the index expression on the intermediate chunks
1216 DataChunk result;
1217 result.Initialize(allocator, types: index->logical_types);
1218
1219 // initialize an index scan
1220 CreateIndexScanState state;
1221 InitializeWALCreateIndexScan(state, column_ids);
1222
1223 if (!is_root) {
1224 throw InternalException("Error during WAL replay. Cannot add an index to a table that has been altered.");
1225 }
1226
1227 // now start incrementally building the index
1228 {
1229 IndexLock lock;
1230 index->InitializeLock(state&: lock);
1231
1232 while (true) {
1233 intermediate.Reset();
1234 result.Reset();
1235 // scan a new chunk from the table to index
1236 CreateIndexScan(state, result&: intermediate, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED);
1237 if (intermediate.size() == 0) {
1238 // finished scanning for index creation
1239 // release all locks
1240 break;
1241 }
1242 // resolve the expressions for this chunk
1243 index->ExecuteExpressions(input&: intermediate, result);
1244
1245 // insert into the index
1246 auto error = index->Insert(lock, input&: result, row_identifiers&: intermediate.data[intermediate.ColumnCount() - 1]);
1247 if (error) {
1248 throw InternalException("Error during WAL replay: %s", error.Message());
1249 }
1250 }
1251 }
1252
1253 info->indexes.AddIndex(index: std::move(index));
1254}
1255
1256//===--------------------------------------------------------------------===//
1257// Statistics
1258//===--------------------------------------------------------------------===//
1259unique_ptr<BaseStatistics> DataTable::GetStatistics(ClientContext &context, column_t column_id) {
1260 if (column_id == COLUMN_IDENTIFIER_ROW_ID) {
1261 return nullptr;
1262 }
1263 return row_groups->CopyStats(column_id);
1264}
1265
1266void DataTable::SetDistinct(column_t column_id, unique_ptr<DistinctStatistics> distinct_stats) {
1267 D_ASSERT(column_id != COLUMN_IDENTIFIER_ROW_ID);
1268 row_groups->SetDistinct(column_id, distinct_stats: std::move(distinct_stats));
1269}
1270
1271//===--------------------------------------------------------------------===//
1272// Checkpoint
1273//===--------------------------------------------------------------------===//
1274void DataTable::Checkpoint(TableDataWriter &writer) {
1275 // checkpoint each individual row group
1276 // FIXME: we might want to combine adjacent row groups in case they have had deletions...
1277 TableStatistics global_stats;
1278 row_groups->CopyStats(stats&: global_stats);
1279
1280 row_groups->Checkpoint(writer, global_stats);
1281
1282 // The rowgroup payload data has been written. Now write:
1283 // column stats
1284 // row-group pointers
1285 // table pointer
1286 // index data
1287 writer.FinalizeTable(global_stats: std::move(global_stats), info: info.get());
1288}
1289
1290void DataTable::CommitDropColumn(idx_t index) {
1291 row_groups->CommitDropColumn(index);
1292}
1293
1294idx_t DataTable::GetTotalRows() {
1295 return row_groups->GetTotalRows();
1296}
1297
1298void DataTable::CommitDropTable() {
1299 // commit a drop of this table: mark all blocks as modified so they can be reclaimed later on
1300 row_groups->CommitDropTable();
1301}
1302
1303//===--------------------------------------------------------------------===//
1304// GetColumnSegmentInfo
1305//===--------------------------------------------------------------------===//
1306vector<ColumnSegmentInfo> DataTable::GetColumnSegmentInfo() {
1307 return row_groups->GetColumnSegmentInfo();
1308}
1309
1310} // namespace duckdb
1311