1#include "duckdb/storage/table/column_data.hpp"
2#include "duckdb/storage/table/column_checkpoint_state.hpp"
3#include "duckdb/storage/table/column_segment.hpp"
4#include "duckdb/storage/checkpoint/write_overflow_strings_to_disk.hpp"
5#include "duckdb/storage/table/row_group.hpp"
6#include "duckdb/storage/checkpoint/table_data_writer.hpp"
7
8#include "duckdb/main/config.hpp"
9
10namespace duckdb {
11
12ColumnCheckpointState::ColumnCheckpointState(RowGroup &row_group, ColumnData &column_data,
13 PartialBlockManager &partial_block_manager)
14 : row_group(row_group), column_data(column_data), partial_block_manager(partial_block_manager) {
15}
16
17ColumnCheckpointState::~ColumnCheckpointState() {
18}
19
20unique_ptr<BaseStatistics> ColumnCheckpointState::GetStatistics() {
21 D_ASSERT(global_stats);
22 return std::move(global_stats);
23}
24
25PartialBlockForCheckpoint::PartialBlockForCheckpoint(ColumnData &data, ColumnSegment &segment,
26 BlockManager &block_manager, PartialBlockState state)
27 : PartialBlock(state), block_manager(block_manager), block(segment.block) {
28 AddSegmentToTail(data, segment, offset_in_block: 0);
29}
30
31PartialBlockForCheckpoint::~PartialBlockForCheckpoint() {
32 D_ASSERT(IsFlushed() || Exception::UncaughtException());
33}
34
35bool PartialBlockForCheckpoint::IsFlushed() {
36 // segments are cleared on Flush
37 return segments.empty();
38}
39
40void PartialBlockForCheckpoint::AddUninitializedRegion(idx_t start, idx_t end) {
41 uninitialized_regions.push_back(x: {.start: start, .end: end});
42}
43
44void PartialBlockForCheckpoint::Flush(idx_t free_space_left) {
45 if (IsFlushed()) {
46 throw InternalException("Flush called on partial block that was already flushed");
47 }
48 // if we have any free space or uninitialized regions we need to zero-initialize them
49 if (free_space_left > 0 || !uninitialized_regions.empty()) {
50 auto handle = block_manager.buffer_manager.Pin(handle&: block);
51 // memset any uninitialized regions
52 for (auto &uninitialized : uninitialized_regions) {
53 memset(s: handle.Ptr() + uninitialized.start, c: 0, n: uninitialized.end - uninitialized.start);
54 }
55 // memset any free space at the end of the block to 0 prior to writing to disk
56 memset(s: handle.Ptr() + Storage::BLOCK_SIZE - free_space_left, c: 0, n: free_space_left);
57 }
58 // At this point, we've already copied all data from tail_segments
59 // into the page owned by first_segment. We flush all segment data to
60 // disk with the following call.
61 // persist the first segment to disk and point the remaining segments to the same block
62 bool fetch_new_block = state.block_id == INVALID_BLOCK;
63 if (fetch_new_block) {
64 state.block_id = block_manager.GetFreeBlockId();
65 }
66 for (idx_t i = 0; i < segments.size(); i++) {
67 auto &segment = segments[i];
68 segment.data.IncrementVersion();
69 if (i == 0) {
70 // the first segment is converted to persistent - this writes the data for ALL segments to disk
71 D_ASSERT(segment.offset_in_block == 0);
72 segment.segment.ConvertToPersistent(block_manager: &block_manager, block_id: state.block_id);
73 // update the block after it has been converted to a persistent segment
74 block = segment.segment.block;
75 } else {
76 // subsequent segments are MARKED as persistent - they don't need to be rewritten
77 segment.segment.MarkAsPersistent(block, offset_in_block: segment.offset_in_block);
78 if (fetch_new_block) {
79 // if we fetched a new block we need to increase the reference count to the block
80 block_manager.IncreaseBlockReferenceCount(block_id: state.block_id);
81 }
82 }
83 }
84 Clear();
85}
86
87void PartialBlockForCheckpoint::Clear() {
88 uninitialized_regions.clear();
89 block.reset();
90 segments.clear();
91}
92
93void PartialBlockForCheckpoint::Merge(PartialBlock &other_p, idx_t offset, idx_t other_size) {
94 auto &other = other_p.Cast<PartialBlockForCheckpoint>();
95
96 auto &buffer_manager = block_manager.buffer_manager;
97 // pin the source block
98 auto old_handle = buffer_manager.Pin(handle&: other.block);
99 // pin the target block
100 auto new_handle = buffer_manager.Pin(handle&: block);
101 // memcpy the contents of the old block to the new block
102 memcpy(dest: new_handle.Ptr() + offset, src: old_handle.Ptr(), n: other_size);
103
104 // now copy over all of the segments to the new block
105 // move over the uninitialized regions
106 for (auto &region : other.uninitialized_regions) {
107 region.start += offset;
108 region.end += offset;
109 uninitialized_regions.push_back(x: region);
110 }
111
112 // move over the segments
113 for (auto &segment : other.segments) {
114 AddSegmentToTail(data&: segment.data, segment&: segment.segment, offset_in_block: segment.offset_in_block + offset);
115 }
116 other.Clear();
117}
118
119void PartialBlockForCheckpoint::AddSegmentToTail(ColumnData &data, ColumnSegment &segment, uint32_t offset_in_block) {
120 segments.emplace_back(args&: data, args&: segment, args&: offset_in_block);
121}
122
123void ColumnCheckpointState::FlushSegment(unique_ptr<ColumnSegment> segment, idx_t segment_size) {
124 D_ASSERT(segment_size <= Storage::BLOCK_SIZE);
125 auto tuple_count = segment->count.load();
126 if (tuple_count == 0) { // LCOV_EXCL_START
127 return;
128 } // LCOV_EXCL_STOP
129
130 // merge the segment stats into the global stats
131 global_stats->Merge(other: segment->stats.statistics);
132
133 // get the buffer of the segment and pin it
134 auto &db = column_data.GetDatabase();
135 auto &buffer_manager = BufferManager::GetBufferManager(db);
136 block_id_t block_id = INVALID_BLOCK;
137 uint32_t offset_in_block = 0;
138
139 if (!segment->stats.statistics.IsConstant()) {
140 // non-constant block
141 PartialBlockAllocation allocation = partial_block_manager.GetBlockAllocation(segment_size);
142 block_id = allocation.state.block_id;
143 offset_in_block = allocation.state.offset_in_block;
144
145 if (allocation.partial_block) {
146 // Use an existing block.
147 D_ASSERT(offset_in_block > 0);
148 auto &pstate = allocation.partial_block->Cast<PartialBlockForCheckpoint>();
149 // pin the source block
150 auto old_handle = buffer_manager.Pin(handle&: segment->block);
151 // pin the target block
152 auto new_handle = buffer_manager.Pin(handle&: pstate.block);
153 // memcpy the contents of the old block to the new block
154 memcpy(dest: new_handle.Ptr() + offset_in_block, src: old_handle.Ptr(), n: segment_size);
155 pstate.AddSegmentToTail(data&: column_data, segment&: *segment, offset_in_block);
156 } else {
157 // Create a new block for future reuse.
158 if (segment->SegmentSize() != Storage::BLOCK_SIZE) {
159 // the segment is smaller than the block size
160 // allocate a new block and copy the data over
161 D_ASSERT(segment->SegmentSize() < Storage::BLOCK_SIZE);
162 segment->Resize(segment_size: Storage::BLOCK_SIZE);
163 }
164 D_ASSERT(offset_in_block == 0);
165 allocation.partial_block = make_uniq<PartialBlockForCheckpoint>(
166 args&: column_data, args&: *segment, args&: *allocation.block_manager, args&: allocation.state);
167 }
168 // Writer will decide whether to reuse this block.
169 partial_block_manager.RegisterPartialBlock(allocation: std::move(allocation));
170 } else {
171 // constant block: no need to write anything to disk besides the stats
172 // set up the compression function to constant
173 auto &config = DBConfig::GetConfig(db);
174 segment->function =
175 *config.GetCompressionFunction(type: CompressionType::COMPRESSION_CONSTANT, data_type: segment->type.InternalType());
176 segment->ConvertToPersistent(block_manager: nullptr, INVALID_BLOCK);
177 }
178
179 // construct the data pointer
180 DataPointer data_pointer(segment->stats.statistics.Copy());
181 data_pointer.block_pointer.block_id = block_id;
182 data_pointer.block_pointer.offset = offset_in_block;
183 data_pointer.row_start = row_group.start;
184 if (!data_pointers.empty()) {
185 auto &last_pointer = data_pointers.back();
186 data_pointer.row_start = last_pointer.row_start + last_pointer.tuple_count;
187 }
188 data_pointer.tuple_count = tuple_count;
189 data_pointer.compression_type = segment->function.get().type;
190
191 // append the segment to the new segment tree
192 new_tree.AppendSegment(segment: std::move(segment));
193 data_pointers.push_back(x: std::move(data_pointer));
194}
195
196void ColumnCheckpointState::WriteDataPointers(RowGroupWriter &writer) {
197 writer.WriteColumnDataPointers(column_checkpoint_state&: *this);
198}
199
200} // namespace duckdb
201