1 | #include "duckdb/storage/table/row_group_collection.hpp" |
2 | #include "duckdb/storage/table/persistent_table_data.hpp" |
3 | #include "duckdb/execution/expression_executor.hpp" |
4 | #include "duckdb/main/client_context.hpp" |
5 | #include "duckdb/storage/data_table.hpp" |
6 | #include "duckdb/planner/constraints/bound_not_null_constraint.hpp" |
7 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
8 | #include "duckdb/storage/table/row_group_segment_tree.hpp" |
9 | #include "duckdb/storage/meta_block_reader.hpp" |
10 | #include "duckdb/storage/table/append_state.hpp" |
11 | #include "duckdb/storage/table/scan_state.hpp" |
12 | #include "duckdb/storage/table_storage_info.hpp" |
13 | |
14 | namespace duckdb { |
15 | |
16 | //===--------------------------------------------------------------------===// |
17 | // Row Group Segment Tree |
18 | //===--------------------------------------------------------------------===// |
19 | RowGroupSegmentTree::RowGroupSegmentTree(RowGroupCollection &collection) |
20 | : SegmentTree<RowGroup, true>(), collection(collection), current_row_group(0), max_row_group(0) { |
21 | } |
22 | RowGroupSegmentTree::~RowGroupSegmentTree() { |
23 | } |
24 | |
25 | void RowGroupSegmentTree::Initialize(PersistentTableData &data) { |
26 | D_ASSERT(data.row_group_count > 0); |
27 | current_row_group = 0; |
28 | max_row_group = data.row_group_count; |
29 | finished_loading = false; |
30 | reader = make_uniq<MetaBlockReader>(args&: collection.GetBlockManager(), args&: data.block_id); |
31 | reader->offset = data.offset; |
32 | } |
33 | |
34 | unique_ptr<RowGroup> RowGroupSegmentTree::LoadSegment() { |
35 | if (current_row_group >= max_row_group) { |
36 | finished_loading = true; |
37 | return nullptr; |
38 | } |
39 | auto row_group_pointer = RowGroup::Deserialize(main_source&: *reader, columns: collection.GetTypes()); |
40 | current_row_group++; |
41 | return make_uniq<RowGroup>(args&: collection, args: std::move(row_group_pointer)); |
42 | } |
43 | |
44 | //===--------------------------------------------------------------------===// |
45 | // Row Group Collection |
46 | //===--------------------------------------------------------------------===// |
47 | RowGroupCollection::RowGroupCollection(shared_ptr<DataTableInfo> info_p, BlockManager &block_manager, |
48 | vector<LogicalType> types_p, idx_t row_start_p, idx_t total_rows_p) |
49 | : block_manager(block_manager), total_rows(total_rows_p), info(std::move(info_p)), types(std::move(types_p)), |
50 | row_start(row_start_p) { |
51 | row_groups = make_shared<RowGroupSegmentTree>(args&: *this); |
52 | } |
53 | |
54 | idx_t RowGroupCollection::GetTotalRows() const { |
55 | return total_rows.load(); |
56 | } |
57 | |
58 | const vector<LogicalType> &RowGroupCollection::GetTypes() const { |
59 | return types; |
60 | } |
61 | |
62 | Allocator &RowGroupCollection::GetAllocator() const { |
63 | return Allocator::Get(db&: info->db); |
64 | } |
65 | |
66 | AttachedDatabase &RowGroupCollection::GetAttached() { |
67 | return GetTableInfo().db; |
68 | } |
69 | |
70 | DatabaseInstance &RowGroupCollection::GetDatabase() { |
71 | return GetAttached().GetDatabase(); |
72 | } |
73 | |
74 | //===--------------------------------------------------------------------===// |
75 | // Initialize |
76 | //===--------------------------------------------------------------------===// |
77 | void RowGroupCollection::Initialize(PersistentTableData &data) { |
78 | D_ASSERT(this->row_start == 0); |
79 | auto l = row_groups->Lock(); |
80 | this->total_rows = data.total_rows; |
81 | row_groups->Initialize(data); |
82 | stats.Initialize(types, data); |
83 | } |
84 | |
85 | void RowGroupCollection::InitializeEmpty() { |
86 | stats.InitializeEmpty(types); |
87 | } |
88 | |
89 | void RowGroupCollection::AppendRowGroup(SegmentLock &l, idx_t start_row) { |
90 | D_ASSERT(start_row >= row_start); |
91 | auto new_row_group = make_uniq<RowGroup>(args&: *this, args&: start_row, args: 0); |
92 | new_row_group->InitializeEmpty(types); |
93 | row_groups->AppendSegment(l, segment: std::move(new_row_group)); |
94 | } |
95 | |
96 | RowGroup *RowGroupCollection::GetRowGroup(int64_t index) { |
97 | return (RowGroup *)row_groups->GetSegmentByIndex(index); |
98 | } |
99 | |
100 | idx_t RowGroupCollection::RowGroupCount() { |
101 | return row_groups->GetSegmentCount(); |
102 | } |
103 | |
104 | void RowGroupCollection::Verify() { |
105 | #ifdef DEBUG |
106 | idx_t current_total_rows = 0; |
107 | row_groups->Verify(); |
108 | for (auto &row_group : row_groups->Segments()) { |
109 | row_group.Verify(); |
110 | D_ASSERT(&row_group.GetCollection() == this); |
111 | D_ASSERT(row_group.start == this->row_start + current_total_rows); |
112 | current_total_rows += row_group.count; |
113 | } |
114 | D_ASSERT(current_total_rows == total_rows.load()); |
115 | #endif |
116 | } |
117 | |
118 | //===--------------------------------------------------------------------===// |
119 | // Scan |
120 | //===--------------------------------------------------------------------===// |
121 | void RowGroupCollection::InitializeScan(CollectionScanState &state, const vector<column_t> &column_ids, |
122 | TableFilterSet *table_filters) { |
123 | auto row_group = row_groups->GetRootSegment(); |
124 | D_ASSERT(row_group); |
125 | state.row_groups = row_groups.get(); |
126 | state.max_row = row_start + total_rows; |
127 | state.Initialize(types: GetTypes()); |
128 | while (row_group && !row_group->InitializeScan(state)) { |
129 | row_group = row_groups->GetNextSegment(segment: row_group); |
130 | } |
131 | } |
132 | |
133 | void RowGroupCollection::InitializeCreateIndexScan(CreateIndexScanState &state) { |
134 | state.segment_lock = row_groups->Lock(); |
135 | } |
136 | |
137 | void RowGroupCollection::InitializeScanWithOffset(CollectionScanState &state, const vector<column_t> &column_ids, |
138 | idx_t start_row, idx_t end_row) { |
139 | auto row_group = row_groups->GetSegment(row_number: start_row); |
140 | D_ASSERT(row_group); |
141 | state.row_groups = row_groups.get(); |
142 | state.max_row = end_row; |
143 | state.Initialize(types: GetTypes()); |
144 | idx_t start_vector = (start_row - row_group->start) / STANDARD_VECTOR_SIZE; |
145 | if (!row_group->InitializeScanWithOffset(state, vector_offset: start_vector)) { |
146 | throw InternalException("Failed to initialize row group scan with offset" ); |
147 | } |
148 | } |
149 | |
150 | bool RowGroupCollection::InitializeScanInRowGroup(CollectionScanState &state, RowGroupCollection &collection, |
151 | RowGroup &row_group, idx_t vector_index, idx_t max_row) { |
152 | state.max_row = max_row; |
153 | state.row_groups = collection.row_groups.get(); |
154 | if (!state.column_scans) { |
155 | // initialize the scan state |
156 | state.Initialize(types: collection.GetTypes()); |
157 | } |
158 | return row_group.InitializeScanWithOffset(state, vector_offset: vector_index); |
159 | } |
160 | |
161 | void RowGroupCollection::InitializeParallelScan(ParallelCollectionScanState &state) { |
162 | state.collection = this; |
163 | state.current_row_group = row_groups->GetRootSegment(); |
164 | state.vector_index = 0; |
165 | state.max_row = row_start + total_rows; |
166 | state.batch_index = 0; |
167 | state.processed_rows = 0; |
168 | } |
169 | |
170 | bool RowGroupCollection::NextParallelScan(ClientContext &context, ParallelCollectionScanState &state, |
171 | CollectionScanState &scan_state) { |
172 | while (true) { |
173 | idx_t vector_index; |
174 | idx_t max_row; |
175 | RowGroupCollection *collection; |
176 | RowGroup *row_group; |
177 | { |
178 | // select the next row group to scan from the parallel state |
179 | lock_guard<mutex> l(state.lock); |
180 | if (!state.current_row_group || state.current_row_group->count == 0) { |
181 | // no more data left to scan |
182 | break; |
183 | } |
184 | collection = state.collection; |
185 | row_group = state.current_row_group; |
186 | if (ClientConfig::GetConfig(context).verify_parallelism) { |
187 | vector_index = state.vector_index; |
188 | max_row = state.current_row_group->start + |
189 | MinValue<idx_t>(a: state.current_row_group->count, |
190 | STANDARD_VECTOR_SIZE * state.vector_index + STANDARD_VECTOR_SIZE); |
191 | D_ASSERT(vector_index * STANDARD_VECTOR_SIZE < state.current_row_group->count); |
192 | state.vector_index++; |
193 | if (state.vector_index * STANDARD_VECTOR_SIZE >= state.current_row_group->count) { |
194 | state.current_row_group = row_groups->GetNextSegment(segment: state.current_row_group); |
195 | state.vector_index = 0; |
196 | } |
197 | } else { |
198 | state.processed_rows += state.current_row_group->count; |
199 | vector_index = 0; |
200 | max_row = state.current_row_group->start + state.current_row_group->count; |
201 | state.current_row_group = row_groups->GetNextSegment(segment: state.current_row_group); |
202 | } |
203 | max_row = MinValue<idx_t>(a: max_row, b: state.max_row); |
204 | scan_state.batch_index = ++state.batch_index; |
205 | } |
206 | D_ASSERT(collection); |
207 | D_ASSERT(row_group); |
208 | |
209 | // initialize the scan for this row group |
210 | bool need_to_scan = InitializeScanInRowGroup(state&: scan_state, collection&: *collection, row_group&: *row_group, vector_index, max_row); |
211 | if (!need_to_scan) { |
212 | // skip this row group |
213 | continue; |
214 | } |
215 | return true; |
216 | } |
217 | return false; |
218 | } |
219 | |
220 | bool RowGroupCollection::Scan(DuckTransaction &transaction, const vector<column_t> &column_ids, |
221 | const std::function<bool(DataChunk &chunk)> &fun) { |
222 | vector<LogicalType> scan_types; |
223 | for (idx_t i = 0; i < column_ids.size(); i++) { |
224 | scan_types.push_back(x: types[column_ids[i]]); |
225 | } |
226 | DataChunk chunk; |
227 | chunk.Initialize(allocator&: GetAllocator(), types: scan_types); |
228 | |
229 | // initialize the scan |
230 | TableScanState state; |
231 | state.Initialize(column_ids, table_filters: nullptr); |
232 | InitializeScan(state&: state.local_state, column_ids, table_filters: nullptr); |
233 | |
234 | while (true) { |
235 | chunk.Reset(); |
236 | state.local_state.Scan(transaction, result&: chunk); |
237 | if (chunk.size() == 0) { |
238 | return true; |
239 | } |
240 | if (!fun(chunk)) { |
241 | return false; |
242 | } |
243 | } |
244 | } |
245 | |
246 | bool RowGroupCollection::Scan(DuckTransaction &transaction, const std::function<bool(DataChunk &chunk)> &fun) { |
247 | vector<column_t> column_ids; |
248 | column_ids.reserve(n: types.size()); |
249 | for (idx_t i = 0; i < types.size(); i++) { |
250 | column_ids.push_back(x: i); |
251 | } |
252 | return Scan(transaction, column_ids, fun); |
253 | } |
254 | |
255 | //===--------------------------------------------------------------------===// |
256 | // Fetch |
257 | //===--------------------------------------------------------------------===// |
258 | void RowGroupCollection::Fetch(TransactionData transaction, DataChunk &result, const vector<column_t> &column_ids, |
259 | const Vector &row_identifiers, idx_t fetch_count, ColumnFetchState &state) { |
260 | // figure out which row_group to fetch from |
261 | auto row_ids = FlatVector::GetData<row_t>(vector: row_identifiers); |
262 | idx_t count = 0; |
263 | for (idx_t i = 0; i < fetch_count; i++) { |
264 | auto row_id = row_ids[i]; |
265 | RowGroup *row_group; |
266 | { |
267 | idx_t segment_index; |
268 | auto l = row_groups->Lock(); |
269 | if (!row_groups->TryGetSegmentIndex(l, row_number: row_id, result&: segment_index)) { |
270 | // in parallel append scenarios it is possible for the row_id |
271 | continue; |
272 | } |
273 | row_group = row_groups->GetSegmentByIndex(l, index: segment_index); |
274 | } |
275 | if (!row_group->Fetch(transaction, row: row_id - row_group->start)) { |
276 | continue; |
277 | } |
278 | row_group->FetchRow(transaction, state, column_ids, row_id, result, result_idx: count); |
279 | count++; |
280 | } |
281 | result.SetCardinality(count); |
282 | } |
283 | |
284 | //===--------------------------------------------------------------------===// |
285 | // Append |
286 | //===--------------------------------------------------------------------===// |
287 | TableAppendState::TableAppendState() |
288 | : row_group_append_state(*this), total_append_count(0), start_row_group(nullptr), transaction(0, 0), remaining(0) { |
289 | } |
290 | |
291 | TableAppendState::~TableAppendState() { |
292 | D_ASSERT(Exception::UncaughtException() || remaining == 0); |
293 | } |
294 | |
295 | bool RowGroupCollection::IsEmpty() const { |
296 | auto l = row_groups->Lock(); |
297 | return IsEmpty(l); |
298 | } |
299 | |
300 | bool RowGroupCollection::IsEmpty(SegmentLock &l) const { |
301 | return row_groups->IsEmpty(l); |
302 | } |
303 | |
304 | void RowGroupCollection::InitializeAppend(TransactionData transaction, TableAppendState &state, idx_t append_count) { |
305 | state.row_start = total_rows; |
306 | state.current_row = state.row_start; |
307 | state.total_append_count = 0; |
308 | |
309 | // start writing to the row_groups |
310 | auto l = row_groups->Lock(); |
311 | if (IsEmpty(l)) { |
312 | // empty row group collection: empty first row group |
313 | AppendRowGroup(l, start_row: row_start); |
314 | } |
315 | state.start_row_group = row_groups->GetLastSegment(l); |
316 | D_ASSERT(this->row_start + total_rows == state.start_row_group->start + state.start_row_group->count); |
317 | state.start_row_group->InitializeAppend(append_state&: state.row_group_append_state); |
318 | state.remaining = append_count; |
319 | state.transaction = transaction; |
320 | if (state.remaining > 0) { |
321 | state.start_row_group->AppendVersionInfo(transaction, count: state.remaining); |
322 | total_rows += state.remaining; |
323 | } |
324 | } |
325 | |
326 | void RowGroupCollection::InitializeAppend(TableAppendState &state) { |
327 | TransactionData tdata(0, 0); |
328 | InitializeAppend(transaction: tdata, state, append_count: 0); |
329 | } |
330 | |
331 | bool RowGroupCollection::Append(DataChunk &chunk, TableAppendState &state) { |
332 | D_ASSERT(chunk.ColumnCount() == types.size()); |
333 | chunk.Verify(); |
334 | |
335 | bool new_row_group = false; |
336 | idx_t append_count = chunk.size(); |
337 | idx_t remaining = chunk.size(); |
338 | state.total_append_count += append_count; |
339 | while (true) { |
340 | auto current_row_group = state.row_group_append_state.row_group; |
341 | // check how much we can fit into the current row_group |
342 | idx_t append_count = |
343 | MinValue<idx_t>(a: remaining, b: RowGroup::ROW_GROUP_SIZE - state.row_group_append_state.offset_in_row_group); |
344 | if (append_count > 0) { |
345 | current_row_group->Append(state&: state.row_group_append_state, chunk, append_count); |
346 | // merge the stats |
347 | auto stats_lock = stats.GetLock(); |
348 | for (idx_t i = 0; i < types.size(); i++) { |
349 | current_row_group->MergeIntoStatistics(column_idx: i, other&: stats.GetStats(i).Statistics()); |
350 | } |
351 | } |
352 | remaining -= append_count; |
353 | if (state.remaining > 0) { |
354 | state.remaining -= append_count; |
355 | } |
356 | if (remaining > 0) { |
357 | // we expect max 1 iteration of this loop (i.e. a single chunk should never overflow more than one |
358 | // row_group) |
359 | D_ASSERT(chunk.size() == remaining + append_count); |
360 | // slice the input chunk |
361 | if (remaining < chunk.size()) { |
362 | SelectionVector sel(remaining); |
363 | for (idx_t i = 0; i < remaining; i++) { |
364 | sel.set_index(idx: i, loc: append_count + i); |
365 | } |
366 | chunk.Slice(sel_vector: sel, count: remaining); |
367 | } |
368 | // append a new row_group |
369 | new_row_group = true; |
370 | auto next_start = current_row_group->start + state.row_group_append_state.offset_in_row_group; |
371 | |
372 | auto l = row_groups->Lock(); |
373 | AppendRowGroup(l, start_row: next_start); |
374 | // set up the append state for this row_group |
375 | auto last_row_group = row_groups->GetLastSegment(l); |
376 | last_row_group->InitializeAppend(append_state&: state.row_group_append_state); |
377 | if (state.remaining > 0) { |
378 | last_row_group->AppendVersionInfo(transaction: state.transaction, count: state.remaining); |
379 | } |
380 | continue; |
381 | } else { |
382 | break; |
383 | } |
384 | } |
385 | state.current_row += append_count; |
386 | auto stats_lock = stats.GetLock(); |
387 | for (idx_t col_idx = 0; col_idx < types.size(); col_idx++) { |
388 | stats.GetStats(i: col_idx).UpdateDistinctStatistics(v&: chunk.data[col_idx], count: chunk.size()); |
389 | } |
390 | return new_row_group; |
391 | } |
392 | |
393 | void RowGroupCollection::FinalizeAppend(TransactionData transaction, TableAppendState &state) { |
394 | auto remaining = state.total_append_count; |
395 | auto row_group = state.start_row_group; |
396 | while (remaining > 0) { |
397 | auto append_count = MinValue<idx_t>(a: remaining, b: RowGroup::ROW_GROUP_SIZE - row_group->count); |
398 | row_group->AppendVersionInfo(transaction, count: append_count); |
399 | remaining -= append_count; |
400 | row_group = row_groups->GetNextSegment(segment: row_group); |
401 | } |
402 | total_rows += state.total_append_count; |
403 | |
404 | state.total_append_count = 0; |
405 | state.start_row_group = nullptr; |
406 | |
407 | Verify(); |
408 | } |
409 | |
410 | void RowGroupCollection::CommitAppend(transaction_t commit_id, idx_t row_start, idx_t count) { |
411 | auto row_group = row_groups->GetSegment(row_number: row_start); |
412 | D_ASSERT(row_group); |
413 | idx_t current_row = row_start; |
414 | idx_t remaining = count; |
415 | while (true) { |
416 | idx_t start_in_row_group = current_row - row_group->start; |
417 | idx_t append_count = MinValue<idx_t>(a: row_group->count - start_in_row_group, b: remaining); |
418 | |
419 | row_group->CommitAppend(commit_id, row_group_start: start_in_row_group, count: append_count); |
420 | |
421 | current_row += append_count; |
422 | remaining -= append_count; |
423 | if (remaining == 0) { |
424 | break; |
425 | } |
426 | row_group = row_groups->GetNextSegment(segment: row_group); |
427 | } |
428 | } |
429 | |
430 | void RowGroupCollection::RevertAppendInternal(idx_t start_row, idx_t count) { |
431 | if (total_rows != start_row + count) { |
432 | throw InternalException("Interleaved appends: this should no longer happen" ); |
433 | } |
434 | total_rows = start_row; |
435 | |
436 | auto l = row_groups->Lock(); |
437 | // find the segment index that the current row belongs to |
438 | idx_t segment_index = row_groups->GetSegmentIndex(l, row_number: start_row); |
439 | auto segment = row_groups->GetSegmentByIndex(l, index: segment_index); |
440 | auto &info = *segment; |
441 | |
442 | // remove any segments AFTER this segment: they should be deleted entirely |
443 | row_groups->EraseSegments(l, segment_start: segment_index); |
444 | |
445 | info.next = nullptr; |
446 | info.RevertAppend(row_group_start: start_row); |
447 | } |
448 | |
449 | void RowGroupCollection::MergeStorage(RowGroupCollection &data) { |
450 | D_ASSERT(data.types == types); |
451 | auto index = row_start + total_rows.load(); |
452 | auto segments = data.row_groups->MoveSegments(); |
453 | for (auto &entry : segments) { |
454 | auto &row_group = entry.node; |
455 | row_group->MoveToCollection(collection&: *this, new_start: index); |
456 | index += row_group->count; |
457 | row_groups->AppendSegment(segment: std::move(row_group)); |
458 | } |
459 | stats.MergeStats(other&: data.stats); |
460 | total_rows += data.total_rows.load(); |
461 | } |
462 | |
463 | //===--------------------------------------------------------------------===// |
464 | // Delete |
465 | //===--------------------------------------------------------------------===// |
466 | idx_t RowGroupCollection::Delete(TransactionData transaction, DataTable &table, row_t *ids, idx_t count) { |
467 | idx_t delete_count = 0; |
468 | // delete is in the row groups |
469 | // we need to figure out for each id to which row group it belongs |
470 | // usually all (or many) ids belong to the same row group |
471 | // we iterate over the ids and check for every id if it belongs to the same row group as their predecessor |
472 | idx_t pos = 0; |
473 | do { |
474 | idx_t start = pos; |
475 | auto row_group = row_groups->GetSegment(row_number: ids[start]); |
476 | for (pos++; pos < count; pos++) { |
477 | D_ASSERT(ids[pos] >= 0); |
478 | // check if this id still belongs to this row group |
479 | if (idx_t(ids[pos]) < row_group->start) { |
480 | // id is before row_group start -> it does not |
481 | break; |
482 | } |
483 | if (idx_t(ids[pos]) >= row_group->start + row_group->count) { |
484 | // id is after row group end -> it does not |
485 | break; |
486 | } |
487 | } |
488 | delete_count += row_group->Delete(transaction, table, ids: ids + start, count: pos - start); |
489 | } while (pos < count); |
490 | return delete_count; |
491 | } |
492 | |
493 | //===--------------------------------------------------------------------===// |
494 | // Update |
495 | //===--------------------------------------------------------------------===// |
496 | void RowGroupCollection::Update(TransactionData transaction, row_t *ids, const vector<PhysicalIndex> &column_ids, |
497 | DataChunk &updates) { |
498 | idx_t pos = 0; |
499 | do { |
500 | idx_t start = pos; |
501 | auto row_group = row_groups->GetSegment(row_number: ids[pos]); |
502 | row_t base_id = |
503 | row_group->start + ((ids[pos] - row_group->start) / STANDARD_VECTOR_SIZE * STANDARD_VECTOR_SIZE); |
504 | row_t max_id = MinValue<row_t>(a: base_id + STANDARD_VECTOR_SIZE, b: row_group->start + row_group->count); |
505 | for (pos++; pos < updates.size(); pos++) { |
506 | D_ASSERT(ids[pos] >= 0); |
507 | // check if this id still belongs to this vector in this row group |
508 | if (ids[pos] < base_id) { |
509 | // id is before vector start -> it does not |
510 | break; |
511 | } |
512 | if (ids[pos] >= max_id) { |
513 | // id is after the maximum id in this vector -> it does not |
514 | break; |
515 | } |
516 | } |
517 | row_group->Update(transaction, update_chunk&: updates, ids, offset: start, count: pos - start, column_ids); |
518 | |
519 | auto l = stats.GetLock(); |
520 | for (idx_t i = 0; i < column_ids.size(); i++) { |
521 | auto column_id = column_ids[i]; |
522 | stats.MergeStats(lock&: *l, i: column_id.index, stats&: *row_group->GetStatistics(column_idx: column_id.index)); |
523 | } |
524 | } while (pos < updates.size()); |
525 | } |
526 | |
527 | void RowGroupCollection::RemoveFromIndexes(TableIndexList &indexes, Vector &row_identifiers, idx_t count) { |
528 | auto row_ids = FlatVector::GetData<row_t>(vector&: row_identifiers); |
529 | |
530 | // initialize the fetch state |
531 | // FIXME: we do not need to fetch all columns, only the columns required by the indices! |
532 | TableScanState state; |
533 | vector<column_t> column_ids; |
534 | column_ids.reserve(n: types.size()); |
535 | for (idx_t i = 0; i < types.size(); i++) { |
536 | column_ids.push_back(x: i); |
537 | } |
538 | state.Initialize(column_ids: std::move(column_ids)); |
539 | state.table_state.max_row = row_start + total_rows; |
540 | |
541 | // initialize the fetch chunk |
542 | DataChunk result; |
543 | result.Initialize(allocator&: GetAllocator(), types); |
544 | |
545 | SelectionVector sel(STANDARD_VECTOR_SIZE); |
546 | // now iterate over the row ids |
547 | for (idx_t r = 0; r < count;) { |
548 | result.Reset(); |
549 | // figure out which row_group to fetch from |
550 | auto row_id = row_ids[r]; |
551 | auto row_group = row_groups->GetSegment(row_number: row_id); |
552 | auto row_group_vector_idx = (row_id - row_group->start) / STANDARD_VECTOR_SIZE; |
553 | auto base_row_id = row_group_vector_idx * STANDARD_VECTOR_SIZE + row_group->start; |
554 | |
555 | // fetch the current vector |
556 | state.table_state.Initialize(types: GetTypes()); |
557 | row_group->InitializeScanWithOffset(state&: state.table_state, vector_offset: row_group_vector_idx); |
558 | row_group->ScanCommitted(state&: state.table_state, result, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS); |
559 | result.Verify(); |
560 | |
561 | // check for any remaining row ids if they also fall into this vector |
562 | // we try to fetch handle as many rows as possible at the same time |
563 | idx_t sel_count = 0; |
564 | for (; r < count; r++) { |
565 | idx_t current_row = idx_t(row_ids[r]); |
566 | if (current_row < base_row_id || current_row >= base_row_id + result.size()) { |
567 | // this row-id does not fall into the current chunk - break |
568 | break; |
569 | } |
570 | auto row_in_vector = current_row - base_row_id; |
571 | D_ASSERT(row_in_vector < result.size()); |
572 | sel.set_index(idx: sel_count++, loc: row_in_vector); |
573 | } |
574 | D_ASSERT(sel_count > 0); |
575 | // slice the vector with all rows that are present in this vector and erase from the index |
576 | result.Slice(sel_vector: sel, count: sel_count); |
577 | |
578 | indexes.Scan(callback: [&](Index &index) { |
579 | index.Delete(entries&: result, row_identifiers); |
580 | return false; |
581 | }); |
582 | } |
583 | } |
584 | |
585 | void RowGroupCollection::UpdateColumn(TransactionData transaction, Vector &row_ids, const vector<column_t> &column_path, |
586 | DataChunk &updates) { |
587 | auto first_id = FlatVector::GetValue<row_t>(vector&: row_ids, idx: 0); |
588 | if (first_id >= MAX_ROW_ID) { |
589 | throw NotImplementedException("Cannot update a column-path on transaction local data" ); |
590 | } |
591 | // find the row_group this id belongs to |
592 | auto primary_column_idx = column_path[0]; |
593 | auto row_group = row_groups->GetSegment(row_number: first_id); |
594 | row_group->UpdateColumn(transaction, updates, row_ids, column_path); |
595 | |
596 | row_group->MergeIntoStatistics(column_idx: primary_column_idx, other&: stats.GetStats(i: primary_column_idx).Statistics()); |
597 | } |
598 | |
599 | //===--------------------------------------------------------------------===// |
600 | // Checkpoint |
601 | //===--------------------------------------------------------------------===// |
602 | void RowGroupCollection::Checkpoint(TableDataWriter &writer, TableStatistics &global_stats) { |
603 | for (auto &row_group : row_groups->Segments()) { |
604 | auto rowg_writer = writer.GetRowGroupWriter(row_group); |
605 | auto pointer = row_group.Checkpoint(writer&: *rowg_writer, global_stats); |
606 | writer.AddRowGroup(row_group_pointer: std::move(pointer), writer: std::move(rowg_writer)); |
607 | } |
608 | } |
609 | |
610 | //===--------------------------------------------------------------------===// |
611 | // CommitDrop |
612 | //===--------------------------------------------------------------------===// |
613 | void RowGroupCollection::CommitDropColumn(idx_t index) { |
614 | for (auto &row_group : row_groups->Segments()) { |
615 | row_group.CommitDropColumn(column_idx: index); |
616 | } |
617 | } |
618 | |
619 | void RowGroupCollection::CommitDropTable() { |
620 | for (auto &row_group : row_groups->Segments()) { |
621 | row_group.CommitDrop(); |
622 | } |
623 | } |
624 | |
625 | //===--------------------------------------------------------------------===// |
626 | // GetColumnSegmentInfo |
627 | //===--------------------------------------------------------------------===// |
628 | vector<ColumnSegmentInfo> RowGroupCollection::GetColumnSegmentInfo() { |
629 | vector<ColumnSegmentInfo> result; |
630 | for (auto &row_group : row_groups->Segments()) { |
631 | row_group.GetColumnSegmentInfo(row_group_index: row_group.index, result); |
632 | } |
633 | return result; |
634 | } |
635 | |
636 | //===--------------------------------------------------------------------===// |
637 | // Alter |
638 | //===--------------------------------------------------------------------===// |
639 | shared_ptr<RowGroupCollection> RowGroupCollection::AddColumn(ClientContext &context, ColumnDefinition &new_column, |
640 | Expression *default_value) { |
641 | idx_t new_column_idx = types.size(); |
642 | auto new_types = types; |
643 | new_types.push_back(x: new_column.GetType()); |
644 | auto result = |
645 | make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load()); |
646 | |
647 | ExpressionExecutor executor(context); |
648 | DataChunk dummy_chunk; |
649 | Vector default_vector(new_column.GetType()); |
650 | if (!default_value) { |
651 | FlatVector::Validity(vector&: default_vector).SetAllInvalid(STANDARD_VECTOR_SIZE); |
652 | } else { |
653 | executor.AddExpression(expr: *default_value); |
654 | } |
655 | |
656 | result->stats.InitializeAddColumn(parent&: stats, new_column_type: new_column.GetType()); |
657 | auto &new_column_stats = result->stats.GetStats(i: new_column_idx); |
658 | |
659 | // fill the column with its DEFAULT value, or NULL if none is specified |
660 | auto new_stats = make_uniq<SegmentStatistics>(args: new_column.GetType()); |
661 | for (auto ¤t_row_group : row_groups->Segments()) { |
662 | auto new_row_group = current_row_group.AddColumn(new_collection&: *result, new_column, executor, default_value, result&: default_vector); |
663 | // merge in the statistics |
664 | new_row_group->MergeIntoStatistics(column_idx: new_column_idx, other&: new_column_stats.Statistics()); |
665 | |
666 | result->row_groups->AppendSegment(segment: std::move(new_row_group)); |
667 | } |
668 | return result; |
669 | } |
670 | |
671 | shared_ptr<RowGroupCollection> RowGroupCollection::RemoveColumn(idx_t col_idx) { |
672 | D_ASSERT(col_idx < types.size()); |
673 | auto new_types = types; |
674 | new_types.erase(position: new_types.begin() + col_idx); |
675 | |
676 | auto result = |
677 | make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load()); |
678 | result->stats.InitializeRemoveColumn(parent&: stats, removed_column: col_idx); |
679 | |
680 | for (auto ¤t_row_group : row_groups->Segments()) { |
681 | auto new_row_group = current_row_group.RemoveColumn(new_collection&: *result, removed_column: col_idx); |
682 | result->row_groups->AppendSegment(segment: std::move(new_row_group)); |
683 | } |
684 | return result; |
685 | } |
686 | |
687 | shared_ptr<RowGroupCollection> RowGroupCollection::AlterType(ClientContext &context, idx_t changed_idx, |
688 | const LogicalType &target_type, |
689 | vector<column_t> bound_columns, Expression &cast_expr) { |
690 | D_ASSERT(changed_idx < types.size()); |
691 | auto new_types = types; |
692 | new_types[changed_idx] = target_type; |
693 | |
694 | auto result = |
695 | make_shared<RowGroupCollection>(args&: info, args&: block_manager, args: std::move(new_types), args&: row_start, args: total_rows.load()); |
696 | result->stats.InitializeAlterType(parent&: stats, changed_idx, new_type: target_type); |
697 | |
698 | vector<LogicalType> scan_types; |
699 | for (idx_t i = 0; i < bound_columns.size(); i++) { |
700 | if (bound_columns[i] == COLUMN_IDENTIFIER_ROW_ID) { |
701 | scan_types.emplace_back(args: LogicalType::ROW_TYPE); |
702 | } else { |
703 | scan_types.push_back(x: types[bound_columns[i]]); |
704 | } |
705 | } |
706 | DataChunk scan_chunk; |
707 | scan_chunk.Initialize(allocator&: GetAllocator(), types: scan_types); |
708 | |
709 | ExpressionExecutor executor(context); |
710 | executor.AddExpression(expr: cast_expr); |
711 | |
712 | TableScanState scan_state; |
713 | scan_state.Initialize(column_ids: bound_columns); |
714 | scan_state.table_state.max_row = row_start + total_rows; |
715 | |
716 | // now alter the type of the column within all of the row_groups individually |
717 | auto &changed_stats = result->stats.GetStats(i: changed_idx); |
718 | for (auto ¤t_row_group : row_groups->Segments()) { |
719 | auto new_row_group = current_row_group.AlterType(new_collection&: *result, target_type, changed_idx, executor, |
720 | scan_state&: scan_state.table_state, scan_chunk); |
721 | new_row_group->MergeIntoStatistics(column_idx: changed_idx, other&: changed_stats.Statistics()); |
722 | result->row_groups->AppendSegment(segment: std::move(new_row_group)); |
723 | } |
724 | |
725 | return result; |
726 | } |
727 | |
728 | void RowGroupCollection::VerifyNewConstraint(DataTable &parent, const BoundConstraint &constraint) { |
729 | if (total_rows == 0) { |
730 | return; |
731 | } |
732 | // scan the original table, check if there's any null value |
733 | auto ¬_null_constraint = constraint.Cast<BoundNotNullConstraint>(); |
734 | vector<LogicalType> scan_types; |
735 | auto physical_index = not_null_constraint.index.index; |
736 | D_ASSERT(physical_index < types.size()); |
737 | scan_types.push_back(x: types[physical_index]); |
738 | DataChunk scan_chunk; |
739 | scan_chunk.Initialize(allocator&: GetAllocator(), types: scan_types); |
740 | |
741 | CreateIndexScanState state; |
742 | vector<column_t> cids; |
743 | cids.push_back(x: physical_index); |
744 | // Use ScanCommitted to scan the latest committed data |
745 | state.Initialize(column_ids: cids, table_filters: nullptr); |
746 | InitializeScan(state&: state.table_state, column_ids: cids, table_filters: nullptr); |
747 | InitializeCreateIndexScan(state); |
748 | while (true) { |
749 | scan_chunk.Reset(); |
750 | state.table_state.ScanCommitted(result&: scan_chunk, l&: state.segment_lock, |
751 | type: TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED); |
752 | if (scan_chunk.size() == 0) { |
753 | break; |
754 | } |
755 | // Check constraint |
756 | if (VectorOperations::HasNull(input&: scan_chunk.data[0], count: scan_chunk.size())) { |
757 | throw ConstraintException("NOT NULL constraint failed: %s.%s" , info->table, |
758 | parent.column_definitions[physical_index].GetName()); |
759 | } |
760 | } |
761 | } |
762 | |
763 | //===--------------------------------------------------------------------===// |
764 | // Statistics |
765 | //===--------------------------------------------------------------------===// |
766 | void RowGroupCollection::CopyStats(TableStatistics &other_stats) { |
767 | stats.CopyStats(other&: other_stats); |
768 | } |
769 | |
770 | unique_ptr<BaseStatistics> RowGroupCollection::CopyStats(column_t column_id) { |
771 | return stats.CopyStats(i: column_id); |
772 | } |
773 | |
774 | void RowGroupCollection::SetDistinct(column_t column_id, unique_ptr<DistinctStatistics> distinct_stats) { |
775 | D_ASSERT(column_id != COLUMN_IDENTIFIER_ROW_ID); |
776 | auto stats_guard = stats.GetLock(); |
777 | stats.GetStats(i: column_id).SetDistinct(std::move(distinct_stats)); |
778 | } |
779 | |
780 | } // namespace duckdb |
781 | |