| 1 | #include "duckdb/storage/table/column_data.hpp" |
| 2 | |
| 3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 4 | #include "duckdb/function/compression_function.hpp" |
| 5 | #include "duckdb/planner/table_filter.hpp" |
| 6 | #include "duckdb/storage/data_pointer.hpp" |
| 7 | #include "duckdb/storage/data_table.hpp" |
| 8 | #include "duckdb/storage/statistics/distinct_statistics.hpp" |
| 9 | #include "duckdb/storage/storage_manager.hpp" |
| 10 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
| 11 | #include "duckdb/storage/table/list_column_data.hpp" |
| 12 | #include "duckdb/storage/table/standard_column_data.hpp" |
| 13 | |
| 14 | #include "duckdb/storage/table/struct_column_data.hpp" |
| 15 | #include "duckdb/storage/table/update_segment.hpp" |
| 16 | #include "duckdb/storage/table_storage_info.hpp" |
| 17 | #include "duckdb/storage/table/append_state.hpp" |
| 18 | #include "duckdb/storage/table/scan_state.hpp" |
| 19 | #include "duckdb/main/attached_database.hpp" |
| 20 | |
| 21 | namespace duckdb { |
| 22 | |
| 23 | ColumnData::ColumnData(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, idx_t start_row, |
| 24 | LogicalType type_p, optional_ptr<ColumnData> parent) |
| 25 | : start(start_row), count(0), block_manager(block_manager), info(info), column_index(column_index), |
| 26 | type(std::move(type_p)), parent(parent), version(0) { |
| 27 | if (!parent) { |
| 28 | stats = make_uniq<SegmentStatistics>(args&: type); |
| 29 | } |
| 30 | } |
| 31 | |
| 32 | ColumnData::~ColumnData() { |
| 33 | } |
| 34 | |
| 35 | void ColumnData::SetStart(idx_t new_start) { |
| 36 | this->start = new_start; |
| 37 | idx_t offset = 0; |
| 38 | for (auto &segment : data.Segments()) { |
| 39 | segment.start = start + offset; |
| 40 | offset += segment.count; |
| 41 | } |
| 42 | data.Reinitialize(); |
| 43 | } |
| 44 | |
| 45 | DatabaseInstance &ColumnData::GetDatabase() const { |
| 46 | return info.db.GetDatabase(); |
| 47 | } |
| 48 | |
| 49 | DataTableInfo &ColumnData::GetTableInfo() const { |
| 50 | return info; |
| 51 | } |
| 52 | |
| 53 | const LogicalType &ColumnData::RootType() const { |
| 54 | if (parent) { |
| 55 | return parent->RootType(); |
| 56 | } |
| 57 | return type; |
| 58 | } |
| 59 | |
| 60 | void ColumnData::IncrementVersion() { |
| 61 | version++; |
| 62 | } |
| 63 | |
| 64 | idx_t ColumnData::GetMaxEntry() { |
| 65 | return count; |
| 66 | } |
| 67 | |
| 68 | void ColumnData::InitializeScan(ColumnScanState &state) { |
| 69 | state.current = data.GetRootSegment(); |
| 70 | state.segment_tree = &data; |
| 71 | state.row_index = state.current ? state.current->start : 0; |
| 72 | state.internal_index = state.row_index; |
| 73 | state.initialized = false; |
| 74 | state.version = version; |
| 75 | state.scan_state.reset(); |
| 76 | state.last_offset = 0; |
| 77 | } |
| 78 | |
| 79 | void ColumnData::InitializeScanWithOffset(ColumnScanState &state, idx_t row_idx) { |
| 80 | state.current = data.GetSegment(row_number: row_idx); |
| 81 | state.segment_tree = &data; |
| 82 | state.row_index = row_idx; |
| 83 | state.internal_index = state.current->start; |
| 84 | state.initialized = false; |
| 85 | state.version = version; |
| 86 | state.scan_state.reset(); |
| 87 | state.last_offset = 0; |
| 88 | } |
| 89 | |
| 90 | idx_t ColumnData::ScanVector(ColumnScanState &state, Vector &result, idx_t remaining) { |
| 91 | state.previous_states.clear(); |
| 92 | if (state.version != version) { |
| 93 | InitializeScanWithOffset(state, row_idx: state.row_index); |
| 94 | state.current->InitializeScan(state); |
| 95 | state.initialized = true; |
| 96 | } else if (!state.initialized) { |
| 97 | D_ASSERT(state.current); |
| 98 | state.current->InitializeScan(state); |
| 99 | state.internal_index = state.current->start; |
| 100 | state.initialized = true; |
| 101 | } |
| 102 | D_ASSERT(data.HasSegment(state.current)); |
| 103 | D_ASSERT(state.version == version); |
| 104 | D_ASSERT(state.internal_index <= state.row_index); |
| 105 | if (state.internal_index < state.row_index) { |
| 106 | state.current->Skip(state); |
| 107 | } |
| 108 | D_ASSERT(state.current->type == type); |
| 109 | idx_t initial_remaining = remaining; |
| 110 | while (remaining > 0) { |
| 111 | D_ASSERT(state.row_index >= state.current->start && |
| 112 | state.row_index <= state.current->start + state.current->count); |
| 113 | idx_t scan_count = MinValue<idx_t>(a: remaining, b: state.current->start + state.current->count - state.row_index); |
| 114 | idx_t result_offset = initial_remaining - remaining; |
| 115 | if (scan_count > 0) { |
| 116 | state.current->Scan(state, scan_count, result, result_offset, entire_vector: scan_count == initial_remaining); |
| 117 | |
| 118 | state.row_index += scan_count; |
| 119 | remaining -= scan_count; |
| 120 | } |
| 121 | |
| 122 | if (remaining > 0) { |
| 123 | auto next = data.GetNextSegment(segment: state.current); |
| 124 | if (!next) { |
| 125 | break; |
| 126 | } |
| 127 | state.previous_states.emplace_back(args: std::move(state.scan_state)); |
| 128 | state.current = next; |
| 129 | state.current->InitializeScan(state); |
| 130 | state.segment_checked = false; |
| 131 | D_ASSERT(state.row_index >= state.current->start && |
| 132 | state.row_index <= state.current->start + state.current->count); |
| 133 | } |
| 134 | } |
| 135 | state.internal_index = state.row_index; |
| 136 | return initial_remaining - remaining; |
| 137 | } |
| 138 | |
| 139 | template <bool SCAN_COMMITTED, bool ALLOW_UPDATES> |
| 140 | idx_t ColumnData::ScanVector(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result) { |
| 141 | auto scan_count = ScanVector(state, result, STANDARD_VECTOR_SIZE); |
| 142 | |
| 143 | lock_guard<mutex> update_guard(update_lock); |
| 144 | if (updates) { |
| 145 | if (!ALLOW_UPDATES && updates->HasUncommittedUpdates(vector_index)) { |
| 146 | throw TransactionException("Cannot create index with outstanding updates" ); |
| 147 | } |
| 148 | result.Flatten(count: scan_count); |
| 149 | if (SCAN_COMMITTED) { |
| 150 | updates->FetchCommitted(vector_index, result); |
| 151 | } else { |
| 152 | updates->FetchUpdates(transaction, vector_index, result); |
| 153 | } |
| 154 | } |
| 155 | return scan_count; |
| 156 | } |
| 157 | |
| 158 | template idx_t ColumnData::ScanVector<false, false>(TransactionData transaction, idx_t vector_index, |
| 159 | ColumnScanState &state, Vector &result); |
| 160 | template idx_t ColumnData::ScanVector<true, false>(TransactionData transaction, idx_t vector_index, |
| 161 | ColumnScanState &state, Vector &result); |
| 162 | template idx_t ColumnData::ScanVector<false, true>(TransactionData transaction, idx_t vector_index, |
| 163 | ColumnScanState &state, Vector &result); |
| 164 | template idx_t ColumnData::ScanVector<true, true>(TransactionData transaction, idx_t vector_index, |
| 165 | ColumnScanState &state, Vector &result); |
| 166 | |
| 167 | idx_t ColumnData::Scan(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result) { |
| 168 | return ScanVector<false, true>(transaction, vector_index, state, result); |
| 169 | } |
| 170 | |
| 171 | idx_t ColumnData::ScanCommitted(idx_t vector_index, ColumnScanState &state, Vector &result, bool allow_updates) { |
| 172 | if (allow_updates) { |
| 173 | return ScanVector<true, true>(transaction: TransactionData(0, 0), vector_index, state, result); |
| 174 | } else { |
| 175 | return ScanVector<true, false>(transaction: TransactionData(0, 0), vector_index, state, result); |
| 176 | } |
| 177 | } |
| 178 | |
| 179 | void ColumnData::ScanCommittedRange(idx_t row_group_start, idx_t offset_in_row_group, idx_t count, Vector &result) { |
| 180 | ColumnScanState child_state; |
| 181 | InitializeScanWithOffset(state&: child_state, row_idx: row_group_start + offset_in_row_group); |
| 182 | auto scan_count = ScanVector(state&: child_state, result, remaining: count); |
| 183 | if (updates) { |
| 184 | result.Flatten(count: scan_count); |
| 185 | updates->FetchCommittedRange(start_row: offset_in_row_group, count, result); |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | idx_t ColumnData::ScanCount(ColumnScanState &state, Vector &result, idx_t count) { |
| 190 | if (count == 0) { |
| 191 | return 0; |
| 192 | } |
| 193 | // ScanCount can only be used if there are no updates |
| 194 | D_ASSERT(!updates); |
| 195 | return ScanVector(state, result, remaining: count); |
| 196 | } |
| 197 | |
| 198 | void ColumnData::Select(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result, |
| 199 | SelectionVector &sel, idx_t &count, const TableFilter &filter) { |
| 200 | idx_t scan_count = Scan(transaction, vector_index, state, result); |
| 201 | result.Flatten(count: scan_count); |
| 202 | ColumnSegment::FilterSelection(sel, result, filter, approved_tuple_count&: count, mask&: FlatVector::Validity(vector&: result)); |
| 203 | } |
| 204 | |
| 205 | void ColumnData::FilterScan(TransactionData transaction, idx_t vector_index, ColumnScanState &state, Vector &result, |
| 206 | SelectionVector &sel, idx_t count) { |
| 207 | Scan(transaction, vector_index, state, result); |
| 208 | result.Slice(sel, count); |
| 209 | } |
| 210 | |
| 211 | void ColumnData::FilterScanCommitted(idx_t vector_index, ColumnScanState &state, Vector &result, SelectionVector &sel, |
| 212 | idx_t count, bool allow_updates) { |
| 213 | ScanCommitted(vector_index, state, result, allow_updates); |
| 214 | result.Slice(sel, count); |
| 215 | } |
| 216 | |
| 217 | void ColumnData::Skip(ColumnScanState &state, idx_t count) { |
| 218 | state.Next(count); |
| 219 | } |
| 220 | |
| 221 | void ColumnData::Append(BaseStatistics &stats, ColumnAppendState &state, Vector &vector, idx_t count) { |
| 222 | UnifiedVectorFormat vdata; |
| 223 | vector.ToUnifiedFormat(count, data&: vdata); |
| 224 | AppendData(stats, state, vdata, count); |
| 225 | } |
| 226 | |
| 227 | void ColumnData::Append(ColumnAppendState &state, Vector &vector, idx_t count) { |
| 228 | if (parent || !stats) { |
| 229 | throw InternalException("ColumnData::Append called on a column with a parent or without stats" ); |
| 230 | } |
| 231 | Append(stats&: stats->statistics, state, vector, count); |
| 232 | } |
| 233 | |
| 234 | bool ColumnData::CheckZonemap(TableFilter &filter) { |
| 235 | if (!stats) { |
| 236 | throw InternalException("ColumnData::CheckZonemap called on a column without stats" ); |
| 237 | } |
| 238 | auto propagate_result = filter.CheckStatistics(stats&: stats->statistics); |
| 239 | if (propagate_result == FilterPropagateResult::FILTER_ALWAYS_FALSE || |
| 240 | propagate_result == FilterPropagateResult::FILTER_FALSE_OR_NULL) { |
| 241 | return false; |
| 242 | } |
| 243 | return true; |
| 244 | } |
| 245 | |
| 246 | unique_ptr<BaseStatistics> ColumnData::GetStatistics() { |
| 247 | if (!stats) { |
| 248 | throw InternalException("ColumnData::GetStatistics called on a column without stats" ); |
| 249 | } |
| 250 | return stats->statistics.ToUnique(); |
| 251 | } |
| 252 | |
| 253 | void ColumnData::MergeStatistics(const BaseStatistics &other) { |
| 254 | if (!stats) { |
| 255 | throw InternalException("ColumnData::MergeStatistics called on a column without stats" ); |
| 256 | } |
| 257 | return stats->statistics.Merge(other); |
| 258 | } |
| 259 | |
| 260 | void ColumnData::MergeIntoStatistics(BaseStatistics &other) { |
| 261 | if (!stats) { |
| 262 | throw InternalException("ColumnData::MergeIntoStatistics called on a column without stats" ); |
| 263 | } |
| 264 | return other.Merge(other: stats->statistics); |
| 265 | } |
| 266 | |
| 267 | void ColumnData::InitializeAppend(ColumnAppendState &state) { |
| 268 | auto l = data.Lock(); |
| 269 | if (data.IsEmpty(l)) { |
| 270 | // no segments yet, append an empty segment |
| 271 | AppendTransientSegment(l, start_row: start); |
| 272 | } |
| 273 | auto segment = data.GetLastSegment(l); |
| 274 | if (segment->segment_type == ColumnSegmentType::PERSISTENT || !segment->function.get().init_append) { |
| 275 | // we cannot append to this segment - append a new segment |
| 276 | auto total_rows = segment->start + segment->count; |
| 277 | AppendTransientSegment(l, start_row: total_rows); |
| 278 | state.current = data.GetLastSegment(l); |
| 279 | } else { |
| 280 | state.current = segment; |
| 281 | } |
| 282 | |
| 283 | D_ASSERT(state.current->segment_type == ColumnSegmentType::TRANSIENT); |
| 284 | state.current->InitializeAppend(state); |
| 285 | D_ASSERT(state.current->function.get().append); |
| 286 | } |
| 287 | |
| 288 | void ColumnData::AppendData(BaseStatistics &stats, ColumnAppendState &state, UnifiedVectorFormat &vdata, idx_t count) { |
| 289 | idx_t offset = 0; |
| 290 | this->count += count; |
| 291 | while (true) { |
| 292 | // append the data from the vector |
| 293 | idx_t copied_elements = state.current->Append(state, data&: vdata, offset, count); |
| 294 | stats.Merge(other: state.current->stats.statistics); |
| 295 | if (copied_elements == count) { |
| 296 | // finished copying everything |
| 297 | break; |
| 298 | } |
| 299 | |
| 300 | // we couldn't fit everything we wanted in the current column segment, create a new one |
| 301 | { |
| 302 | auto l = data.Lock(); |
| 303 | AppendTransientSegment(l, start_row: state.current->start + state.current->count); |
| 304 | state.current = data.GetLastSegment(l); |
| 305 | state.current->InitializeAppend(state); |
| 306 | } |
| 307 | offset += copied_elements; |
| 308 | count -= copied_elements; |
| 309 | } |
| 310 | } |
| 311 | |
| 312 | void ColumnData::RevertAppend(row_t start_row) { |
| 313 | auto l = data.Lock(); |
| 314 | // check if this row is in the segment tree at all |
| 315 | auto last_segment = data.GetLastSegment(l); |
| 316 | if (idx_t(start_row) >= last_segment->start + last_segment->count) { |
| 317 | // the start row is equal to the final portion of the column data: nothing was ever appended here |
| 318 | D_ASSERT(idx_t(start_row) == last_segment->start + last_segment->count); |
| 319 | return; |
| 320 | } |
| 321 | // find the segment index that the current row belongs to |
| 322 | idx_t segment_index = data.GetSegmentIndex(l, row_number: start_row); |
| 323 | auto segment = data.GetSegmentByIndex(l, index: segment_index); |
| 324 | auto &transient = *segment; |
| 325 | D_ASSERT(transient.segment_type == ColumnSegmentType::TRANSIENT); |
| 326 | |
| 327 | // remove any segments AFTER this segment: they should be deleted entirely |
| 328 | data.EraseSegments(l, segment_start: segment_index); |
| 329 | |
| 330 | this->count = start_row - this->start; |
| 331 | segment->next = nullptr; |
| 332 | transient.RevertAppend(start_row); |
| 333 | } |
| 334 | |
| 335 | idx_t ColumnData::Fetch(ColumnScanState &state, row_t row_id, Vector &result) { |
| 336 | D_ASSERT(row_id >= 0); |
| 337 | D_ASSERT(idx_t(row_id) >= start); |
| 338 | // perform the fetch within the segment |
| 339 | state.row_index = start + ((row_id - start) / STANDARD_VECTOR_SIZE * STANDARD_VECTOR_SIZE); |
| 340 | state.current = data.GetSegment(row_number: state.row_index); |
| 341 | state.internal_index = state.current->start; |
| 342 | return ScanVector(state, result, STANDARD_VECTOR_SIZE); |
| 343 | } |
| 344 | |
| 345 | void ColumnData::FetchRow(TransactionData transaction, ColumnFetchState &state, row_t row_id, Vector &result, |
| 346 | idx_t result_idx) { |
| 347 | auto segment = data.GetSegment(row_number: row_id); |
| 348 | |
| 349 | // now perform the fetch within the segment |
| 350 | segment->FetchRow(state, row_id, result, result_idx); |
| 351 | // merge any updates made to this row |
| 352 | lock_guard<mutex> update_guard(update_lock); |
| 353 | if (updates) { |
| 354 | updates->FetchRow(transaction, row_id, result, result_idx); |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | void ColumnData::Update(TransactionData transaction, idx_t column_index, Vector &update_vector, row_t *row_ids, |
| 359 | idx_t update_count) { |
| 360 | lock_guard<mutex> update_guard(update_lock); |
| 361 | if (!updates) { |
| 362 | updates = make_uniq<UpdateSegment>(args&: *this); |
| 363 | } |
| 364 | Vector base_vector(type); |
| 365 | ColumnScanState state; |
| 366 | auto fetch_count = Fetch(state, row_id: row_ids[0], result&: base_vector); |
| 367 | |
| 368 | base_vector.Flatten(count: fetch_count); |
| 369 | updates->Update(transaction, column_index, update&: update_vector, ids: row_ids, count: update_count, base_data&: base_vector); |
| 370 | } |
| 371 | |
| 372 | void ColumnData::UpdateColumn(TransactionData transaction, const vector<column_t> &column_path, Vector &update_vector, |
| 373 | row_t *row_ids, idx_t update_count, idx_t depth) { |
| 374 | // this method should only be called at the end of the path in the base column case |
| 375 | D_ASSERT(depth >= column_path.size()); |
| 376 | ColumnData::Update(transaction, column_index: column_path[0], update_vector, row_ids, update_count); |
| 377 | } |
| 378 | |
| 379 | unique_ptr<BaseStatistics> ColumnData::GetUpdateStatistics() { |
| 380 | lock_guard<mutex> update_guard(update_lock); |
| 381 | return updates ? updates->GetStatistics() : nullptr; |
| 382 | } |
| 383 | |
| 384 | void ColumnData::AppendTransientSegment(SegmentLock &l, idx_t start_row) { |
| 385 | idx_t segment_size = Storage::BLOCK_SIZE; |
| 386 | if (start_row == idx_t(MAX_ROW_ID)) { |
| 387 | #if STANDARD_VECTOR_SIZE < 1024 |
| 388 | segment_size = 1024 * GetTypeIdSize(type.InternalType()); |
| 389 | #else |
| 390 | segment_size = STANDARD_VECTOR_SIZE * GetTypeIdSize(type: type.InternalType()); |
| 391 | #endif |
| 392 | } |
| 393 | auto new_segment = ColumnSegment::CreateTransientSegment(db&: GetDatabase(), type, start: start_row, segment_size); |
| 394 | data.AppendSegment(l, segment: std::move(new_segment)); |
| 395 | } |
| 396 | |
| 397 | void ColumnData::CommitDropColumn() { |
| 398 | for (auto &segment_p : data.Segments()) { |
| 399 | auto &segment = segment_p; |
| 400 | if (segment.segment_type == ColumnSegmentType::PERSISTENT) { |
| 401 | auto block_id = segment.GetBlockId(); |
| 402 | if (block_id != INVALID_BLOCK) { |
| 403 | block_manager.MarkBlockAsModified(block_id); |
| 404 | } |
| 405 | } |
| 406 | } |
| 407 | } |
| 408 | |
| 409 | unique_ptr<ColumnCheckpointState> ColumnData::CreateCheckpointState(RowGroup &row_group, |
| 410 | PartialBlockManager &partial_block_manager) { |
| 411 | return make_uniq<ColumnCheckpointState>(args&: row_group, args&: *this, args&: partial_block_manager); |
| 412 | } |
| 413 | |
| 414 | void ColumnData::CheckpointScan(ColumnSegment &segment, ColumnScanState &state, idx_t row_group_start, idx_t count, |
| 415 | Vector &scan_vector) { |
| 416 | segment.Scan(state, scan_count: count, result&: scan_vector, result_offset: 0, entire_vector: true); |
| 417 | if (updates) { |
| 418 | scan_vector.Flatten(count); |
| 419 | updates->FetchCommittedRange(start_row: state.row_index - row_group_start, count, result&: scan_vector); |
| 420 | } |
| 421 | } |
| 422 | |
| 423 | unique_ptr<ColumnCheckpointState> ColumnData::Checkpoint(RowGroup &row_group, |
| 424 | PartialBlockManager &partial_block_manager, |
| 425 | ColumnCheckpointInfo &checkpoint_info) { |
| 426 | // scan the segments of the column data |
| 427 | // set up the checkpoint state |
| 428 | auto checkpoint_state = CreateCheckpointState(row_group, partial_block_manager); |
| 429 | checkpoint_state->global_stats = BaseStatistics::CreateEmpty(type).ToUnique(); |
| 430 | |
| 431 | auto l = data.Lock(); |
| 432 | auto nodes = data.MoveSegments(l); |
| 433 | if (nodes.empty()) { |
| 434 | // empty table: flush the empty list |
| 435 | return checkpoint_state; |
| 436 | } |
| 437 | lock_guard<mutex> update_guard(update_lock); |
| 438 | |
| 439 | ColumnDataCheckpointer checkpointer(*this, row_group, *checkpoint_state, checkpoint_info); |
| 440 | checkpointer.Checkpoint(nodes: std::move(nodes)); |
| 441 | |
| 442 | // replace the old tree with the new one |
| 443 | data.Replace(l, other&: checkpoint_state->new_tree); |
| 444 | version++; |
| 445 | |
| 446 | return checkpoint_state; |
| 447 | } |
| 448 | |
| 449 | void ColumnData::DeserializeColumn(Deserializer &source) { |
| 450 | // load the data pointers for the column |
| 451 | this->count = 0; |
| 452 | idx_t data_pointer_count = source.Read<idx_t>(); |
| 453 | for (idx_t data_ptr = 0; data_ptr < data_pointer_count; data_ptr++) { |
| 454 | // read the data pointer |
| 455 | auto row_start = source.Read<idx_t>(); |
| 456 | auto tuple_count = source.Read<idx_t>(); |
| 457 | auto block_pointer_block_id = source.Read<block_id_t>(); |
| 458 | auto block_pointer_offset = source.Read<uint32_t>(); |
| 459 | auto compression_type = source.Read<CompressionType>(); |
| 460 | auto segment_stats = BaseStatistics::Deserialize(source, type); |
| 461 | if (stats) { |
| 462 | stats->statistics.Merge(other: segment_stats); |
| 463 | } |
| 464 | |
| 465 | DataPointer data_pointer(std::move(segment_stats)); |
| 466 | data_pointer.row_start = row_start; |
| 467 | data_pointer.tuple_count = tuple_count; |
| 468 | data_pointer.block_pointer.block_id = block_pointer_block_id; |
| 469 | data_pointer.block_pointer.offset = block_pointer_offset; |
| 470 | data_pointer.compression_type = compression_type; |
| 471 | |
| 472 | this->count += tuple_count; |
| 473 | |
| 474 | // create a persistent segment |
| 475 | auto segment = ColumnSegment::CreatePersistentSegment( |
| 476 | db&: GetDatabase(), block_manager, id: data_pointer.block_pointer.block_id, offset: data_pointer.block_pointer.offset, type_p: type, |
| 477 | start: data_pointer.row_start, count: data_pointer.tuple_count, compression_type: data_pointer.compression_type, |
| 478 | statistics: std::move(data_pointer.statistics)); |
| 479 | data.AppendSegment(segment: std::move(segment)); |
| 480 | } |
| 481 | } |
| 482 | |
| 483 | shared_ptr<ColumnData> ColumnData::Deserialize(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, |
| 484 | idx_t start_row, Deserializer &source, const LogicalType &type, |
| 485 | optional_ptr<ColumnData> parent) { |
| 486 | auto entry = ColumnData::CreateColumn(block_manager, info, column_index, start_row, type, parent); |
| 487 | entry->DeserializeColumn(source); |
| 488 | return entry; |
| 489 | } |
| 490 | |
| 491 | void ColumnData::GetColumnSegmentInfo(idx_t row_group_index, vector<idx_t> col_path, |
| 492 | vector<ColumnSegmentInfo> &result) { |
| 493 | D_ASSERT(!col_path.empty()); |
| 494 | |
| 495 | // convert the column path to a string |
| 496 | string col_path_str = "[" ; |
| 497 | for (idx_t i = 0; i < col_path.size(); i++) { |
| 498 | if (i > 0) { |
| 499 | col_path_str += ", " ; |
| 500 | } |
| 501 | col_path_str += to_string(val: col_path[i]); |
| 502 | } |
| 503 | col_path_str += "]" ; |
| 504 | |
| 505 | // iterate over the segments |
| 506 | idx_t segment_idx = 0; |
| 507 | auto segment = (ColumnSegment *)data.GetRootSegment(); |
| 508 | while (segment) { |
| 509 | ColumnSegmentInfo column_info; |
| 510 | column_info.row_group_index = row_group_index; |
| 511 | column_info.column_id = col_path[0]; |
| 512 | column_info.column_path = col_path_str; |
| 513 | column_info.segment_idx = segment_idx; |
| 514 | column_info.segment_type = type.ToString(); |
| 515 | column_info.segment_start = segment->start; |
| 516 | column_info.segment_count = segment->count; |
| 517 | column_info.compression_type = CompressionTypeToString(type: segment->function.get().type); |
| 518 | column_info.segment_stats = segment->stats.statistics.ToString(); |
| 519 | { |
| 520 | lock_guard<mutex> ulock(update_lock); |
| 521 | column_info.has_updates = updates ? true : false; |
| 522 | } |
| 523 | // persistent |
| 524 | // block_id |
| 525 | // block_offset |
| 526 | if (segment->segment_type == ColumnSegmentType::PERSISTENT) { |
| 527 | column_info.persistent = true; |
| 528 | column_info.block_id = segment->GetBlockId(); |
| 529 | column_info.block_offset = segment->GetBlockOffset(); |
| 530 | } else { |
| 531 | column_info.persistent = false; |
| 532 | } |
| 533 | result.emplace_back(args&: column_info); |
| 534 | |
| 535 | segment_idx++; |
| 536 | segment = (ColumnSegment *)data.GetNextSegment(segment); |
| 537 | } |
| 538 | } |
| 539 | |
| 540 | void ColumnData::Verify(RowGroup &parent) { |
| 541 | #ifdef DEBUG |
| 542 | D_ASSERT(this->start == parent.start); |
| 543 | data.Verify(); |
| 544 | if (type.InternalType() == PhysicalType::STRUCT) { |
| 545 | // structs don't have segments |
| 546 | D_ASSERT(!data.GetRootSegment()); |
| 547 | return; |
| 548 | } |
| 549 | idx_t current_index = 0; |
| 550 | idx_t current_start = this->start; |
| 551 | idx_t total_count = 0; |
| 552 | for (auto &segment : data.Segments()) { |
| 553 | D_ASSERT(segment.index == current_index); |
| 554 | D_ASSERT(segment.start == current_start); |
| 555 | current_start += segment.count; |
| 556 | total_count += segment.count; |
| 557 | current_index++; |
| 558 | } |
| 559 | D_ASSERT(this->count == total_count); |
| 560 | #endif |
| 561 | } |
| 562 | |
| 563 | template <class RET, class OP> |
| 564 | static RET CreateColumnInternal(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, idx_t start_row, |
| 565 | const LogicalType &type, optional_ptr<ColumnData> parent) { |
| 566 | if (type.InternalType() == PhysicalType::STRUCT) { |
| 567 | return OP::template Create<StructColumnData>(block_manager, info, column_index, start_row, type, parent); |
| 568 | } else if (type.InternalType() == PhysicalType::LIST) { |
| 569 | return OP::template Create<ListColumnData>(block_manager, info, column_index, start_row, type, parent); |
| 570 | } else if (type.id() == LogicalTypeId::VALIDITY) { |
| 571 | return OP::template Create<ValidityColumnData>(block_manager, info, column_index, start_row, *parent); |
| 572 | } |
| 573 | return OP::template Create<StandardColumnData>(block_manager, info, column_index, start_row, type, parent); |
| 574 | } |
| 575 | |
| 576 | shared_ptr<ColumnData> ColumnData::CreateColumn(BlockManager &block_manager, DataTableInfo &info, idx_t column_index, |
| 577 | idx_t start_row, const LogicalType &type, |
| 578 | optional_ptr<ColumnData> parent) { |
| 579 | return CreateColumnInternal<shared_ptr<ColumnData>, SharedConstructor>(block_manager, info, column_index, start_row, |
| 580 | type, parent); |
| 581 | } |
| 582 | |
| 583 | unique_ptr<ColumnData> ColumnData::CreateColumnUnique(BlockManager &block_manager, DataTableInfo &info, |
| 584 | idx_t column_index, idx_t start_row, const LogicalType &type, |
| 585 | optional_ptr<ColumnData> parent) { |
| 586 | return CreateColumnInternal<unique_ptr<ColumnData>, UniqueConstructor>(block_manager, info, column_index, start_row, |
| 587 | type, parent); |
| 588 | } |
| 589 | |
| 590 | } // namespace duckdb |
| 591 | |