1#include "duckdb/storage/table/row_group_collection.hpp"
2#include "duckdb/storage/table/persistent_table_data.hpp"
3#include "duckdb/execution/expression_executor.hpp"
4#include "duckdb/main/client_context.hpp"
5#include "duckdb/storage/data_table.hpp"
6#include "duckdb/planner/constraints/bound_not_null_constraint.hpp"
7#include "duckdb/storage/checkpoint/table_data_writer.hpp"
8#include "duckdb/storage/table/row_group_segment_tree.hpp"
9#include "duckdb/storage/meta_block_reader.hpp"
10#include "duckdb/storage/table/append_state.hpp"
11#include "duckdb/storage/table/scan_state.hpp"
12#include "duckdb/storage/table_storage_info.hpp"
13
14namespace duckdb {
15
16//===--------------------------------------------------------------------===//
17// Row Group Segment Tree
18//===--------------------------------------------------------------------===//
19RowGroupSegmentTree::RowGroupSegmentTree(RowGroupCollection &collection)
20 : SegmentTree<RowGroup, true>(), collection(collection), current_row_group(0), max_row_group(0) {
21}
22RowGroupSegmentTree::~RowGroupSegmentTree() {
23}
24
25void RowGroupSegmentTree::Initialize(PersistentTableData &data) {
26 D_ASSERT(data.row_group_count > 0);
27 current_row_group = 0;
28 max_row_group = data.row_group_count;
29 finished_loading = false;
30 reader = make_uniq<MetaBlockReader>(args&: collection.GetBlockManager(), args&: data.block_id);
31 reader->offset = data.offset;
32}
33
34unique_ptr<RowGroup> RowGroupSegmentTree::LoadSegment() {
35 if (current_row_group >= max_row_group) {
36 finished_loading = true;
37 return nullptr;
38 }
39 auto row_group_pointer = RowGroup::Deserialize(main_source&: *reader, columns: collection.GetTypes());
40 current_row_group++;
41 return make_uniq<RowGroup>(args&: collection, args: std::move(row_group_pointer));
42}
43
44//===--------------------------------------------------------------------===//
45// Row Group Collection
46//===--------------------------------------------------------------------===//
47RowGroupCollection::RowGroupCollection(shared_ptr<DataTableInfo> info_p, BlockManager &block_manager,
48 vector<LogicalType> types_p, idx_t row_start_p, idx_t total_rows_p)
49 : block_manager(block_manager), total_rows(total_rows_p), info(std::move(info_p)), types(std::move(types_p)),
50 row_start(row_start_p) {
51 row_groups = make_shared<RowGroupSegmentTree>(args&: *this);
52}
53
54idx_t RowGroupCollection::GetTotalRows() const {
55 return total_rows.load();
56}
57
58const vector<LogicalType> &RowGroupCollection::GetTypes() const {
59 return types;
60}
61
62Allocator &RowGroupCollection::GetAllocator() const {
63 return Allocator::Get(db&: info->db);
64}
65
66AttachedDatabase &RowGroupCollection::GetAttached() {
67 return GetTableInfo().db;
68}
69
70DatabaseInstance &RowGroupCollection::GetDatabase() {
71 return GetAttached().GetDatabase();
72}
73
74//===--------------------------------------------------------------------===//
75// Initialize
76//===--------------------------------------------------------------------===//
77void RowGroupCollection::Initialize(PersistentTableData &data) {
78 D_ASSERT(this->row_start == 0);
79 auto l = row_groups->Lock();
80 this->total_rows = data.total_rows;
81 row_groups->Initialize(data);
82 stats.Initialize(types, data);
83}
84
85void RowGroupCollection::InitializeEmpty() {
86 stats.InitializeEmpty(types);
87}
88
89void RowGroupCollection::AppendRowGroup(SegmentLock &l, idx_t start_row) {
90 D_ASSERT(start_row >= row_start);
91 auto new_row_group = make_uniq<RowGroup>(args&: *this, args&: start_row, args: 0);
92 new_row_group->InitializeEmpty(types);
93 row_groups->AppendSegment(l, segment: std::move(new_row_group));
94}
95
96RowGroup *RowGroupCollection::GetRowGroup(int64_t index) {
97 return (RowGroup *)row_groups->GetSegmentByIndex(index);
98}
99
100idx_t RowGroupCollection::RowGroupCount() {
101 return row_groups->GetSegmentCount();
102}
103
104void RowGroupCollection::Verify() {
105#ifdef DEBUG
106 idx_t current_total_rows = 0;
107 row_groups->Verify();
108 for (auto &row_group : row_groups->Segments()) {
109 row_group.Verify();
110 D_ASSERT(&row_group.GetCollection() == this);
111 D_ASSERT(row_group.start == this->row_start + current_total_rows);
112 current_total_rows += row_group.count;
113 }
114 D_ASSERT(current_total_rows == total_rows.load());
115#endif
116}
117
118//===--------------------------------------------------------------------===//
119// Scan
120//===--------------------------------------------------------------------===//
121void RowGroupCollection::InitializeScan(CollectionScanState &state, const vector<column_t> &column_ids,
122 TableFilterSet *table_filters) {
123 auto row_group = row_groups->GetRootSegment();
124 D_ASSERT(row_group);
125 state.row_groups = row_groups.get();
126 state.max_row = row_start + total_rows;
127 state.Initialize(types: GetTypes());
128 while (row_group && !row_group->InitializeScan(state)) {
129 row_group = row_groups->GetNextSegment(segment: row_group);
130 }
131}
132
133void RowGroupCollection::InitializeCreateIndexScan(CreateIndexScanState &state) {
134 state.segment_lock = row_groups->Lock();
135}
136
137void RowGroupCollection::InitializeScanWithOffset(CollectionScanState &state, const vector<column_t> &column_ids,
138 idx_t start_row, idx_t end_row) {
139 auto row_group = row_groups->GetSegment(row_number: start_row);
140 D_ASSERT(row_group);
141 state.row_groups = row_groups.get();
142 state.max_row = end_row;
143 state.Initialize(types: GetTypes());
144 idx_t start_vector = (start_row - row_group->start) / STANDARD_VECTOR_SIZE;
145 if (!row_group->InitializeScanWithOffset(state, vector_offset: start_vector)) {
146 throw InternalException("Failed to initialize row group scan with offset");
147 }
148}
149
150bool RowGroupCollection::InitializeScanInRowGroup(CollectionScanState &state, RowGroupCollection &collection,
151 RowGroup &row_group, idx_t vector_index, idx_t max_row) {
152 state.max_row = max_row;
153 state.row_groups = collection.row_groups.get();
154 if (!state.column_scans) {
155 // initialize the scan state
156 state.Initialize(types: collection.GetTypes());
157 }
158 return row_group.InitializeScanWithOffset(state, vector_offset: vector_index);
159}
160
161void RowGroupCollection::InitializeParallelScan(ParallelCollectionScanState &state) {
162 state.collection = this;
163 state.current_row_group = row_groups->GetRootSegment();
164 state.vector_index = 0;
165 state.max_row = row_start + total_rows;
166 state.batch_index = 0;
167 state.processed_rows = 0;
168}
169
170bool RowGroupCollection::NextParallelScan(ClientContext &context, ParallelCollectionScanState &state,
171 CollectionScanState &scan_state) {
172 while (true) {
173 idx_t vector_index;
174 idx_t max_row;
175 RowGroupCollection *collection;
176 RowGroup *row_group;
177 {
178 // select the next row group to scan from the parallel state
179 lock_guard<mutex> l(state.lock);
180 if (!state.current_row_group || state.current_row_group->count == 0) {
181 // no more data left to scan
182 break;
183 }
184 collection = state.collection;
185 row_group = state.current_row_group;
186 if (ClientConfig::GetConfig(context).verify_parallelism) {
187 vector_index = state.vector_index;
188 max_row = state.current_row_group->start +
189 MinValue<idx_t>(a: state.current_row_group->count,
190 STANDARD_VECTOR_SIZE * state.vector_index + STANDARD_VECTOR_SIZE);
191 D_ASSERT(vector_index * STANDARD_VECTOR_SIZE < state.current_row_group->count);
192 state.vector_index++;
193 if (state.vector_index * STANDARD_VECTOR_SIZE >= state.current_row_group->count) {
194 state.current_row_group = row_groups->GetNextSegment(segment: state.current_row_group);
195 state.vector_index = 0;
196 }
197 } else {
198 state.processed_rows += state.current_row_group->count;
199 vector_index = 0;
200 max_row = state.current_row_group->start + state.current_row_group->count;
201 state.current_row_group = row_groups->GetNextSegment(segment: state.current_row_group);
202 }
203 max_row = MinValue<idx_t>(a: max_row, b: state.max_row);
204 scan_state.batch_index = ++state.batch_index;
205 }
206 D_ASSERT(collection);
207 D_ASSERT(row_group);
208
209 // initialize the scan for this row group
210 bool need_to_scan = InitializeScanInRowGroup(state&: scan_state, collection&: *collection, row_group&: *row_group, vector_index, max_row);
211 if (!need_to_scan) {
212 // skip this row group
213 continue;
214 }
215 return true;
216 }
217 return false;
218}
219
220bool RowGroupCollection::Scan(DuckTransaction &transaction, const vector<column_t> &column_ids,
221 const std::function<bool(DataChunk &chunk)> &fun) {
222 vector<LogicalType> scan_types;
223 for (idx_t i = 0; i < column_ids.size(); i++) {
224 scan_types.push_back(x: types[column_ids[i]]);
225 }
226 DataChunk chunk;
227 chunk.Initialize(allocator&: GetAllocator(), types: scan_types);
228
229 // initialize the scan
230 TableScanState state;
231 state.Initialize(column_ids, table_filters: nullptr);
232 InitializeScan(state&: state.local_state, column_ids, table_filters: nullptr);
233
234 while (true) {
235 chunk.Reset();
236 state.local_state.Scan(transaction, result&: chunk);
237 if (chunk.size() == 0) {
238 return true;
239 }
240 if (!fun(chunk)) {
241 return false;
242 }
243 }
244}
245
246bool RowGroupCollection::Scan(DuckTransaction &transaction, const std::function<bool(DataChunk &chunk)> &fun) {
247 vector<column_t> column_ids;
248 column_ids.reserve(n: types.size());
249 for (idx_t i = 0; i < types.size(); i++) {
250 column_ids.push_back(x: i);
251 }
252 return Scan(transaction, column_ids, fun);
253}
254
255//===--------------------------------------------------------------------===//
256// Fetch
257//===--------------------------------------------------------------------===//
258void RowGroupCollection::Fetch(TransactionData transaction, DataChunk &result, const vector<column_t> &column_ids,
259 const Vector &row_identifiers, idx_t fetch_count, ColumnFetchState &state) {
260 // figure out which row_group to fetch from
261 auto row_ids = FlatVector::GetData<row_t>(vector: row_identifiers);
262 idx_t count = 0;
263 for (idx_t i = 0; i < fetch_count; i++) {
264 auto row_id = row_ids[i];
265 RowGroup *row_group;
266 {
267 idx_t segment_index;
268 auto l = row_groups->Lock();
269 if (!row_groups->TryGetSegmentIndex(l, row_number: row_id, result&: segment_index)) {
270 // in parallel append scenarios it is possible for the row_id
271 continue;
272 }
273 row_group = row_groups->GetSegmentByIndex(l, index: segment_index);
274 }
275 if (!row_group->Fetch(transaction, row: row_id - row_group->start)) {
276 continue;
277 }
278 row_group->FetchRow(transaction, state, column_ids, row_id, result, result_idx: count);
279 count++;
280 }
281 result.SetCardinality(count);
282}
283
284//===--------------------------------------------------------------------===//
285// Append
286//===--------------------------------------------------------------------===//
287TableAppendState::TableAppendState()
288 : row_group_append_state(*this), total_append_count(0), start_row_group(nullptr), transaction(0, 0), remaining(0) {
289}
290
291TableAppendState::~TableAppendState() {
292 D_ASSERT(Exception::UncaughtException() || remaining == 0);
293}
294
295bool RowGroupCollection::IsEmpty() const {
296 auto l = row_groups->Lock();
297 return IsEmpty(l);
298}
299
300bool RowGroupCollection::IsEmpty(SegmentLock &l) const {
301 return row_groups->IsEmpty(l);
302}
303
304void RowGroupCollection::InitializeAppend(TransactionData transaction, TableAppendState &state, idx_t append_count) {
305 state.row_start = total_rows;
306 state.current_row = state.row_start;
307 state.total_append_count = 0;
308
309 // start writing to the row_groups
310 auto l = row_groups->Lock();
311 if (IsEmpty(l)) {
312 // empty row group collection: empty first row group
313 AppendRowGroup(l, start_row: row_start);
314 }
315 state.start_row_group = row_groups->GetLastSegment(l);
316 D_ASSERT(this->row_start + total_rows == state.start_row_group->start + state.start_row_group->count);
317 state.start_row_group->InitializeAppend(append_state&: state.row_group_append_state);
318 state.remaining = append_count;
319 state.transaction = transaction;
320 if (state.remaining > 0) {
321 state.start_row_group->AppendVersionInfo(transaction, count: state.remaining);
322 total_rows += state.remaining;
323 }
324}
325
326void RowGroupCollection::InitializeAppend(TableAppendState &state) {
327 TransactionData tdata(0, 0);
328 InitializeAppend(transaction: tdata, state, append_count: 0);
329}
330
331bool RowGroupCollection::Append(DataChunk &chunk, TableAppendState &state) {
332 D_ASSERT(chunk.ColumnCount() == types.size());
333 chunk.Verify();
334
335 bool new_row_group = false;
336 idx_t append_count = chunk.size();
337 idx_t remaining = chunk.size();
338 state.total_append_count += append_count;
339 while (true) {
340 auto current_row_group = state.row_group_append_state.row_group;
341 // check how much we can fit into the current row_group
342 idx_t append_count =
343 MinValue<idx_t>(a: remaining, b: RowGroup::ROW_GROUP_SIZE - state.row_group_append_state.offset_in_row_group);
344 if (append_count > 0) {
345 current_row_group->Append(state&: state.row_group_append_state, chunk, append_count);
346 // merge the stats
347 auto stats_lock = stats.GetLock();
348 for (idx_t i = 0; i < types.size(); i++) {
349 current_row_group->MergeIntoStatistics(column_idx: i, other&: stats.GetStats(i).Statistics());
350 }
351 }
352 remaining -= append_count;
353 if (state.remaining > 0) {
354 state.remaining -= append_count;
355 }
356 if (remaining > 0) {
357 // we expect max 1 iteration of this loop (i.e. a single chunk should never overflow more than one
358 // row_group)
359 D_ASSERT(chunk.size() == remaining + append_count);
360 // slice the input chunk
361 if (remaining < chunk.size()) {
362 SelectionVector sel(remaining);
363 for (idx_t i = 0; i < remaining; i++) {
364 sel.set_index(idx: i, loc: append_count + i);
365 }
366 chunk.Slice(sel_vector: sel, count: remaining);
367 }
368 // append a new row_group
369 new_row_group = true;
370 auto next_start = current_row_group->start + state.row_group_append_state.offset_in_row_group;
371
372 auto l = row_groups->Lock();
373 AppendRowGroup(l, start_row: next_start);
374 // set up the append state for this row_group
375 auto last_row_group = row_groups->GetLastSegment(l);
376 last_row_group->InitializeAppend(append_state&: state.row_group_append_state);
377 if (state.remaining > 0) {
378 last_row_group->AppendVersionInfo(transaction: state.transaction, count: state.remaining);
379 }
380 continue;
381 } else {
382 break;
383 }
384 }
385 state.current_row += append_count;
386 auto stats_lock = stats.GetLock();
387 for (idx_t col_idx = 0; col_idx < types.size(); col_idx++) {
388 stats.GetStats(i: col_idx).UpdateDistinctStatistics(v&: chunk.data[col_idx], count: chunk.size());
389 }
390 return new_row_group;
391}
392
393void RowGroupCollection::FinalizeAppend(TransactionData transaction, TableAppendState &state) {
394 auto remaining = state.total_append_count;
395 auto row_group = state.start_row_group;
396 while (remaining > 0) {
397 auto append_count = MinValue<idx_t>(a: remaining, b: RowGroup::ROW_GROUP_SIZE - row_group->count);
398 row_group->AppendVersionInfo(transaction, count: append_count);
399 remaining -= append_count;
400 row_group = row_groups->GetNextSegment(segment: row_group);
401 }
402 total_rows += state.total_append_count;
403
404 state.total_append_count = 0;
405 state.start_row_group = nullptr;
406
407 Verify();
408}
409
410void RowGroupCollection::CommitAppend(transaction_t commit_id, idx_t row_start, idx_t count) {
411 auto row_group = row_groups->GetSegment(row_number: row_start);
412 D_ASSERT(row_group);
413 idx_t current_row = row_start;
414 idx_t remaining = count;
415 while (true) {
416 idx_t start_in_row_group = current_row - row_group->start;
417 idx_t append_count = MinValue<idx_t>(a: row_group->count - start_in_row_group, b: remaining);
418
419 row_group->CommitAppend(commit_id, row_group_start: start_in_row_group, count: append_count);
420
421 current_row += append_count;
422 remaining -= append_count;
423 if (remaining == 0) {
424 break;
425 }
426 row_group = row_groups->GetNextSegment(segment: row_group);
427 }
428}
429
430void RowGroupCollection::RevertAppendInternal(idx_t start_row, idx_t count) {
431 if (total_rows != start_row + count) {
432 throw InternalException("Interleaved appends: this should no longer happen");
433 }
434 total_rows = start_row;
435
436 auto l = row_groups->Lock();
437 // find the segment index that the current row belongs to
438 idx_t segment_index = row_groups->GetSegmentIndex(l, row_number: start_row);
439 auto segment = row_groups->GetSegmentByIndex(l, index: segment_index);
440 auto &info = *segment;
441
442 // remove any segments AFTER this segment: they should be deleted entirely
443 row_groups->EraseSegments(l, segment_start: segment_index);
444
445 info.next = nullptr;
446 info.RevertAppend(row_group_start: start_row);
447}
448
449void RowGroupCollection::MergeStorage(RowGroupCollection &data) {
450 D_ASSERT(data.types == types);
451 auto index = row_start + total_rows.load();
452 auto segments = data.row_groups->MoveSegments();
453 for (auto &entry : segments) {
454 auto &row_group = entry.node;
455 row_group->MoveToCollection(collection&: *this, new_start: index);
456 index += row_group->count;
457 row_groups->AppendSegment(segment: std::move(row_group));
458 }
459 stats.MergeStats(other&: data.stats);
460 total_rows += data.total_rows.load();
461}
462
463//===--------------------------------------------------------------------===//
464// Delete
465//===--------------------------------------------------------------------===//
466idx_t RowGroupCollection::Delete(TransactionData transaction, DataTable &table, row_t *ids, idx_t count) {
467 idx_t delete_count = 0;
468 // delete is in the row groups
469 // we need to figure out for each id to which row group it belongs
470 // usually all (or many) ids belong to the same row group
471 // we iterate over the ids and check for every id if it belongs to the same row group as their predecessor
472 idx_t pos = 0;
473 do {
474 idx_t start = pos;
475 auto row_group = row_groups->GetSegment(row_number: ids[start]);
476 for (pos++; pos < count; pos++) {
477 D_ASSERT(ids[pos] >= 0);
478 // check if this id still belongs to this row group
479 if (idx_t(ids[pos]) < row_group->start) {
480 // id is before row_group start -> it does not
481 break;
482 }
483 if (idx_t(ids[pos]) >= row_group->start + row_group->count) {
484 // id is after row group end -> it does not
485 break;
486 }
487 }
488 delete_count += row_group->Delete(transaction, table, ids: ids + start, count: pos - start);
489 } while (pos < count);
490 return delete_count;
491}
492
493//===--------------------------------------------------------------------===//
494// Update
495//===--------------------------------------------------------------------===//
496void RowGroupCollection::Update(TransactionData transaction, row_t *ids, const vector<PhysicalIndex> &column_ids,
497 DataChunk &updates) {
498 idx_t pos = 0;
499 do {
500 idx_t start = pos;
501 auto row_group = row_groups->GetSegment(row_number: ids[pos]);
502 row_t base_id =
503 row_group->start + ((ids[pos] - row_group->start) / STANDARD_VECTOR_SIZE * STANDARD_VECTOR_SIZE);
504 row_t max_id = MinValue<row_t>(a: base_id + STANDARD_VECTOR_SIZE, b: row_group->start + row_group->count);
505 for (pos++; pos < updates.size(); pos++) {
506 D_ASSERT(ids[pos] >= 0);
507 // check if this id still belongs to this vector in this row group
508 if (ids[pos] < base_id) {
509 // id is before vector start -> it does not
510 break;
511 }
512 if (ids[pos] >= max_id) {
513 // id is after the maximum id in this vector -> it does not
514 break;
515 }
516 }
517 row_group->Update(transaction, update_chunk&: updates, ids, offset: start, count: pos - start, column_ids);
518
519 auto l = stats.GetLock();
520 for (idx_t i = 0; i < column_ids.size(); i++) {
521 auto column_id = column_ids[i];
522 stats.MergeStats(lock&: *l, i: column_id.index, stats&: *row_group->GetStatistics(column_idx: column_id.index));
523 }
524 } while (pos < updates.size());
525}
526
527void RowGroupCollection::RemoveFromIndexes(TableIndexList &indexes, Vector &row_identifiers, idx_t count) {
528 auto row_ids = FlatVector::GetData<row_t>(vector&: row_identifiers);
529
530 // initialize the fetch state
531 // FIXME: we do not need to fetch all columns, only the columns required by the indices!
532 TableScanState state;
533 vector<column_t> column_ids;
534 column_ids.reserve(n: types.size());
535 for (idx_t i = 0; i < types.size(); i++) {
536 column_ids.push_back(x: i);
537 }
538 state.Initialize(column_ids: std::move(column_ids));
539 state.table_state.max_row = row_start + total_rows;
540
541 // initialize the fetch chunk
542 DataChunk result;
543 result.Initialize(allocator&: GetAllocator(), types);
544
545 SelectionVector sel(STANDARD_VECTOR_SIZE);
546 // now iterate over the row ids
547 for (idx_t r = 0; r < count;) {
548 result.Reset();
549 // figure out which row_group to fetch from
550 auto row_id = row_ids[r];
551 auto row_group = row_groups->GetSegment(row_number: row_id);
552 auto row_group_vector_idx = (row_id - row_group->start) / STANDARD_VECTOR_SIZE;
553 auto base_row_id = row_group_vector_idx * STANDARD_VECTOR_SIZE + row_group->start;
554
555 // fetch the current vector
556 state.table_state.Initialize(types: GetTypes());
557 row_group->InitializeScanWithOffset(state&: state.table_state, vector_offset: row_group_vector_idx);
558 row_group->ScanCommitted(state&: state.table_state, result, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS);
559 result.Verify();
560
561 // check for any remaining row ids if they also fall into this vector
562 // we try to fetch handle as many rows as possible at the same time
563 idx_t sel_count = 0;
564 for (; r < count; r++) {
565 idx_t current_row = idx_t(row_ids[r]);
566 if (current_row < base_row_id || current_row >= base_row_id + result.size()) {
567 // this row-id does not fall into the current chunk - break
568 break;
569 }
570 auto row_in_vector = current_row - base_row_id;
571 D_ASSERT(row_in_vector < result.size());
572 sel.set_index(idx: sel_count++, loc: row_in_vector);
573 }
574 D_ASSERT(sel_count > 0);
575 // slice the vector with all rows that are present in this vector and erase from the index
576 result.Slice(sel_vector: sel, count: sel_count);
577
578 indexes.Scan(callback: [&](Index &index) {
579 index.Delete(entries&: result, row_identifiers);
580 return false;
581 });
582 }
583}
584
585void RowGroupCollection::UpdateColumn(TransactionData transaction, Vector &row_ids, const vector<column_t> &column_path,
586 DataChunk &updates) {
587 auto first_id = FlatVector::GetValue<row_t>(vector&: row_ids, idx: 0);
588 if (first_id >= MAX_ROW_ID) {
589 throw NotImplementedException("Cannot update a column-path on transaction local data");
590 }
591 // find the row_group this id belongs to
592 auto primary_column_idx = column_path[0];
593 auto row_group = row_groups->GetSegment(row_number: first_id);
594 row_group->UpdateColumn(transaction, updates, row_ids, column_path);
595
596 row_group->MergeIntoStatistics(column_idx: primary_column_idx, other&: stats.GetStats(i: primary_column_idx).Statistics());
597}
598
599//===--------------------------------------------------------------------===//
600// Checkpoint
601//===--------------------------------------------------------------------===//
602void RowGroupCollection::Checkpoint(TableDataWriter &writer, TableStatistics &global_stats) {
603 for (auto &row_group : row_groups->Segments()) {
604 auto rowg_writer = writer.GetRowGroupWriter(row_group);
605 auto pointer = row_group.Checkpoint(writer&: *rowg_writer, global_stats);
606 writer.AddRowGroup(row_group_pointer: std::move(pointer), writer: std::move(rowg_writer));
607 }
608}
609
610//===--------------------------------------------------------------------===//
611// CommitDrop
612//===--------------------------------------------------------------------===//
613void RowGroupCollection::CommitDropColumn(idx_t index) {
614 for (auto &row_group : row_groups->Segments()) {
615 row_group.CommitDropColumn(column_idx: index);
616 }
617}
618
619void RowGroupCollection::CommitDropTable() {
620 for (auto &row_group : row_groups->Segments()) {
621 row_group.CommitDrop();
622 }
623}
624
625//===--------------------------------------------------------------------===//
626// GetColumnSegmentInfo
627//===--------------------------------------------------------------------===//
628vector<ColumnSegmentInfo> RowGroupCollection::GetColumnSegmentInfo() {
629 vector<ColumnSegmentInfo> result;
630 for (auto &row_group : row_groups->Segments()) {
631 row_group.GetColumnSegmentInfo(row_group_index: row_group.index, result);
632 }
633 return result;
634}
635
636//===--------------------------------------------------------------------===//
637// Alter
638//===--------------------------------------------------------------------===//
639shared_ptr<RowGroupCollection> RowGroupCollection::AddColumn(ClientContext &context, ColumnDefinition &new_column,
640 Expression *default_value) {
641 idx_t new_column_idx = types.size();
642 auto new_types = types;
643 new_types.push_back(x: new_column.GetType());
644 auto result =
645 make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load());
646
647 ExpressionExecutor executor(context);
648 DataChunk dummy_chunk;
649 Vector default_vector(new_column.GetType());
650 if (!default_value) {
651 FlatVector::Validity(vector&: default_vector).SetAllInvalid(STANDARD_VECTOR_SIZE);
652 } else {
653 executor.AddExpression(expr: *default_value);
654 }
655
656 result->stats.InitializeAddColumn(parent&: stats, new_column_type: new_column.GetType());
657 auto &new_column_stats = result->stats.GetStats(i: new_column_idx);
658
659 // fill the column with its DEFAULT value, or NULL if none is specified
660 auto new_stats = make_uniq<SegmentStatistics>(args: new_column.GetType());
661 for (auto &current_row_group : row_groups->Segments()) {
662 auto new_row_group = current_row_group.AddColumn(new_collection&: *result, new_column, executor, default_value, result&: default_vector);
663 // merge in the statistics
664 new_row_group->MergeIntoStatistics(column_idx: new_column_idx, other&: new_column_stats.Statistics());
665
666 result->row_groups->AppendSegment(segment: std::move(new_row_group));
667 }
668 return result;
669}
670
671shared_ptr<RowGroupCollection> RowGroupCollection::RemoveColumn(idx_t col_idx) {
672 D_ASSERT(col_idx < types.size());
673 auto new_types = types;
674 new_types.erase(position: new_types.begin() + col_idx);
675
676 auto result =
677 make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load());
678 result->stats.InitializeRemoveColumn(parent&: stats, removed_column: col_idx);
679
680 for (auto &current_row_group : row_groups->Segments()) {
681 auto new_row_group = current_row_group.RemoveColumn(new_collection&: *result, removed_column: col_idx);
682 result->row_groups->AppendSegment(segment: std::move(new_row_group));
683 }
684 return result;
685}
686
687shared_ptr<RowGroupCollection> RowGroupCollection::AlterType(ClientContext &context, idx_t changed_idx,
688 const LogicalType &target_type,
689 vector<column_t> bound_columns, Expression &cast_expr) {
690 D_ASSERT(changed_idx < types.size());
691 auto new_types = types;
692 new_types[changed_idx] = target_type;
693
694 auto result =
695 make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load());
696 result->stats.InitializeAlterType(parent&: stats, changed_idx, new_type: target_type);
697
698 vector<LogicalType> scan_types;
699 for (idx_t i = 0; i < bound_columns.size(); i++) {
700 if (bound_columns[i] == COLUMN_IDENTIFIER_ROW_ID) {
701 scan_types.emplace_back(args: LogicalType::ROW_TYPE);
702 } else {
703 scan_types.push_back(x: types[bound_columns[i]]);
704 }
705 }
706 DataChunk scan_chunk;
707 scan_chunk.Initialize(allocator&: GetAllocator(), types: scan_types);
708
709 ExpressionExecutor executor(context);
710 executor.AddExpression(expr: cast_expr);
711
712 TableScanState scan_state;
713 scan_state.Initialize(column_ids: bound_columns);
714 scan_state.table_state.max_row = row_start + total_rows;
715
716 // now alter the type of the column within all of the row_groups individually
717 auto &changed_stats = result->stats.GetStats(i: changed_idx);
718 for (auto &current_row_group : row_groups->Segments()) {
719 auto new_row_group = current_row_group.AlterType(new_collection&: *result, target_type, changed_idx, executor,
720 scan_state&: scan_state.table_state, scan_chunk);
721 new_row_group->MergeIntoStatistics(column_idx: changed_idx, other&: changed_stats.Statistics());
722 result->row_groups->AppendSegment(segment: std::move(new_row_group));
723 }
724
725 return result;
726}
727
728void RowGroupCollection::VerifyNewConstraint(DataTable &parent, const BoundConstraint &constraint) {
729 if (total_rows == 0) {
730 return;
731 }
732 // scan the original table, check if there's any null value
733 auto &not_null_constraint = constraint.Cast<BoundNotNullConstraint>();
734 vector<LogicalType> scan_types;
735 auto physical_index = not_null_constraint.index.index;
736 D_ASSERT(physical_index < types.size());
737 scan_types.push_back(x: types[physical_index]);
738 DataChunk scan_chunk;
739 scan_chunk.Initialize(allocator&: GetAllocator(), types: scan_types);
740
741 CreateIndexScanState state;
742 vector<column_t> cids;
743 cids.push_back(x: physical_index);
744 // Use ScanCommitted to scan the latest committed data
745 state.Initialize(column_ids: cids, table_filters: nullptr);
746 InitializeScan(state&: state.table_state, column_ids: cids, table_filters: nullptr);
747 InitializeCreateIndexScan(state);
748 while (true) {
749 scan_chunk.Reset();
750 state.table_state.ScanCommitted(result&: scan_chunk, l&: state.segment_lock,
751 type: TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED);
752 if (scan_chunk.size() == 0) {
753 break;
754 }
755 // Check constraint
756 if (VectorOperations::HasNull(input&: scan_chunk.data[0], count: scan_chunk.size())) {
757 throw ConstraintException("NOT NULL constraint failed: %s.%s", info->table,
758 parent.column_definitions[physical_index].GetName());
759 }
760 }
761}
762
763//===--------------------------------------------------------------------===//
764// Statistics
765//===--------------------------------------------------------------------===//
766void RowGroupCollection::CopyStats(TableStatistics &other_stats) {
767 stats.CopyStats(other&: other_stats);
768}
769
770unique_ptr<BaseStatistics> RowGroupCollection::CopyStats(column_t column_id) {
771 return stats.CopyStats(i: column_id);
772}
773
774void RowGroupCollection::SetDistinct(column_t column_id, unique_ptr<DistinctStatistics> distinct_stats) {
775 D_ASSERT(column_id != COLUMN_IDENTIFIER_ROW_ID);
776 auto stats_guard = stats.GetLock();
777 stats.GetStats(i: column_id).SetDistinct(std::move(distinct_stats));
778}
779
780} // namespace duckdb
781