1#include "duckdb/storage/data_table.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/exception.hpp"
5#include "duckdb/common/helper.hpp"
6#include "duckdb/common/vector_operations/vector_operations.hpp"
7#include "duckdb/execution/expression_executor.hpp"
8#include "duckdb/planner/constraints/list.hpp"
9#include "duckdb/transaction/transaction.hpp"
10#include "duckdb/transaction/transaction_manager.hpp"
11#include "duckdb/storage/table/transient_segment.hpp"
12#include "duckdb/storage/storage_manager.hpp"
13
14using namespace duckdb;
15using namespace std;
16using namespace chrono;
17
18DataTable::DataTable(StorageManager &storage, string schema, string table, vector<TypeId> types_,
19 unique_ptr<vector<unique_ptr<PersistentSegment>>[]> data)
20 : info(make_shared<DataTableInfo>(schema, table)), types(types_), storage(storage),
21 persistent_manager(make_shared<VersionManager>(*info)), transient_manager(make_shared<VersionManager>(*info)),
22 is_root(true) {
23 // set up the segment trees for the column segments
24 for (idx_t i = 0; i < types.size(); i++) {
25 auto column_data = make_shared<ColumnData>(*storage.buffer_manager, *info);
26 column_data->type = types[i];
27 column_data->column_idx = i;
28 columns.push_back(move(column_data));
29 }
30
31 // initialize the table with the existing data from disk, if any
32 if (data && data[0].size() > 0) {
33 // first append all the segments to the set of column segments
34 for (idx_t i = 0; i < types.size(); i++) {
35 columns[i]->Initialize(data[i]);
36 if (columns[i]->persistent_rows != columns[0]->persistent_rows) {
37 throw Exception("Column length mismatch in table load!");
38 }
39 }
40 persistent_manager->max_row = columns[0]->persistent_rows;
41 transient_manager->base_row = persistent_manager->max_row;
42 }
43}
44
45DataTable::DataTable(ClientContext &context, DataTable &parent, ColumnDefinition &new_column, Expression *default_value)
46 : info(parent.info), types(parent.types), storage(parent.storage), persistent_manager(parent.persistent_manager),
47 transient_manager(parent.transient_manager), columns(parent.columns), is_root(true) {
48 // prevent any new tuples from being added to the parent
49 lock_guard<mutex> parent_lock(parent.append_lock);
50 // add the new column to this DataTable
51 auto new_column_type = GetInternalType(new_column.type);
52 idx_t new_column_idx = columns.size();
53
54 types.push_back(new_column_type);
55 auto column_data = make_shared<ColumnData>(*storage.buffer_manager, *info);
56 column_data->type = new_column_type;
57 column_data->column_idx = new_column_idx;
58 columns.push_back(move(column_data));
59
60 // fill the column with its DEFAULT value, or NULL if none is specified
61 idx_t rows_to_write = persistent_manager->max_row + transient_manager->max_row;
62 if (rows_to_write > 0) {
63 ExpressionExecutor executor;
64 DataChunk dummy_chunk;
65 Vector result(new_column_type);
66 if (!default_value) {
67 FlatVector::Nullmask(result).set();
68 } else {
69 executor.AddExpression(*default_value);
70 }
71
72 ColumnAppendState state;
73 columns[new_column_idx]->InitializeAppend(state);
74 for (idx_t i = 0; i < rows_to_write; i += STANDARD_VECTOR_SIZE) {
75 idx_t rows_in_this_vector = std::min(rows_to_write - i, (idx_t)STANDARD_VECTOR_SIZE);
76 if (default_value) {
77 dummy_chunk.SetCardinality(rows_in_this_vector);
78 executor.ExecuteExpression(dummy_chunk, result);
79 }
80 columns[new_column_idx]->Append(state, result, rows_in_this_vector);
81 }
82 }
83 // also add this column to client local storage
84 Transaction::GetTransaction(context).storage.AddColumn(&parent, this, new_column, default_value);
85
86 // this table replaces the previous table, hence the parent is no longer the root DataTable
87 parent.is_root = false;
88}
89
90DataTable::DataTable(ClientContext &context, DataTable &parent, idx_t removed_column)
91 : info(parent.info), types(parent.types), storage(parent.storage), persistent_manager(parent.persistent_manager),
92 transient_manager(parent.transient_manager), columns(parent.columns), is_root(true) {
93 // prevent any new tuples from being added to the parent
94 lock_guard<mutex> parent_lock(parent.append_lock);
95 // first check if there are any indexes that exist that point to the removed column
96 for (auto &index : info->indexes) {
97 for (auto &column_id : index->column_ids) {
98 if (column_id == removed_column) {
99 throw CatalogException("Cannot drop this column: an index depends on it!");
100 } else if (column_id > removed_column) {
101 throw CatalogException("Cannot drop this column: an index depends on a column after it!");
102 }
103 }
104 }
105 // erase the column from this DataTable
106 assert(removed_column < types.size());
107 types.erase(types.begin() + removed_column);
108 columns.erase(columns.begin() + removed_column);
109
110 // this table replaces the previous table, hence the parent is no longer the root DataTable
111 parent.is_root = false;
112}
113
114DataTable::DataTable(ClientContext &context, DataTable &parent, idx_t changed_idx, SQLType target_type,
115 vector<column_t> bound_columns, Expression &cast_expr)
116 : info(parent.info), types(parent.types), storage(parent.storage), persistent_manager(parent.persistent_manager),
117 transient_manager(parent.transient_manager), columns(parent.columns), is_root(true) {
118
119 // prevent any new tuples from being added to the parent
120 CreateIndexScanState scan_state;
121 parent.InitializeCreateIndexScan(scan_state, bound_columns);
122
123 // first check if there are any indexes that exist that point to the changed column
124 for (auto &index : info->indexes) {
125 for (auto &column_id : index->column_ids) {
126 if (column_id == changed_idx) {
127 throw CatalogException("Cannot change the type of this column: an index depends on it!");
128 }
129 }
130 }
131 // change the type in this DataTable
132 auto new_type = GetInternalType(target_type);
133 types[changed_idx] = new_type;
134
135 // construct a new column data for this type
136 auto column_data = make_shared<ColumnData>(*storage.buffer_manager, *info);
137 column_data->type = new_type;
138 column_data->column_idx = changed_idx;
139
140 ColumnAppendState append_state;
141 column_data->InitializeAppend(append_state);
142
143 // scan the original table, and fill the new column with the transformed value
144 auto &transaction = Transaction::GetTransaction(context);
145
146 vector<TypeId> types;
147 for (idx_t i = 0; i < bound_columns.size(); i++) {
148 if (bound_columns[i] == COLUMN_IDENTIFIER_ROW_ID) {
149 types.push_back(ROW_TYPE);
150 } else {
151 types.push_back(parent.types[bound_columns[i]]);
152 }
153 }
154
155 DataChunk scan_chunk;
156 scan_chunk.Initialize(types);
157
158 ExpressionExecutor executor;
159 executor.AddExpression(cast_expr);
160
161 Vector append_vector(new_type);
162 while (true) {
163 // scan the table
164 scan_chunk.Reset();
165 parent.CreateIndexScan(scan_state, scan_chunk);
166 if (scan_chunk.size() == 0) {
167 break;
168 }
169 // execute the expression
170 executor.ExecuteExpression(scan_chunk, append_vector);
171 column_data->Append(append_state, append_vector, scan_chunk.size());
172 }
173 // also add this column to client local storage
174 transaction.storage.ChangeType(&parent, this, changed_idx, target_type, bound_columns, cast_expr);
175
176 columns[changed_idx] = move(column_data);
177
178 // this table replaces the previous table, hence the parent is no longer the root DataTable
179 parent.is_root = false;
180}
181
182//===--------------------------------------------------------------------===//
183// Scan
184//===--------------------------------------------------------------------===//
185void DataTable::InitializeScan(TableScanState &state, vector<column_t> column_ids,
186 unordered_map<idx_t, vector<TableFilter>> *table_filters) {
187 // initialize a column scan state for each column
188 state.column_scans = unique_ptr<ColumnScanState[]>(new ColumnScanState[column_ids.size()]);
189 for (idx_t i = 0; i < column_ids.size(); i++) {
190 auto column = column_ids[i];
191 if (column != COLUMN_IDENTIFIER_ROW_ID) {
192 columns[column]->InitializeScan(state.column_scans[i]);
193 }
194 }
195 state.column_ids = move(column_ids);
196 // initialize the chunk scan state
197 state.offset = 0;
198 state.current_persistent_row = 0;
199 state.max_persistent_row = persistent_manager->max_row;
200 state.current_transient_row = 0;
201 state.max_transient_row = transient_manager->max_row;
202 if (table_filters && table_filters->size() > 0) {
203 state.adaptive_filter = make_unique<AdaptiveFilter>(*table_filters);
204 }
205}
206
207void DataTable::InitializeScan(Transaction &transaction, TableScanState &state, vector<column_t> column_ids,
208 unordered_map<idx_t, vector<TableFilter>> *table_filters) {
209 InitializeScan(state, move(column_ids), table_filters);
210 transaction.storage.InitializeScan(this, state.local_state);
211}
212
213void DataTable::Scan(Transaction &transaction, DataChunk &result, TableScanState &state,
214 unordered_map<idx_t, vector<TableFilter>> &table_filters) {
215 // scan the persistent segments
216 while (ScanBaseTable(transaction, result, state, state.current_persistent_row, state.max_persistent_row, 0,
217 *persistent_manager, table_filters)) {
218 if (result.size() > 0) {
219 return;
220 }
221 }
222 // scan the transient segments
223 while (ScanBaseTable(transaction, result, state, state.current_transient_row, state.max_transient_row,
224 persistent_manager->max_row, *transient_manager, table_filters)) {
225 if (result.size() > 0) {
226 return;
227 }
228 }
229
230 // scan the transaction-local segments
231 transaction.storage.Scan(state.local_state, state.column_ids, result, &table_filters);
232}
233
234template <class T> bool checkZonemap(TableScanState &state, TableFilter &table_filter, T constant) {
235 T *min = (T *)state.column_scans[table_filter.column_index].current->stats.minimum.get();
236 T *max = (T *)state.column_scans[table_filter.column_index].current->stats.maximum.get();
237 switch (table_filter.comparison_type) {
238 case ExpressionType::COMPARE_EQUAL:
239 return constant >= *min && constant <= *max;
240 case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
241 return constant <= *max;
242 case ExpressionType::COMPARE_GREATERTHAN:
243 return constant < *max;
244 case ExpressionType::COMPARE_LESSTHANOREQUALTO:
245 return constant >= *min;
246 case ExpressionType::COMPARE_LESSTHAN:
247 return constant > *min;
248 default:
249 throw NotImplementedException("Operation not implemented");
250 }
251}
252
253bool checkZonemapString(TableScanState &state, TableFilter &table_filter, const char *constant) {
254 char *min = (char *)state.column_scans[table_filter.column_index].current->stats.minimum.get();
255 char *max = (char *)state.column_scans[table_filter.column_index].current->stats.maximum.get();
256 int min_comp = strcmp(min, constant);
257 int max_comp = strcmp(max, constant);
258 switch (table_filter.comparison_type) {
259 case ExpressionType::COMPARE_EQUAL:
260 return min_comp <= 0 && max_comp >= 0;
261 case ExpressionType::COMPARE_GREATERTHANOREQUALTO:
262 case ExpressionType::COMPARE_GREATERTHAN:
263 return max_comp >= 0;
264 case ExpressionType::COMPARE_LESSTHAN:
265 case ExpressionType::COMPARE_LESSTHANOREQUALTO:
266 return min_comp <= 0;
267 default:
268 throw NotImplementedException("Operation not implemented");
269 }
270}
271
272bool DataTable::CheckZonemap(TableScanState &state, unordered_map<idx_t, vector<TableFilter>> &table_filters,
273 idx_t &current_row) {
274 bool readSegment = true;
275 for (auto &table_filter : table_filters) {
276 for (auto &predicate_constant : table_filter.second) {
277 if (!state.column_scans[predicate_constant.column_index].segment_checked) {
278 state.column_scans[predicate_constant.column_index].segment_checked = true;
279 if (!state.column_scans[predicate_constant.column_index].current) {
280 return true;
281 }
282 switch (state.column_scans[predicate_constant.column_index].current->type) {
283 case TypeId::INT8: {
284 int8_t constant = predicate_constant.constant.value_.tinyint;
285 readSegment &= checkZonemap<int8_t>(state, predicate_constant, constant);
286 break;
287 }
288 case TypeId::INT16: {
289 int16_t constant = predicate_constant.constant.value_.smallint;
290 readSegment &= checkZonemap<int16_t>(state, predicate_constant, constant);
291 break;
292 }
293 case TypeId::INT32: {
294 int32_t constant = predicate_constant.constant.value_.integer;
295 readSegment &= checkZonemap<int32_t>(state, predicate_constant, constant);
296 break;
297 }
298 case TypeId::INT64: {
299 int64_t constant = predicate_constant.constant.value_.bigint;
300 readSegment &= checkZonemap<int64_t>(state, predicate_constant, constant);
301 break;
302 }
303 case TypeId::FLOAT: {
304 float constant = predicate_constant.constant.value_.float_;
305 readSegment &= checkZonemap<float>(state, predicate_constant, constant);
306 break;
307 }
308 case TypeId::DOUBLE: {
309 double constant = predicate_constant.constant.value_.double_;
310 readSegment &= checkZonemap<double>(state, predicate_constant, constant);
311 break;
312 }
313 case TypeId::VARCHAR: {
314 //! we can only compare the first 7 bytes
315 size_t value_size = predicate_constant.constant.str_value.size() > 7
316 ? 7
317 : predicate_constant.constant.str_value.size();
318 string constant;
319 for (size_t i = 0; i < value_size; i++) {
320 constant += predicate_constant.constant.str_value[i];
321 }
322 readSegment &= checkZonemapString(state, predicate_constant, constant.c_str());
323 break;
324 }
325 default:
326 throw NotImplementedException("Unimplemented type for uncompressed segment");
327 }
328 }
329 if (!readSegment) {
330 //! We can skip this partition
331 idx_t vectorsToSkip =
332 ceil((double)(state.column_scans[predicate_constant.column_index].current->count +
333 state.column_scans[predicate_constant.column_index].current->start - current_row) /
334 STANDARD_VECTOR_SIZE);
335 for (idx_t i = 0; i < vectorsToSkip; ++i) {
336 state.NextVector();
337 current_row += STANDARD_VECTOR_SIZE;
338 }
339 return false;
340 }
341 }
342 }
343
344 return true;
345}
346
347bool DataTable::ScanBaseTable(Transaction &transaction, DataChunk &result, TableScanState &state, idx_t &current_row,
348 idx_t max_row, idx_t base_row, VersionManager &manager,
349 unordered_map<idx_t, vector<TableFilter>> &table_filters) {
350 if (current_row >= max_row) {
351 // exceeded the amount of rows to scan
352 return false;
353 }
354 idx_t max_count = std::min((idx_t)STANDARD_VECTOR_SIZE, max_row - current_row);
355 idx_t vector_offset = current_row / STANDARD_VECTOR_SIZE;
356 //! first check the zonemap if we have to scan this partition
357 if (!CheckZonemap(state, table_filters, current_row)) {
358 return true;
359 }
360 // second, scan the version chunk manager to figure out which tuples to load for this transaction
361 SelectionVector valid_sel(STANDARD_VECTOR_SIZE);
362 idx_t count = manager.GetSelVector(transaction, vector_offset, valid_sel, max_count);
363 if (count == 0) {
364 // nothing to scan for this vector, skip the entire vector
365 state.NextVector();
366 current_row += STANDARD_VECTOR_SIZE;
367 return true;
368 }
369 idx_t approved_tuple_count = count;
370 if (count == max_count && table_filters.empty()) {
371 //! If we don't have any deleted tuples or filters we can just run a regular scan
372 for (idx_t i = 0; i < state.column_ids.size(); i++) {
373 auto column = state.column_ids[i];
374 if (column == COLUMN_IDENTIFIER_ROW_ID) {
375 // scan row id
376 assert(result.data[i].type == ROW_TYPE);
377 result.data[i].Sequence(base_row + current_row, 1);
378 } else {
379 columns[column]->Scan(transaction, state.column_scans[i], result.data[i]);
380 }
381 }
382 } else {
383 SelectionVector sel;
384
385 if (count != max_count) {
386 sel.Initialize(valid_sel);
387 } else {
388 sel.Initialize(FlatVector::IncrementalSelectionVector);
389 }
390 //! First, we scan the columns with filters, fetch their data and generate a selection vector.
391 //! get runtime statistics
392 auto start_time = high_resolution_clock::now();
393 for (idx_t i = 0; i < table_filters.size(); i++) {
394 auto tf_idx = state.adaptive_filter->permutation[i];
395 columns[tf_idx]->Select(transaction, state.column_scans[tf_idx], result.data[tf_idx], sel,
396 approved_tuple_count, table_filters[tf_idx]);
397 }
398 for (auto &table_filter : table_filters) {
399 result.data[table_filter.first].Slice(sel, approved_tuple_count);
400 }
401 //! Now we use the selection vector to fetch data for the other columns.
402 for (idx_t i = 0; i < state.column_ids.size(); i++) {
403 if (table_filters.find(i) == table_filters.end()) {
404 auto column = state.column_ids[i];
405 if (column == COLUMN_IDENTIFIER_ROW_ID) {
406 assert(result.data[i].type == TypeId::INT64);
407 result.data[i].vector_type = VectorType::FLAT_VECTOR;
408 auto result_data = (int64_t *)FlatVector::GetData(result.data[i]);
409 for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) {
410 result_data[sel_idx] = base_row + current_row + sel.get_index(sel_idx);
411 }
412 } else {
413 columns[column]->FilterScan(transaction, state.column_scans[i], result.data[i], sel,
414 approved_tuple_count);
415 }
416 }
417 }
418 auto end_time = high_resolution_clock::now();
419 if (state.adaptive_filter && table_filters.size() > 1) {
420 state.adaptive_filter->AdaptRuntimeStatistics(
421 duration_cast<duration<double>>(end_time - start_time).count());
422 }
423 }
424
425 result.SetCardinality(approved_tuple_count);
426 current_row += STANDARD_VECTOR_SIZE;
427 return true;
428}
429
430//===--------------------------------------------------------------------===//
431// Index Scan
432//===--------------------------------------------------------------------===//
433void DataTable::InitializeIndexScan(Transaction &transaction, TableIndexScanState &state, Index &index,
434 vector<column_t> column_ids) {
435 state.index = &index;
436 state.column_ids = move(column_ids);
437 transaction.storage.InitializeScan(this, state.local_state);
438}
439
440void DataTable::InitializeIndexScan(Transaction &transaction, TableIndexScanState &state, Index &index, Value value,
441 ExpressionType expr_type, vector<column_t> column_ids) {
442 InitializeIndexScan(transaction, state, index, move(column_ids));
443 state.index_state = index.InitializeScanSinglePredicate(transaction, state.column_ids, value, expr_type);
444}
445
446void DataTable::InitializeIndexScan(Transaction &transaction, TableIndexScanState &state, Index &index, Value low_value,
447 ExpressionType low_type, Value high_value, ExpressionType high_type,
448 vector<column_t> column_ids) {
449 InitializeIndexScan(transaction, state, index, move(column_ids));
450 state.index_state =
451 index.InitializeScanTwoPredicates(transaction, state.column_ids, low_value, low_type, high_value, high_type);
452}
453
454void DataTable::IndexScan(Transaction &transaction, DataChunk &result, TableIndexScanState &state) {
455 // clear any previously pinned blocks
456 state.fetch_state.handles.clear();
457 // scan the index
458 state.index->Scan(transaction, *this, state, result);
459 if (result.size() > 0) {
460 return;
461 }
462 // scan the local structure
463 transaction.storage.Scan(state.local_state, state.column_ids, result);
464}
465
466//===--------------------------------------------------------------------===//
467// Fetch
468//===--------------------------------------------------------------------===//
469void DataTable::Fetch(Transaction &transaction, DataChunk &result, vector<column_t> &column_ids,
470 Vector &row_identifiers, idx_t fetch_count, TableIndexScanState &state) {
471 // first figure out which row identifiers we should use for this transaction by looking at the VersionManagers
472 row_t rows[STANDARD_VECTOR_SIZE];
473 idx_t count = FetchRows(transaction, row_identifiers, fetch_count, rows);
474
475 if (count == 0) {
476 // no rows to use
477 return;
478 }
479 // for each of the remaining rows, now fetch the data
480 result.SetCardinality(count);
481 for (idx_t col_idx = 0; col_idx < column_ids.size(); col_idx++) {
482 auto column = column_ids[col_idx];
483 if (column == COLUMN_IDENTIFIER_ROW_ID) {
484 // row id column: fill in the row ids
485 assert(result.data[col_idx].type == TypeId::INT64);
486 result.data[col_idx].vector_type = VectorType::FLAT_VECTOR;
487 auto data = FlatVector::GetData<row_t>(result.data[col_idx]);
488 for (idx_t i = 0; i < count; i++) {
489 data[i] = rows[i];
490 }
491 } else {
492 // regular column: fetch data from the base column
493 for (idx_t i = 0; i < count; i++) {
494 auto row_id = rows[i];
495 columns[column]->FetchRow(state.fetch_state, transaction, row_id, result.data[col_idx], i);
496 }
497 }
498 }
499}
500
501idx_t DataTable::FetchRows(Transaction &transaction, Vector &row_identifiers, idx_t fetch_count, row_t result_rows[]) {
502 assert(row_identifiers.type == ROW_TYPE);
503
504 // obtain a read lock on the version managers
505 auto l1 = persistent_manager->lock.GetSharedLock();
506 auto l2 = transient_manager->lock.GetSharedLock();
507
508 // now iterate over the row ids and figure out which rows to use
509 idx_t count = 0;
510
511 auto row_ids = FlatVector::GetData<row_t>(row_identifiers);
512 for (idx_t i = 0; i < fetch_count; i++) {
513 auto row_id = row_ids[i];
514 bool use_row;
515 if ((idx_t)row_id < persistent_manager->max_row) {
516 // persistent row: use persistent manager
517 use_row = persistent_manager->Fetch(transaction, row_id);
518 } else {
519 // transient row: use transient manager
520 use_row = transient_manager->Fetch(transaction, row_id);
521 }
522 if (use_row) {
523 // row is not deleted; use the row
524 result_rows[count++] = row_id;
525 }
526 }
527 return count;
528}
529
530//===--------------------------------------------------------------------===//
531// Append
532//===--------------------------------------------------------------------===//
533static void VerifyNotNullConstraint(TableCatalogEntry &table, Vector &vector, idx_t count, string &col_name) {
534 if (VectorOperations::HasNull(vector, count)) {
535 throw ConstraintException("NOT NULL constraint failed: %s.%s", table.name.c_str(), col_name.c_str());
536 }
537}
538
539static void VerifyCheckConstraint(TableCatalogEntry &table, Expression &expr, DataChunk &chunk) {
540 ExpressionExecutor executor(expr);
541 Vector result(TypeId::INT32);
542 try {
543 executor.ExecuteExpression(chunk, result);
544 } catch (Exception &ex) {
545 throw ConstraintException("CHECK constraint failed: %s (Error: %s)", table.name.c_str(), ex.what());
546 } catch (...) {
547 throw ConstraintException("CHECK constraint failed: %s (Unknown Error)", table.name.c_str());
548 }
549 VectorData vdata;
550 result.Orrify(chunk.size(), vdata);
551
552 auto dataptr = (int32_t *)vdata.data;
553 for (idx_t i = 0; i < chunk.size(); i++) {
554 auto idx = vdata.sel->get_index(i);
555 if (!(*vdata.nullmask)[idx] && dataptr[idx] == 0) {
556 throw ConstraintException("CHECK constraint failed: %s", table.name.c_str());
557 }
558 }
559}
560
561void DataTable::VerifyAppendConstraints(TableCatalogEntry &table, DataChunk &chunk) {
562 for (auto &constraint : table.bound_constraints) {
563 switch (constraint->type) {
564 case ConstraintType::NOT_NULL: {
565 auto &not_null = *reinterpret_cast<BoundNotNullConstraint *>(constraint.get());
566 VerifyNotNullConstraint(table, chunk.data[not_null.index], chunk.size(),
567 table.columns[not_null.index].name);
568 break;
569 }
570 case ConstraintType::CHECK: {
571 auto &check = *reinterpret_cast<BoundCheckConstraint *>(constraint.get());
572 VerifyCheckConstraint(table, *check.expression, chunk);
573 break;
574 }
575 case ConstraintType::UNIQUE: {
576 //! check whether or not the chunk can be inserted into the indexes
577 for (auto &index : info->indexes) {
578 index->VerifyAppend(chunk);
579 }
580 break;
581 }
582 case ConstraintType::FOREIGN_KEY:
583 default:
584 throw NotImplementedException("Constraint type not implemented!");
585 }
586 }
587}
588
589void DataTable::Append(TableCatalogEntry &table, ClientContext &context, DataChunk &chunk) {
590 if (chunk.size() == 0) {
591 return;
592 }
593 if (chunk.column_count() != table.columns.size()) {
594 throw CatalogException("Mismatch in column count for append");
595 }
596 if (!is_root) {
597 throw TransactionException("Transaction conflict: adding entries to a table that has been altered!");
598 }
599
600 chunk.Verify();
601
602 // verify any constraints on the new chunk
603 VerifyAppendConstraints(table, chunk);
604
605 // append to the transaction local data
606 auto &transaction = Transaction::GetTransaction(context);
607 transaction.storage.Append(this, chunk);
608}
609
610void DataTable::InitializeAppend(TableAppendState &state) {
611 // obtain the append lock for this table
612 state.append_lock = unique_lock<mutex>(append_lock);
613 if (!is_root) {
614 throw TransactionException("Transaction conflict: adding entries to a table that has been altered!");
615 }
616 // obtain locks on all indexes for the table
617 state.index_locks = unique_ptr<IndexLock[]>(new IndexLock[info->indexes.size()]);
618 for (idx_t i = 0; i < info->indexes.size(); i++) {
619 info->indexes[i]->InitializeLock(state.index_locks[i]);
620 }
621 // for each column, initialize the append state
622 state.states = unique_ptr<ColumnAppendState[]>(new ColumnAppendState[types.size()]);
623 for (idx_t i = 0; i < types.size(); i++) {
624 columns[i]->InitializeAppend(state.states[i]);
625 }
626 state.row_start = transient_manager->max_row;
627 state.current_row = state.row_start;
628}
629
630void DataTable::Append(Transaction &transaction, transaction_t commit_id, DataChunk &chunk, TableAppendState &state) {
631 assert(is_root);
632 assert(chunk.column_count() == types.size());
633 chunk.Verify();
634
635 // set up the inserted info in the version manager
636 transient_manager->Append(transaction, state.current_row, chunk.size(), commit_id);
637
638 // append the physical data to each of the entries
639 for (idx_t i = 0; i < types.size(); i++) {
640 columns[i]->Append(state.states[i], chunk.data[i], chunk.size());
641 }
642 info->cardinality += chunk.size();
643 state.current_row += chunk.size();
644}
645
646void DataTable::RevertAppend(TableAppendState &state) {
647 if (state.row_start == state.current_row) {
648 // nothing to revert!
649 return;
650 }
651 assert(is_root);
652 // revert changes in the base columns
653 for (idx_t i = 0; i < types.size(); i++) {
654 columns[i]->RevertAppend(state.row_start);
655 }
656 // adjust the cardinality
657 info->cardinality -= state.current_row - state.row_start;
658 transient_manager->max_row = state.row_start;
659 // revert changes in the transient manager
660 transient_manager->RevertAppend(state.row_start, state.current_row);
661}
662
663//===--------------------------------------------------------------------===//
664// Indexes
665//===--------------------------------------------------------------------===//
666bool DataTable::AppendToIndexes(TableAppendState &state, DataChunk &chunk, row_t row_start) {
667 assert(is_root);
668 if (info->indexes.size() == 0) {
669 return true;
670 }
671 // first generate the vector of row identifiers
672 Vector row_identifiers(ROW_TYPE);
673 VectorOperations::GenerateSequence(row_identifiers, chunk.size(), row_start, 1);
674
675 idx_t failed_index = INVALID_INDEX;
676 // now append the entries to the indices
677 for (idx_t i = 0; i < info->indexes.size(); i++) {
678 if (!info->indexes[i]->Append(state.index_locks[i], chunk, row_identifiers)) {
679 failed_index = i;
680 break;
681 }
682 }
683 if (failed_index != INVALID_INDEX) {
684 // constraint violation!
685 // remove any appended entries from previous indexes (if any)
686 for (idx_t i = 0; i < failed_index; i++) {
687 info->indexes[i]->Delete(state.index_locks[i], chunk, row_identifiers);
688 }
689 return false;
690 }
691 return true;
692}
693
694void DataTable::RemoveFromIndexes(TableAppendState &state, DataChunk &chunk, row_t row_start) {
695 assert(is_root);
696 if (info->indexes.size() == 0) {
697 return;
698 }
699 // first generate the vector of row identifiers
700 Vector row_identifiers(ROW_TYPE);
701 VectorOperations::GenerateSequence(row_identifiers, chunk.size(), row_start, 1);
702
703 // now remove the entries from the indices
704 RemoveFromIndexes(state, chunk, row_identifiers);
705}
706
707void DataTable::RemoveFromIndexes(TableAppendState &state, DataChunk &chunk, Vector &row_identifiers) {
708 assert(is_root);
709 for (idx_t i = 0; i < info->indexes.size(); i++) {
710 info->indexes[i]->Delete(state.index_locks[i], chunk, row_identifiers);
711 }
712}
713
714void DataTable::RemoveFromIndexes(Vector &row_identifiers, idx_t count) {
715 assert(is_root);
716 auto row_ids = FlatVector::GetData<row_t>(row_identifiers);
717 // create a selection vector from the row_ids
718 SelectionVector sel(STANDARD_VECTOR_SIZE);
719 for (idx_t i = 0; i < count; i++) {
720 sel.set_index(i, row_ids[i] % STANDARD_VECTOR_SIZE);
721 }
722
723 // fetch the data for these row identifiers
724 DataChunk result;
725 result.Initialize(types);
726 // FIXME: we do not need to fetch all columns, only the columns required by the indices!
727 auto states = unique_ptr<ColumnScanState[]>(new ColumnScanState[types.size()]);
728 for (idx_t i = 0; i < types.size(); i++) {
729 columns[i]->Fetch(states[i], row_ids[0], result.data[i]);
730 }
731 result.Slice(sel, count);
732 for (idx_t i = 0; i < info->indexes.size(); i++) {
733 info->indexes[i]->Delete(result, row_identifiers);
734 }
735}
736
737//===--------------------------------------------------------------------===//
738// Delete
739//===--------------------------------------------------------------------===//
740void DataTable::Delete(TableCatalogEntry &table, ClientContext &context, Vector &row_identifiers, idx_t count) {
741 assert(row_identifiers.type == ROW_TYPE);
742 if (count == 0) {
743 return;
744 }
745
746 auto &transaction = Transaction::GetTransaction(context);
747
748 row_identifiers.Normalify(count);
749 auto ids = FlatVector::GetData<row_t>(row_identifiers);
750 auto first_id = ids[0];
751
752 if (first_id >= MAX_ROW_ID) {
753 // deletion is in transaction-local storage: push delete into local chunk collection
754 transaction.storage.Delete(this, row_identifiers, count);
755 } else if ((idx_t)first_id < persistent_manager->max_row) {
756 // deletion is in persistent storage: delete in the persistent version manager
757 persistent_manager->Delete(transaction, this, row_identifiers, count);
758 } else {
759 // deletion is in transient storage: delete in the persistent version manager
760 transient_manager->Delete(transaction, this, row_identifiers, count);
761 }
762}
763
764//===--------------------------------------------------------------------===//
765// Update
766//===--------------------------------------------------------------------===//
767static void CreateMockChunk(vector<TypeId> &types, vector<column_t> &column_ids, DataChunk &chunk,
768 DataChunk &mock_chunk) {
769 // construct a mock DataChunk
770 mock_chunk.InitializeEmpty(types);
771 for (column_t i = 0; i < column_ids.size(); i++) {
772 mock_chunk.data[column_ids[i]].Reference(chunk.data[i]);
773 }
774 mock_chunk.SetCardinality(chunk.size());
775}
776
777static bool CreateMockChunk(TableCatalogEntry &table, vector<column_t> &column_ids,
778 unordered_set<column_t> &desired_column_ids, DataChunk &chunk, DataChunk &mock_chunk) {
779 idx_t found_columns = 0;
780 // check whether the desired columns are present in the UPDATE clause
781 for (column_t i = 0; i < column_ids.size(); i++) {
782 if (desired_column_ids.find(column_ids[i]) != desired_column_ids.end()) {
783 found_columns++;
784 }
785 }
786 if (found_columns == 0) {
787 // no columns were found: no need to check the constraint again
788 return false;
789 }
790 if (found_columns != desired_column_ids.size()) {
791 // FIXME: not all columns in UPDATE clause are present!
792 // this should not be triggered at all as the binder should add these columns
793 throw NotImplementedException(
794 "Not all columns required for the CHECK constraint are present in the UPDATED chunk!");
795 }
796 // construct a mock DataChunk
797 auto types = table.GetTypes();
798 CreateMockChunk(types, column_ids, chunk, mock_chunk);
799 return true;
800}
801
802void DataTable::VerifyUpdateConstraints(TableCatalogEntry &table, DataChunk &chunk, vector<column_t> &column_ids) {
803 for (auto &constraint : table.bound_constraints) {
804 switch (constraint->type) {
805 case ConstraintType::NOT_NULL: {
806 auto &not_null = *reinterpret_cast<BoundNotNullConstraint *>(constraint.get());
807 // check if the constraint is in the list of column_ids
808 for (idx_t i = 0; i < column_ids.size(); i++) {
809 if (column_ids[i] == not_null.index) {
810 // found the column id: check the data in
811 VerifyNotNullConstraint(table, chunk.data[i], chunk.size(), table.columns[not_null.index].name);
812 break;
813 }
814 }
815 break;
816 }
817 case ConstraintType::CHECK: {
818 auto &check = *reinterpret_cast<BoundCheckConstraint *>(constraint.get());
819
820 DataChunk mock_chunk;
821 if (CreateMockChunk(table, column_ids, check.bound_columns, chunk, mock_chunk)) {
822 VerifyCheckConstraint(table, *check.expression, mock_chunk);
823 }
824 break;
825 }
826 case ConstraintType::UNIQUE:
827 case ConstraintType::FOREIGN_KEY:
828 break;
829 default:
830 throw NotImplementedException("Constraint type not implemented!");
831 }
832 }
833 // update should not be called for indexed columns!
834 // instead update should have been rewritten to delete + update on higher layer
835#ifdef DEBUG
836 for (idx_t i = 0; i < info->indexes.size(); i++) {
837 assert(!info->indexes[i]->IndexIsUpdated(column_ids));
838 }
839#endif
840}
841
842void DataTable::Update(TableCatalogEntry &table, ClientContext &context, Vector &row_ids, vector<column_t> &column_ids,
843 DataChunk &updates) {
844 assert(row_ids.type == ROW_TYPE);
845
846 updates.Verify();
847 if (updates.size() == 0) {
848 return;
849 }
850
851 // first verify that no constraints are violated
852 VerifyUpdateConstraints(table, updates, column_ids);
853
854 // now perform the actual update
855 auto &transaction = Transaction::GetTransaction(context);
856
857 updates.Normalify();
858 row_ids.Normalify(updates.size());
859 auto first_id = FlatVector::GetValue<row_t>(row_ids, 0);
860 if (first_id >= MAX_ROW_ID) {
861 // update is in transaction-local storage: push update into local storage
862 transaction.storage.Update(this, row_ids, column_ids, updates);
863 return;
864 }
865
866 for (idx_t i = 0; i < column_ids.size(); i++) {
867 auto column = column_ids[i];
868 assert(column != COLUMN_IDENTIFIER_ROW_ID);
869
870 columns[column]->Update(transaction, updates.data[i], row_ids, updates.size());
871 }
872}
873
874//===--------------------------------------------------------------------===//
875// Create Index Scan
876//===--------------------------------------------------------------------===//
877void DataTable::InitializeCreateIndexScan(CreateIndexScanState &state, vector<column_t> column_ids) {
878 // we grab the append lock to make sure nothing is appended until AFTER we finish the index scan
879 state.append_lock = unique_lock<mutex>(append_lock);
880 // get a read lock on the VersionManagers to prevent any further deletions
881 state.locks.push_back(persistent_manager->lock.GetSharedLock());
882 state.locks.push_back(transient_manager->lock.GetSharedLock());
883
884 InitializeScan(state, column_ids);
885}
886
887void DataTable::CreateIndexScan(CreateIndexScanState &state, DataChunk &result) {
888 // scan the persistent segments
889 if (ScanCreateIndex(state, result, state.current_persistent_row, state.max_persistent_row, 0)) {
890 return;
891 }
892 // scan the transient segments
893 if (ScanCreateIndex(state, result, state.current_transient_row, state.max_transient_row,
894 state.max_persistent_row)) {
895 return;
896 }
897}
898
899bool DataTable::ScanCreateIndex(CreateIndexScanState &state, DataChunk &result, idx_t &current_row, idx_t max_row,
900 idx_t base_row) {
901 if (current_row >= max_row) {
902 return false;
903 }
904 idx_t count = std::min((idx_t)STANDARD_VECTOR_SIZE, max_row - current_row);
905
906 // scan the base columns to fetch the actual data
907 // note that we insert all data into the index, even if it is marked as deleted
908 // FIXME: tuples that are already "cleaned up" do not need to be inserted into the index!
909 for (idx_t i = 0; i < state.column_ids.size(); i++) {
910 auto column = state.column_ids[i];
911 if (column == COLUMN_IDENTIFIER_ROW_ID) {
912 // scan row id
913 assert(result.data[i].type == ROW_TYPE);
914 result.data[i].Sequence(base_row + current_row, 1);
915 } else {
916 // scan actual base column
917 columns[column]->IndexScan(state.column_scans[i], result.data[i]);
918 }
919 }
920 result.SetCardinality(count);
921
922 current_row += STANDARD_VECTOR_SIZE;
923 return count > 0;
924}
925
926void DataTable::AddIndex(unique_ptr<Index> index, vector<unique_ptr<Expression>> &expressions) {
927 DataChunk result;
928 result.Initialize(index->types);
929
930 DataChunk intermediate;
931 vector<TypeId> intermediate_types;
932 auto column_ids = index->column_ids;
933 column_ids.push_back(COLUMN_IDENTIFIER_ROW_ID);
934 for (auto &id : index->column_ids) {
935 intermediate_types.push_back(types[id]);
936 }
937 intermediate_types.push_back(ROW_TYPE);
938 intermediate.Initialize(intermediate_types);
939
940 // initialize an index scan
941 CreateIndexScanState state;
942 InitializeCreateIndexScan(state, column_ids);
943
944 if (!is_root) {
945 throw TransactionException("Transaction conflict: cannot add an index to a table that has been altered!");
946 }
947
948 // now start incrementally building the index
949 IndexLock lock;
950 index->InitializeLock(lock);
951 ExpressionExecutor executor(expressions);
952 while (true) {
953 intermediate.Reset();
954 // scan a new chunk from the table to index
955 CreateIndexScan(state, intermediate);
956 if (intermediate.size() == 0) {
957 // finished scanning for index creation
958 // release all locks
959 break;
960 }
961 // resolve the expressions for this chunk
962 executor.Execute(intermediate, result);
963
964 // insert into the index
965 if (!index->Insert(lock, result, intermediate.data[intermediate.column_count() - 1])) {
966 throw ConstraintException("Cant create unique index, table contains duplicate data on indexed column(s)");
967 }
968 }
969 info->indexes.push_back(move(index));
970}
971