| 1 | #include "duckdb/storage/table/column_data.hpp" |
| 2 | #include "duckdb/storage/table/column_checkpoint_state.hpp" |
| 3 | #include "duckdb/storage/table/column_segment.hpp" |
| 4 | #include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp" |
| 5 | #include "duckdb/storage/table/row_group.hpp" |
| 6 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
| 7 | |
| 8 | #include "duckdb/main/config.hpp" |
| 9 | |
| 10 | namespace duckdb { |
| 11 | |
| 12 | ColumnCheckpointState::ColumnCheckpointState(RowGroup &row_group, ColumnData &column_data, |
| 13 | PartialBlockManager &partial_block_manager) |
| 14 | : row_group(row_group), column_data(column_data), partial_block_manager(partial_block_manager) { |
| 15 | } |
| 16 | |
| 17 | ColumnCheckpointState::~ColumnCheckpointState() { |
| 18 | } |
| 19 | |
| 20 | unique_ptr<BaseStatistics> ColumnCheckpointState::GetStatistics() { |
| 21 | D_ASSERT(global_stats); |
| 22 | return std::move(global_stats); |
| 23 | } |
| 24 | |
| 25 | PartialBlockForCheckpoint::PartialBlockForCheckpoint(ColumnData &data, ColumnSegment &segment, |
| 26 | BlockManager &block_manager, PartialBlockState state) |
| 27 | : PartialBlock(state), block_manager(block_manager), block(segment.block) { |
| 28 | AddSegmentToTail(data, segment, offset_in_block: 0); |
| 29 | } |
| 30 | |
| 31 | PartialBlockForCheckpoint::~PartialBlockForCheckpoint() { |
| 32 | D_ASSERT(IsFlushed() || Exception::UncaughtException()); |
| 33 | } |
| 34 | |
| 35 | bool PartialBlockForCheckpoint::IsFlushed() { |
| 36 | // segments are cleared on Flush |
| 37 | return segments.empty(); |
| 38 | } |
| 39 | |
| 40 | void PartialBlockForCheckpoint::AddUninitializedRegion(idx_t start, idx_t end) { |
| 41 | uninitialized_regions.push_back(x: {.start: start, .end: end}); |
| 42 | } |
| 43 | |
| 44 | void PartialBlockForCheckpoint::Flush(idx_t free_space_left) { |
| 45 | if (IsFlushed()) { |
| 46 | throw InternalException("Flush called on partial block that was already flushed" ); |
| 47 | } |
| 48 | // if we have any free space or uninitialized regions we need to zero-initialize them |
| 49 | if (free_space_left > 0 || !uninitialized_regions.empty()) { |
| 50 | auto handle = block_manager.buffer_manager.Pin(handle&: block); |
| 51 | // memset any uninitialized regions |
| 52 | for (auto &uninitialized : uninitialized_regions) { |
| 53 | memset(s: handle.Ptr() + uninitialized.start, c: 0, n: uninitialized.end - uninitialized.start); |
| 54 | } |
| 55 | // memset any free space at the end of the block to 0 prior to writing to disk |
| 56 | memset(s: handle.Ptr() + Storage::BLOCK_SIZE - free_space_left, c: 0, n: free_space_left); |
| 57 | } |
| 58 | // At this point, we've already copied all data from tail_segments |
| 59 | // into the page owned by first_segment. We flush all segment data to |
| 60 | // disk with the following call. |
| 61 | // persist the first segment to disk and point the remaining segments to the same block |
| 62 | bool fetch_new_block = state.block_id == INVALID_BLOCK; |
| 63 | if (fetch_new_block) { |
| 64 | state.block_id = block_manager.GetFreeBlockId(); |
| 65 | } |
| 66 | for (idx_t i = 0; i < segments.size(); i++) { |
| 67 | auto &segment = segments[i]; |
| 68 | segment.data.IncrementVersion(); |
| 69 | if (i == 0) { |
| 70 | // the first segment is converted to persistent - this writes the data for ALL segments to disk |
| 71 | D_ASSERT(segment.offset_in_block == 0); |
| 72 | segment.segment.ConvertToPersistent(block_manager: &block_manager, block_id: state.block_id); |
| 73 | // update the block after it has been converted to a persistent segment |
| 74 | block = segment.segment.block; |
| 75 | } else { |
| 76 | // subsequent segments are MARKED as persistent - they don't need to be rewritten |
| 77 | segment.segment.MarkAsPersistent(block, offset_in_block: segment.offset_in_block); |
| 78 | if (fetch_new_block) { |
| 79 | // if we fetched a new block we need to increase the reference count to the block |
| 80 | block_manager.IncreaseBlockReferenceCount(block_id: state.block_id); |
| 81 | } |
| 82 | } |
| 83 | } |
| 84 | Clear(); |
| 85 | } |
| 86 | |
| 87 | void PartialBlockForCheckpoint::Clear() { |
| 88 | uninitialized_regions.clear(); |
| 89 | block.reset(); |
| 90 | segments.clear(); |
| 91 | } |
| 92 | |
| 93 | void PartialBlockForCheckpoint::Merge(PartialBlock &other_p, idx_t offset, idx_t other_size) { |
| 94 | auto &other = other_p.Cast<PartialBlockForCheckpoint>(); |
| 95 | |
| 96 | auto &buffer_manager = block_manager.buffer_manager; |
| 97 | // pin the source block |
| 98 | auto old_handle = buffer_manager.Pin(handle&: other.block); |
| 99 | // pin the target block |
| 100 | auto new_handle = buffer_manager.Pin(handle&: block); |
| 101 | // memcpy the contents of the old block to the new block |
| 102 | memcpy(dest: new_handle.Ptr() + offset, src: old_handle.Ptr(), n: other_size); |
| 103 | |
| 104 | // now copy over all of the segments to the new block |
| 105 | // move over the uninitialized regions |
| 106 | for (auto ®ion : other.uninitialized_regions) { |
| 107 | region.start += offset; |
| 108 | region.end += offset; |
| 109 | uninitialized_regions.push_back(x: region); |
| 110 | } |
| 111 | |
| 112 | // move over the segments |
| 113 | for (auto &segment : other.segments) { |
| 114 | AddSegmentToTail(data&: segment.data, segment&: segment.segment, offset_in_block: segment.offset_in_block + offset); |
| 115 | } |
| 116 | other.Clear(); |
| 117 | } |
| 118 | |
| 119 | void PartialBlockForCheckpoint::AddSegmentToTail(ColumnData &data, ColumnSegment &segment, uint32_t offset_in_block) { |
| 120 | segments.emplace_back(args&: data, args&: segment, args&: offset_in_block); |
| 121 | } |
| 122 | |
| 123 | void ColumnCheckpointState::FlushSegment(unique_ptr<ColumnSegment> segment, idx_t segment_size) { |
| 124 | D_ASSERT(segment_size <= Storage::BLOCK_SIZE); |
| 125 | auto tuple_count = segment->count.load(); |
| 126 | if (tuple_count == 0) { // LCOV_EXCL_START |
| 127 | return; |
| 128 | } // LCOV_EXCL_STOP |
| 129 | |
| 130 | // merge the segment stats into the global stats |
| 131 | global_stats->Merge(other: segment->stats.statistics); |
| 132 | |
| 133 | // get the buffer of the segment and pin it |
| 134 | auto &db = column_data.GetDatabase(); |
| 135 | auto &buffer_manager = BufferManager::GetBufferManager(db); |
| 136 | block_id_t block_id = INVALID_BLOCK; |
| 137 | uint32_t offset_in_block = 0; |
| 138 | |
| 139 | if (!segment->stats.statistics.IsConstant()) { |
| 140 | // non-constant block |
| 141 | PartialBlockAllocation allocation = partial_block_manager.GetBlockAllocation(segment_size); |
| 142 | block_id = allocation.state.block_id; |
| 143 | offset_in_block = allocation.state.offset_in_block; |
| 144 | |
| 145 | if (allocation.partial_block) { |
| 146 | // Use an existing block. |
| 147 | D_ASSERT(offset_in_block > 0); |
| 148 | auto &pstate = allocation.partial_block->Cast<PartialBlockForCheckpoint>(); |
| 149 | // pin the source block |
| 150 | auto old_handle = buffer_manager.Pin(handle&: segment->block); |
| 151 | // pin the target block |
| 152 | auto new_handle = buffer_manager.Pin(handle&: pstate.block); |
| 153 | // memcpy the contents of the old block to the new block |
| 154 | memcpy(dest: new_handle.Ptr() + offset_in_block, src: old_handle.Ptr(), n: segment_size); |
| 155 | pstate.AddSegmentToTail(data&: column_data, segment&: *segment, offset_in_block); |
| 156 | } else { |
| 157 | // Create a new block for future reuse. |
| 158 | if (segment->SegmentSize() != Storage::BLOCK_SIZE) { |
| 159 | // the segment is smaller than the block size |
| 160 | // allocate a new block and copy the data over |
| 161 | D_ASSERT(segment->SegmentSize() < Storage::BLOCK_SIZE); |
| 162 | segment->Resize(segment_size: Storage::BLOCK_SIZE); |
| 163 | } |
| 164 | D_ASSERT(offset_in_block == 0); |
| 165 | allocation.partial_block = make_uniq<PartialBlockForCheckpoint>( |
| 166 | args&: column_data, args&: *segment, args&: *allocation.block_manager, args&: allocation.state); |
| 167 | } |
| 168 | // Writer will decide whether to reuse this block. |
| 169 | partial_block_manager.RegisterPartialBlock(allocation: std::move(allocation)); |
| 170 | } else { |
| 171 | // constant block: no need to write anything to disk besides the stats |
| 172 | // set up the compression function to constant |
| 173 | auto &config = DBConfig::GetConfig(db); |
| 174 | segment->function = |
| 175 | *config.GetCompressionFunction(type: CompressionType::COMPRESSION_CONSTANT, data_type: segment->type.InternalType()); |
| 176 | segment->ConvertToPersistent(block_manager: nullptr, INVALID_BLOCK); |
| 177 | } |
| 178 | |
| 179 | // construct the data pointer |
| 180 | DataPointer data_pointer(segment->stats.statistics.Copy()); |
| 181 | data_pointer.block_pointer.block_id = block_id; |
| 182 | data_pointer.block_pointer.offset = offset_in_block; |
| 183 | data_pointer.row_start = row_group.start; |
| 184 | if (!data_pointers.empty()) { |
| 185 | auto &last_pointer = data_pointers.back(); |
| 186 | data_pointer.row_start = last_pointer.row_start + last_pointer.tuple_count; |
| 187 | } |
| 188 | data_pointer.tuple_count = tuple_count; |
| 189 | data_pointer.compression_type = segment->function.get().type; |
| 190 | |
| 191 | // append the segment to the new segment tree |
| 192 | new_tree.AppendSegment(segment: std::move(segment)); |
| 193 | data_pointers.push_back(x: std::move(data_pointer)); |
| 194 | } |
| 195 | |
| 196 | void ColumnCheckpointState::WriteDataPointers(RowGroupWriter &writer) { |
| 197 | writer.WriteColumnDataPointers(column_checkpoint_state&: *this); |
| 198 | } |
| 199 | |
| 200 | } // namespace duckdb |
| 201 | |