1#include "duckdb/storage/table/column_data.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/function/compression_function.hpp"
5#include "duckdb/planner/table_filter.hpp"
6#include "duckdb/storage/data_pointer.hpp"
7#include "duckdb/storage/data_table.hpp"
8#include "duckdb/storage/statistics/distinct_statistics.hpp"
9#include "duckdb/storage/storage_manager.hpp"
10#include "duckdb/storage/table/column_data_checkpointer.hpp"
11#include "duckdb/storage/table/list_column_data.hpp"
12#include "duckdb/storage/table/standard_column_data.hpp"
13
14#include "duckdb/storage/table/struct_column_data.hpp"
15#include "duckdb/storage/table/update_segment.hpp"
16#include "duckdb/storage/table_storage_info.hpp"
17#include "duckdb/storage/table/append_state.hpp"
18#include "duckdb/storage/table/scan_state.hpp"
19#include "duckdb/main/attached_database.hpp"
20
21namespace duckdb {
22
23ColumnData::ColumnData(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, idx_t start_row,
24 LogicalType type_p, optional_ptr<ColumnData> parent)
25 : start(start_row), count(0), block_manager(block_manager), info(info), column_index(column_index),
26 type(std::move(type_p)), parent(parent), version(0) {
27 if (!parent) {
28 stats = make_uniq<SegmentStatistics>(args&: type);
29 }
30}
31
32ColumnData::~ColumnData() {
33}
34
35void ColumnData::SetStart(idx_t new_start) {
36 this->start = new_start;
37 idx_t offset = 0;
38 for (auto &segment : data.Segments()) {
39 segment.start = start + offset;
40 offset += segment.count;
41 }
42 data.Reinitialize();
43}
44
45DatabaseInstance &ColumnData::GetDatabase() const {
46 return info.db.GetDatabase();
47}
48
49DataTableInfo &ColumnData::GetTableInfo() const {
50 return info;
51}
52
53const LogicalType &ColumnData::RootType() const {
54 if (parent) {
55 return parent->RootType();
56 }
57 return type;
58}
59
60void ColumnData::IncrementVersion() {
61 version++;
62}
63
64idx_t ColumnData::GetMaxEntry() {
65 return count;
66}
67
68void ColumnData::InitializeScan(ColumnScanState &state) {
69 state.current = data.GetRootSegment();
70 state.segment_tree = &data;
71 state.row_index = state.current ? state.current->start : 0;
72 state.internal_index = state.row_index;
73 state.initialized = false;
74 state.version = version;
75 state.scan_state.reset();
76 state.last_offset = 0;
77}
78
79void ColumnData::InitializeScanWithOffset(ColumnScanState &state, idx_t row_idx) {
80 state.current = data.GetSegment(row_number: row_idx);
81 state.segment_tree = &data;
82 state.row_index = row_idx;
83 state.internal_index = state.current->start;
84 state.initialized = false;
85 state.version = version;
86 state.scan_state.reset();
87 state.last_offset = 0;
88}
89
90idx_t ColumnData::ScanVector(ColumnScanState &state, Vector &result, idx_t remaining) {
91 state.previous_states.clear();
92 if (state.version != version) {
93 InitializeScanWithOffset(state, row_idx: state.row_index);
94 state.current->InitializeScan(state);
95 state.initialized = true;
96 } else if (!state.initialized) {
97 D_ASSERT(state.current);
98 state.current->InitializeScan(state);
99 state.internal_index = state.current->start;
100 state.initialized = true;
101 }
102 D_ASSERT(data.HasSegment(state.current));
103 D_ASSERT(state.version == version);
104 D_ASSERT(state.internal_index <= state.row_index);
105 if (state.internal_index < state.row_index) {
106 state.current->Skip(state);
107 }
108 D_ASSERT(state.current->type == type);
109 idx_t initial_remaining = remaining;
110 while (remaining > 0) {
111 D_ASSERT(state.row_index >= state.current->start &&
112 state.row_index <= state.current->start + state.current->count);
113 idx_t scan_count = MinValue<idx_t>(a: remaining, b: state.current->start + state.current->count - state.row_index);
114 idx_t result_offset = initial_remaining - remaining;
115 if (scan_count > 0) {
116 state.current->Scan(state, scan_count, result, result_offset, entire_vector: scan_count == initial_remaining);
117
118 state.row_index += scan_count;
119 remaining -= scan_count;
120 }
121
122 if (remaining > 0) {
123 auto next = data.GetNextSegment(segment: state.current);
124 if (!next) {
125 break;
126 }
127 state.previous_states.emplace_back(args: std::move(state.scan_state));
128 state.current = next;
129 state.current->InitializeScan(state);
130 state.segment_checked = false;
131 D_ASSERT(state.row_index >= state.current->start &&
132 state.row_index <= state.current->start + state.current->count);
133 }
134 }
135 state.internal_index = state.row_index;
136 return initial_remaining - remaining;
137}
138
139template <bool SCAN_COMMITTED, bool ALLOW_UPDATES>
140idx_t ColumnData::ScanVector(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result) {
141 auto scan_count = ScanVector(state, result, STANDARD_VECTOR_SIZE);
142
143 lock_guard<mutex> update_guard(update_lock);
144 if (updates) {
145 if (!ALLOW_UPDATES && updates->HasUncommittedUpdates(vector_index)) {
146 throw TransactionException("Cannot create index with outstanding updates");
147 }
148 result.Flatten(count: scan_count);
149 if (SCAN_COMMITTED) {
150 updates->FetchCommitted(vector_index, result);
151 } else {
152 updates->FetchUpdates(transaction, vector_index, result);
153 }
154 }
155 return scan_count;
156}
157
158template idx_t ColumnData::ScanVector<false, false>(TransactionData transaction, idx_t vector_index,
159 ColumnScanState &state, Vector &result);
160template idx_t ColumnData::ScanVector<true, false>(TransactionData transaction, idx_t vector_index,
161 ColumnScanState &state, Vector &result);
162template idx_t ColumnData::ScanVector<false, true>(TransactionData transaction, idx_t vector_index,
163 ColumnScanState &state, Vector &result);
164template idx_t ColumnData::ScanVector<true, true>(TransactionData transaction, idx_t vector_index,
165 ColumnScanState &state, Vector &result);
166
167idx_t ColumnData::Scan(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result) {
168 return ScanVector<false, true>(transaction, vector_index, state, result);
169}
170
171idx_t ColumnData::ScanCommitted(idx_t vector_index, ColumnScanState &state, Vector &result, bool allow_updates) {
172 if (allow_updates) {
173 return ScanVector<true, true>(transaction: TransactionData(0, 0), vector_index, state, result);
174 } else {
175 return ScanVector<true, false>(transaction: TransactionData(0, 0), vector_index, state, result);
176 }
177}
178
179void ColumnData::ScanCommittedRange(idx_t row_group_start, idx_t offset_in_row_group, idx_t count, Vector &result) {
180 ColumnScanState child_state;
181 InitializeScanWithOffset(state&: child_state, row_idx: row_group_start + offset_in_row_group);
182 auto scan_count = ScanVector(state&: child_state, result, remaining: count);
183 if (updates) {
184 result.Flatten(count: scan_count);
185 updates->FetchCommittedRange(start_row: offset_in_row_group, count, result);
186 }
187}
188
189idx_t ColumnData::ScanCount(ColumnScanState &state, Vector &result, idx_t count) {
190 if (count == 0) {
191 return 0;
192 }
193 // ScanCount can only be used if there are no updates
194 D_ASSERT(!updates);
195 return ScanVector(state, result, remaining: count);
196}
197
198void ColumnData::Select(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result,
199 SelectionVector &sel, idx_t &count, const TableFilter &filter) {
200 idx_t scan_count = Scan(transaction, vector_index, state, result);
201 result.Flatten(count: scan_count);
202 ColumnSegment::FilterSelection(sel, result, filter, approved_tuple_count&: count, mask&: FlatVector::Validity(vector&: result));
203}
204
205void ColumnData::FilterScan(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result,
206 SelectionVector &sel, idx_t count) {
207 Scan(transaction, vector_index, state, result);
208 result.Slice(sel, count);
209}
210
211void ColumnData::FilterScanCommitted(idx_t vector_index, ColumnScanState &state, Vector &result, SelectionVector &sel,
212 idx_t count, bool allow_updates) {
213 ScanCommitted(vector_index, state, result, allow_updates);
214 result.Slice(sel, count);
215}
216
217void ColumnData::Skip(ColumnScanState &state, idx_t count) {
218 state.Next(count);
219}
220
221void ColumnData::Append(BaseStatistics &stats, ColumnAppendState &state, Vector &vector, idx_t count) {
222 UnifiedVectorFormat vdata;
223 vector.ToUnifiedFormat(count, data&: vdata);
224 AppendData(stats, state, vdata, count);
225}
226
227void ColumnData::Append(ColumnAppendState &state, Vector &vector, idx_t count) {
228 if (parent || !stats) {
229 throw InternalException("ColumnData::Append called on a column with a parent or without stats");
230 }
231 Append(stats&: stats->statistics, state, vector, count);
232}
233
234bool ColumnData::CheckZonemap(TableFilter &filter) {
235 if (!stats) {
236 throw InternalException("ColumnData::CheckZonemap called on a column without stats");
237 }
238 auto propagate_result = filter.CheckStatistics(stats&: stats->statistics);
239 if (propagate_result == FilterPropagateResult::FILTER_ALWAYS_FALSE ||
240 propagate_result == FilterPropagateResult::FILTER_FALSE_OR_NULL) {
241 return false;
242 }
243 return true;
244}
245
246unique_ptr<BaseStatistics> ColumnData::GetStatistics() {
247 if (!stats) {
248 throw InternalException("ColumnData::GetStatistics called on a column without stats");
249 }
250 return stats->statistics.ToUnique();
251}
252
253void ColumnData::MergeStatistics(const BaseStatistics &other) {
254 if (!stats) {
255 throw InternalException("ColumnData::MergeStatistics called on a column without stats");
256 }
257 return stats->statistics.Merge(other);
258}
259
260void ColumnData::MergeIntoStatistics(BaseStatistics &other) {
261 if (!stats) {
262 throw InternalException("ColumnData::MergeIntoStatistics called on a column without stats");
263 }
264 return other.Merge(other: stats->statistics);
265}
266
267void ColumnData::InitializeAppend(ColumnAppendState &state) {
268 auto l = data.Lock();
269 if (data.IsEmpty(l)) {
270 // no segments yet, append an empty segment
271 AppendTransientSegment(l, start_row: start);
272 }
273 auto segment = data.GetLastSegment(l);
274 if (segment->segment_type == ColumnSegmentType::PERSISTENT || !segment->function.get().init_append) {
275 // we cannot append to this segment - append a new segment
276 auto total_rows = segment->start + segment->count;
277 AppendTransientSegment(l, start_row: total_rows);
278 state.current = data.GetLastSegment(l);
279 } else {
280 state.current = segment;
281 }
282
283 D_ASSERT(state.current->segment_type == ColumnSegmentType::TRANSIENT);
284 state.current->InitializeAppend(state);
285 D_ASSERT(state.current->function.get().append);
286}
287
288void ColumnData::AppendData(BaseStatistics &stats, ColumnAppendState &state, UnifiedVectorFormat &vdata, idx_t count) {
289 idx_t offset = 0;
290 this->count += count;
291 while (true) {
292 // append the data from the vector
293 idx_t copied_elements = state.current->Append(state, data&: vdata, offset, count);
294 stats.Merge(other: state.current->stats.statistics);
295 if (copied_elements == count) {
296 // finished copying everything
297 break;
298 }
299
300 // we couldn't fit everything we wanted in the current column segment, create a new one
301 {
302 auto l = data.Lock();
303 AppendTransientSegment(l, start_row: state.current->start + state.current->count);
304 state.current = data.GetLastSegment(l);
305 state.current->InitializeAppend(state);
306 }
307 offset += copied_elements;
308 count -= copied_elements;
309 }
310}
311
312void ColumnData::RevertAppend(row_t start_row) {
313 auto l = data.Lock();
314 // check if this row is in the segment tree at all
315 auto last_segment = data.GetLastSegment(l);
316 if (idx_t(start_row) >= last_segment->start + last_segment->count) {
317 // the start row is equal to the final portion of the column data: nothing was ever appended here
318 D_ASSERT(idx_t(start_row) == last_segment->start + last_segment->count);
319 return;
320 }
321 // find the segment index that the current row belongs to
322 idx_t segment_index = data.GetSegmentIndex(l, row_number: start_row);
323 auto segment = data.GetSegmentByIndex(l, index: segment_index);
324 auto &transient = *segment;
325 D_ASSERT(transient.segment_type == ColumnSegmentType::TRANSIENT);
326
327 // remove any segments AFTER this segment: they should be deleted entirely
328 data.EraseSegments(l, segment_start: segment_index);
329
330 this->count = start_row - this->start;
331 segment->next = nullptr;
332 transient.RevertAppend(start_row);
333}
334
335idx_t ColumnData::Fetch(ColumnScanState &state, row_t row_id, Vector &result) {
336 D_ASSERT(row_id >= 0);
337 D_ASSERT(idx_t(row_id) >= start);
338 // perform the fetch within the segment
339 state.row_index = start + ((row_id - start) / STANDARD_VECTOR_SIZE * STANDARD_VECTOR_SIZE);
340 state.current = data.GetSegment(row_number: state.row_index);
341 state.internal_index = state.current->start;
342 return ScanVector(state, result, STANDARD_VECTOR_SIZE);
343}
344
345void ColumnData::FetchRow(TransactionData transaction, ColumnFetchState &state, row_t row_id, Vector &result,
346 idx_t result_idx) {
347 auto segment = data.GetSegment(row_number: row_id);
348
349 // now perform the fetch within the segment
350 segment->FetchRow(state, row_id, result, result_idx);
351 // merge any updates made to this row
352 lock_guard<mutex> update_guard(update_lock);
353 if (updates) {
354 updates->FetchRow(transaction, row_id, result, result_idx);
355 }
356}
357
358void ColumnData::Update(TransactionData transaction, idx_t column_index, Vector &update_vector, row_t *row_ids,
359 idx_t update_count) {
360 lock_guard<mutex> update_guard(update_lock);
361 if (!updates) {
362 updates = make_uniq<UpdateSegment>(args&: *this);
363 }
364 Vector base_vector(type);
365 ColumnScanState state;
366 auto fetch_count = Fetch(state, row_id: row_ids[0], result&: base_vector);
367
368 base_vector.Flatten(count: fetch_count);
369 updates->Update(transaction, column_index, update&: update_vector, ids: row_ids, count: update_count, base_data&: base_vector);
370}
371
372void ColumnData::UpdateColumn(TransactionData transaction, const vector<column_t> &column_path, Vector &update_vector,
373 row_t *row_ids, idx_t update_count, idx_t depth) {
374 // this method should only be called at the end of the path in the base column case
375 D_ASSERT(depth >= column_path.size());
376 ColumnData::Update(transaction, column_index: column_path[0], update_vector, row_ids, update_count);
377}
378
379unique_ptr<BaseStatistics> ColumnData::GetUpdateStatistics() {
380 lock_guard<mutex> update_guard(update_lock);
381 return updates ? updates->GetStatistics() : nullptr;
382}
383
384void ColumnData::AppendTransientSegment(SegmentLock &l, idx_t start_row) {
385 idx_t segment_size = Storage::BLOCK_SIZE;
386 if (start_row == idx_t(MAX_ROW_ID)) {
387#if STANDARD_VECTOR_SIZE < 1024
388 segment_size = 1024 * GetTypeIdSize(type.InternalType());
389#else
390 segment_size = STANDARD_VECTOR_SIZE * GetTypeIdSize(type: type.InternalType());
391#endif
392 }
393 auto new_segment = ColumnSegment::CreateTransientSegment(db&: GetDatabase(), type, start: start_row, segment_size);
394 data.AppendSegment(l, segment: std::move(new_segment));
395}
396
397void ColumnData::CommitDropColumn() {
398 for (auto &segment_p : data.Segments()) {
399 auto &segment = segment_p;
400 if (segment.segment_type == ColumnSegmentType::PERSISTENT) {
401 auto block_id = segment.GetBlockId();
402 if (block_id != INVALID_BLOCK) {
403 block_manager.MarkBlockAsModified(block_id);
404 }
405 }
406 }
407}
408
409unique_ptr<ColumnCheckpointState> ColumnData::CreateCheckpointState(RowGroup &row_group,
410 PartialBlockManager &partial_block_manager) {
411 return make_uniq<ColumnCheckpointState>(args&: row_group, args&: *this, args&: partial_block_manager);
412}
413
414void ColumnData::CheckpointScan(ColumnSegment &segment, ColumnScanState &state, idx_t row_group_start, idx_t count,
415 Vector &scan_vector) {
416 segment.Scan(state, scan_count: count, result&: scan_vector, result_offset: 0, entire_vector: true);
417 if (updates) {
418 scan_vector.Flatten(count);
419 updates->FetchCommittedRange(start_row: state.row_index - row_group_start, count, result&: scan_vector);
420 }
421}
422
423unique_ptr<ColumnCheckpointState> ColumnData::Checkpoint(RowGroup &row_group,
424 PartialBlockManager &partial_block_manager,
425 ColumnCheckpointInfo &checkpoint_info) {
426 // scan the segments of the column data
427 // set up the checkpoint state
428 auto checkpoint_state = CreateCheckpointState(row_group, partial_block_manager);
429 checkpoint_state->global_stats = BaseStatistics::CreateEmpty(type).ToUnique();
430
431 auto l = data.Lock();
432 auto nodes = data.MoveSegments(l);
433 if (nodes.empty()) {
434 // empty table: flush the empty list
435 return checkpoint_state;
436 }
437 lock_guard<mutex> update_guard(update_lock);
438
439 ColumnDataCheckpointer checkpointer(*this, row_group, *checkpoint_state, checkpoint_info);
440 checkpointer.Checkpoint(nodes: std::move(nodes));
441
442 // replace the old tree with the new one
443 data.Replace(l, other&: checkpoint_state->new_tree);
444 version++;
445
446 return checkpoint_state;
447}
448
449void ColumnData::DeserializeColumn(Deserializer &source) {
450 // load the data pointers for the column
451 this->count = 0;
452 idx_t data_pointer_count = source.Read<idx_t>();
453 for (idx_t data_ptr = 0; data_ptr < data_pointer_count; data_ptr++) {
454 // read the data pointer
455 auto row_start = source.Read<idx_t>();
456 auto tuple_count = source.Read<idx_t>();
457 auto block_pointer_block_id = source.Read<block_id_t>();
458 auto block_pointer_offset = source.Read<uint32_t>();
459 auto compression_type = source.Read<CompressionType>();
460 auto segment_stats = BaseStatistics::Deserialize(source, type);
461 if (stats) {
462 stats->statistics.Merge(other: segment_stats);
463 }
464
465 DataPointer data_pointer(std::move(segment_stats));
466 data_pointer.row_start = row_start;
467 data_pointer.tuple_count = tuple_count;
468 data_pointer.block_pointer.block_id = block_pointer_block_id;
469 data_pointer.block_pointer.offset = block_pointer_offset;
470 data_pointer.compression_type = compression_type;
471
472 this->count += tuple_count;
473
474 // create a persistent segment
475 auto segment = ColumnSegment::CreatePersistentSegment(
476 db&: GetDatabase(), block_manager, id: data_pointer.block_pointer.block_id, offset: data_pointer.block_pointer.offset, type_p: type,
477 start: data_pointer.row_start, count: data_pointer.tuple_count, compression_type: data_pointer.compression_type,
478 statistics: std::move(data_pointer.statistics));
479 data.AppendSegment(segment: std::move(segment));
480 }
481}
482
483shared_ptr<ColumnData> ColumnData::Deserialize(BlockManager &block_manager, DataTableInfo &info, idx_t column_index,
484 idx_t start_row, Deserializer &source, const LogicalType &type,
485 optional_ptr<ColumnData> parent) {
486 auto entry = ColumnData::CreateColumn(block_manager, info, column_index, start_row, type, parent);
487 entry->DeserializeColumn(source);
488 return entry;
489}
490
491void ColumnData::GetColumnSegmentInfo(idx_t row_group_index, vector<idx_t> col_path,
492 vector<ColumnSegmentInfo> &result) {
493 D_ASSERT(!col_path.empty());
494
495 // convert the column path to a string
496 string col_path_str = "[";
497 for (idx_t i = 0; i < col_path.size(); i++) {
498 if (i > 0) {
499 col_path_str += ", ";
500 }
501 col_path_str += to_string(val: col_path[i]);
502 }
503 col_path_str += "]";
504
505 // iterate over the segments
506 idx_t segment_idx = 0;
507 auto segment = (ColumnSegment *)data.GetRootSegment();
508 while (segment) {
509 ColumnSegmentInfo column_info;
510 column_info.row_group_index = row_group_index;
511 column_info.column_id = col_path[0];
512 column_info.column_path = col_path_str;
513 column_info.segment_idx = segment_idx;
514 column_info.segment_type = type.ToString();
515 column_info.segment_start = segment->start;
516 column_info.segment_count = segment->count;
517 column_info.compression_type = CompressionTypeToString(type: segment->function.get().type);
518 column_info.segment_stats = segment->stats.statistics.ToString();
519 {
520 lock_guard<mutex> ulock(update_lock);
521 column_info.has_updates = updates ? true : false;
522 }
523 // persistent
524 // block_id
525 // block_offset
526 if (segment->segment_type == ColumnSegmentType::PERSISTENT) {
527 column_info.persistent = true;
528 column_info.block_id = segment->GetBlockId();
529 column_info.block_offset = segment->GetBlockOffset();
530 } else {
531 column_info.persistent = false;
532 }
533 result.emplace_back(args&: column_info);
534
535 segment_idx++;
536 segment = (ColumnSegment *)data.GetNextSegment(segment);
537 }
538}
539
540void ColumnData::Verify(RowGroup &parent) {
541#ifdef DEBUG
542 D_ASSERT(this->start == parent.start);
543 data.Verify();
544 if (type.InternalType() == PhysicalType::STRUCT) {
545 // structs don't have segments
546 D_ASSERT(!data.GetRootSegment());
547 return;
548 }
549 idx_t current_index = 0;
550 idx_t current_start = this->start;
551 idx_t total_count = 0;
552 for (auto &segment : data.Segments()) {
553 D_ASSERT(segment.index == current_index);
554 D_ASSERT(segment.start == current_start);
555 current_start += segment.count;
556 total_count += segment.count;
557 current_index++;
558 }
559 D_ASSERT(this->count == total_count);
560#endif
561}
562
563template <class RET, class OP>
564static RET CreateColumnInternal(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, idx_t start_row,
565 const LogicalType &type, optional_ptr<ColumnData> parent) {
566 if (type.InternalType() == PhysicalType::STRUCT) {
567 return OP::template Create<StructColumnData>(block_manager, info, column_index, start_row, type, parent);
568 } else if (type.InternalType() == PhysicalType::LIST) {
569 return OP::template Create<ListColumnData>(block_manager, info, column_index, start_row, type, parent);
570 } else if (type.id() == LogicalTypeId::VALIDITY) {
571 return OP::template Create<ValidityColumnData>(block_manager, info, column_index, start_row, *parent);
572 }
573 return OP::template Create<StandardColumnData>(block_manager, info, column_index, start_row, type, parent);
574}
575
576shared_ptr<ColumnData> ColumnData::CreateColumn(BlockManager &block_manager, DataTableInfo &info, idx_t column_index,
577 idx_t start_row, const LogicalType &type,
578 optional_ptr<ColumnData> parent) {
579 return CreateColumnInternal<shared_ptr<ColumnData>, SharedConstructor>(block_manager, info, column_index, start_row,
580 type, parent);
581}
582
583unique_ptr<ColumnData> ColumnData::CreateColumnUnique(BlockManager &block_manager, DataTableInfo &info,
584 idx_t column_index, idx_t start_row, const LogicalType &type,
585 optional_ptr<ColumnData> parent) {
586 return CreateColumnInternal<unique_ptr<ColumnData>, UniqueConstructor>(block_manager, info, column_index, start_row,
587 type, parent);
588}
589
590} // namespace duckdb
591