| 1 | #include "duckdb/storage/table/row_group_collection.hpp" |
| 2 | #include "duckdb/storage/table/persistent_table_data.hpp" |
| 3 | #include "duckdb/execution/expression_executor.hpp" |
| 4 | #include "duckdb/main/client_context.hpp" |
| 5 | #include "duckdb/storage/data_table.hpp" |
| 6 | #include "duckdb/planner/constraints/bound_not_null_constraint.hpp" |
| 7 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
| 8 | #include "duckdb/storage/table/row_group_segment_tree.hpp" |
| 9 | #include "duckdb/storage/meta_block_reader.hpp" |
| 10 | #include "duckdb/storage/table/append_state.hpp" |
| 11 | #include "duckdb/storage/table/scan_state.hpp" |
| 12 | #include "duckdb/storage/table_storage_info.hpp" |
| 13 | |
| 14 | namespace duckdb { |
| 15 | |
| 16 | //===--------------------------------------------------------------------===// |
| 17 | // Row Group Segment Tree |
| 18 | //===--------------------------------------------------------------------===// |
| 19 | RowGroupSegmentTree::RowGroupSegmentTree(RowGroupCollection &collection) |
| 20 | : SegmentTree<RowGroup, true>(), collection(collection), current_row_group(0), max_row_group(0) { |
| 21 | } |
| 22 | RowGroupSegmentTree::~RowGroupSegmentTree() { |
| 23 | } |
| 24 | |
| 25 | void RowGroupSegmentTree::Initialize(PersistentTableData &data) { |
| 26 | D_ASSERT(data.row_group_count > 0); |
| 27 | current_row_group = 0; |
| 28 | max_row_group = data.row_group_count; |
| 29 | finished_loading = false; |
| 30 | reader = make_uniq<MetaBlockReader>(args&: collection.GetBlockManager(), args&: data.block_id); |
| 31 | reader->offset = data.offset; |
| 32 | } |
| 33 | |
| 34 | unique_ptr<RowGroup> RowGroupSegmentTree::LoadSegment() { |
| 35 | if (current_row_group >= max_row_group) { |
| 36 | finished_loading = true; |
| 37 | return nullptr; |
| 38 | } |
| 39 | auto row_group_pointer = RowGroup::Deserialize(main_source&: *reader, columns: collection.GetTypes()); |
| 40 | current_row_group++; |
| 41 | return make_uniq<RowGroup>(args&: collection, args: std::move(row_group_pointer)); |
| 42 | } |
| 43 | |
| 44 | //===--------------------------------------------------------------------===// |
| 45 | // Row Group Collection |
| 46 | //===--------------------------------------------------------------------===// |
| 47 | RowGroupCollection::RowGroupCollection(shared_ptr<DataTableInfo> info_p, BlockManager &block_manager, |
| 48 | vector<LogicalType> types_p, idx_t row_start_p, idx_t total_rows_p) |
| 49 | : block_manager(block_manager), total_rows(total_rows_p), info(std::move(info_p)), types(std::move(types_p)), |
| 50 | row_start(row_start_p) { |
| 51 | row_groups = make_shared<RowGroupSegmentTree>(args&: *this); |
| 52 | } |
| 53 | |
| 54 | idx_t RowGroupCollection::GetTotalRows() const { |
| 55 | return total_rows.load(); |
| 56 | } |
| 57 | |
| 58 | const vector<LogicalType> &RowGroupCollection::GetTypes() const { |
| 59 | return types; |
| 60 | } |
| 61 | |
| 62 | Allocator &RowGroupCollection::GetAllocator() const { |
| 63 | return Allocator::Get(db&: info->db); |
| 64 | } |
| 65 | |
| 66 | AttachedDatabase &RowGroupCollection::GetAttached() { |
| 67 | return GetTableInfo().db; |
| 68 | } |
| 69 | |
| 70 | DatabaseInstance &RowGroupCollection::GetDatabase() { |
| 71 | return GetAttached().GetDatabase(); |
| 72 | } |
| 73 | |
| 74 | //===--------------------------------------------------------------------===// |
| 75 | // Initialize |
| 76 | //===--------------------------------------------------------------------===// |
| 77 | void RowGroupCollection::Initialize(PersistentTableData &data) { |
| 78 | D_ASSERT(this->row_start == 0); |
| 79 | auto l = row_groups->Lock(); |
| 80 | this->total_rows = data.total_rows; |
| 81 | row_groups->Initialize(data); |
| 82 | stats.Initialize(types, data); |
| 83 | } |
| 84 | |
| 85 | void RowGroupCollection::InitializeEmpty() { |
| 86 | stats.InitializeEmpty(types); |
| 87 | } |
| 88 | |
| 89 | void RowGroupCollection::AppendRowGroup(SegmentLock &l, idx_t start_row) { |
| 90 | D_ASSERT(start_row >= row_start); |
| 91 | auto new_row_group = make_uniq<RowGroup>(args&: *this, args&: start_row, args: 0); |
| 92 | new_row_group->InitializeEmpty(types); |
| 93 | row_groups->AppendSegment(l, segment: std::move(new_row_group)); |
| 94 | } |
| 95 | |
| 96 | RowGroup *RowGroupCollection::GetRowGroup(int64_t index) { |
| 97 | return (RowGroup *)row_groups->GetSegmentByIndex(index); |
| 98 | } |
| 99 | |
| 100 | idx_t RowGroupCollection::RowGroupCount() { |
| 101 | return row_groups->GetSegmentCount(); |
| 102 | } |
| 103 | |
| 104 | void RowGroupCollection::Verify() { |
| 105 | #ifdef DEBUG |
| 106 | idx_t current_total_rows = 0; |
| 107 | row_groups->Verify(); |
| 108 | for (auto &row_group : row_groups->Segments()) { |
| 109 | row_group.Verify(); |
| 110 | D_ASSERT(&row_group.GetCollection() == this); |
| 111 | D_ASSERT(row_group.start == this->row_start + current_total_rows); |
| 112 | current_total_rows += row_group.count; |
| 113 | } |
| 114 | D_ASSERT(current_total_rows == total_rows.load()); |
| 115 | #endif |
| 116 | } |
| 117 | |
| 118 | //===--------------------------------------------------------------------===// |
| 119 | // Scan |
| 120 | //===--------------------------------------------------------------------===// |
| 121 | void RowGroupCollection::InitializeScan(CollectionScanState &state, const vector<column_t> &column_ids, |
| 122 | TableFilterSet *table_filters) { |
| 123 | auto row_group = row_groups->GetRootSegment(); |
| 124 | D_ASSERT(row_group); |
| 125 | state.row_groups = row_groups.get(); |
| 126 | state.max_row = row_start + total_rows; |
| 127 | state.Initialize(types: GetTypes()); |
| 128 | while (row_group && !row_group->InitializeScan(state)) { |
| 129 | row_group = row_groups->GetNextSegment(segment: row_group); |
| 130 | } |
| 131 | } |
| 132 | |
| 133 | void RowGroupCollection::InitializeCreateIndexScan(CreateIndexScanState &state) { |
| 134 | state.segment_lock = row_groups->Lock(); |
| 135 | } |
| 136 | |
| 137 | void RowGroupCollection::InitializeScanWithOffset(CollectionScanState &state, const vector<column_t> &column_ids, |
| 138 | idx_t start_row, idx_t end_row) { |
| 139 | auto row_group = row_groups->GetSegment(row_number: start_row); |
| 140 | D_ASSERT(row_group); |
| 141 | state.row_groups = row_groups.get(); |
| 142 | state.max_row = end_row; |
| 143 | state.Initialize(types: GetTypes()); |
| 144 | idx_t start_vector = (start_row - row_group->start) / STANDARD_VECTOR_SIZE; |
| 145 | if (!row_group->InitializeScanWithOffset(state, vector_offset: start_vector)) { |
| 146 | throw InternalException("Failed to initialize row group scan with offset" ); |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | bool RowGroupCollection::InitializeScanInRowGroup(CollectionScanState &state, RowGroupCollection &collection, |
| 151 | RowGroup &row_group, idx_t vector_index, idx_t max_row) { |
| 152 | state.max_row = max_row; |
| 153 | state.row_groups = collection.row_groups.get(); |
| 154 | if (!state.column_scans) { |
| 155 | // initialize the scan state |
| 156 | state.Initialize(types: collection.GetTypes()); |
| 157 | } |
| 158 | return row_group.InitializeScanWithOffset(state, vector_offset: vector_index); |
| 159 | } |
| 160 | |
| 161 | void RowGroupCollection::InitializeParallelScan(ParallelCollectionScanState &state) { |
| 162 | state.collection = this; |
| 163 | state.current_row_group = row_groups->GetRootSegment(); |
| 164 | state.vector_index = 0; |
| 165 | state.max_row = row_start + total_rows; |
| 166 | state.batch_index = 0; |
| 167 | state.processed_rows = 0; |
| 168 | } |
| 169 | |
| 170 | bool RowGroupCollection::NextParallelScan(ClientContext &context, ParallelCollectionScanState &state, |
| 171 | CollectionScanState &scan_state) { |
| 172 | while (true) { |
| 173 | idx_t vector_index; |
| 174 | idx_t max_row; |
| 175 | RowGroupCollection *collection; |
| 176 | RowGroup *row_group; |
| 177 | { |
| 178 | // select the next row group to scan from the parallel state |
| 179 | lock_guard<mutex> l(state.lock); |
| 180 | if (!state.current_row_group || state.current_row_group->count == 0) { |
| 181 | // no more data left to scan |
| 182 | break; |
| 183 | } |
| 184 | collection = state.collection; |
| 185 | row_group = state.current_row_group; |
| 186 | if (ClientConfig::GetConfig(context).verify_parallelism) { |
| 187 | vector_index = state.vector_index; |
| 188 | max_row = state.current_row_group->start + |
| 189 | MinValue<idx_t>(a: state.current_row_group->count, |
| 190 | STANDARD_VECTOR_SIZE * state.vector_index + STANDARD_VECTOR_SIZE); |
| 191 | D_ASSERT(vector_index * STANDARD_VECTOR_SIZE < state.current_row_group->count); |
| 192 | state.vector_index++; |
| 193 | if (state.vector_index * STANDARD_VECTOR_SIZE >= state.current_row_group->count) { |
| 194 | state.current_row_group = row_groups->GetNextSegment(segment: state.current_row_group); |
| 195 | state.vector_index = 0; |
| 196 | } |
| 197 | } else { |
| 198 | state.processed_rows += state.current_row_group->count; |
| 199 | vector_index = 0; |
| 200 | max_row = state.current_row_group->start + state.current_row_group->count; |
| 201 | state.current_row_group = row_groups->GetNextSegment(segment: state.current_row_group); |
| 202 | } |
| 203 | max_row = MinValue<idx_t>(a: max_row, b: state.max_row); |
| 204 | scan_state.batch_index = ++state.batch_index; |
| 205 | } |
| 206 | D_ASSERT(collection); |
| 207 | D_ASSERT(row_group); |
| 208 | |
| 209 | // initialize the scan for this row group |
| 210 | bool need_to_scan = InitializeScanInRowGroup(state&: scan_state, collection&: *collection, row_group&: *row_group, vector_index, max_row); |
| 211 | if (!need_to_scan) { |
| 212 | // skip this row group |
| 213 | continue; |
| 214 | } |
| 215 | return true; |
| 216 | } |
| 217 | return false; |
| 218 | } |
| 219 | |
| 220 | bool RowGroupCollection::Scan(DuckTransaction &transaction, const vector<column_t> &column_ids, |
| 221 | const std::function<bool(DataChunk &chunk)> &fun) { |
| 222 | vector<LogicalType> scan_types; |
| 223 | for (idx_t i = 0; i < column_ids.size(); i++) { |
| 224 | scan_types.push_back(x: types[column_ids[i]]); |
| 225 | } |
| 226 | DataChunk chunk; |
| 227 | chunk.Initialize(allocator&: GetAllocator(), types: scan_types); |
| 228 | |
| 229 | // initialize the scan |
| 230 | TableScanState state; |
| 231 | state.Initialize(column_ids, table_filters: nullptr); |
| 232 | InitializeScan(state&: state.local_state, column_ids, table_filters: nullptr); |
| 233 | |
| 234 | while (true) { |
| 235 | chunk.Reset(); |
| 236 | state.local_state.Scan(transaction, result&: chunk); |
| 237 | if (chunk.size() == 0) { |
| 238 | return true; |
| 239 | } |
| 240 | if (!fun(chunk)) { |
| 241 | return false; |
| 242 | } |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | bool RowGroupCollection::Scan(DuckTransaction &transaction, const std::function<bool(DataChunk &chunk)> &fun) { |
| 247 | vector<column_t> column_ids; |
| 248 | column_ids.reserve(n: types.size()); |
| 249 | for (idx_t i = 0; i < types.size(); i++) { |
| 250 | column_ids.push_back(x: i); |
| 251 | } |
| 252 | return Scan(transaction, column_ids, fun); |
| 253 | } |
| 254 | |
| 255 | //===--------------------------------------------------------------------===// |
| 256 | // Fetch |
| 257 | //===--------------------------------------------------------------------===// |
| 258 | void RowGroupCollection::Fetch(TransactionData transaction, DataChunk &result, const vector<column_t> &column_ids, |
| 259 | const Vector &row_identifiers, idx_t fetch_count, ColumnFetchState &state) { |
| 260 | // figure out which row_group to fetch from |
| 261 | auto row_ids = FlatVector::GetData<row_t>(vector: row_identifiers); |
| 262 | idx_t count = 0; |
| 263 | for (idx_t i = 0; i < fetch_count; i++) { |
| 264 | auto row_id = row_ids[i]; |
| 265 | RowGroup *row_group; |
| 266 | { |
| 267 | idx_t segment_index; |
| 268 | auto l = row_groups->Lock(); |
| 269 | if (!row_groups->TryGetSegmentIndex(l, row_number: row_id, result&: segment_index)) { |
| 270 | // in parallel append scenarios it is possible for the row_id |
| 271 | continue; |
| 272 | } |
| 273 | row_group = row_groups->GetSegmentByIndex(l, index: segment_index); |
| 274 | } |
| 275 | if (!row_group->Fetch(transaction, row: row_id - row_group->start)) { |
| 276 | continue; |
| 277 | } |
| 278 | row_group->FetchRow(transaction, state, column_ids, row_id, result, result_idx: count); |
| 279 | count++; |
| 280 | } |
| 281 | result.SetCardinality(count); |
| 282 | } |
| 283 | |
| 284 | //===--------------------------------------------------------------------===// |
| 285 | // Append |
| 286 | //===--------------------------------------------------------------------===// |
| 287 | TableAppendState::TableAppendState() |
| 288 | : row_group_append_state(*this), total_append_count(0), start_row_group(nullptr), transaction(0, 0), remaining(0) { |
| 289 | } |
| 290 | |
| 291 | TableAppendState::~TableAppendState() { |
| 292 | D_ASSERT(Exception::UncaughtException() || remaining == 0); |
| 293 | } |
| 294 | |
| 295 | bool RowGroupCollection::IsEmpty() const { |
| 296 | auto l = row_groups->Lock(); |
| 297 | return IsEmpty(l); |
| 298 | } |
| 299 | |
| 300 | bool RowGroupCollection::IsEmpty(SegmentLock &l) const { |
| 301 | return row_groups->IsEmpty(l); |
| 302 | } |
| 303 | |
| 304 | void RowGroupCollection::InitializeAppend(TransactionData transaction, TableAppendState &state, idx_t append_count) { |
| 305 | state.row_start = total_rows; |
| 306 | state.current_row = state.row_start; |
| 307 | state.total_append_count = 0; |
| 308 | |
| 309 | // start writing to the row_groups |
| 310 | auto l = row_groups->Lock(); |
| 311 | if (IsEmpty(l)) { |
| 312 | // empty row group collection: empty first row group |
| 313 | AppendRowGroup(l, start_row: row_start); |
| 314 | } |
| 315 | state.start_row_group = row_groups->GetLastSegment(l); |
| 316 | D_ASSERT(this->row_start + total_rows == state.start_row_group->start + state.start_row_group->count); |
| 317 | state.start_row_group->InitializeAppend(append_state&: state.row_group_append_state); |
| 318 | state.remaining = append_count; |
| 319 | state.transaction = transaction; |
| 320 | if (state.remaining > 0) { |
| 321 | state.start_row_group->AppendVersionInfo(transaction, count: state.remaining); |
| 322 | total_rows += state.remaining; |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | void RowGroupCollection::InitializeAppend(TableAppendState &state) { |
| 327 | TransactionData tdata(0, 0); |
| 328 | InitializeAppend(transaction: tdata, state, append_count: 0); |
| 329 | } |
| 330 | |
| 331 | bool RowGroupCollection::Append(DataChunk &chunk, TableAppendState &state) { |
| 332 | D_ASSERT(chunk.ColumnCount() == types.size()); |
| 333 | chunk.Verify(); |
| 334 | |
| 335 | bool new_row_group = false; |
| 336 | idx_t append_count = chunk.size(); |
| 337 | idx_t remaining = chunk.size(); |
| 338 | state.total_append_count += append_count; |
| 339 | while (true) { |
| 340 | auto current_row_group = state.row_group_append_state.row_group; |
| 341 | // check how much we can fit into the current row_group |
| 342 | idx_t append_count = |
| 343 | MinValue<idx_t>(a: remaining, b: RowGroup::ROW_GROUP_SIZE - state.row_group_append_state.offset_in_row_group); |
| 344 | if (append_count > 0) { |
| 345 | current_row_group->Append(state&: state.row_group_append_state, chunk, append_count); |
| 346 | // merge the stats |
| 347 | auto stats_lock = stats.GetLock(); |
| 348 | for (idx_t i = 0; i < types.size(); i++) { |
| 349 | current_row_group->MergeIntoStatistics(column_idx: i, other&: stats.GetStats(i).Statistics()); |
| 350 | } |
| 351 | } |
| 352 | remaining -= append_count; |
| 353 | if (state.remaining > 0) { |
| 354 | state.remaining -= append_count; |
| 355 | } |
| 356 | if (remaining > 0) { |
| 357 | // we expect max 1 iteration of this loop (i.e. a single chunk should never overflow more than one |
| 358 | // row_group) |
| 359 | D_ASSERT(chunk.size() == remaining + append_count); |
| 360 | // slice the input chunk |
| 361 | if (remaining < chunk.size()) { |
| 362 | SelectionVector sel(remaining); |
| 363 | for (idx_t i = 0; i < remaining; i++) { |
| 364 | sel.set_index(idx: i, loc: append_count + i); |
| 365 | } |
| 366 | chunk.Slice(sel_vector: sel, count: remaining); |
| 367 | } |
| 368 | // append a new row_group |
| 369 | new_row_group = true; |
| 370 | auto next_start = current_row_group->start + state.row_group_append_state.offset_in_row_group; |
| 371 | |
| 372 | auto l = row_groups->Lock(); |
| 373 | AppendRowGroup(l, start_row: next_start); |
| 374 | // set up the append state for this row_group |
| 375 | auto last_row_group = row_groups->GetLastSegment(l); |
| 376 | last_row_group->InitializeAppend(append_state&: state.row_group_append_state); |
| 377 | if (state.remaining > 0) { |
| 378 | last_row_group->AppendVersionInfo(transaction: state.transaction, count: state.remaining); |
| 379 | } |
| 380 | continue; |
| 381 | } else { |
| 382 | break; |
| 383 | } |
| 384 | } |
| 385 | state.current_row += append_count; |
| 386 | auto stats_lock = stats.GetLock(); |
| 387 | for (idx_t col_idx = 0; col_idx < types.size(); col_idx++) { |
| 388 | stats.GetStats(i: col_idx).UpdateDistinctStatistics(v&: chunk.data[col_idx], count: chunk.size()); |
| 389 | } |
| 390 | return new_row_group; |
| 391 | } |
| 392 | |
| 393 | void RowGroupCollection::FinalizeAppend(TransactionData transaction, TableAppendState &state) { |
| 394 | auto remaining = state.total_append_count; |
| 395 | auto row_group = state.start_row_group; |
| 396 | while (remaining > 0) { |
| 397 | auto append_count = MinValue<idx_t>(a: remaining, b: RowGroup::ROW_GROUP_SIZE - row_group->count); |
| 398 | row_group->AppendVersionInfo(transaction, count: append_count); |
| 399 | remaining -= append_count; |
| 400 | row_group = row_groups->GetNextSegment(segment: row_group); |
| 401 | } |
| 402 | total_rows += state.total_append_count; |
| 403 | |
| 404 | state.total_append_count = 0; |
| 405 | state.start_row_group = nullptr; |
| 406 | |
| 407 | Verify(); |
| 408 | } |
| 409 | |
| 410 | void RowGroupCollection::CommitAppend(transaction_t commit_id, idx_t row_start, idx_t count) { |
| 411 | auto row_group = row_groups->GetSegment(row_number: row_start); |
| 412 | D_ASSERT(row_group); |
| 413 | idx_t current_row = row_start; |
| 414 | idx_t remaining = count; |
| 415 | while (true) { |
| 416 | idx_t start_in_row_group = current_row - row_group->start; |
| 417 | idx_t append_count = MinValue<idx_t>(a: row_group->count - start_in_row_group, b: remaining); |
| 418 | |
| 419 | row_group->CommitAppend(commit_id, row_group_start: start_in_row_group, count: append_count); |
| 420 | |
| 421 | current_row += append_count; |
| 422 | remaining -= append_count; |
| 423 | if (remaining == 0) { |
| 424 | break; |
| 425 | } |
| 426 | row_group = row_groups->GetNextSegment(segment: row_group); |
| 427 | } |
| 428 | } |
| 429 | |
| 430 | void RowGroupCollection::RevertAppendInternal(idx_t start_row, idx_t count) { |
| 431 | if (total_rows != start_row + count) { |
| 432 | throw InternalException("Interleaved appends: this should no longer happen" ); |
| 433 | } |
| 434 | total_rows = start_row; |
| 435 | |
| 436 | auto l = row_groups->Lock(); |
| 437 | // find the segment index that the current row belongs to |
| 438 | idx_t segment_index = row_groups->GetSegmentIndex(l, row_number: start_row); |
| 439 | auto segment = row_groups->GetSegmentByIndex(l, index: segment_index); |
| 440 | auto &info = *segment; |
| 441 | |
| 442 | // remove any segments AFTER this segment: they should be deleted entirely |
| 443 | row_groups->EraseSegments(l, segment_start: segment_index); |
| 444 | |
| 445 | info.next = nullptr; |
| 446 | info.RevertAppend(row_group_start: start_row); |
| 447 | } |
| 448 | |
| 449 | void RowGroupCollection::MergeStorage(RowGroupCollection &data) { |
| 450 | D_ASSERT(data.types == types); |
| 451 | auto index = row_start + total_rows.load(); |
| 452 | auto segments = data.row_groups->MoveSegments(); |
| 453 | for (auto &entry : segments) { |
| 454 | auto &row_group = entry.node; |
| 455 | row_group->MoveToCollection(collection&: *this, new_start: index); |
| 456 | index += row_group->count; |
| 457 | row_groups->AppendSegment(segment: std::move(row_group)); |
| 458 | } |
| 459 | stats.MergeStats(other&: data.stats); |
| 460 | total_rows += data.total_rows.load(); |
| 461 | } |
| 462 | |
| 463 | //===--------------------------------------------------------------------===// |
| 464 | // Delete |
| 465 | //===--------------------------------------------------------------------===// |
| 466 | idx_t RowGroupCollection::Delete(TransactionData transaction, DataTable &table, row_t *ids, idx_t count) { |
| 467 | idx_t delete_count = 0; |
| 468 | // delete is in the row groups |
| 469 | // we need to figure out for each id to which row group it belongs |
| 470 | // usually all (or many) ids belong to the same row group |
| 471 | // we iterate over the ids and check for every id if it belongs to the same row group as their predecessor |
| 472 | idx_t pos = 0; |
| 473 | do { |
| 474 | idx_t start = pos; |
| 475 | auto row_group = row_groups->GetSegment(row_number: ids[start]); |
| 476 | for (pos++; pos < count; pos++) { |
| 477 | D_ASSERT(ids[pos] >= 0); |
| 478 | // check if this id still belongs to this row group |
| 479 | if (idx_t(ids[pos]) < row_group->start) { |
| 480 | // id is before row_group start -> it does not |
| 481 | break; |
| 482 | } |
| 483 | if (idx_t(ids[pos]) >= row_group->start + row_group->count) { |
| 484 | // id is after row group end -> it does not |
| 485 | break; |
| 486 | } |
| 487 | } |
| 488 | delete_count += row_group->Delete(transaction, table, ids: ids + start, count: pos - start); |
| 489 | } while (pos < count); |
| 490 | return delete_count; |
| 491 | } |
| 492 | |
| 493 | //===--------------------------------------------------------------------===// |
| 494 | // Update |
| 495 | //===--------------------------------------------------------------------===// |
| 496 | void RowGroupCollection::Update(TransactionData transaction, row_t *ids, const vector<PhysicalIndex> &column_ids, |
| 497 | DataChunk &updates) { |
| 498 | idx_t pos = 0; |
| 499 | do { |
| 500 | idx_t start = pos; |
| 501 | auto row_group = row_groups->GetSegment(row_number: ids[pos]); |
| 502 | row_t base_id = |
| 503 | row_group->start + ((ids[pos] - row_group->start) / STANDARD_VECTOR_SIZE * STANDARD_VECTOR_SIZE); |
| 504 | row_t max_id = MinValue<row_t>(a: base_id + STANDARD_VECTOR_SIZE, b: row_group->start + row_group->count); |
| 505 | for (pos++; pos < updates.size(); pos++) { |
| 506 | D_ASSERT(ids[pos] >= 0); |
| 507 | // check if this id still belongs to this vector in this row group |
| 508 | if (ids[pos] < base_id) { |
| 509 | // id is before vector start -> it does not |
| 510 | break; |
| 511 | } |
| 512 | if (ids[pos] >= max_id) { |
| 513 | // id is after the maximum id in this vector -> it does not |
| 514 | break; |
| 515 | } |
| 516 | } |
| 517 | row_group->Update(transaction, update_chunk&: updates, ids, offset: start, count: pos - start, column_ids); |
| 518 | |
| 519 | auto l = stats.GetLock(); |
| 520 | for (idx_t i = 0; i < column_ids.size(); i++) { |
| 521 | auto column_id = column_ids[i]; |
| 522 | stats.MergeStats(lock&: *l, i: column_id.index, stats&: *row_group->GetStatistics(column_idx: column_id.index)); |
| 523 | } |
| 524 | } while (pos < updates.size()); |
| 525 | } |
| 526 | |
| 527 | void RowGroupCollection::RemoveFromIndexes(TableIndexList &indexes, Vector &row_identifiers, idx_t count) { |
| 528 | auto row_ids = FlatVector::GetData<row_t>(vector&: row_identifiers); |
| 529 | |
| 530 | // initialize the fetch state |
| 531 | // FIXME: we do not need to fetch all columns, only the columns required by the indices! |
| 532 | TableScanState state; |
| 533 | vector<column_t> column_ids; |
| 534 | column_ids.reserve(n: types.size()); |
| 535 | for (idx_t i = 0; i < types.size(); i++) { |
| 536 | column_ids.push_back(x: i); |
| 537 | } |
| 538 | state.Initialize(column_ids: std::move(column_ids)); |
| 539 | state.table_state.max_row = row_start + total_rows; |
| 540 | |
| 541 | // initialize the fetch chunk |
| 542 | DataChunk result; |
| 543 | result.Initialize(allocator&: GetAllocator(), types); |
| 544 | |
| 545 | SelectionVector sel(STANDARD_VECTOR_SIZE); |
| 546 | // now iterate over the row ids |
| 547 | for (idx_t r = 0; r < count;) { |
| 548 | result.Reset(); |
| 549 | // figure out which row_group to fetch from |
| 550 | auto row_id = row_ids[r]; |
| 551 | auto row_group = row_groups->GetSegment(row_number: row_id); |
| 552 | auto row_group_vector_idx = (row_id - row_group->start) / STANDARD_VECTOR_SIZE; |
| 553 | auto base_row_id = row_group_vector_idx * STANDARD_VECTOR_SIZE + row_group->start; |
| 554 | |
| 555 | // fetch the current vector |
| 556 | state.table_state.Initialize(types: GetTypes()); |
| 557 | row_group->InitializeScanWithOffset(state&: state.table_state, vector_offset: row_group_vector_idx); |
| 558 | row_group->ScanCommitted(state&: state.table_state, result, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS); |
| 559 | result.Verify(); |
| 560 | |
| 561 | // check for any remaining row ids if they also fall into this vector |
| 562 | // we try to fetch handle as many rows as possible at the same time |
| 563 | idx_t sel_count = 0; |
| 564 | for (; r < count; r++) { |
| 565 | idx_t current_row = idx_t(row_ids[r]); |
| 566 | if (current_row < base_row_id || current_row >= base_row_id + result.size()) { |
| 567 | // this row-id does not fall into the current chunk - break |
| 568 | break; |
| 569 | } |
| 570 | auto row_in_vector = current_row - base_row_id; |
| 571 | D_ASSERT(row_in_vector < result.size()); |
| 572 | sel.set_index(idx: sel_count++, loc: row_in_vector); |
| 573 | } |
| 574 | D_ASSERT(sel_count > 0); |
| 575 | // slice the vector with all rows that are present in this vector and erase from the index |
| 576 | result.Slice(sel_vector: sel, count: sel_count); |
| 577 | |
| 578 | indexes.Scan(callback: [&](Index &index) { |
| 579 | index.Delete(entries&: result, row_identifiers); |
| 580 | return false; |
| 581 | }); |
| 582 | } |
| 583 | } |
| 584 | |
| 585 | void RowGroupCollection::UpdateColumn(TransactionData transaction, Vector &row_ids, const vector<column_t> &column_path, |
| 586 | DataChunk &updates) { |
| 587 | auto first_id = FlatVector::GetValue<row_t>(vector&: row_ids, idx: 0); |
| 588 | if (first_id >= MAX_ROW_ID) { |
| 589 | throw NotImplementedException("Cannot update a column-path on transaction local data" ); |
| 590 | } |
| 591 | // find the row_group this id belongs to |
| 592 | auto primary_column_idx = column_path[0]; |
| 593 | auto row_group = row_groups->GetSegment(row_number: first_id); |
| 594 | row_group->UpdateColumn(transaction, updates, row_ids, column_path); |
| 595 | |
| 596 | row_group->MergeIntoStatistics(column_idx: primary_column_idx, other&: stats.GetStats(i: primary_column_idx).Statistics()); |
| 597 | } |
| 598 | |
| 599 | //===--------------------------------------------------------------------===// |
| 600 | // Checkpoint |
| 601 | //===--------------------------------------------------------------------===// |
| 602 | void RowGroupCollection::Checkpoint(TableDataWriter &writer, TableStatistics &global_stats) { |
| 603 | for (auto &row_group : row_groups->Segments()) { |
| 604 | auto rowg_writer = writer.GetRowGroupWriter(row_group); |
| 605 | auto pointer = row_group.Checkpoint(writer&: *rowg_writer, global_stats); |
| 606 | writer.AddRowGroup(row_group_pointer: std::move(pointer), writer: std::move(rowg_writer)); |
| 607 | } |
| 608 | } |
| 609 | |
| 610 | //===--------------------------------------------------------------------===// |
| 611 | // CommitDrop |
| 612 | //===--------------------------------------------------------------------===// |
| 613 | void RowGroupCollection::CommitDropColumn(idx_t index) { |
| 614 | for (auto &row_group : row_groups->Segments()) { |
| 615 | row_group.CommitDropColumn(column_idx: index); |
| 616 | } |
| 617 | } |
| 618 | |
| 619 | void RowGroupCollection::CommitDropTable() { |
| 620 | for (auto &row_group : row_groups->Segments()) { |
| 621 | row_group.CommitDrop(); |
| 622 | } |
| 623 | } |
| 624 | |
| 625 | //===--------------------------------------------------------------------===// |
| 626 | // GetColumnSegmentInfo |
| 627 | //===--------------------------------------------------------------------===// |
| 628 | vector<ColumnSegmentInfo> RowGroupCollection::GetColumnSegmentInfo() { |
| 629 | vector<ColumnSegmentInfo> result; |
| 630 | for (auto &row_group : row_groups->Segments()) { |
| 631 | row_group.GetColumnSegmentInfo(row_group_index: row_group.index, result); |
| 632 | } |
| 633 | return result; |
| 634 | } |
| 635 | |
| 636 | //===--------------------------------------------------------------------===// |
| 637 | // Alter |
| 638 | //===--------------------------------------------------------------------===// |
| 639 | shared_ptr<RowGroupCollection> RowGroupCollection::AddColumn(ClientContext &context, ColumnDefinition &new_column, |
| 640 | Expression *default_value) { |
| 641 | idx_t new_column_idx = types.size(); |
| 642 | auto new_types = types; |
| 643 | new_types.push_back(x: new_column.GetType()); |
| 644 | auto result = |
| 645 | make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load()); |
| 646 | |
| 647 | ExpressionExecutor executor(context); |
| 648 | DataChunk dummy_chunk; |
| 649 | Vector default_vector(new_column.GetType()); |
| 650 | if (!default_value) { |
| 651 | FlatVector::Validity(vector&: default_vector).SetAllInvalid(STANDARD_VECTOR_SIZE); |
| 652 | } else { |
| 653 | executor.AddExpression(expr: *default_value); |
| 654 | } |
| 655 | |
| 656 | result->stats.InitializeAddColumn(parent&: stats, new_column_type: new_column.GetType()); |
| 657 | auto &new_column_stats = result->stats.GetStats(i: new_column_idx); |
| 658 | |
| 659 | // fill the column with its DEFAULT value, or NULL if none is specified |
| 660 | auto new_stats = make_uniq<SegmentStatistics>(args: new_column.GetType()); |
| 661 | for (auto ¤t_row_group : row_groups->Segments()) { |
| 662 | auto new_row_group = current_row_group.AddColumn(new_collection&: *result, new_column, executor, default_value, result&: default_vector); |
| 663 | // merge in the statistics |
| 664 | new_row_group->MergeIntoStatistics(column_idx: new_column_idx, other&: new_column_stats.Statistics()); |
| 665 | |
| 666 | result->row_groups->AppendSegment(segment: std::move(new_row_group)); |
| 667 | } |
| 668 | return result; |
| 669 | } |
| 670 | |
| 671 | shared_ptr<RowGroupCollection> RowGroupCollection::RemoveColumn(idx_t col_idx) { |
| 672 | D_ASSERT(col_idx < types.size()); |
| 673 | auto new_types = types; |
| 674 | new_types.erase(position: new_types.begin() + col_idx); |
| 675 | |
| 676 | auto result = |
| 677 | make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load()); |
| 678 | result->stats.InitializeRemoveColumn(parent&: stats, removed_column: col_idx); |
| 679 | |
| 680 | for (auto ¤t_row_group : row_groups->Segments()) { |
| 681 | auto new_row_group = current_row_group.RemoveColumn(new_collection&: *result, removed_column: col_idx); |
| 682 | result->row_groups->AppendSegment(segment: std::move(new_row_group)); |
| 683 | } |
| 684 | return result; |
| 685 | } |
| 686 | |
| 687 | shared_ptr<RowGroupCollection> RowGroupCollection::AlterType(ClientContext &context, idx_t changed_idx, |
| 688 | const LogicalType &target_type, |
| 689 | vector<column_t> bound_columns, Expression &cast_expr) { |
| 690 | D_ASSERT(changed_idx < types.size()); |
| 691 | auto new_types = types; |
| 692 | new_types[changed_idx] = target_type; |
| 693 | |
| 694 | auto result = |
| 695 | make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load()); |
| 696 | result->stats.InitializeAlterType(parent&: stats, changed_idx, new_type: target_type); |
| 697 | |
| 698 | vector<LogicalType> scan_types; |
| 699 | for (idx_t i = 0; i < bound_columns.size(); i++) { |
| 700 | if (bound_columns[i] == COLUMN_IDENTIFIER_ROW_ID) { |
| 701 | scan_types.emplace_back(args: LogicalType::ROW_TYPE); |
| 702 | } else { |
| 703 | scan_types.push_back(x: types[bound_columns[i]]); |
| 704 | } |
| 705 | } |
| 706 | DataChunk scan_chunk; |
| 707 | scan_chunk.Initialize(allocator&: GetAllocator(), types: scan_types); |
| 708 | |
| 709 | ExpressionExecutor executor(context); |
| 710 | executor.AddExpression(expr: cast_expr); |
| 711 | |
| 712 | TableScanState scan_state; |
| 713 | scan_state.Initialize(column_ids: bound_columns); |
| 714 | scan_state.table_state.max_row = row_start + total_rows; |
| 715 | |
| 716 | // now alter the type of the column within all of the row_groups individually |
| 717 | auto &changed_stats = result->stats.GetStats(i: changed_idx); |
| 718 | for (auto ¤t_row_group : row_groups->Segments()) { |
| 719 | auto new_row_group = current_row_group.AlterType(new_collection&: *result, target_type, changed_idx, executor, |
| 720 | scan_state&: scan_state.table_state, scan_chunk); |
| 721 | new_row_group->MergeIntoStatistics(column_idx: changed_idx, other&: changed_stats.Statistics()); |
| 722 | result->row_groups->AppendSegment(segment: std::move(new_row_group)); |
| 723 | } |
| 724 | |
| 725 | return result; |
| 726 | } |
| 727 | |
| 728 | void RowGroupCollection::VerifyNewConstraint(DataTable &parent, const BoundConstraint &constraint) { |
| 729 | if (total_rows == 0) { |
| 730 | return; |
| 731 | } |
| 732 | // scan the original table, check if there's any null value |
| 733 | auto ¬_null_constraint = constraint.Cast<BoundNotNullConstraint>(); |
| 734 | vector<LogicalType> scan_types; |
| 735 | auto physical_index = not_null_constraint.index.index; |
| 736 | D_ASSERT(physical_index < types.size()); |
| 737 | scan_types.push_back(x: types[physical_index]); |
| 738 | DataChunk scan_chunk; |
| 739 | scan_chunk.Initialize(allocator&: GetAllocator(), types: scan_types); |
| 740 | |
| 741 | CreateIndexScanState state; |
| 742 | vector<column_t> cids; |
| 743 | cids.push_back(x: physical_index); |
| 744 | // Use ScanCommitted to scan the latest committed data |
| 745 | state.Initialize(column_ids: cids, table_filters: nullptr); |
| 746 | InitializeScan(state&: state.table_state, column_ids: cids, table_filters: nullptr); |
| 747 | InitializeCreateIndexScan(state); |
| 748 | while (true) { |
| 749 | scan_chunk.Reset(); |
| 750 | state.table_state.ScanCommitted(result&: scan_chunk, l&: state.segment_lock, |
| 751 | type: TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED); |
| 752 | if (scan_chunk.size() == 0) { |
| 753 | break; |
| 754 | } |
| 755 | // Check constraint |
| 756 | if (VectorOperations::HasNull(input&: scan_chunk.data[0], count: scan_chunk.size())) { |
| 757 | throw ConstraintException("NOT NULL constraint failed: %s.%s" , info->table, |
| 758 | parent.column_definitions[physical_index].GetName()); |
| 759 | } |
| 760 | } |
| 761 | } |
| 762 | |
| 763 | //===--------------------------------------------------------------------===// |
| 764 | // Statistics |
| 765 | //===--------------------------------------------------------------------===// |
| 766 | void RowGroupCollection::CopyStats(TableStatistics &other_stats) { |
| 767 | stats.CopyStats(other&: other_stats); |
| 768 | } |
| 769 | |
| 770 | unique_ptr<BaseStatistics> RowGroupCollection::CopyStats(column_t column_id) { |
| 771 | return stats.CopyStats(i: column_id); |
| 772 | } |
| 773 | |
| 774 | void RowGroupCollection::SetDistinct(column_t column_id, unique_ptr<DistinctStatistics> distinct_stats) { |
| 775 | D_ASSERT(column_id != COLUMN_IDENTIFIER_ROW_ID); |
| 776 | auto stats_guard = stats.GetLock(); |
| 777 | stats.GetStats(i: column_id).SetDistinct(std::move(distinct_stats)); |
| 778 | } |
| 779 | |
| 780 | } // namespace duckdb |
| 781 | |