1#include "duckdb/storage/table/row_group.hpp"
2#include "duckdb/common/types/vector.hpp"
3#include "duckdb/transaction/transaction.hpp"
4#include "duckdb/common/exception.hpp"
5#include "duckdb/common/field_writer.hpp"
6#include "duckdb/storage/table/column_data.hpp"
7#include "duckdb/storage/table/column_checkpoint_state.hpp"
8#include "duckdb/storage/table/update_segment.hpp"
9#include "duckdb/storage/table_storage_info.hpp"
10#include "duckdb/common/chrono.hpp"
11#include "duckdb/planner/table_filter.hpp"
12#include "duckdb/execution/expression_executor.hpp"
13#include "duckdb/storage/checkpoint/table_data_writer.hpp"
14#include "duckdb/storage/meta_block_reader.hpp"
15#include "duckdb/transaction/duck_transaction_manager.hpp"
16#include "duckdb/main/database.hpp"
17#include "duckdb/main/attached_database.hpp"
18#include "duckdb/transaction/duck_transaction.hpp"
19#include "duckdb/storage/table/append_state.hpp"
20#include "duckdb/storage/table/scan_state.hpp"
21
22namespace duckdb {
23
24constexpr const idx_t RowGroup::ROW_GROUP_VECTOR_COUNT;
25constexpr const idx_t RowGroup::ROW_GROUP_SIZE;
26
27RowGroup::RowGroup(RowGroupCollection &collection, idx_t start, idx_t count)
28 : SegmentBase<RowGroup>(start, count), collection(collection) {
29 Verify();
30}
31
32RowGroup::RowGroup(RowGroupCollection &collection, RowGroupPointer &&pointer)
33 : SegmentBase<RowGroup>(pointer.row_start, pointer.tuple_count), collection(collection) {
34 // deserialize the columns
35 if (pointer.data_pointers.size() != collection.GetTypes().size()) {
36 throw IOException("Row group column count is unaligned with table column count. Corrupt file?");
37 }
38 this->column_pointers = std::move(pointer.data_pointers);
39 this->columns.resize(new_size: column_pointers.size());
40 this->is_loaded = unique_ptr<atomic<bool>[]>(new atomic<bool>[columns.size()]);
41 for (idx_t c = 0; c < columns.size(); c++) {
42 this->is_loaded[c] = false;
43 }
44 this->version_info = std::move(pointer.versions);
45
46 Verify();
47}
48
49void RowGroup::MoveToCollection(RowGroupCollection &collection, idx_t new_start) {
50 this->collection = collection;
51 this->start = new_start;
52 for (auto &column : GetColumns()) {
53 column->SetStart(new_start);
54 }
55 if (version_info) {
56 version_info->SetStart(new_start);
57 }
58}
59
60void VersionNode::SetStart(idx_t start) {
61 idx_t current_start = start;
62 for (idx_t i = 0; i < RowGroup::ROW_GROUP_VECTOR_COUNT; i++) {
63 if (info[i]) {
64 info[i]->start = current_start;
65 }
66 current_start += STANDARD_VECTOR_SIZE;
67 }
68}
69
70RowGroup::~RowGroup() {
71}
72
73vector<shared_ptr<ColumnData>> &RowGroup::GetColumns() {
74 // ensure all columns are loaded
75 for (idx_t c = 0; c < GetColumnCount(); c++) {
76 GetColumn(c);
77 }
78 return columns;
79}
80
81idx_t RowGroup::GetColumnCount() const {
82 return columns.size();
83}
84
85ColumnData &RowGroup::GetColumn(storage_t c) {
86 D_ASSERT(c < columns.size());
87 if (!is_loaded) {
88 // not being lazy loaded
89 D_ASSERT(columns[c]);
90 return *columns[c];
91 }
92 if (is_loaded[c]) {
93 D_ASSERT(columns[c]);
94 return *columns[c];
95 }
96 lock_guard<mutex> l(row_group_lock);
97 if (columns[c]) {
98 D_ASSERT(is_loaded[c]);
99 return *columns[c];
100 }
101 if (column_pointers.size() != columns.size()) {
102 throw InternalException("Lazy loading a column but the pointer was not set");
103 }
104 auto &block_manager = GetCollection().GetBlockManager();
105 auto &types = GetCollection().GetTypes();
106 auto &block_pointer = column_pointers[c];
107 MetaBlockReader column_data_reader(block_manager, block_pointer.block_id);
108 column_data_reader.offset = block_pointer.offset;
109 this->columns[c] =
110 ColumnData::Deserialize(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: c, start_row: start, source&: column_data_reader, type: types[c], parent: nullptr);
111 is_loaded[c] = true;
112 return *columns[c];
113}
114
115DatabaseInstance &RowGroup::GetDatabase() {
116 return GetCollection().GetDatabase();
117}
118
119BlockManager &RowGroup::GetBlockManager() {
120 return GetCollection().GetBlockManager();
121}
122DataTableInfo &RowGroup::GetTableInfo() {
123 return GetCollection().GetTableInfo();
124}
125
126void RowGroup::InitializeEmpty(const vector<LogicalType> &types) {
127 // set up the segment trees for the column segments
128 D_ASSERT(columns.empty());
129 for (idx_t i = 0; i < types.size(); i++) {
130 auto column_data = ColumnData::CreateColumn(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: i, start_row: start, type: types[i]);
131 columns.push_back(x: std::move(column_data));
132 }
133}
134
135void ColumnScanState::Initialize(const LogicalType &type) {
136 if (type.id() == LogicalTypeId::VALIDITY) {
137 // validity - nothing to initialize
138 return;
139 }
140 if (type.InternalType() == PhysicalType::STRUCT) {
141 // validity + struct children
142 auto &struct_children = StructType::GetChildTypes(type);
143 child_states.resize(new_size: struct_children.size() + 1);
144 for (idx_t i = 0; i < struct_children.size(); i++) {
145 child_states[i + 1].Initialize(type: struct_children[i].second);
146 }
147 } else if (type.InternalType() == PhysicalType::LIST) {
148 // validity + list child
149 child_states.resize(new_size: 2);
150 child_states[1].Initialize(type: ListType::GetChildType(type));
151 } else {
152 // validity
153 child_states.resize(new_size: 1);
154 }
155}
156
157void CollectionScanState::Initialize(const vector<LogicalType> &types) {
158 auto &column_ids = GetColumnIds();
159 column_scans = make_unsafe_uniq_array<ColumnScanState>(n: column_ids.size());
160 for (idx_t i = 0; i < column_ids.size(); i++) {
161 if (column_ids[i] == COLUMN_IDENTIFIER_ROW_ID) {
162 continue;
163 }
164 column_scans[i].Initialize(type: types[column_ids[i]]);
165 }
166}
167
168bool RowGroup::InitializeScanWithOffset(CollectionScanState &state, idx_t vector_offset) {
169 auto &column_ids = state.GetColumnIds();
170 auto filters = state.GetFilters();
171 if (filters) {
172 if (!CheckZonemap(filters&: *filters, column_ids)) {
173 return false;
174 }
175 }
176
177 state.row_group = this;
178 state.vector_index = vector_offset;
179 state.max_row_group_row =
180 this->start > state.max_row ? 0 : MinValue<idx_t>(a: this->count, b: state.max_row - this->start);
181 D_ASSERT(state.column_scans);
182 for (idx_t i = 0; i < column_ids.size(); i++) {
183 const auto &column = column_ids[i];
184 if (column != COLUMN_IDENTIFIER_ROW_ID) {
185 auto &column_data = GetColumn(c: column);
186 column_data.InitializeScanWithOffset(state&: state.column_scans[i], row_idx: start + vector_offset * STANDARD_VECTOR_SIZE);
187 } else {
188 state.column_scans[i].current = nullptr;
189 }
190 }
191 return true;
192}
193
194bool RowGroup::InitializeScan(CollectionScanState &state) {
195 auto &column_ids = state.GetColumnIds();
196 auto filters = state.GetFilters();
197 if (filters) {
198 if (!CheckZonemap(filters&: *filters, column_ids)) {
199 return false;
200 }
201 }
202 state.row_group = this;
203 state.vector_index = 0;
204 state.max_row_group_row =
205 this->start > state.max_row ? 0 : MinValue<idx_t>(a: this->count, b: state.max_row - this->start);
206 if (state.max_row_group_row == 0) {
207 return false;
208 }
209 D_ASSERT(state.column_scans);
210 for (idx_t i = 0; i < column_ids.size(); i++) {
211 auto column = column_ids[i];
212 if (column != COLUMN_IDENTIFIER_ROW_ID) {
213 auto &column_data = GetColumn(c: column);
214 column_data.InitializeScan(state&: state.column_scans[i]);
215 } else {
216 state.column_scans[i].current = nullptr;
217 }
218 }
219 return true;
220}
221
222unique_ptr<RowGroup> RowGroup::AlterType(RowGroupCollection &new_collection, const LogicalType &target_type,
223 idx_t changed_idx, ExpressionExecutor &executor,
224 CollectionScanState &scan_state, DataChunk &scan_chunk) {
225 Verify();
226
227 // construct a new column data for this type
228 auto column_data = ColumnData::CreateColumn(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: changed_idx, start_row: start, type: target_type);
229
230 ColumnAppendState append_state;
231 column_data->InitializeAppend(state&: append_state);
232
233 // scan the original table, and fill the new column with the transformed value
234 scan_state.Initialize(types: GetCollection().GetTypes());
235 InitializeScan(state&: scan_state);
236
237 DataChunk append_chunk;
238 vector<LogicalType> append_types;
239 append_types.push_back(x: target_type);
240 append_chunk.Initialize(allocator&: Allocator::DefaultAllocator(), types: append_types);
241 auto &append_vector = append_chunk.data[0];
242 while (true) {
243 // scan the table
244 scan_chunk.Reset();
245 ScanCommitted(state&: scan_state, result&: scan_chunk, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS);
246 if (scan_chunk.size() == 0) {
247 break;
248 }
249 // execute the expression
250 append_chunk.Reset();
251 executor.ExecuteExpression(input&: scan_chunk, result&: append_vector);
252 column_data->Append(state&: append_state, vector&: append_vector, count: scan_chunk.size());
253 }
254
255 // set up the row_group based on this row_group
256 auto row_group = make_uniq<RowGroup>(args&: new_collection, args&: this->start, args&: this->count);
257 row_group->version_info = version_info;
258 auto &cols = GetColumns();
259 for (idx_t i = 0; i < cols.size(); i++) {
260 if (i == changed_idx) {
261 // this is the altered column: use the new column
262 row_group->columns.push_back(x: std::move(column_data));
263 } else {
264 // this column was not altered: use the data directly
265 row_group->columns.push_back(x: cols[i]);
266 }
267 }
268 row_group->Verify();
269 return row_group;
270}
271
272unique_ptr<RowGroup> RowGroup::AddColumn(RowGroupCollection &new_collection, ColumnDefinition &new_column,
273 ExpressionExecutor &executor, Expression *default_value, Vector &result) {
274 Verify();
275
276 // construct a new column data for the new column
277 auto added_column =
278 ColumnData::CreateColumn(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: GetColumnCount(), start_row: start, type: new_column.Type());
279
280 idx_t rows_to_write = this->count;
281 if (rows_to_write > 0) {
282 DataChunk dummy_chunk;
283
284 ColumnAppendState state;
285 added_column->InitializeAppend(state);
286 for (idx_t i = 0; i < rows_to_write; i += STANDARD_VECTOR_SIZE) {
287 idx_t rows_in_this_vector = MinValue<idx_t>(a: rows_to_write - i, STANDARD_VECTOR_SIZE);
288 if (default_value) {
289 dummy_chunk.SetCardinality(rows_in_this_vector);
290 executor.ExecuteExpression(input&: dummy_chunk, result);
291 }
292 added_column->Append(state, vector&: result, count: rows_in_this_vector);
293 }
294 }
295
296 // set up the row_group based on this row_group
297 auto row_group = make_uniq<RowGroup>(args&: new_collection, args&: this->start, args&: this->count);
298 row_group->version_info = version_info;
299 row_group->columns = GetColumns();
300 // now add the new column
301 row_group->columns.push_back(x: std::move(added_column));
302
303 row_group->Verify();
304 return row_group;
305}
306
307unique_ptr<RowGroup> RowGroup::RemoveColumn(RowGroupCollection &new_collection, idx_t removed_column) {
308 Verify();
309
310 D_ASSERT(removed_column < columns.size());
311
312 auto row_group = make_uniq<RowGroup>(args&: new_collection, args&: this->start, args&: this->count);
313 row_group->version_info = version_info;
314 // copy over all columns except for the removed one
315 auto &cols = GetColumns();
316 for (idx_t i = 0; i < cols.size(); i++) {
317 if (i != removed_column) {
318 row_group->columns.push_back(x: cols[i]);
319 }
320 }
321
322 row_group->Verify();
323 return row_group;
324}
325
326void RowGroup::CommitDrop() {
327 for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
328 CommitDropColumn(index: column_idx);
329 }
330}
331
332void RowGroup::CommitDropColumn(idx_t column_idx) {
333 GetColumn(c: column_idx).CommitDropColumn();
334}
335
336void RowGroup::NextVector(CollectionScanState &state) {
337 state.vector_index++;
338 const auto &column_ids = state.GetColumnIds();
339 for (idx_t i = 0; i < column_ids.size(); i++) {
340 const auto &column = column_ids[i];
341 if (column == COLUMN_IDENTIFIER_ROW_ID) {
342 continue;
343 }
344 D_ASSERT(column < columns.size());
345 GetColumn(c: column).Skip(state&: state.column_scans[i]);
346 }
347}
348
349bool RowGroup::CheckZonemap(TableFilterSet &filters, const vector<storage_t> &column_ids) {
350 for (auto &entry : filters.filters) {
351 auto column_index = entry.first;
352 auto &filter = entry.second;
353 const auto &base_column_index = column_ids[column_index];
354 if (!GetColumn(c: base_column_index).CheckZonemap(filter&: *filter)) {
355 return false;
356 }
357 }
358 return true;
359}
360
361bool RowGroup::CheckZonemapSegments(CollectionScanState &state) {
362 auto &column_ids = state.GetColumnIds();
363 auto filters = state.GetFilters();
364 if (!filters) {
365 return true;
366 }
367 for (auto &entry : filters->filters) {
368 D_ASSERT(entry.first < column_ids.size());
369 auto column_idx = entry.first;
370 const auto &base_column_idx = column_ids[column_idx];
371 bool read_segment = GetColumn(c: base_column_idx).CheckZonemap(state&: state.column_scans[column_idx], filter&: *entry.second);
372 if (!read_segment) {
373 idx_t target_row =
374 state.column_scans[column_idx].current->start + state.column_scans[column_idx].current->count;
375 D_ASSERT(target_row >= this->start);
376 D_ASSERT(target_row <= this->start + this->count);
377 idx_t target_vector_index = (target_row - this->start) / STANDARD_VECTOR_SIZE;
378 if (state.vector_index == target_vector_index) {
379 // we can't skip any full vectors because this segment contains less than a full vector
380 // for now we just bail-out
381 // FIXME: we could check if we can ALSO skip the next segments, in which case skipping a full vector
382 // might be possible
383 // we don't care that much though, since a single segment that fits less than a full vector is
384 // exceedingly rare
385 return true;
386 }
387 while (state.vector_index < target_vector_index) {
388 NextVector(state);
389 }
390 return false;
391 }
392 }
393
394 return true;
395}
396
397template <TableScanType TYPE>
398void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &state, DataChunk &result) {
399 const bool ALLOW_UPDATES = TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES &&
400 TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED;
401 auto table_filters = state.GetFilters();
402 const auto &column_ids = state.GetColumnIds();
403 auto adaptive_filter = state.GetAdaptiveFilter();
404 while (true) {
405 if (state.vector_index * STANDARD_VECTOR_SIZE >= state.max_row_group_row) {
406 // exceeded the amount of rows to scan
407 return;
408 }
409 idx_t current_row = state.vector_index * STANDARD_VECTOR_SIZE;
410 auto max_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, b: state.max_row_group_row - current_row);
411
412 //! first check the zonemap if we have to scan this partition
413 if (!CheckZonemapSegments(state)) {
414 continue;
415 }
416 // second, scan the version chunk manager to figure out which tuples to load for this transaction
417 idx_t count;
418 SelectionVector valid_sel(STANDARD_VECTOR_SIZE);
419 if (TYPE == TableScanType::TABLE_SCAN_REGULAR) {
420 count = state.row_group->GetSelVector(transaction, vector_idx: state.vector_index, sel_vector&: valid_sel, max_count);
421 if (count == 0) {
422 // nothing to scan for this vector, skip the entire vector
423 NextVector(state);
424 continue;
425 }
426 } else if (TYPE == TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED) {
427 count = state.row_group->GetCommittedSelVector(start_time: transaction.start_time, transaction_id: transaction.transaction_id,
428 vector_idx: state.vector_index, sel_vector&: valid_sel, max_count);
429 if (count == 0) {
430 // nothing to scan for this vector, skip the entire vector
431 NextVector(state);
432 continue;
433 }
434 } else {
435 count = max_count;
436 }
437 if (count == max_count && !table_filters) {
438 // scan all vectors completely: full scan without deletions or table filters
439 for (idx_t i = 0; i < column_ids.size(); i++) {
440 const auto &column = column_ids[i];
441 if (column == COLUMN_IDENTIFIER_ROW_ID) {
442 // scan row id
443 D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE);
444 result.data[i].Sequence(start: this->start + current_row, increment: 1, count);
445 } else {
446 auto &col_data = GetColumn(c: column);
447 if (TYPE != TableScanType::TABLE_SCAN_REGULAR) {
448 col_data.ScanCommitted(vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i],
449 allow_updates: ALLOW_UPDATES);
450 } else {
451 col_data.Scan(transaction, vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i]);
452 }
453 }
454 }
455 } else {
456 // partial scan: we have deletions or table filters
457 idx_t approved_tuple_count = count;
458 SelectionVector sel;
459 if (count != max_count) {
460 sel.Initialize(other: valid_sel);
461 } else {
462 sel.Initialize(sel: nullptr);
463 }
464 //! first, we scan the columns with filters, fetch their data and generate a selection vector.
465 //! get runtime statistics
466 auto start_time = high_resolution_clock::now();
467 if (table_filters) {
468 D_ASSERT(adaptive_filter);
469 D_ASSERT(ALLOW_UPDATES);
470 for (idx_t i = 0; i < table_filters->filters.size(); i++) {
471 auto tf_idx = adaptive_filter->permutation[i];
472 auto col_idx = column_ids[tf_idx];
473 auto &col_data = GetColumn(c: col_idx);
474 col_data.Select(transaction, vector_index: state.vector_index, state&: state.column_scans[tf_idx], result&: result.data[tf_idx],
475 sel, count&: approved_tuple_count, filter: *table_filters->filters[tf_idx]);
476 }
477 for (auto &table_filter : table_filters->filters) {
478 result.data[table_filter.first].Slice(sel, count: approved_tuple_count);
479 }
480 }
481 if (approved_tuple_count == 0) {
482 // all rows were filtered out by the table filters
483 // skip this vector in all the scans that were not scanned yet
484 D_ASSERT(table_filters);
485 result.Reset();
486 for (idx_t i = 0; i < column_ids.size(); i++) {
487 auto col_idx = column_ids[i];
488 if (col_idx == COLUMN_IDENTIFIER_ROW_ID) {
489 continue;
490 }
491 if (table_filters->filters.find(x: i) == table_filters->filters.end()) {
492 auto &col_data = GetColumn(c: col_idx);
493 col_data.Skip(state&: state.column_scans[i]);
494 }
495 }
496 state.vector_index++;
497 continue;
498 }
499 //! Now we use the selection vector to fetch data for the other columns.
500 for (idx_t i = 0; i < column_ids.size(); i++) {
501 if (!table_filters || table_filters->filters.find(x: i) == table_filters->filters.end()) {
502 auto column = column_ids[i];
503 if (column == COLUMN_IDENTIFIER_ROW_ID) {
504 D_ASSERT(result.data[i].GetType().InternalType() == PhysicalType::INT64);
505 result.data[i].SetVectorType(VectorType::FLAT_VECTOR);
506 auto result_data = FlatVector::GetData<int64_t>(vector&: result.data[i]);
507 for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) {
508 result_data[sel_idx] = this->start + current_row + sel.get_index(idx: sel_idx);
509 }
510 } else {
511 auto &col_data = GetColumn(c: column);
512 if (TYPE == TableScanType::TABLE_SCAN_REGULAR) {
513 col_data.FilterScan(transaction, vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i],
514 sel, count: approved_tuple_count);
515 } else {
516 col_data.FilterScanCommitted(vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i], sel,
517 count: approved_tuple_count, allow_updates: ALLOW_UPDATES);
518 }
519 }
520 }
521 }
522 auto end_time = high_resolution_clock::now();
523 if (adaptive_filter && table_filters->filters.size() > 1) {
524 adaptive_filter->AdaptRuntimeStatistics(duration: duration_cast<duration<double>>(d: end_time - start_time).count());
525 }
526 D_ASSERT(approved_tuple_count > 0);
527 count = approved_tuple_count;
528 }
529 result.SetCardinality(count);
530 state.vector_index++;
531 break;
532 }
533}
534
535void RowGroup::Scan(TransactionData transaction, CollectionScanState &state, DataChunk &result) {
536 TemplatedScan<TableScanType::TABLE_SCAN_REGULAR>(transaction, state, result);
537}
538
539void RowGroup::ScanCommitted(CollectionScanState &state, DataChunk &result, TableScanType type) {
540 auto &transaction_manager = DuckTransactionManager::Get(db&: GetCollection().GetAttached());
541
542 auto lowest_active_start = transaction_manager.LowestActiveStart();
543 auto lowest_active_id = transaction_manager.LowestActiveId();
544 TransactionData data(lowest_active_id, lowest_active_start);
545 switch (type) {
546 case TableScanType::TABLE_SCAN_COMMITTED_ROWS:
547 TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS>(transaction: data, state, result);
548 break;
549 case TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES:
550 TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES>(transaction: data, state, result);
551 break;
552 case TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED:
553 TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED>(transaction: data, state, result);
554 break;
555 default:
556 throw InternalException("Unrecognized table scan type");
557 }
558}
559
560ChunkInfo *RowGroup::GetChunkInfo(idx_t vector_idx) {
561 if (!version_info) {
562 return nullptr;
563 }
564 return version_info->info[vector_idx].get();
565}
566
567idx_t RowGroup::GetSelVector(TransactionData transaction, idx_t vector_idx, SelectionVector &sel_vector,
568 idx_t max_count) {
569 lock_guard<mutex> lock(row_group_lock);
570
571 auto info = GetChunkInfo(vector_idx);
572 if (!info) {
573 return max_count;
574 }
575 return info->GetSelVector(transaction, sel_vector, max_count);
576}
577
578idx_t RowGroup::GetCommittedSelVector(transaction_t start_time, transaction_t transaction_id, idx_t vector_idx,
579 SelectionVector &sel_vector, idx_t max_count) {
580 lock_guard<mutex> lock(row_group_lock);
581
582 auto info = GetChunkInfo(vector_idx);
583 if (!info) {
584 return max_count;
585 }
586 return info->GetCommittedSelVector(min_start_id: start_time, min_transaction_id: transaction_id, sel_vector, max_count);
587}
588
589bool RowGroup::Fetch(TransactionData transaction, idx_t row) {
590 D_ASSERT(row < this->count);
591 lock_guard<mutex> lock(row_group_lock);
592
593 idx_t vector_index = row / STANDARD_VECTOR_SIZE;
594 auto info = GetChunkInfo(vector_idx: vector_index);
595 if (!info) {
596 return true;
597 }
598 return info->Fetch(transaction, row: row - vector_index * STANDARD_VECTOR_SIZE);
599}
600
601void RowGroup::FetchRow(TransactionData transaction, ColumnFetchState &state, const vector<column_t> &column_ids,
602 row_t row_id, DataChunk &result, idx_t result_idx) {
603 for (idx_t col_idx = 0; col_idx < column_ids.size(); col_idx++) {
604 auto column = column_ids[col_idx];
605 if (column == COLUMN_IDENTIFIER_ROW_ID) {
606 // row id column: fill in the row ids
607 D_ASSERT(result.data[col_idx].GetType().InternalType() == PhysicalType::INT64);
608 result.data[col_idx].SetVectorType(VectorType::FLAT_VECTOR);
609 auto data = FlatVector::GetData<row_t>(vector&: result.data[col_idx]);
610 data[result_idx] = row_id;
611 } else {
612 // regular column: fetch data from the base column
613 auto &col_data = GetColumn(c: column);
614 col_data.FetchRow(transaction, state, row_id, result&: result.data[col_idx], result_idx);
615 }
616 }
617}
618
619void RowGroup::AppendVersionInfo(TransactionData transaction, idx_t count) {
620 idx_t row_group_start = this->count.load();
621 idx_t row_group_end = row_group_start + count;
622 if (row_group_end > RowGroup::ROW_GROUP_SIZE) {
623 row_group_end = RowGroup::ROW_GROUP_SIZE;
624 }
625 lock_guard<mutex> lock(row_group_lock);
626
627 // create the version_info if it doesn't exist yet
628 if (!version_info) {
629 version_info = make_shared<VersionNode>();
630 }
631 idx_t start_vector_idx = row_group_start / STANDARD_VECTOR_SIZE;
632 idx_t end_vector_idx = (row_group_end - 1) / STANDARD_VECTOR_SIZE;
633 for (idx_t vector_idx = start_vector_idx; vector_idx <= end_vector_idx; vector_idx++) {
634 idx_t start = vector_idx == start_vector_idx ? row_group_start - start_vector_idx * STANDARD_VECTOR_SIZE : 0;
635 idx_t end =
636 vector_idx == end_vector_idx ? row_group_end - end_vector_idx * STANDARD_VECTOR_SIZE : STANDARD_VECTOR_SIZE;
637 if (start == 0 && end == STANDARD_VECTOR_SIZE) {
638 // entire vector is encapsulated by append: append a single constant
639 auto constant_info = make_uniq<ChunkConstantInfo>(args: this->start + vector_idx * STANDARD_VECTOR_SIZE);
640 constant_info->insert_id = transaction.transaction_id;
641 constant_info->delete_id = NOT_DELETED_ID;
642 version_info->info[vector_idx] = std::move(constant_info);
643 } else {
644 // part of a vector is encapsulated: append to that part
645 ChunkVectorInfo *info;
646 if (!version_info->info[vector_idx]) {
647 // first time appending to this vector: create new info
648 auto insert_info = make_uniq<ChunkVectorInfo>(args: this->start + vector_idx * STANDARD_VECTOR_SIZE);
649 info = insert_info.get();
650 version_info->info[vector_idx] = std::move(insert_info);
651 } else {
652 D_ASSERT(version_info->info[vector_idx]->type == ChunkInfoType::VECTOR_INFO);
653 // use existing vector
654 info = &version_info->info[vector_idx]->Cast<ChunkVectorInfo>();
655 }
656 info->Append(start, end, commit_id: transaction.transaction_id);
657 }
658 }
659 this->count = row_group_end;
660}
661
662void RowGroup::CommitAppend(transaction_t commit_id, idx_t row_group_start, idx_t count) {
663 D_ASSERT(version_info.get());
664 idx_t row_group_end = row_group_start + count;
665 lock_guard<mutex> lock(row_group_lock);
666
667 idx_t start_vector_idx = row_group_start / STANDARD_VECTOR_SIZE;
668 idx_t end_vector_idx = (row_group_end - 1) / STANDARD_VECTOR_SIZE;
669 for (idx_t vector_idx = start_vector_idx; vector_idx <= end_vector_idx; vector_idx++) {
670 idx_t start = vector_idx == start_vector_idx ? row_group_start - start_vector_idx * STANDARD_VECTOR_SIZE : 0;
671 idx_t end =
672 vector_idx == end_vector_idx ? row_group_end - end_vector_idx * STANDARD_VECTOR_SIZE : STANDARD_VECTOR_SIZE;
673
674 auto info = version_info->info[vector_idx].get();
675 info->CommitAppend(commit_id, start, end);
676 }
677}
678
679void RowGroup::RevertAppend(idx_t row_group_start) {
680 if (!version_info) {
681 return;
682 }
683 idx_t start_row = row_group_start - this->start;
684 idx_t start_vector_idx = (start_row + (STANDARD_VECTOR_SIZE - 1)) / STANDARD_VECTOR_SIZE;
685 for (idx_t vector_idx = start_vector_idx; vector_idx < RowGroup::ROW_GROUP_VECTOR_COUNT; vector_idx++) {
686 version_info->info[vector_idx].reset();
687 }
688 for (auto &column : columns) {
689 column->RevertAppend(start_row: row_group_start);
690 }
691 this->count = MinValue<idx_t>(a: row_group_start - this->start, b: this->count);
692 Verify();
693}
694
695void RowGroup::InitializeAppend(RowGroupAppendState &append_state) {
696 append_state.row_group = this;
697 append_state.offset_in_row_group = this->count;
698 // for each column, initialize the append state
699 append_state.states = make_unsafe_uniq_array<ColumnAppendState>(n: GetColumnCount());
700 for (idx_t i = 0; i < GetColumnCount(); i++) {
701 auto &col_data = GetColumn(c: i);
702 col_data.InitializeAppend(state&: append_state.states[i]);
703 }
704}
705
706void RowGroup::Append(RowGroupAppendState &state, DataChunk &chunk, idx_t append_count) {
707 // append to the current row_group
708 for (idx_t i = 0; i < GetColumnCount(); i++) {
709 auto &col_data = GetColumn(c: i);
710 col_data.Append(state&: state.states[i], vector&: chunk.data[i], count: append_count);
711 }
712 state.offset_in_row_group += append_count;
713}
714
715void RowGroup::Update(TransactionData transaction, DataChunk &update_chunk, row_t *ids, idx_t offset, idx_t count,
716 const vector<PhysicalIndex> &column_ids) {
717#ifdef DEBUG
718 for (size_t i = offset; i < offset + count; i++) {
719 D_ASSERT(ids[i] >= row_t(this->start) && ids[i] < row_t(this->start + this->count));
720 }
721#endif
722 for (idx_t i = 0; i < column_ids.size(); i++) {
723 auto column = column_ids[i];
724 D_ASSERT(column.index != COLUMN_IDENTIFIER_ROW_ID);
725 auto &col_data = GetColumn(c: column.index);
726 D_ASSERT(col_data.type.id() == update_chunk.data[i].GetType().id());
727 if (offset > 0) {
728 Vector sliced_vector(update_chunk.data[i], offset, offset + count);
729 sliced_vector.Flatten(count);
730 col_data.Update(transaction, column_index: column.index, update_vector&: sliced_vector, row_ids: ids + offset, update_count: count);
731 } else {
732 col_data.Update(transaction, column_index: column.index, update_vector&: update_chunk.data[i], row_ids: ids, update_count: count);
733 }
734 MergeStatistics(column_idx: column.index, other: *col_data.GetUpdateStatistics());
735 }
736}
737
738void RowGroup::UpdateColumn(TransactionData transaction, DataChunk &updates, Vector &row_ids,
739 const vector<column_t> &column_path) {
740 D_ASSERT(updates.ColumnCount() == 1);
741 auto ids = FlatVector::GetData<row_t>(vector&: row_ids);
742
743 auto primary_column_idx = column_path[0];
744 D_ASSERT(primary_column_idx != COLUMN_IDENTIFIER_ROW_ID);
745 D_ASSERT(primary_column_idx < columns.size());
746 auto &col_data = GetColumn(c: primary_column_idx);
747 col_data.UpdateColumn(transaction, column_path, update_vector&: updates.data[0], row_ids: ids, update_count: updates.size(), depth: 1);
748 MergeStatistics(column_idx: primary_column_idx, other: *col_data.GetUpdateStatistics());
749}
750
751unique_ptr<BaseStatistics> RowGroup::GetStatistics(idx_t column_idx) {
752 auto &col_data = GetColumn(c: column_idx);
753 lock_guard<mutex> slock(stats_lock);
754 return col_data.GetStatistics();
755}
756
757void RowGroup::MergeStatistics(idx_t column_idx, const BaseStatistics &other) {
758 auto &col_data = GetColumn(c: column_idx);
759 lock_guard<mutex> slock(stats_lock);
760 col_data.MergeStatistics(other);
761}
762
763void RowGroup::MergeIntoStatistics(idx_t column_idx, BaseStatistics &other) {
764 auto &col_data = GetColumn(c: column_idx);
765 lock_guard<mutex> slock(stats_lock);
766 col_data.MergeIntoStatistics(other);
767}
768
769RowGroupWriteData RowGroup::WriteToDisk(PartialBlockManager &manager,
770 const vector<CompressionType> &compression_types) {
771 RowGroupWriteData result;
772 result.states.reserve(n: columns.size());
773 result.statistics.reserve(n: columns.size());
774
775 // Checkpoint the individual columns of the row group
776 // Here we're iterating over columns. Each column can have multiple segments.
777 // (Some columns will be wider than others, and require different numbers
778 // of blocks to encode.) Segments cannot span blocks.
779 //
780 // Some of these columns are composite (list, struct). The data is written
781 // first sequentially, and the pointers are written later, so that the
782 // pointers all end up densely packed, and thus more cache-friendly.
783 for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
784 auto &column = GetColumn(c: column_idx);
785 ColumnCheckpointInfo checkpoint_info {compression_types[column_idx]};
786 auto checkpoint_state = column.Checkpoint(row_group&: *this, partial_block_manager&: manager, checkpoint_info);
787 D_ASSERT(checkpoint_state);
788
789 auto stats = checkpoint_state->GetStatistics();
790 D_ASSERT(stats);
791
792 result.statistics.push_back(x: stats->Copy());
793 result.states.push_back(x: std::move(checkpoint_state));
794 }
795 D_ASSERT(result.states.size() == result.statistics.size());
796 return result;
797}
798
799RowGroupPointer RowGroup::Checkpoint(RowGroupWriter &writer, TableStatistics &global_stats) {
800 RowGroupPointer row_group_pointer;
801
802 vector<CompressionType> compression_types;
803 compression_types.reserve(n: columns.size());
804 for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
805 compression_types.push_back(x: writer.GetColumnCompressionType(i: column_idx));
806 }
807 auto result = WriteToDisk(manager&: writer.GetPartialBlockManager(), compression_types);
808 for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
809 global_stats.GetStats(i: column_idx).Statistics().Merge(other: result.statistics[column_idx]);
810 }
811
812 // construct the row group pointer and write the column meta data to disk
813 D_ASSERT(result.states.size() == columns.size());
814 row_group_pointer.row_start = start;
815 row_group_pointer.tuple_count = count;
816 for (auto &state : result.states) {
817 // get the current position of the table data writer
818 auto &data_writer = writer.GetPayloadWriter();
819 auto pointer = data_writer.GetBlockPointer();
820
821 // store the stats and the data pointers in the row group pointers
822 row_group_pointer.data_pointers.push_back(x: pointer);
823
824 // Write pointers to the column segments.
825 //
826 // Just as above, the state can refer to many other states, so this
827 // can cascade recursively into more pointer writes.
828 state->WriteDataPointers(writer);
829 }
830 row_group_pointer.versions = version_info;
831 Verify();
832 return row_group_pointer;
833}
834
835void RowGroup::CheckpointDeletes(VersionNode *versions, Serializer &serializer) {
836 if (!versions) {
837 // no version information: write nothing
838 serializer.Write<idx_t>(element: 0);
839 return;
840 }
841 // first count how many ChunkInfo's we need to deserialize
842 idx_t chunk_info_count = 0;
843 for (idx_t vector_idx = 0; vector_idx < RowGroup::ROW_GROUP_VECTOR_COUNT; vector_idx++) {
844 auto chunk_info = versions->info[vector_idx].get();
845 if (!chunk_info) {
846 continue;
847 }
848 chunk_info_count++;
849 }
850 // now serialize the actual version information
851 serializer.Write<idx_t>(element: chunk_info_count);
852 for (idx_t vector_idx = 0; vector_idx < RowGroup::ROW_GROUP_VECTOR_COUNT; vector_idx++) {
853 auto chunk_info = versions->info[vector_idx].get();
854 if (!chunk_info) {
855 continue;
856 }
857 serializer.Write<idx_t>(element: vector_idx);
858 chunk_info->Serialize(serialize&: serializer);
859 }
860}
861
862shared_ptr<VersionNode> RowGroup::DeserializeDeletes(Deserializer &source) {
863 auto chunk_count = source.Read<idx_t>();
864 if (chunk_count == 0) {
865 // no deletes
866 return nullptr;
867 }
868 auto version_info = make_shared<VersionNode>();
869 for (idx_t i = 0; i < chunk_count; i++) {
870 idx_t vector_index = source.Read<idx_t>();
871 if (vector_index >= RowGroup::ROW_GROUP_VECTOR_COUNT) {
872 throw Exception("In DeserializeDeletes, vector_index is out of range for the row group. Corrupted file?");
873 }
874 version_info->info[vector_index] = ChunkInfo::Deserialize(source);
875 }
876 return version_info;
877}
878
879void RowGroup::Serialize(RowGroupPointer &pointer, Serializer &main_serializer) {
880 FieldWriter writer(main_serializer);
881 writer.WriteField<uint64_t>(element: pointer.row_start);
882 writer.WriteField<uint64_t>(element: pointer.tuple_count);
883 auto &serializer = writer.GetSerializer();
884 for (auto &data_pointer : pointer.data_pointers) {
885 serializer.Write<block_id_t>(element: data_pointer.block_id);
886 serializer.Write<uint64_t>(element: data_pointer.offset);
887 }
888 CheckpointDeletes(versions: pointer.versions.get(), serializer);
889 writer.Finalize();
890}
891
892RowGroupPointer RowGroup::Deserialize(Deserializer &main_source, const vector<LogicalType> &columns) {
893 RowGroupPointer result;
894
895 FieldReader reader(main_source);
896 result.row_start = reader.ReadRequired<uint64_t>();
897 result.tuple_count = reader.ReadRequired<uint64_t>();
898
899 auto physical_columns = columns.size();
900 result.data_pointers.reserve(n: physical_columns);
901
902 auto &source = reader.GetSource();
903 for (idx_t i = 0; i < physical_columns; i++) {
904 BlockPointer pointer;
905 pointer.block_id = source.Read<block_id_t>();
906 pointer.offset = source.Read<uint64_t>();
907 result.data_pointers.push_back(x: pointer);
908 }
909 result.versions = DeserializeDeletes(source);
910
911 reader.Finalize();
912 return result;
913}
914
915//===--------------------------------------------------------------------===//
916// GetColumnSegmentInfo
917//===--------------------------------------------------------------------===//
918void RowGroup::GetColumnSegmentInfo(idx_t row_group_index, vector<ColumnSegmentInfo> &result) {
919 for (idx_t col_idx = 0; col_idx < GetColumnCount(); col_idx++) {
920 auto &col_data = GetColumn(c: col_idx);
921 col_data.GetColumnSegmentInfo(row_group_index, col_path: {col_idx}, result);
922 }
923}
924
925//===--------------------------------------------------------------------===//
926// Version Delete Information
927//===--------------------------------------------------------------------===//
928class VersionDeleteState {
929public:
930 VersionDeleteState(RowGroup &info, TransactionData transaction, DataTable &table, idx_t base_row)
931 : info(info), transaction(transaction), table(table), current_info(nullptr),
932 current_chunk(DConstants::INVALID_INDEX), count(0), base_row(base_row), delete_count(0) {
933 }
934
935 RowGroup &info;
936 TransactionData transaction;
937 DataTable &table;
938 ChunkVectorInfo *current_info;
939 idx_t current_chunk;
940 row_t rows[STANDARD_VECTOR_SIZE];
941 idx_t count;
942 idx_t base_row;
943 idx_t chunk_row;
944 idx_t delete_count;
945
946public:
947 void Delete(row_t row_id);
948 void Flush();
949};
950
951idx_t RowGroup::Delete(TransactionData transaction, DataTable &table, row_t *ids, idx_t count) {
952 lock_guard<mutex> lock(row_group_lock);
953 VersionDeleteState del_state(*this, transaction, table, this->start);
954
955 // obtain a write lock
956 for (idx_t i = 0; i < count; i++) {
957 D_ASSERT(ids[i] >= 0);
958 D_ASSERT(idx_t(ids[i]) >= this->start && idx_t(ids[i]) < this->start + this->count);
959 del_state.Delete(row_id: ids[i] - this->start);
960 }
961 del_state.Flush();
962 return del_state.delete_count;
963}
964
965void RowGroup::Verify() {
966#ifdef DEBUG
967 for (auto &column : GetColumns()) {
968 column->Verify(*this);
969 }
970#endif
971}
972
973void VersionDeleteState::Delete(row_t row_id) {
974 D_ASSERT(row_id >= 0);
975 idx_t vector_idx = row_id / STANDARD_VECTOR_SIZE;
976 idx_t idx_in_vector = row_id - vector_idx * STANDARD_VECTOR_SIZE;
977 if (current_chunk != vector_idx) {
978 Flush();
979
980 if (!info.version_info) {
981 info.version_info = make_shared<VersionNode>();
982 }
983
984 if (!info.version_info->info[vector_idx]) {
985 // no info yet: create it
986 info.version_info->info[vector_idx] =
987 make_uniq<ChunkVectorInfo>(args: info.start + vector_idx * STANDARD_VECTOR_SIZE);
988 } else if (info.version_info->info[vector_idx]->type == ChunkInfoType::CONSTANT_INFO) {
989 auto &constant = info.version_info->info[vector_idx]->Cast<ChunkConstantInfo>();
990 // info exists but it's a constant info: convert to a vector info
991 auto new_info = make_uniq<ChunkVectorInfo>(args: info.start + vector_idx * STANDARD_VECTOR_SIZE);
992 new_info->insert_id = constant.insert_id.load();
993 for (idx_t i = 0; i < STANDARD_VECTOR_SIZE; i++) {
994 new_info->inserted[i] = constant.insert_id.load();
995 }
996 info.version_info->info[vector_idx] = std::move(new_info);
997 }
998 D_ASSERT(info.version_info->info[vector_idx]->type == ChunkInfoType::VECTOR_INFO);
999 current_info = &info.version_info->info[vector_idx]->Cast<ChunkVectorInfo>();
1000 current_chunk = vector_idx;
1001 chunk_row = vector_idx * STANDARD_VECTOR_SIZE;
1002 }
1003 rows[count++] = idx_in_vector;
1004}
1005
1006void VersionDeleteState::Flush() {
1007 if (count == 0) {
1008 return;
1009 }
1010 // it is possible for delete statements to delete the same tuple multiple times when combined with a USING clause
1011 // in the current_info->Delete, we check which tuples are actually deleted (excluding duplicate deletions)
1012 // this is returned in the actual_delete_count
1013 auto actual_delete_count = current_info->Delete(transaction_id: transaction.transaction_id, rows, count);
1014 delete_count += actual_delete_count;
1015 if (transaction.transaction && actual_delete_count > 0) {
1016 // now push the delete into the undo buffer, but only if any deletes were actually performed
1017 transaction.transaction->PushDelete(table, vinfo: current_info, rows, count: actual_delete_count, base_row: base_row + chunk_row);
1018 }
1019 count = 0;
1020}
1021
1022} // namespace duckdb
1023