1#include "duckdb/storage/table/column_data_checkpointer.hpp"
2#include "duckdb/main/config.hpp"
3#include "duckdb/storage/table/update_segment.hpp"
4#include "duckdb/storage/data_table.hpp"
5#include "duckdb/parser/column_definition.hpp"
6#include "duckdb/storage/table/scan_state.hpp"
7
8namespace duckdb {
9
10ColumnDataCheckpointer::ColumnDataCheckpointer(ColumnData &col_data_p, RowGroup &row_group_p,
11 ColumnCheckpointState &state_p, ColumnCheckpointInfo &checkpoint_info_p)
12 : col_data(col_data_p), row_group(row_group_p), state(state_p),
13 is_validity(GetType().id() == LogicalTypeId::VALIDITY),
14 intermediate(is_validity ? LogicalType::BOOLEAN : GetType(), true, is_validity),
15 checkpoint_info(checkpoint_info_p) {
16 auto &config = DBConfig::GetConfig(db&: GetDatabase());
17 auto functions = config.GetCompressionFunctions(data_type: GetType().InternalType());
18 for (auto &func : functions) {
19 compression_functions.push_back(x: &func.get());
20 }
21}
22
23DatabaseInstance &ColumnDataCheckpointer::GetDatabase() {
24 return col_data.GetDatabase();
25}
26
27const LogicalType &ColumnDataCheckpointer::GetType() const {
28 return col_data.type;
29}
30
31ColumnData &ColumnDataCheckpointer::GetColumnData() {
32 return col_data;
33}
34
35RowGroup &ColumnDataCheckpointer::GetRowGroup() {
36 return row_group;
37}
38
39ColumnCheckpointState &ColumnDataCheckpointer::GetCheckpointState() {
40 return state;
41}
42
43void ColumnDataCheckpointer::ScanSegments(const std::function<void(Vector &, idx_t)> &callback) {
44 Vector scan_vector(intermediate.GetType(), nullptr);
45 for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) {
46 auto &segment = *nodes[segment_idx].node;
47 ColumnScanState scan_state;
48 scan_state.current = &segment;
49 segment.InitializeScan(state&: scan_state);
50
51 for (idx_t base_row_index = 0; base_row_index < segment.count; base_row_index += STANDARD_VECTOR_SIZE) {
52 scan_vector.Reference(other&: intermediate);
53
54 idx_t count = MinValue<idx_t>(a: segment.count - base_row_index, STANDARD_VECTOR_SIZE);
55 scan_state.row_index = segment.start + base_row_index;
56
57 col_data.CheckpointScan(segment, state&: scan_state, row_group_start: row_group.start, count, scan_vector);
58
59 callback(scan_vector, count);
60 }
61 }
62}
63
64CompressionType ForceCompression(vector<optional_ptr<CompressionFunction>> &compression_functions,
65 CompressionType compression_type) {
66 // On of the force_compression flags has been set
67 // check if this compression method is available
68 bool found = false;
69 for (idx_t i = 0; i < compression_functions.size(); i++) {
70 auto &compression_function = *compression_functions[i];
71 if (compression_function.type == compression_type) {
72 found = true;
73 break;
74 }
75 }
76 if (found) {
77 // the force_compression method is available
78 // clear all other compression methods
79 // except the uncompressed method, so we can fall back on that
80 for (idx_t i = 0; i < compression_functions.size(); i++) {
81 auto &compression_function = *compression_functions[i];
82 if (compression_function.type == CompressionType::COMPRESSION_UNCOMPRESSED) {
83 continue;
84 }
85 if (compression_function.type != compression_type) {
86 compression_functions[i] = nullptr;
87 }
88 }
89 }
90 return found ? compression_type : CompressionType::COMPRESSION_AUTO;
91}
92
93unique_ptr<AnalyzeState> ColumnDataCheckpointer::DetectBestCompressionMethod(idx_t &compression_idx) {
94 D_ASSERT(!compression_functions.empty());
95 auto &config = DBConfig::GetConfig(db&: GetDatabase());
96 CompressionType forced_method = CompressionType::COMPRESSION_AUTO;
97
98 auto compression_type = checkpoint_info.compression_type;
99 if (compression_type != CompressionType::COMPRESSION_AUTO) {
100 forced_method = ForceCompression(compression_functions, compression_type);
101 }
102 if (compression_type == CompressionType::COMPRESSION_AUTO &&
103 config.options.force_compression != CompressionType::COMPRESSION_AUTO) {
104 forced_method = ForceCompression(compression_functions, compression_type: config.options.force_compression);
105 }
106 // set up the analyze states for each compression method
107 vector<unique_ptr<AnalyzeState>> analyze_states;
108 analyze_states.reserve(n: compression_functions.size());
109 for (idx_t i = 0; i < compression_functions.size(); i++) {
110 if (!compression_functions[i]) {
111 analyze_states.push_back(x: nullptr);
112 continue;
113 }
114 analyze_states.push_back(x: compression_functions[i]->init_analyze(col_data, col_data.type.InternalType()));
115 }
116
117 // scan over all the segments and run the analyze step
118 ScanSegments(callback: [&](Vector &scan_vector, idx_t count) {
119 for (idx_t i = 0; i < compression_functions.size(); i++) {
120 if (!compression_functions[i]) {
121 continue;
122 }
123 auto success = compression_functions[i]->analyze(*analyze_states[i], scan_vector, count);
124 if (!success) {
125 // could not use this compression function on this data set
126 // erase it
127 compression_functions[i] = nullptr;
128 analyze_states[i].reset();
129 }
130 }
131 });
132
133 // now that we have passed over all the data, we need to figure out the best method
134 // we do this using the final_analyze method
135 unique_ptr<AnalyzeState> state;
136 compression_idx = DConstants::INVALID_INDEX;
137 idx_t best_score = NumericLimits<idx_t>::Maximum();
138 for (idx_t i = 0; i < compression_functions.size(); i++) {
139 if (!compression_functions[i]) {
140 continue;
141 }
142 //! Check if the method type is the forced method (if forced is used)
143 bool forced_method_found = compression_functions[i]->type == forced_method;
144 auto score = compression_functions[i]->final_analyze(*analyze_states[i]);
145
146 //! The finalize method can return this value from final_analyze to indicate it should not be used.
147 if (score == DConstants::INVALID_INDEX) {
148 continue;
149 }
150
151 if (score < best_score || forced_method_found) {
152 compression_idx = i;
153 best_score = score;
154 state = std::move(analyze_states[i]);
155 }
156 //! If we have found the forced method, we're done
157 if (forced_method_found) {
158 break;
159 }
160 }
161 return state;
162}
163
164void ColumnDataCheckpointer::WriteToDisk() {
165 // there were changes or transient segments
166 // we need to rewrite the column segments to disk
167
168 // first we check the current segments
169 // if there are any persistent segments, we will mark their old block ids as modified
170 // since the segments will be rewritten their old on disk data is no longer required
171 auto &block_manager = col_data.GetBlockManager();
172 for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) {
173 auto segment = nodes[segment_idx].node.get();
174 if (segment->segment_type == ColumnSegmentType::PERSISTENT) {
175 // persistent segment has updates: mark it as modified and rewrite the block with the merged updates
176 auto block_id = segment->GetBlockId();
177 if (block_id != INVALID_BLOCK) {
178 block_manager.MarkBlockAsModified(block_id);
179 }
180 }
181 }
182
183 // now we need to write our segment
184 // we will first run an analyze step that determines which compression function to use
185 idx_t compression_idx;
186 auto analyze_state = DetectBestCompressionMethod(compression_idx);
187
188 if (!analyze_state) {
189 throw FatalException("No suitable compression/storage method found to store column");
190 }
191
192 // now that we have analyzed the compression functions we can start writing to disk
193 auto best_function = compression_functions[compression_idx];
194 auto compress_state = best_function->init_compression(*this, std::move(analyze_state));
195 ScanSegments(
196 callback: [&](Vector &scan_vector, idx_t count) { best_function->compress(*compress_state, scan_vector, count); });
197 best_function->compress_finalize(*compress_state);
198
199 nodes.clear();
200}
201
202bool ColumnDataCheckpointer::HasChanges() {
203 for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) {
204 auto segment = nodes[segment_idx].node.get();
205 if (segment->segment_type == ColumnSegmentType::TRANSIENT) {
206 // transient segment: always need to write to disk
207 return true;
208 } else {
209 // persistent segment; check if there were any updates or deletions in this segment
210 idx_t start_row_idx = segment->start - row_group.start;
211 idx_t end_row_idx = start_row_idx + segment->count;
212 if (col_data.updates && col_data.updates->HasUpdates(start_row_idx, end_row_idx)) {
213 return true;
214 }
215 }
216 }
217 return false;
218}
219
220void ColumnDataCheckpointer::WritePersistentSegments() {
221 // all segments are persistent and there are no updates
222 // we only need to write the metadata
223 for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) {
224 auto segment = nodes[segment_idx].node.get();
225 D_ASSERT(segment->segment_type == ColumnSegmentType::PERSISTENT);
226
227 // set up the data pointer directly using the data from the persistent segment
228 DataPointer pointer(segment->stats.statistics.Copy());
229 pointer.block_pointer.block_id = segment->GetBlockId();
230 pointer.block_pointer.offset = segment->GetBlockOffset();
231 pointer.row_start = segment->start;
232 pointer.tuple_count = segment->count;
233 pointer.compression_type = segment->function.get().type;
234
235 // merge the persistent stats into the global column stats
236 state.global_stats->Merge(other: segment->stats.statistics);
237
238 // directly append the current segment to the new tree
239 state.new_tree.AppendSegment(segment: std::move(nodes[segment_idx].node));
240
241 state.data_pointers.push_back(x: std::move(pointer));
242 }
243}
244
245void ColumnDataCheckpointer::Checkpoint(vector<SegmentNode<ColumnSegment>> nodes) {
246 D_ASSERT(!nodes.empty());
247 this->nodes = std::move(nodes);
248 // first check if any of the segments have changes
249 if (!HasChanges()) {
250 // no changes: only need to write the metadata for this column
251 WritePersistentSegments();
252 } else {
253 // there are changes: rewrite the set of columns);
254 WriteToDisk();
255 }
256}
257
258CompressionFunction &ColumnDataCheckpointer::GetCompressionFunction(CompressionType compression_type) {
259 auto &db = GetDatabase();
260 auto &column_type = GetType();
261 auto &config = DBConfig::GetConfig(db);
262 return *config.GetCompressionFunction(type: compression_type, data_type: column_type.InternalType());
263}
264
265} // namespace duckdb
266