1 | #include "duckdb/storage/table/row_group.hpp" |
2 | #include "duckdb/common/types/vector.hpp" |
3 | #include "duckdb/transaction/transaction.hpp" |
4 | #include "duckdb/common/exception.hpp" |
5 | #include "duckdb/common/field_writer.hpp" |
6 | #include "duckdb/storage/table/column_data.hpp" |
7 | #include "duckdb/storage/table/column_checkpoint_state.hpp" |
8 | #include "duckdb/storage/table/update_segment.hpp" |
9 | #include "duckdb/storage/table_storage_info.hpp" |
10 | #include "duckdb/common/chrono.hpp" |
11 | #include "duckdb/planner/table_filter.hpp" |
12 | #include "duckdb/execution/expression_executor.hpp" |
13 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
14 | #include "duckdb/storage/meta_block_reader.hpp" |
15 | #include "duckdb/transaction/duck_transaction_manager.hpp" |
16 | #include "duckdb/main/database.hpp" |
17 | #include "duckdb/main/attached_database.hpp" |
18 | #include "duckdb/transaction/duck_transaction.hpp" |
19 | #include "duckdb/storage/table/append_state.hpp" |
20 | #include "duckdb/storage/table/scan_state.hpp" |
21 | |
22 | namespace duckdb { |
23 | |
24 | constexpr const idx_t RowGroup::ROW_GROUP_VECTOR_COUNT; |
25 | constexpr const idx_t RowGroup::ROW_GROUP_SIZE; |
26 | |
27 | RowGroup::RowGroup(RowGroupCollection &collection, idx_t start, idx_t count) |
28 | : SegmentBase<RowGroup>(start, count), collection(collection) { |
29 | Verify(); |
30 | } |
31 | |
32 | RowGroup::RowGroup(RowGroupCollection &collection, RowGroupPointer &&pointer) |
33 | : SegmentBase<RowGroup>(pointer.row_start, pointer.tuple_count), collection(collection) { |
34 | // deserialize the columns |
35 | if (pointer.data_pointers.size() != collection.GetTypes().size()) { |
36 | throw IOException("Row group column count is unaligned with table column count. Corrupt file?" ); |
37 | } |
38 | this->column_pointers = std::move(pointer.data_pointers); |
39 | this->columns.resize(new_size: column_pointers.size()); |
40 | this->is_loaded = unique_ptr<atomic<bool>[]>(new atomic<bool>[columns.size()]); |
41 | for (idx_t c = 0; c < columns.size(); c++) { |
42 | this->is_loaded[c] = false; |
43 | } |
44 | this->version_info = std::move(pointer.versions); |
45 | |
46 | Verify(); |
47 | } |
48 | |
49 | void RowGroup::MoveToCollection(RowGroupCollection &collection, idx_t new_start) { |
50 | this->collection = collection; |
51 | this->start = new_start; |
52 | for (auto &column : GetColumns()) { |
53 | column->SetStart(new_start); |
54 | } |
55 | if (version_info) { |
56 | version_info->SetStart(new_start); |
57 | } |
58 | } |
59 | |
60 | void VersionNode::SetStart(idx_t start) { |
61 | idx_t current_start = start; |
62 | for (idx_t i = 0; i < RowGroup::ROW_GROUP_VECTOR_COUNT; i++) { |
63 | if (info[i]) { |
64 | info[i]->start = current_start; |
65 | } |
66 | current_start += STANDARD_VECTOR_SIZE; |
67 | } |
68 | } |
69 | |
70 | RowGroup::~RowGroup() { |
71 | } |
72 | |
73 | vector<shared_ptr<ColumnData>> &RowGroup::GetColumns() { |
74 | // ensure all columns are loaded |
75 | for (idx_t c = 0; c < GetColumnCount(); c++) { |
76 | GetColumn(c); |
77 | } |
78 | return columns; |
79 | } |
80 | |
81 | idx_t RowGroup::GetColumnCount() const { |
82 | return columns.size(); |
83 | } |
84 | |
85 | ColumnData &RowGroup::GetColumn(storage_t c) { |
86 | D_ASSERT(c < columns.size()); |
87 | if (!is_loaded) { |
88 | // not being lazy loaded |
89 | D_ASSERT(columns[c]); |
90 | return *columns[c]; |
91 | } |
92 | if (is_loaded[c]) { |
93 | D_ASSERT(columns[c]); |
94 | return *columns[c]; |
95 | } |
96 | lock_guard<mutex> l(row_group_lock); |
97 | if (columns[c]) { |
98 | D_ASSERT(is_loaded[c]); |
99 | return *columns[c]; |
100 | } |
101 | if (column_pointers.size() != columns.size()) { |
102 | throw InternalException("Lazy loading a column but the pointer was not set" ); |
103 | } |
104 | auto &block_manager = GetCollection().GetBlockManager(); |
105 | auto &types = GetCollection().GetTypes(); |
106 | auto &block_pointer = column_pointers[c]; |
107 | MetaBlockReader column_data_reader(block_manager, block_pointer.block_id); |
108 | column_data_reader.offset = block_pointer.offset; |
109 | this->columns[c] = |
110 | ColumnData::Deserialize(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: c, start_row: start, source&: column_data_reader, type: types[c], parent: nullptr); |
111 | is_loaded[c] = true; |
112 | return *columns[c]; |
113 | } |
114 | |
115 | DatabaseInstance &RowGroup::GetDatabase() { |
116 | return GetCollection().GetDatabase(); |
117 | } |
118 | |
119 | BlockManager &RowGroup::GetBlockManager() { |
120 | return GetCollection().GetBlockManager(); |
121 | } |
122 | DataTableInfo &RowGroup::GetTableInfo() { |
123 | return GetCollection().GetTableInfo(); |
124 | } |
125 | |
126 | void RowGroup::InitializeEmpty(const vector<LogicalType> &types) { |
127 | // set up the segment trees for the column segments |
128 | D_ASSERT(columns.empty()); |
129 | for (idx_t i = 0; i < types.size(); i++) { |
130 | auto column_data = ColumnData::CreateColumn(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: i, start_row: start, type: types[i]); |
131 | columns.push_back(x: std::move(column_data)); |
132 | } |
133 | } |
134 | |
135 | void ColumnScanState::Initialize(const LogicalType &type) { |
136 | if (type.id() == LogicalTypeId::VALIDITY) { |
137 | // validity - nothing to initialize |
138 | return; |
139 | } |
140 | if (type.InternalType() == PhysicalType::STRUCT) { |
141 | // validity + struct children |
142 | auto &struct_children = StructType::GetChildTypes(type); |
143 | child_states.resize(new_size: struct_children.size() + 1); |
144 | for (idx_t i = 0; i < struct_children.size(); i++) { |
145 | child_states[i + 1].Initialize(type: struct_children[i].second); |
146 | } |
147 | } else if (type.InternalType() == PhysicalType::LIST) { |
148 | // validity + list child |
149 | child_states.resize(new_size: 2); |
150 | child_states[1].Initialize(type: ListType::GetChildType(type)); |
151 | } else { |
152 | // validity |
153 | child_states.resize(new_size: 1); |
154 | } |
155 | } |
156 | |
157 | void CollectionScanState::Initialize(const vector<LogicalType> &types) { |
158 | auto &column_ids = GetColumnIds(); |
159 | column_scans = make_unsafe_uniq_array<ColumnScanState>(n: column_ids.size()); |
160 | for (idx_t i = 0; i < column_ids.size(); i++) { |
161 | if (column_ids[i] == COLUMN_IDENTIFIER_ROW_ID) { |
162 | continue; |
163 | } |
164 | column_scans[i].Initialize(type: types[column_ids[i]]); |
165 | } |
166 | } |
167 | |
168 | bool RowGroup::InitializeScanWithOffset(CollectionScanState &state, idx_t vector_offset) { |
169 | auto &column_ids = state.GetColumnIds(); |
170 | auto filters = state.GetFilters(); |
171 | if (filters) { |
172 | if (!CheckZonemap(filters&: *filters, column_ids)) { |
173 | return false; |
174 | } |
175 | } |
176 | |
177 | state.row_group = this; |
178 | state.vector_index = vector_offset; |
179 | state.max_row_group_row = |
180 | this->start > state.max_row ? 0 : MinValue<idx_t>(a: this->count, b: state.max_row - this->start); |
181 | D_ASSERT(state.column_scans); |
182 | for (idx_t i = 0; i < column_ids.size(); i++) { |
183 | const auto &column = column_ids[i]; |
184 | if (column != COLUMN_IDENTIFIER_ROW_ID) { |
185 | auto &column_data = GetColumn(c: column); |
186 | column_data.InitializeScanWithOffset(state&: state.column_scans[i], row_idx: start + vector_offset * STANDARD_VECTOR_SIZE); |
187 | } else { |
188 | state.column_scans[i].current = nullptr; |
189 | } |
190 | } |
191 | return true; |
192 | } |
193 | |
194 | bool RowGroup::InitializeScan(CollectionScanState &state) { |
195 | auto &column_ids = state.GetColumnIds(); |
196 | auto filters = state.GetFilters(); |
197 | if (filters) { |
198 | if (!CheckZonemap(filters&: *filters, column_ids)) { |
199 | return false; |
200 | } |
201 | } |
202 | state.row_group = this; |
203 | state.vector_index = 0; |
204 | state.max_row_group_row = |
205 | this->start > state.max_row ? 0 : MinValue<idx_t>(a: this->count, b: state.max_row - this->start); |
206 | if (state.max_row_group_row == 0) { |
207 | return false; |
208 | } |
209 | D_ASSERT(state.column_scans); |
210 | for (idx_t i = 0; i < column_ids.size(); i++) { |
211 | auto column = column_ids[i]; |
212 | if (column != COLUMN_IDENTIFIER_ROW_ID) { |
213 | auto &column_data = GetColumn(c: column); |
214 | column_data.InitializeScan(state&: state.column_scans[i]); |
215 | } else { |
216 | state.column_scans[i].current = nullptr; |
217 | } |
218 | } |
219 | return true; |
220 | } |
221 | |
222 | unique_ptr<RowGroup> RowGroup::AlterType(RowGroupCollection &new_collection, const LogicalType &target_type, |
223 | idx_t changed_idx, ExpressionExecutor &executor, |
224 | CollectionScanState &scan_state, DataChunk &scan_chunk) { |
225 | Verify(); |
226 | |
227 | // construct a new column data for this type |
228 | auto column_data = ColumnData::CreateColumn(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: changed_idx, start_row: start, type: target_type); |
229 | |
230 | ColumnAppendState append_state; |
231 | column_data->InitializeAppend(state&: append_state); |
232 | |
233 | // scan the original table, and fill the new column with the transformed value |
234 | scan_state.Initialize(types: GetCollection().GetTypes()); |
235 | InitializeScan(state&: scan_state); |
236 | |
237 | DataChunk append_chunk; |
238 | vector<LogicalType> append_types; |
239 | append_types.push_back(x: target_type); |
240 | append_chunk.Initialize(allocator&: Allocator::DefaultAllocator(), types: append_types); |
241 | auto &append_vector = append_chunk.data[0]; |
242 | while (true) { |
243 | // scan the table |
244 | scan_chunk.Reset(); |
245 | ScanCommitted(state&: scan_state, result&: scan_chunk, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS); |
246 | if (scan_chunk.size() == 0) { |
247 | break; |
248 | } |
249 | // execute the expression |
250 | append_chunk.Reset(); |
251 | executor.ExecuteExpression(input&: scan_chunk, result&: append_vector); |
252 | column_data->Append(state&: append_state, vector&: append_vector, count: scan_chunk.size()); |
253 | } |
254 | |
255 | // set up the row_group based on this row_group |
256 | auto row_group = make_uniq<RowGroup>(args&: new_collection, args&: this->start, args&: this->count); |
257 | row_group->version_info = version_info; |
258 | auto &cols = GetColumns(); |
259 | for (idx_t i = 0; i < cols.size(); i++) { |
260 | if (i == changed_idx) { |
261 | // this is the altered column: use the new column |
262 | row_group->columns.push_back(x: std::move(column_data)); |
263 | } else { |
264 | // this column was not altered: use the data directly |
265 | row_group->columns.push_back(x: cols[i]); |
266 | } |
267 | } |
268 | row_group->Verify(); |
269 | return row_group; |
270 | } |
271 | |
272 | unique_ptr<RowGroup> RowGroup::AddColumn(RowGroupCollection &new_collection, ColumnDefinition &new_column, |
273 | ExpressionExecutor &executor, Expression *default_value, Vector &result) { |
274 | Verify(); |
275 | |
276 | // construct a new column data for the new column |
277 | auto added_column = |
278 | ColumnData::CreateColumn(block_manager&: GetBlockManager(), info&: GetTableInfo(), column_index: GetColumnCount(), start_row: start, type: new_column.Type()); |
279 | |
280 | idx_t rows_to_write = this->count; |
281 | if (rows_to_write > 0) { |
282 | DataChunk dummy_chunk; |
283 | |
284 | ColumnAppendState state; |
285 | added_column->InitializeAppend(state); |
286 | for (idx_t i = 0; i < rows_to_write; i += STANDARD_VECTOR_SIZE) { |
287 | idx_t rows_in_this_vector = MinValue<idx_t>(a: rows_to_write - i, STANDARD_VECTOR_SIZE); |
288 | if (default_value) { |
289 | dummy_chunk.SetCardinality(rows_in_this_vector); |
290 | executor.ExecuteExpression(input&: dummy_chunk, result); |
291 | } |
292 | added_column->Append(state, vector&: result, count: rows_in_this_vector); |
293 | } |
294 | } |
295 | |
296 | // set up the row_group based on this row_group |
297 | auto row_group = make_uniq<RowGroup>(args&: new_collection, args&: this->start, args&: this->count); |
298 | row_group->version_info = version_info; |
299 | row_group->columns = GetColumns(); |
300 | // now add the new column |
301 | row_group->columns.push_back(x: std::move(added_column)); |
302 | |
303 | row_group->Verify(); |
304 | return row_group; |
305 | } |
306 | |
307 | unique_ptr<RowGroup> RowGroup::RemoveColumn(RowGroupCollection &new_collection, idx_t removed_column) { |
308 | Verify(); |
309 | |
310 | D_ASSERT(removed_column < columns.size()); |
311 | |
312 | auto row_group = make_uniq<RowGroup>(args&: new_collection, args&: this->start, args&: this->count); |
313 | row_group->version_info = version_info; |
314 | // copy over all columns except for the removed one |
315 | auto &cols = GetColumns(); |
316 | for (idx_t i = 0; i < cols.size(); i++) { |
317 | if (i != removed_column) { |
318 | row_group->columns.push_back(x: cols[i]); |
319 | } |
320 | } |
321 | |
322 | row_group->Verify(); |
323 | return row_group; |
324 | } |
325 | |
326 | void RowGroup::CommitDrop() { |
327 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
328 | CommitDropColumn(index: column_idx); |
329 | } |
330 | } |
331 | |
332 | void RowGroup::CommitDropColumn(idx_t column_idx) { |
333 | GetColumn(c: column_idx).CommitDropColumn(); |
334 | } |
335 | |
336 | void RowGroup::NextVector(CollectionScanState &state) { |
337 | state.vector_index++; |
338 | const auto &column_ids = state.GetColumnIds(); |
339 | for (idx_t i = 0; i < column_ids.size(); i++) { |
340 | const auto &column = column_ids[i]; |
341 | if (column == COLUMN_IDENTIFIER_ROW_ID) { |
342 | continue; |
343 | } |
344 | D_ASSERT(column < columns.size()); |
345 | GetColumn(c: column).Skip(state&: state.column_scans[i]); |
346 | } |
347 | } |
348 | |
349 | bool RowGroup::CheckZonemap(TableFilterSet &filters, const vector<storage_t> &column_ids) { |
350 | for (auto &entry : filters.filters) { |
351 | auto column_index = entry.first; |
352 | auto &filter = entry.second; |
353 | const auto &base_column_index = column_ids[column_index]; |
354 | if (!GetColumn(c: base_column_index).CheckZonemap(filter&: *filter)) { |
355 | return false; |
356 | } |
357 | } |
358 | return true; |
359 | } |
360 | |
361 | bool RowGroup::CheckZonemapSegments(CollectionScanState &state) { |
362 | auto &column_ids = state.GetColumnIds(); |
363 | auto filters = state.GetFilters(); |
364 | if (!filters) { |
365 | return true; |
366 | } |
367 | for (auto &entry : filters->filters) { |
368 | D_ASSERT(entry.first < column_ids.size()); |
369 | auto column_idx = entry.first; |
370 | const auto &base_column_idx = column_ids[column_idx]; |
371 | bool read_segment = GetColumn(c: base_column_idx).CheckZonemap(state&: state.column_scans[column_idx], filter&: *entry.second); |
372 | if (!read_segment) { |
373 | idx_t target_row = |
374 | state.column_scans[column_idx].current->start + state.column_scans[column_idx].current->count; |
375 | D_ASSERT(target_row >= this->start); |
376 | D_ASSERT(target_row <= this->start + this->count); |
377 | idx_t target_vector_index = (target_row - this->start) / STANDARD_VECTOR_SIZE; |
378 | if (state.vector_index == target_vector_index) { |
379 | // we can't skip any full vectors because this segment contains less than a full vector |
380 | // for now we just bail-out |
381 | // FIXME: we could check if we can ALSO skip the next segments, in which case skipping a full vector |
382 | // might be possible |
383 | // we don't care that much though, since a single segment that fits less than a full vector is |
384 | // exceedingly rare |
385 | return true; |
386 | } |
387 | while (state.vector_index < target_vector_index) { |
388 | NextVector(state); |
389 | } |
390 | return false; |
391 | } |
392 | } |
393 | |
394 | return true; |
395 | } |
396 | |
397 | template <TableScanType TYPE> |
398 | void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &state, DataChunk &result) { |
399 | const bool ALLOW_UPDATES = TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES && |
400 | TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED; |
401 | auto table_filters = state.GetFilters(); |
402 | const auto &column_ids = state.GetColumnIds(); |
403 | auto adaptive_filter = state.GetAdaptiveFilter(); |
404 | while (true) { |
405 | if (state.vector_index * STANDARD_VECTOR_SIZE >= state.max_row_group_row) { |
406 | // exceeded the amount of rows to scan |
407 | return; |
408 | } |
409 | idx_t current_row = state.vector_index * STANDARD_VECTOR_SIZE; |
410 | auto max_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, b: state.max_row_group_row - current_row); |
411 | |
412 | //! first check the zonemap if we have to scan this partition |
413 | if (!CheckZonemapSegments(state)) { |
414 | continue; |
415 | } |
416 | // second, scan the version chunk manager to figure out which tuples to load for this transaction |
417 | idx_t count; |
418 | SelectionVector valid_sel(STANDARD_VECTOR_SIZE); |
419 | if (TYPE == TableScanType::TABLE_SCAN_REGULAR) { |
420 | count = state.row_group->GetSelVector(transaction, vector_idx: state.vector_index, sel_vector&: valid_sel, max_count); |
421 | if (count == 0) { |
422 | // nothing to scan for this vector, skip the entire vector |
423 | NextVector(state); |
424 | continue; |
425 | } |
426 | } else if (TYPE == TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED) { |
427 | count = state.row_group->GetCommittedSelVector(start_time: transaction.start_time, transaction_id: transaction.transaction_id, |
428 | vector_idx: state.vector_index, sel_vector&: valid_sel, max_count); |
429 | if (count == 0) { |
430 | // nothing to scan for this vector, skip the entire vector |
431 | NextVector(state); |
432 | continue; |
433 | } |
434 | } else { |
435 | count = max_count; |
436 | } |
437 | if (count == max_count && !table_filters) { |
438 | // scan all vectors completely: full scan without deletions or table filters |
439 | for (idx_t i = 0; i < column_ids.size(); i++) { |
440 | const auto &column = column_ids[i]; |
441 | if (column == COLUMN_IDENTIFIER_ROW_ID) { |
442 | // scan row id |
443 | D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE); |
444 | result.data[i].Sequence(start: this->start + current_row, increment: 1, count); |
445 | } else { |
446 | auto &col_data = GetColumn(c: column); |
447 | if (TYPE != TableScanType::TABLE_SCAN_REGULAR) { |
448 | col_data.ScanCommitted(vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i], |
449 | allow_updates: ALLOW_UPDATES); |
450 | } else { |
451 | col_data.Scan(transaction, vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i]); |
452 | } |
453 | } |
454 | } |
455 | } else { |
456 | // partial scan: we have deletions or table filters |
457 | idx_t approved_tuple_count = count; |
458 | SelectionVector sel; |
459 | if (count != max_count) { |
460 | sel.Initialize(other: valid_sel); |
461 | } else { |
462 | sel.Initialize(sel: nullptr); |
463 | } |
464 | //! first, we scan the columns with filters, fetch their data and generate a selection vector. |
465 | //! get runtime statistics |
466 | auto start_time = high_resolution_clock::now(); |
467 | if (table_filters) { |
468 | D_ASSERT(adaptive_filter); |
469 | D_ASSERT(ALLOW_UPDATES); |
470 | for (idx_t i = 0; i < table_filters->filters.size(); i++) { |
471 | auto tf_idx = adaptive_filter->permutation[i]; |
472 | auto col_idx = column_ids[tf_idx]; |
473 | auto &col_data = GetColumn(c: col_idx); |
474 | col_data.Select(transaction, vector_index: state.vector_index, state&: state.column_scans[tf_idx], result&: result.data[tf_idx], |
475 | sel, count&: approved_tuple_count, filter: *table_filters->filters[tf_idx]); |
476 | } |
477 | for (auto &table_filter : table_filters->filters) { |
478 | result.data[table_filter.first].Slice(sel, count: approved_tuple_count); |
479 | } |
480 | } |
481 | if (approved_tuple_count == 0) { |
482 | // all rows were filtered out by the table filters |
483 | // skip this vector in all the scans that were not scanned yet |
484 | D_ASSERT(table_filters); |
485 | result.Reset(); |
486 | for (idx_t i = 0; i < column_ids.size(); i++) { |
487 | auto col_idx = column_ids[i]; |
488 | if (col_idx == COLUMN_IDENTIFIER_ROW_ID) { |
489 | continue; |
490 | } |
491 | if (table_filters->filters.find(x: i) == table_filters->filters.end()) { |
492 | auto &col_data = GetColumn(c: col_idx); |
493 | col_data.Skip(state&: state.column_scans[i]); |
494 | } |
495 | } |
496 | state.vector_index++; |
497 | continue; |
498 | } |
499 | //! Now we use the selection vector to fetch data for the other columns. |
500 | for (idx_t i = 0; i < column_ids.size(); i++) { |
501 | if (!table_filters || table_filters->filters.find(x: i) == table_filters->filters.end()) { |
502 | auto column = column_ids[i]; |
503 | if (column == COLUMN_IDENTIFIER_ROW_ID) { |
504 | D_ASSERT(result.data[i].GetType().InternalType() == PhysicalType::INT64); |
505 | result.data[i].SetVectorType(VectorType::FLAT_VECTOR); |
506 | auto result_data = FlatVector::GetData<int64_t>(vector&: result.data[i]); |
507 | for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) { |
508 | result_data[sel_idx] = this->start + current_row + sel.get_index(idx: sel_idx); |
509 | } |
510 | } else { |
511 | auto &col_data = GetColumn(c: column); |
512 | if (TYPE == TableScanType::TABLE_SCAN_REGULAR) { |
513 | col_data.FilterScan(transaction, vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i], |
514 | sel, count: approved_tuple_count); |
515 | } else { |
516 | col_data.FilterScanCommitted(vector_index: state.vector_index, state&: state.column_scans[i], result&: result.data[i], sel, |
517 | count: approved_tuple_count, allow_updates: ALLOW_UPDATES); |
518 | } |
519 | } |
520 | } |
521 | } |
522 | auto end_time = high_resolution_clock::now(); |
523 | if (adaptive_filter && table_filters->filters.size() > 1) { |
524 | adaptive_filter->AdaptRuntimeStatistics(duration: duration_cast<duration<double>>(d: end_time - start_time).count()); |
525 | } |
526 | D_ASSERT(approved_tuple_count > 0); |
527 | count = approved_tuple_count; |
528 | } |
529 | result.SetCardinality(count); |
530 | state.vector_index++; |
531 | break; |
532 | } |
533 | } |
534 | |
535 | void RowGroup::Scan(TransactionData transaction, CollectionScanState &state, DataChunk &result) { |
536 | TemplatedScan<TableScanType::TABLE_SCAN_REGULAR>(transaction, state, result); |
537 | } |
538 | |
539 | void RowGroup::ScanCommitted(CollectionScanState &state, DataChunk &result, TableScanType type) { |
540 | auto &transaction_manager = DuckTransactionManager::Get(db&: GetCollection().GetAttached()); |
541 | |
542 | auto lowest_active_start = transaction_manager.LowestActiveStart(); |
543 | auto lowest_active_id = transaction_manager.LowestActiveId(); |
544 | TransactionData data(lowest_active_id, lowest_active_start); |
545 | switch (type) { |
546 | case TableScanType::TABLE_SCAN_COMMITTED_ROWS: |
547 | TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS>(transaction: data, state, result); |
548 | break; |
549 | case TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES: |
550 | TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES>(transaction: data, state, result); |
551 | break; |
552 | case TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED: |
553 | TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED>(transaction: data, state, result); |
554 | break; |
555 | default: |
556 | throw InternalException("Unrecognized table scan type" ); |
557 | } |
558 | } |
559 | |
560 | ChunkInfo *RowGroup::GetChunkInfo(idx_t vector_idx) { |
561 | if (!version_info) { |
562 | return nullptr; |
563 | } |
564 | return version_info->info[vector_idx].get(); |
565 | } |
566 | |
567 | idx_t RowGroup::GetSelVector(TransactionData transaction, idx_t vector_idx, SelectionVector &sel_vector, |
568 | idx_t max_count) { |
569 | lock_guard<mutex> lock(row_group_lock); |
570 | |
571 | auto info = GetChunkInfo(vector_idx); |
572 | if (!info) { |
573 | return max_count; |
574 | } |
575 | return info->GetSelVector(transaction, sel_vector, max_count); |
576 | } |
577 | |
578 | idx_t RowGroup::GetCommittedSelVector(transaction_t start_time, transaction_t transaction_id, idx_t vector_idx, |
579 | SelectionVector &sel_vector, idx_t max_count) { |
580 | lock_guard<mutex> lock(row_group_lock); |
581 | |
582 | auto info = GetChunkInfo(vector_idx); |
583 | if (!info) { |
584 | return max_count; |
585 | } |
586 | return info->GetCommittedSelVector(min_start_id: start_time, min_transaction_id: transaction_id, sel_vector, max_count); |
587 | } |
588 | |
589 | bool RowGroup::Fetch(TransactionData transaction, idx_t row) { |
590 | D_ASSERT(row < this->count); |
591 | lock_guard<mutex> lock(row_group_lock); |
592 | |
593 | idx_t vector_index = row / STANDARD_VECTOR_SIZE; |
594 | auto info = GetChunkInfo(vector_idx: vector_index); |
595 | if (!info) { |
596 | return true; |
597 | } |
598 | return info->Fetch(transaction, row: row - vector_index * STANDARD_VECTOR_SIZE); |
599 | } |
600 | |
601 | void RowGroup::FetchRow(TransactionData transaction, ColumnFetchState &state, const vector<column_t> &column_ids, |
602 | row_t row_id, DataChunk &result, idx_t result_idx) { |
603 | for (idx_t col_idx = 0; col_idx < column_ids.size(); col_idx++) { |
604 | auto column = column_ids[col_idx]; |
605 | if (column == COLUMN_IDENTIFIER_ROW_ID) { |
606 | // row id column: fill in the row ids |
607 | D_ASSERT(result.data[col_idx].GetType().InternalType() == PhysicalType::INT64); |
608 | result.data[col_idx].SetVectorType(VectorType::FLAT_VECTOR); |
609 | auto data = FlatVector::GetData<row_t>(vector&: result.data[col_idx]); |
610 | data[result_idx] = row_id; |
611 | } else { |
612 | // regular column: fetch data from the base column |
613 | auto &col_data = GetColumn(c: column); |
614 | col_data.FetchRow(transaction, state, row_id, result&: result.data[col_idx], result_idx); |
615 | } |
616 | } |
617 | } |
618 | |
619 | void RowGroup::AppendVersionInfo(TransactionData transaction, idx_t count) { |
620 | idx_t row_group_start = this->count.load(); |
621 | idx_t row_group_end = row_group_start + count; |
622 | if (row_group_end > RowGroup::ROW_GROUP_SIZE) { |
623 | row_group_end = RowGroup::ROW_GROUP_SIZE; |
624 | } |
625 | lock_guard<mutex> lock(row_group_lock); |
626 | |
627 | // create the version_info if it doesn't exist yet |
628 | if (!version_info) { |
629 | version_info = make_shared<VersionNode>(); |
630 | } |
631 | idx_t start_vector_idx = row_group_start / STANDARD_VECTOR_SIZE; |
632 | idx_t end_vector_idx = (row_group_end - 1) / STANDARD_VECTOR_SIZE; |
633 | for (idx_t vector_idx = start_vector_idx; vector_idx <= end_vector_idx; vector_idx++) { |
634 | idx_t start = vector_idx == start_vector_idx ? row_group_start - start_vector_idx * STANDARD_VECTOR_SIZE : 0; |
635 | idx_t end = |
636 | vector_idx == end_vector_idx ? row_group_end - end_vector_idx * STANDARD_VECTOR_SIZE : STANDARD_VECTOR_SIZE; |
637 | if (start == 0 && end == STANDARD_VECTOR_SIZE) { |
638 | // entire vector is encapsulated by append: append a single constant |
639 | auto constant_info = make_uniq<ChunkConstantInfo>(args: this->start + vector_idx * STANDARD_VECTOR_SIZE); |
640 | constant_info->insert_id = transaction.transaction_id; |
641 | constant_info->delete_id = NOT_DELETED_ID; |
642 | version_info->info[vector_idx] = std::move(constant_info); |
643 | } else { |
644 | // part of a vector is encapsulated: append to that part |
645 | ChunkVectorInfo *info; |
646 | if (!version_info->info[vector_idx]) { |
647 | // first time appending to this vector: create new info |
648 | auto insert_info = make_uniq<ChunkVectorInfo>(args: this->start + vector_idx * STANDARD_VECTOR_SIZE); |
649 | info = insert_info.get(); |
650 | version_info->info[vector_idx] = std::move(insert_info); |
651 | } else { |
652 | D_ASSERT(version_info->info[vector_idx]->type == ChunkInfoType::VECTOR_INFO); |
653 | // use existing vector |
654 | info = &version_info->info[vector_idx]->Cast<ChunkVectorInfo>(); |
655 | } |
656 | info->Append(start, end, commit_id: transaction.transaction_id); |
657 | } |
658 | } |
659 | this->count = row_group_end; |
660 | } |
661 | |
662 | void RowGroup::CommitAppend(transaction_t commit_id, idx_t row_group_start, idx_t count) { |
663 | D_ASSERT(version_info.get()); |
664 | idx_t row_group_end = row_group_start + count; |
665 | lock_guard<mutex> lock(row_group_lock); |
666 | |
667 | idx_t start_vector_idx = row_group_start / STANDARD_VECTOR_SIZE; |
668 | idx_t end_vector_idx = (row_group_end - 1) / STANDARD_VECTOR_SIZE; |
669 | for (idx_t vector_idx = start_vector_idx; vector_idx <= end_vector_idx; vector_idx++) { |
670 | idx_t start = vector_idx == start_vector_idx ? row_group_start - start_vector_idx * STANDARD_VECTOR_SIZE : 0; |
671 | idx_t end = |
672 | vector_idx == end_vector_idx ? row_group_end - end_vector_idx * STANDARD_VECTOR_SIZE : STANDARD_VECTOR_SIZE; |
673 | |
674 | auto info = version_info->info[vector_idx].get(); |
675 | info->CommitAppend(commit_id, start, end); |
676 | } |
677 | } |
678 | |
679 | void RowGroup::RevertAppend(idx_t row_group_start) { |
680 | if (!version_info) { |
681 | return; |
682 | } |
683 | idx_t start_row = row_group_start - this->start; |
684 | idx_t start_vector_idx = (start_row + (STANDARD_VECTOR_SIZE - 1)) / STANDARD_VECTOR_SIZE; |
685 | for (idx_t vector_idx = start_vector_idx; vector_idx < RowGroup::ROW_GROUP_VECTOR_COUNT; vector_idx++) { |
686 | version_info->info[vector_idx].reset(); |
687 | } |
688 | for (auto &column : columns) { |
689 | column->RevertAppend(start_row: row_group_start); |
690 | } |
691 | this->count = MinValue<idx_t>(a: row_group_start - this->start, b: this->count); |
692 | Verify(); |
693 | } |
694 | |
695 | void RowGroup::InitializeAppend(RowGroupAppendState &append_state) { |
696 | append_state.row_group = this; |
697 | append_state.offset_in_row_group = this->count; |
698 | // for each column, initialize the append state |
699 | append_state.states = make_unsafe_uniq_array<ColumnAppendState>(n: GetColumnCount()); |
700 | for (idx_t i = 0; i < GetColumnCount(); i++) { |
701 | auto &col_data = GetColumn(c: i); |
702 | col_data.InitializeAppend(state&: append_state.states[i]); |
703 | } |
704 | } |
705 | |
706 | void RowGroup::Append(RowGroupAppendState &state, DataChunk &chunk, idx_t append_count) { |
707 | // append to the current row_group |
708 | for (idx_t i = 0; i < GetColumnCount(); i++) { |
709 | auto &col_data = GetColumn(c: i); |
710 | col_data.Append(state&: state.states[i], vector&: chunk.data[i], count: append_count); |
711 | } |
712 | state.offset_in_row_group += append_count; |
713 | } |
714 | |
715 | void RowGroup::Update(TransactionData transaction, DataChunk &update_chunk, row_t *ids, idx_t offset, idx_t count, |
716 | const vector<PhysicalIndex> &column_ids) { |
717 | #ifdef DEBUG |
718 | for (size_t i = offset; i < offset + count; i++) { |
719 | D_ASSERT(ids[i] >= row_t(this->start) && ids[i] < row_t(this->start + this->count)); |
720 | } |
721 | #endif |
722 | for (idx_t i = 0; i < column_ids.size(); i++) { |
723 | auto column = column_ids[i]; |
724 | D_ASSERT(column.index != COLUMN_IDENTIFIER_ROW_ID); |
725 | auto &col_data = GetColumn(c: column.index); |
726 | D_ASSERT(col_data.type.id() == update_chunk.data[i].GetType().id()); |
727 | if (offset > 0) { |
728 | Vector sliced_vector(update_chunk.data[i], offset, offset + count); |
729 | sliced_vector.Flatten(count); |
730 | col_data.Update(transaction, column_index: column.index, update_vector&: sliced_vector, row_ids: ids + offset, update_count: count); |
731 | } else { |
732 | col_data.Update(transaction, column_index: column.index, update_vector&: update_chunk.data[i], row_ids: ids, update_count: count); |
733 | } |
734 | MergeStatistics(column_idx: column.index, other: *col_data.GetUpdateStatistics()); |
735 | } |
736 | } |
737 | |
738 | void RowGroup::UpdateColumn(TransactionData transaction, DataChunk &updates, Vector &row_ids, |
739 | const vector<column_t> &column_path) { |
740 | D_ASSERT(updates.ColumnCount() == 1); |
741 | auto ids = FlatVector::GetData<row_t>(vector&: row_ids); |
742 | |
743 | auto primary_column_idx = column_path[0]; |
744 | D_ASSERT(primary_column_idx != COLUMN_IDENTIFIER_ROW_ID); |
745 | D_ASSERT(primary_column_idx < columns.size()); |
746 | auto &col_data = GetColumn(c: primary_column_idx); |
747 | col_data.UpdateColumn(transaction, column_path, update_vector&: updates.data[0], row_ids: ids, update_count: updates.size(), depth: 1); |
748 | MergeStatistics(column_idx: primary_column_idx, other: *col_data.GetUpdateStatistics()); |
749 | } |
750 | |
751 | unique_ptr<BaseStatistics> RowGroup::GetStatistics(idx_t column_idx) { |
752 | auto &col_data = GetColumn(c: column_idx); |
753 | lock_guard<mutex> slock(stats_lock); |
754 | return col_data.GetStatistics(); |
755 | } |
756 | |
757 | void RowGroup::MergeStatistics(idx_t column_idx, const BaseStatistics &other) { |
758 | auto &col_data = GetColumn(c: column_idx); |
759 | lock_guard<mutex> slock(stats_lock); |
760 | col_data.MergeStatistics(other); |
761 | } |
762 | |
763 | void RowGroup::MergeIntoStatistics(idx_t column_idx, BaseStatistics &other) { |
764 | auto &col_data = GetColumn(c: column_idx); |
765 | lock_guard<mutex> slock(stats_lock); |
766 | col_data.MergeIntoStatistics(other); |
767 | } |
768 | |
769 | RowGroupWriteData RowGroup::WriteToDisk(PartialBlockManager &manager, |
770 | const vector<CompressionType> &compression_types) { |
771 | RowGroupWriteData result; |
772 | result.states.reserve(n: columns.size()); |
773 | result.statistics.reserve(n: columns.size()); |
774 | |
775 | // Checkpoint the individual columns of the row group |
776 | // Here we're iterating over columns. Each column can have multiple segments. |
777 | // (Some columns will be wider than others, and require different numbers |
778 | // of blocks to encode.) Segments cannot span blocks. |
779 | // |
780 | // Some of these columns are composite (list, struct). The data is written |
781 | // first sequentially, and the pointers are written later, so that the |
782 | // pointers all end up densely packed, and thus more cache-friendly. |
783 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
784 | auto &column = GetColumn(c: column_idx); |
785 | ColumnCheckpointInfo checkpoint_info {compression_types[column_idx]}; |
786 | auto checkpoint_state = column.Checkpoint(row_group&: *this, partial_block_manager&: manager, checkpoint_info); |
787 | D_ASSERT(checkpoint_state); |
788 | |
789 | auto stats = checkpoint_state->GetStatistics(); |
790 | D_ASSERT(stats); |
791 | |
792 | result.statistics.push_back(x: stats->Copy()); |
793 | result.states.push_back(x: std::move(checkpoint_state)); |
794 | } |
795 | D_ASSERT(result.states.size() == result.statistics.size()); |
796 | return result; |
797 | } |
798 | |
799 | RowGroupPointer RowGroup::Checkpoint(RowGroupWriter &writer, TableStatistics &global_stats) { |
800 | RowGroupPointer row_group_pointer; |
801 | |
802 | vector<CompressionType> compression_types; |
803 | compression_types.reserve(n: columns.size()); |
804 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
805 | compression_types.push_back(x: writer.GetColumnCompressionType(i: column_idx)); |
806 | } |
807 | auto result = WriteToDisk(manager&: writer.GetPartialBlockManager(), compression_types); |
808 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
809 | global_stats.GetStats(i: column_idx).Statistics().Merge(other: result.statistics[column_idx]); |
810 | } |
811 | |
812 | // construct the row group pointer and write the column meta data to disk |
813 | D_ASSERT(result.states.size() == columns.size()); |
814 | row_group_pointer.row_start = start; |
815 | row_group_pointer.tuple_count = count; |
816 | for (auto &state : result.states) { |
817 | // get the current position of the table data writer |
818 | auto &data_writer = writer.GetPayloadWriter(); |
819 | auto pointer = data_writer.GetBlockPointer(); |
820 | |
821 | // store the stats and the data pointers in the row group pointers |
822 | row_group_pointer.data_pointers.push_back(x: pointer); |
823 | |
824 | // Write pointers to the column segments. |
825 | // |
826 | // Just as above, the state can refer to many other states, so this |
827 | // can cascade recursively into more pointer writes. |
828 | state->WriteDataPointers(writer); |
829 | } |
830 | row_group_pointer.versions = version_info; |
831 | Verify(); |
832 | return row_group_pointer; |
833 | } |
834 | |
835 | void RowGroup::CheckpointDeletes(VersionNode *versions, Serializer &serializer) { |
836 | if (!versions) { |
837 | // no version information: write nothing |
838 | serializer.Write<idx_t>(element: 0); |
839 | return; |
840 | } |
841 | // first count how many ChunkInfo's we need to deserialize |
842 | idx_t chunk_info_count = 0; |
843 | for (idx_t vector_idx = 0; vector_idx < RowGroup::ROW_GROUP_VECTOR_COUNT; vector_idx++) { |
844 | auto chunk_info = versions->info[vector_idx].get(); |
845 | if (!chunk_info) { |
846 | continue; |
847 | } |
848 | chunk_info_count++; |
849 | } |
850 | // now serialize the actual version information |
851 | serializer.Write<idx_t>(element: chunk_info_count); |
852 | for (idx_t vector_idx = 0; vector_idx < RowGroup::ROW_GROUP_VECTOR_COUNT; vector_idx++) { |
853 | auto chunk_info = versions->info[vector_idx].get(); |
854 | if (!chunk_info) { |
855 | continue; |
856 | } |
857 | serializer.Write<idx_t>(element: vector_idx); |
858 | chunk_info->Serialize(serialize&: serializer); |
859 | } |
860 | } |
861 | |
862 | shared_ptr<VersionNode> RowGroup::DeserializeDeletes(Deserializer &source) { |
863 | auto chunk_count = source.Read<idx_t>(); |
864 | if (chunk_count == 0) { |
865 | // no deletes |
866 | return nullptr; |
867 | } |
868 | auto version_info = make_shared<VersionNode>(); |
869 | for (idx_t i = 0; i < chunk_count; i++) { |
870 | idx_t vector_index = source.Read<idx_t>(); |
871 | if (vector_index >= RowGroup::ROW_GROUP_VECTOR_COUNT) { |
872 | throw Exception("In DeserializeDeletes, vector_index is out of range for the row group. Corrupted file?" ); |
873 | } |
874 | version_info->info[vector_index] = ChunkInfo::Deserialize(source); |
875 | } |
876 | return version_info; |
877 | } |
878 | |
879 | void RowGroup::Serialize(RowGroupPointer &pointer, Serializer &main_serializer) { |
880 | FieldWriter writer(main_serializer); |
881 | writer.WriteField<uint64_t>(element: pointer.row_start); |
882 | writer.WriteField<uint64_t>(element: pointer.tuple_count); |
883 | auto &serializer = writer.GetSerializer(); |
884 | for (auto &data_pointer : pointer.data_pointers) { |
885 | serializer.Write<block_id_t>(element: data_pointer.block_id); |
886 | serializer.Write<uint64_t>(element: data_pointer.offset); |
887 | } |
888 | CheckpointDeletes(versions: pointer.versions.get(), serializer); |
889 | writer.Finalize(); |
890 | } |
891 | |
892 | RowGroupPointer RowGroup::Deserialize(Deserializer &main_source, const vector<LogicalType> &columns) { |
893 | RowGroupPointer result; |
894 | |
895 | FieldReader reader(main_source); |
896 | result.row_start = reader.ReadRequired<uint64_t>(); |
897 | result.tuple_count = reader.ReadRequired<uint64_t>(); |
898 | |
899 | auto physical_columns = columns.size(); |
900 | result.data_pointers.reserve(n: physical_columns); |
901 | |
902 | auto &source = reader.GetSource(); |
903 | for (idx_t i = 0; i < physical_columns; i++) { |
904 | BlockPointer pointer; |
905 | pointer.block_id = source.Read<block_id_t>(); |
906 | pointer.offset = source.Read<uint64_t>(); |
907 | result.data_pointers.push_back(x: pointer); |
908 | } |
909 | result.versions = DeserializeDeletes(source); |
910 | |
911 | reader.Finalize(); |
912 | return result; |
913 | } |
914 | |
915 | //===--------------------------------------------------------------------===// |
916 | // GetColumnSegmentInfo |
917 | //===--------------------------------------------------------------------===// |
918 | void RowGroup::GetColumnSegmentInfo(idx_t row_group_index, vector<ColumnSegmentInfo> &result) { |
919 | for (idx_t col_idx = 0; col_idx < GetColumnCount(); col_idx++) { |
920 | auto &col_data = GetColumn(c: col_idx); |
921 | col_data.GetColumnSegmentInfo(row_group_index, col_path: {col_idx}, result); |
922 | } |
923 | } |
924 | |
925 | //===--------------------------------------------------------------------===// |
926 | // Version Delete Information |
927 | //===--------------------------------------------------------------------===// |
928 | class VersionDeleteState { |
929 | public: |
930 | VersionDeleteState(RowGroup &info, TransactionData transaction, DataTable &table, idx_t base_row) |
931 | : info(info), transaction(transaction), table(table), current_info(nullptr), |
932 | current_chunk(DConstants::INVALID_INDEX), count(0), base_row(base_row), delete_count(0) { |
933 | } |
934 | |
935 | RowGroup &info; |
936 | TransactionData transaction; |
937 | DataTable &table; |
938 | ChunkVectorInfo *current_info; |
939 | idx_t current_chunk; |
940 | row_t rows[STANDARD_VECTOR_SIZE]; |
941 | idx_t count; |
942 | idx_t base_row; |
943 | idx_t chunk_row; |
944 | idx_t delete_count; |
945 | |
946 | public: |
947 | void Delete(row_t row_id); |
948 | void Flush(); |
949 | }; |
950 | |
951 | idx_t RowGroup::Delete(TransactionData transaction, DataTable &table, row_t *ids, idx_t count) { |
952 | lock_guard<mutex> lock(row_group_lock); |
953 | VersionDeleteState del_state(*this, transaction, table, this->start); |
954 | |
955 | // obtain a write lock |
956 | for (idx_t i = 0; i < count; i++) { |
957 | D_ASSERT(ids[i] >= 0); |
958 | D_ASSERT(idx_t(ids[i]) >= this->start && idx_t(ids[i]) < this->start + this->count); |
959 | del_state.Delete(row_id: ids[i] - this->start); |
960 | } |
961 | del_state.Flush(); |
962 | return del_state.delete_count; |
963 | } |
964 | |
965 | void RowGroup::Verify() { |
966 | #ifdef DEBUG |
967 | for (auto &column : GetColumns()) { |
968 | column->Verify(*this); |
969 | } |
970 | #endif |
971 | } |
972 | |
973 | void VersionDeleteState::Delete(row_t row_id) { |
974 | D_ASSERT(row_id >= 0); |
975 | idx_t vector_idx = row_id / STANDARD_VECTOR_SIZE; |
976 | idx_t idx_in_vector = row_id - vector_idx * STANDARD_VECTOR_SIZE; |
977 | if (current_chunk != vector_idx) { |
978 | Flush(); |
979 | |
980 | if (!info.version_info) { |
981 | info.version_info = make_shared<VersionNode>(); |
982 | } |
983 | |
984 | if (!info.version_info->info[vector_idx]) { |
985 | // no info yet: create it |
986 | info.version_info->info[vector_idx] = |
987 | make_uniq<ChunkVectorInfo>(args: info.start + vector_idx * STANDARD_VECTOR_SIZE); |
988 | } else if (info.version_info->info[vector_idx]->type == ChunkInfoType::CONSTANT_INFO) { |
989 | auto &constant = info.version_info->info[vector_idx]->Cast<ChunkConstantInfo>(); |
990 | // info exists but it's a constant info: convert to a vector info |
991 | auto new_info = make_uniq<ChunkVectorInfo>(args: info.start + vector_idx * STANDARD_VECTOR_SIZE); |
992 | new_info->insert_id = constant.insert_id.load(); |
993 | for (idx_t i = 0; i < STANDARD_VECTOR_SIZE; i++) { |
994 | new_info->inserted[i] = constant.insert_id.load(); |
995 | } |
996 | info.version_info->info[vector_idx] = std::move(new_info); |
997 | } |
998 | D_ASSERT(info.version_info->info[vector_idx]->type == ChunkInfoType::VECTOR_INFO); |
999 | current_info = &info.version_info->info[vector_idx]->Cast<ChunkVectorInfo>(); |
1000 | current_chunk = vector_idx; |
1001 | chunk_row = vector_idx * STANDARD_VECTOR_SIZE; |
1002 | } |
1003 | rows[count++] = idx_in_vector; |
1004 | } |
1005 | |
1006 | void VersionDeleteState::Flush() { |
1007 | if (count == 0) { |
1008 | return; |
1009 | } |
1010 | // it is possible for delete statements to delete the same tuple multiple times when combined with a USING clause |
1011 | // in the current_info->Delete, we check which tuples are actually deleted (excluding duplicate deletions) |
1012 | // this is returned in the actual_delete_count |
1013 | auto actual_delete_count = current_info->Delete(transaction_id: transaction.transaction_id, rows, count); |
1014 | delete_count += actual_delete_count; |
1015 | if (transaction.transaction && actual_delete_count > 0) { |
1016 | // now push the delete into the undo buffer, but only if any deletes were actually performed |
1017 | transaction.transaction->PushDelete(table, vinfo: current_info, rows, count: actual_delete_count, base_row: base_row + chunk_row); |
1018 | } |
1019 | count = 0; |
1020 | } |
1021 | |
1022 | } // namespace duckdb |
1023 | |