1 | #include "duckdb/storage/table/column_data.hpp" |
2 | |
3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
4 | #include "duckdb/function/compression_function.hpp" |
5 | #include "duckdb/planner/table_filter.hpp" |
6 | #include "duckdb/storage/data_pointer.hpp" |
7 | #include "duckdb/storage/data_table.hpp" |
8 | #include "duckdb/storage/statistics/distinct_statistics.hpp" |
9 | #include "duckdb/storage/storage_manager.hpp" |
10 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
11 | #include "duckdb/storage/table/list_column_data.hpp" |
12 | #include "duckdb/storage/table/standard_column_data.hpp" |
13 | |
14 | #include "duckdb/storage/table/struct_column_data.hpp" |
15 | #include "duckdb/storage/table/update_segment.hpp" |
16 | #include "duckdb/storage/table_storage_info.hpp" |
17 | #include "duckdb/storage/table/append_state.hpp" |
18 | #include "duckdb/storage/table/scan_state.hpp" |
19 | #include "duckdb/main/attached_database.hpp" |
20 | |
21 | namespace duckdb { |
22 | |
23 | ColumnData::ColumnData(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, idx_t start_row, |
24 | LogicalType type_p, optional_ptr<ColumnData> parent) |
25 | : start(start_row), count(0), block_manager(block_manager), info(info), column_index(column_index), |
26 | type(std::move(type_p)), parent(parent), version(0) { |
27 | if (!parent) { |
28 | stats = make_uniq<SegmentStatistics>(args&: type); |
29 | } |
30 | } |
31 | |
32 | ColumnData::~ColumnData() { |
33 | } |
34 | |
35 | void ColumnData::SetStart(idx_t new_start) { |
36 | this->start = new_start; |
37 | idx_t offset = 0; |
38 | for (auto &segment : data.Segments()) { |
39 | segment.start = start + offset; |
40 | offset += segment.count; |
41 | } |
42 | data.Reinitialize(); |
43 | } |
44 | |
45 | DatabaseInstance &ColumnData::GetDatabase() const { |
46 | return info.db.GetDatabase(); |
47 | } |
48 | |
49 | DataTableInfo &ColumnData::GetTableInfo() const { |
50 | return info; |
51 | } |
52 | |
53 | const LogicalType &ColumnData::RootType() const { |
54 | if (parent) { |
55 | return parent->RootType(); |
56 | } |
57 | return type; |
58 | } |
59 | |
60 | void ColumnData::IncrementVersion() { |
61 | version++; |
62 | } |
63 | |
64 | idx_t ColumnData::GetMaxEntry() { |
65 | return count; |
66 | } |
67 | |
68 | void ColumnData::InitializeScan(ColumnScanState &state) { |
69 | state.current = data.GetRootSegment(); |
70 | state.segment_tree = &data; |
71 | state.row_index = state.current ? state.current->start : 0; |
72 | state.internal_index = state.row_index; |
73 | state.initialized = false; |
74 | state.version = version; |
75 | state.scan_state.reset(); |
76 | state.last_offset = 0; |
77 | } |
78 | |
79 | void ColumnData::InitializeScanWithOffset(ColumnScanState &state, idx_t row_idx) { |
80 | state.current = data.GetSegment(row_number: row_idx); |
81 | state.segment_tree = &data; |
82 | state.row_index = row_idx; |
83 | state.internal_index = state.current->start; |
84 | state.initialized = false; |
85 | state.version = version; |
86 | state.scan_state.reset(); |
87 | state.last_offset = 0; |
88 | } |
89 | |
90 | idx_t ColumnData::ScanVector(ColumnScanState &state, Vector &result, idx_t remaining) { |
91 | state.previous_states.clear(); |
92 | if (state.version != version) { |
93 | InitializeScanWithOffset(state, row_idx: state.row_index); |
94 | state.current->InitializeScan(state); |
95 | state.initialized = true; |
96 | } else if (!state.initialized) { |
97 | D_ASSERT(state.current); |
98 | state.current->InitializeScan(state); |
99 | state.internal_index = state.current->start; |
100 | state.initialized = true; |
101 | } |
102 | D_ASSERT(data.HasSegment(state.current)); |
103 | D_ASSERT(state.version == version); |
104 | D_ASSERT(state.internal_index <= state.row_index); |
105 | if (state.internal_index < state.row_index) { |
106 | state.current->Skip(state); |
107 | } |
108 | D_ASSERT(state.current->type == type); |
109 | idx_t initial_remaining = remaining; |
110 | while (remaining > 0) { |
111 | D_ASSERT(state.row_index >= state.current->start && |
112 | state.row_index <= state.current->start + state.current->count); |
113 | idx_t scan_count = MinValue<idx_t>(a: remaining, b: state.current->start + state.current->count - state.row_index); |
114 | idx_t result_offset = initial_remaining - remaining; |
115 | if (scan_count > 0) { |
116 | state.current->Scan(state, scan_count, result, result_offset, entire_vector: scan_count == initial_remaining); |
117 | |
118 | state.row_index += scan_count; |
119 | remaining -= scan_count; |
120 | } |
121 | |
122 | if (remaining > 0) { |
123 | auto next = data.GetNextSegment(segment: state.current); |
124 | if (!next) { |
125 | break; |
126 | } |
127 | state.previous_states.emplace_back(args: std::move(state.scan_state)); |
128 | state.current = next; |
129 | state.current->InitializeScan(state); |
130 | state.segment_checked = false; |
131 | D_ASSERT(state.row_index >= state.current->start && |
132 | state.row_index <= state.current->start + state.current->count); |
133 | } |
134 | } |
135 | state.internal_index = state.row_index; |
136 | return initial_remaining - remaining; |
137 | } |
138 | |
139 | template <bool SCAN_COMMITTED, bool ALLOW_UPDATES> |
140 | idx_t ColumnData::ScanVector(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result) { |
141 | auto scan_count = ScanVector(state, result, STANDARD_VECTOR_SIZE); |
142 | |
143 | lock_guard<mutex> update_guard(update_lock); |
144 | if (updates) { |
145 | if (!ALLOW_UPDATES && updates->HasUncommittedUpdates(vector_index)) { |
146 | throw TransactionException("Cannot create index with outstanding updates" ); |
147 | } |
148 | result.Flatten(count: scan_count); |
149 | if (SCAN_COMMITTED) { |
150 | updates->FetchCommitted(vector_index, result); |
151 | } else { |
152 | updates->FetchUpdates(transaction, vector_index, result); |
153 | } |
154 | } |
155 | return scan_count; |
156 | } |
157 | |
158 | template idx_t ColumnData::ScanVector<false, false>(TransactionData transaction, idx_t vector_index, |
159 | ColumnScanState &state, Vector &result); |
160 | template idx_t ColumnData::ScanVector<true, false>(TransactionData transaction, idx_t vector_index, |
161 | ColumnScanState &state, Vector &result); |
162 | template idx_t ColumnData::ScanVector<false, true>(TransactionData transaction, idx_t vector_index, |
163 | ColumnScanState &state, Vector &result); |
164 | template idx_t ColumnData::ScanVector<true, true>(TransactionData transaction, idx_t vector_index, |
165 | ColumnScanState &state, Vector &result); |
166 | |
167 | idx_t ColumnData::Scan(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result) { |
168 | return ScanVector<false, true>(transaction, vector_index, state, result); |
169 | } |
170 | |
171 | idx_t ColumnData::ScanCommitted(idx_t vector_index, ColumnScanState &state, Vector &result, bool allow_updates) { |
172 | if (allow_updates) { |
173 | return ScanVector<true, true>(transaction: TransactionData(0, 0), vector_index, state, result); |
174 | } else { |
175 | return ScanVector<true, false>(transaction: TransactionData(0, 0), vector_index, state, result); |
176 | } |
177 | } |
178 | |
179 | void ColumnData::ScanCommittedRange(idx_t row_group_start, idx_t offset_in_row_group, idx_t count, Vector &result) { |
180 | ColumnScanState child_state; |
181 | InitializeScanWithOffset(state&: child_state, row_idx: row_group_start + offset_in_row_group); |
182 | auto scan_count = ScanVector(state&: child_state, result, remaining: count); |
183 | if (updates) { |
184 | result.Flatten(count: scan_count); |
185 | updates->FetchCommittedRange(start_row: offset_in_row_group, count, result); |
186 | } |
187 | } |
188 | |
189 | idx_t ColumnData::ScanCount(ColumnScanState &state, Vector &result, idx_t count) { |
190 | if (count == 0) { |
191 | return 0; |
192 | } |
193 | // ScanCount can only be used if there are no updates |
194 | D_ASSERT(!updates); |
195 | return ScanVector(state, result, remaining: count); |
196 | } |
197 | |
198 | void ColumnData::Select(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result, |
199 | SelectionVector &sel, idx_t &count, const TableFilter &filter) { |
200 | idx_t scan_count = Scan(transaction, vector_index, state, result); |
201 | result.Flatten(count: scan_count); |
202 | ColumnSegment::FilterSelection(sel, result, filter, approved_tuple_count&: count, mask&: FlatVector::Validity(vector&: result)); |
203 | } |
204 | |
205 | void ColumnData::FilterScan(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result, |
206 | SelectionVector &sel, idx_t count) { |
207 | Scan(transaction, vector_index, state, result); |
208 | result.Slice(sel, count); |
209 | } |
210 | |
211 | void ColumnData::FilterScanCommitted(idx_t vector_index, ColumnScanState &state, Vector &result, SelectionVector &sel, |
212 | idx_t count, bool allow_updates) { |
213 | ScanCommitted(vector_index, state, result, allow_updates); |
214 | result.Slice(sel, count); |
215 | } |
216 | |
217 | void ColumnData::Skip(ColumnScanState &state, idx_t count) { |
218 | state.Next(count); |
219 | } |
220 | |
221 | void ColumnData::Append(BaseStatistics &stats, ColumnAppendState &state, Vector &vector, idx_t count) { |
222 | UnifiedVectorFormat vdata; |
223 | vector.ToUnifiedFormat(count, data&: vdata); |
224 | AppendData(stats, state, vdata, count); |
225 | } |
226 | |
227 | void ColumnData::Append(ColumnAppendState &state, Vector &vector, idx_t count) { |
228 | if (parent || !stats) { |
229 | throw InternalException("ColumnData::Append called on a column with a parent or without stats" ); |
230 | } |
231 | Append(stats&: stats->statistics, state, vector, count); |
232 | } |
233 | |
234 | bool ColumnData::CheckZonemap(TableFilter &filter) { |
235 | if (!stats) { |
236 | throw InternalException("ColumnData::CheckZonemap called on a column without stats" ); |
237 | } |
238 | auto propagate_result = filter.CheckStatistics(stats&: stats->statistics); |
239 | if (propagate_result == FilterPropagateResult::FILTER_ALWAYS_FALSE || |
240 | propagate_result == FilterPropagateResult::FILTER_FALSE_OR_NULL) { |
241 | return false; |
242 | } |
243 | return true; |
244 | } |
245 | |
246 | unique_ptr<BaseStatistics> ColumnData::GetStatistics() { |
247 | if (!stats) { |
248 | throw InternalException("ColumnData::GetStatistics called on a column without stats" ); |
249 | } |
250 | return stats->statistics.ToUnique(); |
251 | } |
252 | |
253 | void ColumnData::MergeStatistics(const BaseStatistics &other) { |
254 | if (!stats) { |
255 | throw InternalException("ColumnData::MergeStatistics called on a column without stats" ); |
256 | } |
257 | return stats->statistics.Merge(other); |
258 | } |
259 | |
260 | void ColumnData::MergeIntoStatistics(BaseStatistics &other) { |
261 | if (!stats) { |
262 | throw InternalException("ColumnData::MergeIntoStatistics called on a column without stats" ); |
263 | } |
264 | return other.Merge(other: stats->statistics); |
265 | } |
266 | |
267 | void ColumnData::InitializeAppend(ColumnAppendState &state) { |
268 | auto l = data.Lock(); |
269 | if (data.IsEmpty(l)) { |
270 | // no segments yet, append an empty segment |
271 | AppendTransientSegment(l, start_row: start); |
272 | } |
273 | auto segment = data.GetLastSegment(l); |
274 | if (segment->segment_type == ColumnSegmentType::PERSISTENT || !segment->function.get().init_append) { |
275 | // we cannot append to this segment - append a new segment |
276 | auto total_rows = segment->start + segment->count; |
277 | AppendTransientSegment(l, start_row: total_rows); |
278 | state.current = data.GetLastSegment(l); |
279 | } else { |
280 | state.current = segment; |
281 | } |
282 | |
283 | D_ASSERT(state.current->segment_type == ColumnSegmentType::TRANSIENT); |
284 | state.current->InitializeAppend(state); |
285 | D_ASSERT(state.current->function.get().append); |
286 | } |
287 | |
288 | void ColumnData::AppendData(BaseStatistics &stats, ColumnAppendState &state, UnifiedVectorFormat &vdata, idx_t count) { |
289 | idx_t offset = 0; |
290 | this->count += count; |
291 | while (true) { |
292 | // append the data from the vector |
293 | idx_t copied_elements = state.current->Append(state, data&: vdata, offset, count); |
294 | stats.Merge(other: state.current->stats.statistics); |
295 | if (copied_elements == count) { |
296 | // finished copying everything |
297 | break; |
298 | } |
299 | |
300 | // we couldn't fit everything we wanted in the current column segment, create a new one |
301 | { |
302 | auto l = data.Lock(); |
303 | AppendTransientSegment(l, start_row: state.current->start + state.current->count); |
304 | state.current = data.GetLastSegment(l); |
305 | state.current->InitializeAppend(state); |
306 | } |
307 | offset += copied_elements; |
308 | count -= copied_elements; |
309 | } |
310 | } |
311 | |
312 | void ColumnData::RevertAppend(row_t start_row) { |
313 | auto l = data.Lock(); |
314 | // check if this row is in the segment tree at all |
315 | auto last_segment = data.GetLastSegment(l); |
316 | if (idx_t(start_row) >= last_segment->start + last_segment->count) { |
317 | // the start row is equal to the final portion of the column data: nothing was ever appended here |
318 | D_ASSERT(idx_t(start_row) == last_segment->start + last_segment->count); |
319 | return; |
320 | } |
321 | // find the segment index that the current row belongs to |
322 | idx_t segment_index = data.GetSegmentIndex(l, row_number: start_row); |
323 | auto segment = data.GetSegmentByIndex(l, index: segment_index); |
324 | auto &transient = *segment; |
325 | D_ASSERT(transient.segment_type == ColumnSegmentType::TRANSIENT); |
326 | |
327 | // remove any segments AFTER this segment: they should be deleted entirely |
328 | data.EraseSegments(l, segment_start: segment_index); |
329 | |
330 | this->count = start_row - this->start; |
331 | segment->next = nullptr; |
332 | transient.RevertAppend(start_row); |
333 | } |
334 | |
335 | idx_t ColumnData::Fetch(ColumnScanState &state, row_t row_id, Vector &result) { |
336 | D_ASSERT(row_id >= 0); |
337 | D_ASSERT(idx_t(row_id) >= start); |
338 | // perform the fetch within the segment |
339 | state.row_index = start + ((row_id - start) / STANDARD_VECTOR_SIZE * STANDARD_VECTOR_SIZE); |
340 | state.current = data.GetSegment(row_number: state.row_index); |
341 | state.internal_index = state.current->start; |
342 | return ScanVector(state, result, STANDARD_VECTOR_SIZE); |
343 | } |
344 | |
345 | void ColumnData::FetchRow(TransactionData transaction, ColumnFetchState &state, row_t row_id, Vector &result, |
346 | idx_t result_idx) { |
347 | auto segment = data.GetSegment(row_number: row_id); |
348 | |
349 | // now perform the fetch within the segment |
350 | segment->FetchRow(state, row_id, result, result_idx); |
351 | // merge any updates made to this row |
352 | lock_guard<mutex> update_guard(update_lock); |
353 | if (updates) { |
354 | updates->FetchRow(transaction, row_id, result, result_idx); |
355 | } |
356 | } |
357 | |
358 | void ColumnData::Update(TransactionData transaction, idx_t column_index, Vector &update_vector, row_t *row_ids, |
359 | idx_t update_count) { |
360 | lock_guard<mutex> update_guard(update_lock); |
361 | if (!updates) { |
362 | updates = make_uniq<UpdateSegment>(args&: *this); |
363 | } |
364 | Vector base_vector(type); |
365 | ColumnScanState state; |
366 | auto fetch_count = Fetch(state, row_id: row_ids[0], result&: base_vector); |
367 | |
368 | base_vector.Flatten(count: fetch_count); |
369 | updates->Update(transaction, column_index, update&: update_vector, ids: row_ids, count: update_count, base_data&: base_vector); |
370 | } |
371 | |
372 | void ColumnData::UpdateColumn(TransactionData transaction, const vector<column_t> &column_path, Vector &update_vector, |
373 | row_t *row_ids, idx_t update_count, idx_t depth) { |
374 | // this method should only be called at the end of the path in the base column case |
375 | D_ASSERT(depth >= column_path.size()); |
376 | ColumnData::Update(transaction, column_index: column_path[0], update_vector, row_ids, update_count); |
377 | } |
378 | |
379 | unique_ptr<BaseStatistics> ColumnData::GetUpdateStatistics() { |
380 | lock_guard<mutex> update_guard(update_lock); |
381 | return updates ? updates->GetStatistics() : nullptr; |
382 | } |
383 | |
384 | void ColumnData::AppendTransientSegment(SegmentLock &l, idx_t start_row) { |
385 | idx_t segment_size = Storage::BLOCK_SIZE; |
386 | if (start_row == idx_t(MAX_ROW_ID)) { |
387 | #if STANDARD_VECTOR_SIZE < 1024 |
388 | segment_size = 1024 * GetTypeIdSize(type.InternalType()); |
389 | #else |
390 | segment_size = STANDARD_VECTOR_SIZE * GetTypeIdSize(type: type.InternalType()); |
391 | #endif |
392 | } |
393 | auto new_segment = ColumnSegment::CreateTransientSegment(db&: GetDatabase(), type, start: start_row, segment_size); |
394 | data.AppendSegment(l, segment: std::move(new_segment)); |
395 | } |
396 | |
397 | void ColumnData::CommitDropColumn() { |
398 | for (auto &segment_p : data.Segments()) { |
399 | auto &segment = segment_p; |
400 | if (segment.segment_type == ColumnSegmentType::PERSISTENT) { |
401 | auto block_id = segment.GetBlockId(); |
402 | if (block_id != INVALID_BLOCK) { |
403 | block_manager.MarkBlockAsModified(block_id); |
404 | } |
405 | } |
406 | } |
407 | } |
408 | |
409 | unique_ptr<ColumnCheckpointState> ColumnData::CreateCheckpointState(RowGroup &row_group, |
410 | PartialBlockManager &partial_block_manager) { |
411 | return make_uniq<ColumnCheckpointState>(args&: row_group, args&: *this, args&: partial_block_manager); |
412 | } |
413 | |
414 | void ColumnData::CheckpointScan(ColumnSegment &segment, ColumnScanState &state, idx_t row_group_start, idx_t count, |
415 | Vector &scan_vector) { |
416 | segment.Scan(state, scan_count: count, result&: scan_vector, result_offset: 0, entire_vector: true); |
417 | if (updates) { |
418 | scan_vector.Flatten(count); |
419 | updates->FetchCommittedRange(start_row: state.row_index - row_group_start, count, result&: scan_vector); |
420 | } |
421 | } |
422 | |
423 | unique_ptr<ColumnCheckpointState> ColumnData::Checkpoint(RowGroup &row_group, |
424 | PartialBlockManager &partial_block_manager, |
425 | ColumnCheckpointInfo &checkpoint_info) { |
426 | // scan the segments of the column data |
427 | // set up the checkpoint state |
428 | auto checkpoint_state = CreateCheckpointState(row_group, partial_block_manager); |
429 | checkpoint_state->global_stats = BaseStatistics::CreateEmpty(type).ToUnique(); |
430 | |
431 | auto l = data.Lock(); |
432 | auto nodes = data.MoveSegments(l); |
433 | if (nodes.empty()) { |
434 | // empty table: flush the empty list |
435 | return checkpoint_state; |
436 | } |
437 | lock_guard<mutex> update_guard(update_lock); |
438 | |
439 | ColumnDataCheckpointer checkpointer(*this, row_group, *checkpoint_state, checkpoint_info); |
440 | checkpointer.Checkpoint(nodes: std::move(nodes)); |
441 | |
442 | // replace the old tree with the new one |
443 | data.Replace(l, other&: checkpoint_state->new_tree); |
444 | version++; |
445 | |
446 | return checkpoint_state; |
447 | } |
448 | |
449 | void ColumnData::DeserializeColumn(Deserializer &source) { |
450 | // load the data pointers for the column |
451 | this->count = 0; |
452 | idx_t data_pointer_count = source.Read<idx_t>(); |
453 | for (idx_t data_ptr = 0; data_ptr < data_pointer_count; data_ptr++) { |
454 | // read the data pointer |
455 | auto row_start = source.Read<idx_t>(); |
456 | auto tuple_count = source.Read<idx_t>(); |
457 | auto block_pointer_block_id = source.Read<block_id_t>(); |
458 | auto block_pointer_offset = source.Read<uint32_t>(); |
459 | auto compression_type = source.Read<CompressionType>(); |
460 | auto segment_stats = BaseStatistics::Deserialize(source, type); |
461 | if (stats) { |
462 | stats->statistics.Merge(other: segment_stats); |
463 | } |
464 | |
465 | DataPointer data_pointer(std::move(segment_stats)); |
466 | data_pointer.row_start = row_start; |
467 | data_pointer.tuple_count = tuple_count; |
468 | data_pointer.block_pointer.block_id = block_pointer_block_id; |
469 | data_pointer.block_pointer.offset = block_pointer_offset; |
470 | data_pointer.compression_type = compression_type; |
471 | |
472 | this->count += tuple_count; |
473 | |
474 | // create a persistent segment |
475 | auto segment = ColumnSegment::CreatePersistentSegment( |
476 | db&: GetDatabase(), block_manager, id: data_pointer.block_pointer.block_id, offset: data_pointer.block_pointer.offset, type_p: type, |
477 | start: data_pointer.row_start, count: data_pointer.tuple_count, compression_type: data_pointer.compression_type, |
478 | statistics: std::move(data_pointer.statistics)); |
479 | data.AppendSegment(segment: std::move(segment)); |
480 | } |
481 | } |
482 | |
483 | shared_ptr<ColumnData> ColumnData::Deserialize(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, |
484 | idx_t start_row, Deserializer &source, const LogicalType &type, |
485 | optional_ptr<ColumnData> parent) { |
486 | auto entry = ColumnData::CreateColumn(block_manager, info, column_index, start_row, type, parent); |
487 | entry->DeserializeColumn(source); |
488 | return entry; |
489 | } |
490 | |
491 | void ColumnData::GetColumnSegmentInfo(idx_t row_group_index, vector<idx_t> col_path, |
492 | vector<ColumnSegmentInfo> &result) { |
493 | D_ASSERT(!col_path.empty()); |
494 | |
495 | // convert the column path to a string |
496 | string col_path_str = "[" ; |
497 | for (idx_t i = 0; i < col_path.size(); i++) { |
498 | if (i > 0) { |
499 | col_path_str += ", " ; |
500 | } |
501 | col_path_str += to_string(val: col_path[i]); |
502 | } |
503 | col_path_str += "]" ; |
504 | |
505 | // iterate over the segments |
506 | idx_t segment_idx = 0; |
507 | auto segment = (ColumnSegment *)data.GetRootSegment(); |
508 | while (segment) { |
509 | ColumnSegmentInfo column_info; |
510 | column_info.row_group_index = row_group_index; |
511 | column_info.column_id = col_path[0]; |
512 | column_info.column_path = col_path_str; |
513 | column_info.segment_idx = segment_idx; |
514 | column_info.segment_type = type.ToString(); |
515 | column_info.segment_start = segment->start; |
516 | column_info.segment_count = segment->count; |
517 | column_info.compression_type = CompressionTypeToString(type: segment->function.get().type); |
518 | column_info.segment_stats = segment->stats.statistics.ToString(); |
519 | { |
520 | lock_guard<mutex> ulock(update_lock); |
521 | column_info.has_updates = updates ? true : false; |
522 | } |
523 | // persistent |
524 | // block_id |
525 | // block_offset |
526 | if (segment->segment_type == ColumnSegmentType::PERSISTENT) { |
527 | column_info.persistent = true; |
528 | column_info.block_id = segment->GetBlockId(); |
529 | column_info.block_offset = segment->GetBlockOffset(); |
530 | } else { |
531 | column_info.persistent = false; |
532 | } |
533 | result.emplace_back(args&: column_info); |
534 | |
535 | segment_idx++; |
536 | segment = (ColumnSegment *)data.GetNextSegment(segment); |
537 | } |
538 | } |
539 | |
540 | void ColumnData::Verify(RowGroup &parent) { |
541 | #ifdef DEBUG |
542 | D_ASSERT(this->start == parent.start); |
543 | data.Verify(); |
544 | if (type.InternalType() == PhysicalType::STRUCT) { |
545 | // structs don't have segments |
546 | D_ASSERT(!data.GetRootSegment()); |
547 | return; |
548 | } |
549 | idx_t current_index = 0; |
550 | idx_t current_start = this->start; |
551 | idx_t total_count = 0; |
552 | for (auto &segment : data.Segments()) { |
553 | D_ASSERT(segment.index == current_index); |
554 | D_ASSERT(segment.start == current_start); |
555 | current_start += segment.count; |
556 | total_count += segment.count; |
557 | current_index++; |
558 | } |
559 | D_ASSERT(this->count == total_count); |
560 | #endif |
561 | } |
562 | |
563 | template <class RET, class OP> |
564 | static RET CreateColumnInternal(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, idx_t start_row, |
565 | const LogicalType &type, optional_ptr<ColumnData> parent) { |
566 | if (type.InternalType() == PhysicalType::STRUCT) { |
567 | return OP::template Create<StructColumnData>(block_manager, info, column_index, start_row, type, parent); |
568 | } else if (type.InternalType() == PhysicalType::LIST) { |
569 | return OP::template Create<ListColumnData>(block_manager, info, column_index, start_row, type, parent); |
570 | } else if (type.id() == LogicalTypeId::VALIDITY) { |
571 | return OP::template Create<ValidityColumnData>(block_manager, info, column_index, start_row, *parent); |
572 | } |
573 | return OP::template Create<StandardColumnData>(block_manager, info, column_index, start_row, type, parent); |
574 | } |
575 | |
576 | shared_ptr<ColumnData> ColumnData::CreateColumn(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, |
577 | idx_t start_row, const LogicalType &type, |
578 | optional_ptr<ColumnData> parent) { |
579 | return CreateColumnInternal<shared_ptr<ColumnData>, SharedConstructor>(block_manager, info, column_index, start_row, |
580 | type, parent); |
581 | } |
582 | |
583 | unique_ptr<ColumnData> ColumnData::CreateColumnUnique(BlockManager &block_manager, DataTableInfo &info, |
584 | idx_t column_index, idx_t start_row, const LogicalType &type, |
585 | optional_ptr<ColumnData> parent) { |
586 | return CreateColumnInternal<unique_ptr<ColumnData>, UniqueConstructor>(block_manager, info, column_index, start_row, |
587 | type, parent); |
588 | } |
589 | |
590 | } // namespace duckdb |
591 | |