| 1 | #include "duckdb/transaction/local_storage.hpp" |
| 2 | #include "duckdb/execution/index/art/art.hpp" |
| 3 | #include "duckdb/storage/table/append_state.hpp" |
| 4 | #include "duckdb/storage/write_ahead_log.hpp" |
| 5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 6 | #include "duckdb/storage/uncompressed_segment.hpp" |
| 7 | |
| 8 | using namespace duckdb; |
| 9 | using namespace std; |
| 10 | |
| 11 | LocalTableStorage::LocalTableStorage(DataTable &table) : max_row(0) { |
| 12 | for (auto &index : table.info->indexes) { |
| 13 | assert(index->type == IndexType::ART); |
| 14 | auto &art = (ART &)*index; |
| 15 | if (art.is_unique) { |
| 16 | // unique index: create a local ART index that maintains the same unique constraint |
| 17 | vector<unique_ptr<Expression>> unbound_expressions; |
| 18 | for (auto &expr : art.unbound_expressions) { |
| 19 | unbound_expressions.push_back(expr->Copy()); |
| 20 | } |
| 21 | indexes.push_back(make_unique<ART>(art.column_ids, move(unbound_expressions), true)); |
| 22 | } |
| 23 | } |
| 24 | } |
| 25 | |
| 26 | LocalTableStorage::~LocalTableStorage() { |
| 27 | } |
| 28 | |
| 29 | void LocalTableStorage::InitializeScan(LocalScanState &state) { |
| 30 | state.storage = this; |
| 31 | |
| 32 | state.chunk_index = 0; |
| 33 | state.max_index = collection.chunks.size() - 1; |
| 34 | state.last_chunk_count = collection.chunks.back()->size(); |
| 35 | } |
| 36 | |
| 37 | void LocalTableStorage::Clear() { |
| 38 | collection.chunks.clear(); |
| 39 | indexes.clear(); |
| 40 | deleted_entries.clear(); |
| 41 | } |
| 42 | |
| 43 | void LocalStorage::InitializeScan(DataTable *table, LocalScanState &state) { |
| 44 | auto entry = table_storage.find(table); |
| 45 | if (entry == table_storage.end()) { |
| 46 | // no local storage for table: set scan to nullptr |
| 47 | state.storage = nullptr; |
| 48 | return; |
| 49 | } |
| 50 | state.storage = entry->second.get(); |
| 51 | state.storage->InitializeScan(state); |
| 52 | } |
| 53 | |
| 54 | void LocalStorage::Scan(LocalScanState &state, const vector<column_t> &column_ids, DataChunk &result, |
| 55 | unordered_map<idx_t, vector<TableFilter>> *table_filters) { |
| 56 | if (!state.storage || state.chunk_index > state.max_index) { |
| 57 | // nothing left to scan |
| 58 | result.Reset(); |
| 59 | return; |
| 60 | } |
| 61 | auto &chunk = *state.storage->collection.chunks[state.chunk_index]; |
| 62 | idx_t chunk_count = state.chunk_index == state.max_index ? state.last_chunk_count : chunk.size(); |
| 63 | idx_t count = chunk_count; |
| 64 | |
| 65 | // first create a selection vector from the deleted entries (if any) |
| 66 | SelectionVector valid_sel(STANDARD_VECTOR_SIZE); |
| 67 | auto entry = state.storage->deleted_entries.find(state.chunk_index); |
| 68 | if (entry != state.storage->deleted_entries.end()) { |
| 69 | // deleted entries! create a selection vector |
| 70 | auto deleted = entry->second.get(); |
| 71 | idx_t new_count = 0; |
| 72 | for (idx_t i = 0; i < count; i++) { |
| 73 | if (!deleted[i]) { |
| 74 | valid_sel.set_index(new_count++, i); |
| 75 | } |
| 76 | } |
| 77 | if (new_count == 0 && count > 0) { |
| 78 | // all entries in this chunk were deleted: continue to next chunk |
| 79 | state.chunk_index++; |
| 80 | Scan(state, column_ids, result, table_filters); |
| 81 | return; |
| 82 | } |
| 83 | count = new_count; |
| 84 | } |
| 85 | |
| 86 | SelectionVector sel; |
| 87 | if (count != chunk_count) { |
| 88 | sel.Initialize(valid_sel); |
| 89 | } else { |
| 90 | sel.Initialize(FlatVector::IncrementalSelectionVector); |
| 91 | } |
| 92 | // now scan the vectors of the chunk |
| 93 | for (idx_t i = 0; i < column_ids.size(); i++) { |
| 94 | auto id = column_ids[i]; |
| 95 | if (id == COLUMN_IDENTIFIER_ROW_ID) { |
| 96 | // row identifier: return a sequence of rowids starting from MAX_ROW_ID plus the row offset in the chunk |
| 97 | result.data[i].Sequence(MAX_ROW_ID + state.chunk_index * STANDARD_VECTOR_SIZE, 1); |
| 98 | } else { |
| 99 | result.data[i].Reference(chunk.data[id]); |
| 100 | } |
| 101 | idx_t approved_tuple_count = count; |
| 102 | if (table_filters) { |
| 103 | auto column_filters = table_filters->find(i); |
| 104 | if (column_filters != table_filters->end()) { |
| 105 | //! We have filters to apply here |
| 106 | for (auto &column_filter : column_filters->second) { |
| 107 | nullmask_t nullmask = FlatVector::Nullmask(result.data[i]); |
| 108 | UncompressedSegment::filterSelection(sel, result.data[i], column_filter, approved_tuple_count, |
| 109 | nullmask); |
| 110 | } |
| 111 | count = approved_tuple_count; |
| 112 | } |
| 113 | } |
| 114 | } |
| 115 | if (count == 0) { |
| 116 | // all entries in this chunk were filtered:: Continue on next chunk |
| 117 | state.chunk_index++; |
| 118 | Scan(state, column_ids, result, table_filters); |
| 119 | return; |
| 120 | } |
| 121 | if (count == chunk_count) { |
| 122 | result.SetCardinality(count); |
| 123 | } else { |
| 124 | result.Slice(sel, count); |
| 125 | } |
| 126 | state.chunk_index++; |
| 127 | } |
| 128 | |
| 129 | void LocalStorage::Append(DataTable *table, DataChunk &chunk) { |
| 130 | auto entry = table_storage.find(table); |
| 131 | LocalTableStorage *storage; |
| 132 | if (entry == table_storage.end()) { |
| 133 | auto new_storage = make_unique<LocalTableStorage>(*table); |
| 134 | storage = new_storage.get(); |
| 135 | table_storage.insert(make_pair(table, move(new_storage))); |
| 136 | } else { |
| 137 | storage = entry->second.get(); |
| 138 | } |
| 139 | // append to unique indices (if any) |
| 140 | if (storage->indexes.size() > 0) { |
| 141 | idx_t base_id = MAX_ROW_ID + storage->collection.count; |
| 142 | |
| 143 | // first generate the vector of row identifiers |
| 144 | Vector row_ids(ROW_TYPE); |
| 145 | VectorOperations::GenerateSequence(row_ids, chunk.size(), base_id, 1); |
| 146 | |
| 147 | // now append the entries to the indices |
| 148 | for (auto &index : storage->indexes) { |
| 149 | if (!index->Append(chunk, row_ids)) { |
| 150 | throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key" ); |
| 151 | } |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | //! Append to the chunk |
| 156 | storage->collection.Append(chunk); |
| 157 | } |
| 158 | |
| 159 | LocalTableStorage *LocalStorage::GetStorage(DataTable *table) { |
| 160 | auto entry = table_storage.find(table); |
| 161 | assert(entry != table_storage.end()); |
| 162 | return entry->second.get(); |
| 163 | } |
| 164 | |
| 165 | static idx_t GetChunk(Vector &row_ids) { |
| 166 | auto ids = FlatVector::GetData<row_t>(row_ids); |
| 167 | auto first_id = ids[0] - MAX_ROW_ID; |
| 168 | |
| 169 | return first_id / STANDARD_VECTOR_SIZE; |
| 170 | } |
| 171 | |
| 172 | void LocalStorage::Delete(DataTable *table, Vector &row_ids, idx_t count) { |
| 173 | auto storage = GetStorage(table); |
| 174 | // figure out the chunk from which these row ids came |
| 175 | idx_t chunk_idx = GetChunk(row_ids); |
| 176 | assert(chunk_idx < storage->collection.chunks.size()); |
| 177 | |
| 178 | // get a pointer to the deleted entries for this chunk |
| 179 | bool *deleted; |
| 180 | auto entry = storage->deleted_entries.find(chunk_idx); |
| 181 | if (entry == storage->deleted_entries.end()) { |
| 182 | // nothing deleted yet, add the deleted entries |
| 183 | auto del_entries = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]); |
| 184 | memset(del_entries.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE); |
| 185 | deleted = del_entries.get(); |
| 186 | storage->deleted_entries.insert(make_pair(chunk_idx, move(del_entries))); |
| 187 | } else { |
| 188 | deleted = entry->second.get(); |
| 189 | } |
| 190 | |
| 191 | // now actually mark the entries as deleted in the deleted vector |
| 192 | idx_t base_index = MAX_ROW_ID + chunk_idx * STANDARD_VECTOR_SIZE; |
| 193 | |
| 194 | auto ids = FlatVector::GetData<row_t>(row_ids); |
| 195 | for (idx_t i = 0; i < count; i++) { |
| 196 | auto id = ids[i] - base_index; |
| 197 | deleted[id] = true; |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | template <class T> |
| 202 | static void update_data(Vector &data_vector, Vector &update_vector, Vector &row_ids, idx_t count, idx_t base_index) { |
| 203 | VectorData udata; |
| 204 | update_vector.Orrify(count, udata); |
| 205 | |
| 206 | auto target = FlatVector::GetData<T>(data_vector); |
| 207 | auto &nullmask = FlatVector::Nullmask(data_vector); |
| 208 | auto ids = FlatVector::GetData<row_t>(row_ids); |
| 209 | auto updates = (T *)udata.data; |
| 210 | |
| 211 | for (idx_t i = 0; i < count; i++) { |
| 212 | auto uidx = udata.sel->get_index(i); |
| 213 | |
| 214 | auto id = ids[i] - base_index; |
| 215 | target[id] = updates[uidx]; |
| 216 | nullmask[id] = (*udata.nullmask)[uidx]; |
| 217 | } |
| 218 | } |
| 219 | |
| 220 | static void update_chunk(Vector &data, Vector &updates, Vector &row_ids, idx_t count, idx_t base_index) { |
| 221 | assert(data.type == updates.type); |
| 222 | assert(row_ids.type == ROW_TYPE); |
| 223 | |
| 224 | switch (data.type) { |
| 225 | case TypeId::INT8: |
| 226 | update_data<int8_t>(data, updates, row_ids, count, base_index); |
| 227 | break; |
| 228 | case TypeId::INT16: |
| 229 | update_data<int16_t>(data, updates, row_ids, count, base_index); |
| 230 | break; |
| 231 | case TypeId::INT32: |
| 232 | update_data<int32_t>(data, updates, row_ids, count, base_index); |
| 233 | break; |
| 234 | case TypeId::INT64: |
| 235 | update_data<int64_t>(data, updates, row_ids, count, base_index); |
| 236 | break; |
| 237 | case TypeId::FLOAT: |
| 238 | update_data<float>(data, updates, row_ids, count, base_index); |
| 239 | break; |
| 240 | case TypeId::DOUBLE: |
| 241 | update_data<double>(data, updates, row_ids, count, base_index); |
| 242 | break; |
| 243 | default: |
| 244 | throw Exception("Unsupported type for in-place update" ); |
| 245 | } |
| 246 | } |
| 247 | |
| 248 | void LocalStorage::Update(DataTable *table, Vector &row_ids, vector<column_t> &column_ids, DataChunk &data) { |
| 249 | auto storage = GetStorage(table); |
| 250 | // figure out the chunk from which these row ids came |
| 251 | idx_t chunk_idx = GetChunk(row_ids); |
| 252 | assert(chunk_idx < storage->collection.chunks.size()); |
| 253 | |
| 254 | idx_t base_index = MAX_ROW_ID + chunk_idx * STANDARD_VECTOR_SIZE; |
| 255 | |
| 256 | // now perform the actual update |
| 257 | auto &chunk = *storage->collection.chunks[chunk_idx]; |
| 258 | for (idx_t i = 0; i < column_ids.size(); i++) { |
| 259 | auto col_idx = column_ids[i]; |
| 260 | update_chunk(chunk.data[col_idx], data.data[i], row_ids, data.size(), base_index); |
| 261 | } |
| 262 | } |
| 263 | |
| 264 | template <class T> bool LocalStorage::ScanTableStorage(DataTable *table, LocalTableStorage *storage, T &&fun) { |
| 265 | vector<column_t> column_ids; |
| 266 | for (idx_t i = 0; i < table->types.size(); i++) { |
| 267 | column_ids.push_back(i); |
| 268 | } |
| 269 | |
| 270 | DataChunk chunk; |
| 271 | chunk.Initialize(table->types); |
| 272 | |
| 273 | // initialize the scan |
| 274 | LocalScanState state; |
| 275 | storage->InitializeScan(state); |
| 276 | |
| 277 | while (true) { |
| 278 | Scan(state, column_ids, chunk); |
| 279 | if (chunk.size() == 0) { |
| 280 | return true; |
| 281 | } |
| 282 | if (!fun(chunk)) { |
| 283 | return false; |
| 284 | } |
| 285 | } |
| 286 | } |
| 287 | |
| 288 | void LocalStorage::Commit(LocalStorage::CommitState &commit_state, Transaction &transaction, WriteAheadLog *log, |
| 289 | transaction_t commit_id) { |
| 290 | // commit local storage, iterate over all entries in the table storage map |
| 291 | for (auto &entry : table_storage) { |
| 292 | auto table = entry.first; |
| 293 | auto storage = entry.second.get(); |
| 294 | |
| 295 | // initialize the append state |
| 296 | auto append_state_ptr = make_unique<TableAppendState>(); |
| 297 | auto &append_state = *append_state_ptr; |
| 298 | // add it to the set of append states |
| 299 | commit_state.append_states[table] = move(append_state_ptr); |
| 300 | table->InitializeAppend(append_state); |
| 301 | |
| 302 | if (log && !table->info->IsTemporary()) { |
| 303 | log->WriteSetTable(table->info->schema, table->info->table); |
| 304 | } |
| 305 | |
| 306 | // scan all chunks in this storage |
| 307 | ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool { |
| 308 | // append this chunk to the indexes of the table |
| 309 | if (!table->AppendToIndexes(append_state, chunk, append_state.current_row)) { |
| 310 | throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key" ); |
| 311 | } |
| 312 | |
| 313 | // append to base table |
| 314 | table->Append(transaction, commit_id, chunk, append_state); |
| 315 | // if there is a WAL, write the chunk to there as well |
| 316 | if (log && !table->info->IsTemporary()) { |
| 317 | log->WriteInsert(chunk); |
| 318 | } |
| 319 | return true; |
| 320 | }); |
| 321 | } |
| 322 | // finished commit: clear local storage |
| 323 | for (auto &entry : table_storage) { |
| 324 | entry.second->Clear(); |
| 325 | } |
| 326 | table_storage.clear(); |
| 327 | } |
| 328 | |
| 329 | void LocalStorage::RevertCommit(LocalStorage::CommitState &commit_state) { |
| 330 | for (auto &entry : commit_state.append_states) { |
| 331 | auto table = entry.first; |
| 332 | auto storage = table_storage[table].get(); |
| 333 | auto &append_state = *entry.second; |
| 334 | if (table->info->indexes.size() > 0 && !table->info->IsTemporary()) { |
| 335 | row_t current_row = append_state.row_start; |
| 336 | // remove the data from the indexes, if there are any indexes |
| 337 | ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool { |
| 338 | // append this chunk to the indexes of the table |
| 339 | table->RemoveFromIndexes(append_state, chunk, current_row); |
| 340 | |
| 341 | current_row += chunk.size(); |
| 342 | if (current_row >= append_state.current_row) { |
| 343 | // finished deleting all rows from the index: abort now |
| 344 | return false; |
| 345 | } |
| 346 | return true; |
| 347 | }); |
| 348 | } |
| 349 | |
| 350 | table->RevertAppend(*entry.second); |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | void LocalStorage::AddColumn(DataTable *old_dt, DataTable *new_dt, ColumnDefinition &new_column, |
| 355 | Expression *default_value) { |
| 356 | // check if there are any pending appends for the old version of the table |
| 357 | auto entry = table_storage.find(old_dt); |
| 358 | if (entry == table_storage.end()) { |
| 359 | return; |
| 360 | } |
| 361 | // take over the storage from the old entry |
| 362 | auto new_storage = move(entry->second); |
| 363 | |
| 364 | // now add the new column filled with the default value to all chunks |
| 365 | auto new_column_type = GetInternalType(new_column.type); |
| 366 | ExpressionExecutor executor; |
| 367 | DataChunk dummy_chunk; |
| 368 | if (default_value) { |
| 369 | executor.AddExpression(*default_value); |
| 370 | } |
| 371 | |
| 372 | new_storage->collection.types.push_back(new_column_type); |
| 373 | for (idx_t chunk_idx = 0; chunk_idx < new_storage->collection.chunks.size(); chunk_idx++) { |
| 374 | auto &chunk = new_storage->collection.chunks[chunk_idx]; |
| 375 | Vector result(new_column_type); |
| 376 | if (default_value) { |
| 377 | dummy_chunk.SetCardinality(chunk->size()); |
| 378 | executor.ExecuteExpression(dummy_chunk, result); |
| 379 | } else { |
| 380 | FlatVector::Nullmask(result).set(); |
| 381 | } |
| 382 | chunk->data.push_back(move(result)); |
| 383 | } |
| 384 | |
| 385 | table_storage.erase(entry); |
| 386 | table_storage[new_dt] = move(new_storage); |
| 387 | } |
| 388 | |
| 389 | void LocalStorage::ChangeType(DataTable *old_dt, DataTable *new_dt, idx_t changed_idx, SQLType target_type, |
| 390 | vector<column_t> bound_columns, Expression &cast_expr) { |
| 391 | // check if there are any pending appends for the old version of the table |
| 392 | auto entry = table_storage.find(old_dt); |
| 393 | if (entry == table_storage.end()) { |
| 394 | return; |
| 395 | } |
| 396 | throw NotImplementedException("FIXME: ALTER TYPE with transaction local data not currently supported" ); |
| 397 | } |
| 398 | |