1 | #include "duckdb/storage/table/column_data.hpp" |
2 | #include "duckdb/storage/table/column_checkpoint_state.hpp" |
3 | #include "duckdb/storage/table/column_segment.hpp" |
4 | #include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp" |
5 | #include "duckdb/storage/table/row_group.hpp" |
6 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
7 | |
8 | #include "duckdb/main/config.hpp" |
9 | |
10 | namespace duckdb { |
11 | |
12 | ColumnCheckpointState::ColumnCheckpointState(RowGroup &row_group, ColumnData &column_data, |
13 | PartialBlockManager &partial_block_manager) |
14 | : row_group(row_group), column_data(column_data), partial_block_manager(partial_block_manager) { |
15 | } |
16 | |
17 | ColumnCheckpointState::~ColumnCheckpointState() { |
18 | } |
19 | |
20 | unique_ptr<BaseStatistics> ColumnCheckpointState::GetStatistics() { |
21 | D_ASSERT(global_stats); |
22 | return std::move(global_stats); |
23 | } |
24 | |
25 | PartialBlockForCheckpoint::PartialBlockForCheckpoint(ColumnData &data, ColumnSegment &segment, |
26 | BlockManager &block_manager, PartialBlockState state) |
27 | : PartialBlock(state), block_manager(block_manager), block(segment.block) { |
28 | AddSegmentToTail(data, segment, offset_in_block: 0); |
29 | } |
30 | |
31 | PartialBlockForCheckpoint::~PartialBlockForCheckpoint() { |
32 | D_ASSERT(IsFlushed() || Exception::UncaughtException()); |
33 | } |
34 | |
35 | bool PartialBlockForCheckpoint::IsFlushed() { |
36 | // segments are cleared on Flush |
37 | return segments.empty(); |
38 | } |
39 | |
40 | void PartialBlockForCheckpoint::AddUninitializedRegion(idx_t start, idx_t end) { |
41 | uninitialized_regions.push_back(x: {.start: start, .end: end}); |
42 | } |
43 | |
44 | void PartialBlockForCheckpoint::Flush(idx_t free_space_left) { |
45 | if (IsFlushed()) { |
46 | throw InternalException("Flush called on partial block that was already flushed" ); |
47 | } |
48 | // if we have any free space or uninitialized regions we need to zero-initialize them |
49 | if (free_space_left > 0 || !uninitialized_regions.empty()) { |
50 | auto handle = block_manager.buffer_manager.Pin(handle&: block); |
51 | // memset any uninitialized regions |
52 | for (auto &uninitialized : uninitialized_regions) { |
53 | memset(s: handle.Ptr() + uninitialized.start, c: 0, n: uninitialized.end - uninitialized.start); |
54 | } |
55 | // memset any free space at the end of the block to 0 prior to writing to disk |
56 | memset(s: handle.Ptr() + Storage::BLOCK_SIZE - free_space_left, c: 0, n: free_space_left); |
57 | } |
58 | // At this point, we've already copied all data from tail_segments |
59 | // into the page owned by first_segment. We flush all segment data to |
60 | // disk with the following call. |
61 | // persist the first segment to disk and point the remaining segments to the same block |
62 | bool fetch_new_block = state.block_id == INVALID_BLOCK; |
63 | if (fetch_new_block) { |
64 | state.block_id = block_manager.GetFreeBlockId(); |
65 | } |
66 | for (idx_t i = 0; i < segments.size(); i++) { |
67 | auto &segment = segments[i]; |
68 | segment.data.IncrementVersion(); |
69 | if (i == 0) { |
70 | // the first segment is converted to persistent - this writes the data for ALL segments to disk |
71 | D_ASSERT(segment.offset_in_block == 0); |
72 | segment.segment.ConvertToPersistent(block_manager: &block_manager, block_id: state.block_id); |
73 | // update the block after it has been converted to a persistent segment |
74 | block = segment.segment.block; |
75 | } else { |
76 | // subsequent segments are MARKED as persistent - they don't need to be rewritten |
77 | segment.segment.MarkAsPersistent(block, offset_in_block: segment.offset_in_block); |
78 | if (fetch_new_block) { |
79 | // if we fetched a new block we need to increase the reference count to the block |
80 | block_manager.IncreaseBlockReferenceCount(block_id: state.block_id); |
81 | } |
82 | } |
83 | } |
84 | Clear(); |
85 | } |
86 | |
87 | void PartialBlockForCheckpoint::Clear() { |
88 | uninitialized_regions.clear(); |
89 | block.reset(); |
90 | segments.clear(); |
91 | } |
92 | |
93 | void PartialBlockForCheckpoint::Merge(PartialBlock &other_p, idx_t offset, idx_t other_size) { |
94 | auto &other = other_p.Cast<PartialBlockForCheckpoint>(); |
95 | |
96 | auto &buffer_manager = block_manager.buffer_manager; |
97 | // pin the source block |
98 | auto old_handle = buffer_manager.Pin(handle&: other.block); |
99 | // pin the target block |
100 | auto new_handle = buffer_manager.Pin(handle&: block); |
101 | // memcpy the contents of the old block to the new block |
102 | memcpy(dest: new_handle.Ptr() + offset, src: old_handle.Ptr(), n: other_size); |
103 | |
104 | // now copy over all of the segments to the new block |
105 | // move over the uninitialized regions |
106 | for (auto ®ion : other.uninitialized_regions) { |
107 | region.start += offset; |
108 | region.end += offset; |
109 | uninitialized_regions.push_back(x: region); |
110 | } |
111 | |
112 | // move over the segments |
113 | for (auto &segment : other.segments) { |
114 | AddSegmentToTail(data&: segment.data, segment&: segment.segment, offset_in_block: segment.offset_in_block + offset); |
115 | } |
116 | other.Clear(); |
117 | } |
118 | |
119 | void PartialBlockForCheckpoint::AddSegmentToTail(ColumnData &data, ColumnSegment &segment, uint32_t offset_in_block) { |
120 | segments.emplace_back(args&: data, args&: segment, args&: offset_in_block); |
121 | } |
122 | |
123 | void ColumnCheckpointState::FlushSegment(unique_ptr<ColumnSegment> segment, idx_t segment_size) { |
124 | D_ASSERT(segment_size <= Storage::BLOCK_SIZE); |
125 | auto tuple_count = segment->count.load(); |
126 | if (tuple_count == 0) { // LCOV_EXCL_START |
127 | return; |
128 | } // LCOV_EXCL_STOP |
129 | |
130 | // merge the segment stats into the global stats |
131 | global_stats->Merge(other: segment->stats.statistics); |
132 | |
133 | // get the buffer of the segment and pin it |
134 | auto &db = column_data.GetDatabase(); |
135 | auto &buffer_manager = BufferManager::GetBufferManager(db); |
136 | block_id_t block_id = INVALID_BLOCK; |
137 | uint32_t offset_in_block = 0; |
138 | |
139 | if (!segment->stats.statistics.IsConstant()) { |
140 | // non-constant block |
141 | PartialBlockAllocation allocation = partial_block_manager.GetBlockAllocation(segment_size); |
142 | block_id = allocation.state.block_id; |
143 | offset_in_block = allocation.state.offset_in_block; |
144 | |
145 | if (allocation.partial_block) { |
146 | // Use an existing block. |
147 | D_ASSERT(offset_in_block > 0); |
148 | auto &pstate = allocation.partial_block->Cast<PartialBlockForCheckpoint>(); |
149 | // pin the source block |
150 | auto old_handle = buffer_manager.Pin(handle&: segment->block); |
151 | // pin the target block |
152 | auto new_handle = buffer_manager.Pin(handle&: pstate.block); |
153 | // memcpy the contents of the old block to the new block |
154 | memcpy(dest: new_handle.Ptr() + offset_in_block, src: old_handle.Ptr(), n: segment_size); |
155 | pstate.AddSegmentToTail(data&: column_data, segment&: *segment, offset_in_block); |
156 | } else { |
157 | // Create a new block for future reuse. |
158 | if (segment->SegmentSize() != Storage::BLOCK_SIZE) { |
159 | // the segment is smaller than the block size |
160 | // allocate a new block and copy the data over |
161 | D_ASSERT(segment->SegmentSize() < Storage::BLOCK_SIZE); |
162 | segment->Resize(segment_size: Storage::BLOCK_SIZE); |
163 | } |
164 | D_ASSERT(offset_in_block == 0); |
165 | allocation.partial_block = make_uniq<PartialBlockForCheckpoint>( |
166 | args&: column_data, args&: *segment, args&: *allocation.block_manager, args&: allocation.state); |
167 | } |
168 | // Writer will decide whether to reuse this block. |
169 | partial_block_manager.RegisterPartialBlock(allocation: std::move(allocation)); |
170 | } else { |
171 | // constant block: no need to write anything to disk besides the stats |
172 | // set up the compression function to constant |
173 | auto &config = DBConfig::GetConfig(db); |
174 | segment->function = |
175 | *config.GetCompressionFunction(type: CompressionType::COMPRESSION_CONSTANT, data_type: segment->type.InternalType()); |
176 | segment->ConvertToPersistent(block_manager: nullptr, INVALID_BLOCK); |
177 | } |
178 | |
179 | // construct the data pointer |
180 | DataPointer data_pointer(segment->stats.statistics.Copy()); |
181 | data_pointer.block_pointer.block_id = block_id; |
182 | data_pointer.block_pointer.offset = offset_in_block; |
183 | data_pointer.row_start = row_group.start; |
184 | if (!data_pointers.empty()) { |
185 | auto &last_pointer = data_pointers.back(); |
186 | data_pointer.row_start = last_pointer.row_start + last_pointer.tuple_count; |
187 | } |
188 | data_pointer.tuple_count = tuple_count; |
189 | data_pointer.compression_type = segment->function.get().type; |
190 | |
191 | // append the segment to the new segment tree |
192 | new_tree.AppendSegment(segment: std::move(segment)); |
193 | data_pointers.push_back(x: std::move(data_pointer)); |
194 | } |
195 | |
196 | void ColumnCheckpointState::WriteDataPointers(RowGroupWriter &writer) { |
197 | writer.WriteColumnDataPointers(column_checkpoint_state&: *this); |
198 | } |
199 | |
200 | } // namespace duckdb |
201 | |