1 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
2 | #include "duckdb/main/config.hpp" |
3 | #include "duckdb/storage/table/update_segment.hpp" |
4 | #include "duckdb/storage/data_table.hpp" |
5 | #include "duckdb/parser/column_definition.hpp" |
6 | #include "duckdb/storage/table/scan_state.hpp" |
7 | |
8 | namespace duckdb { |
9 | |
10 | ColumnDataCheckpointer::ColumnDataCheckpointer(ColumnData &col_data_p, RowGroup &row_group_p, |
11 | ColumnCheckpointState &state_p, ColumnCheckpointInfo &checkpoint_info_p) |
12 | : col_data(col_data_p), row_group(row_group_p), state(state_p), |
13 | is_validity(GetType().id() == LogicalTypeId::VALIDITY), |
14 | intermediate(is_validity ? LogicalType::BOOLEAN : GetType(), true, is_validity), |
15 | checkpoint_info(checkpoint_info_p) { |
16 | auto &config = DBConfig::GetConfig(db&: GetDatabase()); |
17 | auto functions = config.GetCompressionFunctions(data_type: GetType().InternalType()); |
18 | for (auto &func : functions) { |
19 | compression_functions.push_back(x: &func.get()); |
20 | } |
21 | } |
22 | |
23 | DatabaseInstance &ColumnDataCheckpointer::GetDatabase() { |
24 | return col_data.GetDatabase(); |
25 | } |
26 | |
27 | const LogicalType &ColumnDataCheckpointer::GetType() const { |
28 | return col_data.type; |
29 | } |
30 | |
31 | ColumnData &ColumnDataCheckpointer::GetColumnData() { |
32 | return col_data; |
33 | } |
34 | |
35 | RowGroup &ColumnDataCheckpointer::GetRowGroup() { |
36 | return row_group; |
37 | } |
38 | |
39 | ColumnCheckpointState &ColumnDataCheckpointer::GetCheckpointState() { |
40 | return state; |
41 | } |
42 | |
43 | void ColumnDataCheckpointer::ScanSegments(const std::function<void(Vector &, idx_t)> &callback) { |
44 | Vector scan_vector(intermediate.GetType(), nullptr); |
45 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
46 | auto &segment = *nodes[segment_idx].node; |
47 | ColumnScanState scan_state; |
48 | scan_state.current = &segment; |
49 | segment.InitializeScan(state&: scan_state); |
50 | |
51 | for (idx_t base_row_index = 0; base_row_index < segment.count; base_row_index += STANDARD_VECTOR_SIZE) { |
52 | scan_vector.Reference(other&: intermediate); |
53 | |
54 | idx_t count = MinValue<idx_t>(a: segment.count - base_row_index, STANDARD_VECTOR_SIZE); |
55 | scan_state.row_index = segment.start + base_row_index; |
56 | |
57 | col_data.CheckpointScan(segment, state&: scan_state, row_group_start: row_group.start, count, scan_vector); |
58 | |
59 | callback(scan_vector, count); |
60 | } |
61 | } |
62 | } |
63 | |
64 | CompressionType ForceCompression(vector<optional_ptr<CompressionFunction>> &compression_functions, |
65 | CompressionType compression_type) { |
66 | // On of the force_compression flags has been set |
67 | // check if this compression method is available |
68 | bool found = false; |
69 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
70 | auto &compression_function = *compression_functions[i]; |
71 | if (compression_function.type == compression_type) { |
72 | found = true; |
73 | break; |
74 | } |
75 | } |
76 | if (found) { |
77 | // the force_compression method is available |
78 | // clear all other compression methods |
79 | // except the uncompressed method, so we can fall back on that |
80 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
81 | auto &compression_function = *compression_functions[i]; |
82 | if (compression_function.type == CompressionType::COMPRESSION_UNCOMPRESSED) { |
83 | continue; |
84 | } |
85 | if (compression_function.type != compression_type) { |
86 | compression_functions[i] = nullptr; |
87 | } |
88 | } |
89 | } |
90 | return found ? compression_type : CompressionType::COMPRESSION_AUTO; |
91 | } |
92 | |
93 | unique_ptr<AnalyzeState> ColumnDataCheckpointer::DetectBestCompressionMethod(idx_t &compression_idx) { |
94 | D_ASSERT(!compression_functions.empty()); |
95 | auto &config = DBConfig::GetConfig(db&: GetDatabase()); |
96 | CompressionType forced_method = CompressionType::COMPRESSION_AUTO; |
97 | |
98 | auto compression_type = checkpoint_info.compression_type; |
99 | if (compression_type != CompressionType::COMPRESSION_AUTO) { |
100 | forced_method = ForceCompression(compression_functions, compression_type); |
101 | } |
102 | if (compression_type == CompressionType::COMPRESSION_AUTO && |
103 | config.options.force_compression != CompressionType::COMPRESSION_AUTO) { |
104 | forced_method = ForceCompression(compression_functions, compression_type: config.options.force_compression); |
105 | } |
106 | // set up the analyze states for each compression method |
107 | vector<unique_ptr<AnalyzeState>> analyze_states; |
108 | analyze_states.reserve(n: compression_functions.size()); |
109 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
110 | if (!compression_functions[i]) { |
111 | analyze_states.push_back(x: nullptr); |
112 | continue; |
113 | } |
114 | analyze_states.push_back(x: compression_functions[i]->init_analyze(col_data, col_data.type.InternalType())); |
115 | } |
116 | |
117 | // scan over all the segments and run the analyze step |
118 | ScanSegments(callback: [&](Vector &scan_vector, idx_t count) { |
119 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
120 | if (!compression_functions[i]) { |
121 | continue; |
122 | } |
123 | auto success = compression_functions[i]->analyze(*analyze_states[i], scan_vector, count); |
124 | if (!success) { |
125 | // could not use this compression function on this data set |
126 | // erase it |
127 | compression_functions[i] = nullptr; |
128 | analyze_states[i].reset(); |
129 | } |
130 | } |
131 | }); |
132 | |
133 | // now that we have passed over all the data, we need to figure out the best method |
134 | // we do this using the final_analyze method |
135 | unique_ptr<AnalyzeState> state; |
136 | compression_idx = DConstants::INVALID_INDEX; |
137 | idx_t best_score = NumericLimits<idx_t>::Maximum(); |
138 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
139 | if (!compression_functions[i]) { |
140 | continue; |
141 | } |
142 | //! Check if the method type is the forced method (if forced is used) |
143 | bool forced_method_found = compression_functions[i]->type == forced_method; |
144 | auto score = compression_functions[i]->final_analyze(*analyze_states[i]); |
145 | |
146 | //! The finalize method can return this value from final_analyze to indicate it should not be used. |
147 | if (score == DConstants::INVALID_INDEX) { |
148 | continue; |
149 | } |
150 | |
151 | if (score < best_score || forced_method_found) { |
152 | compression_idx = i; |
153 | best_score = score; |
154 | state = std::move(analyze_states[i]); |
155 | } |
156 | //! If we have found the forced method, we're done |
157 | if (forced_method_found) { |
158 | break; |
159 | } |
160 | } |
161 | return state; |
162 | } |
163 | |
164 | void ColumnDataCheckpointer::WriteToDisk() { |
165 | // there were changes or transient segments |
166 | // we need to rewrite the column segments to disk |
167 | |
168 | // first we check the current segments |
169 | // if there are any persistent segments, we will mark their old block ids as modified |
170 | // since the segments will be rewritten their old on disk data is no longer required |
171 | auto &block_manager = col_data.GetBlockManager(); |
172 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
173 | auto segment = nodes[segment_idx].node.get(); |
174 | if (segment->segment_type == ColumnSegmentType::PERSISTENT) { |
175 | // persistent segment has updates: mark it as modified and rewrite the block with the merged updates |
176 | auto block_id = segment->GetBlockId(); |
177 | if (block_id != INVALID_BLOCK) { |
178 | block_manager.MarkBlockAsModified(block_id); |
179 | } |
180 | } |
181 | } |
182 | |
183 | // now we need to write our segment |
184 | // we will first run an analyze step that determines which compression function to use |
185 | idx_t compression_idx; |
186 | auto analyze_state = DetectBestCompressionMethod(compression_idx); |
187 | |
188 | if (!analyze_state) { |
189 | throw FatalException("No suitable compression/storage method found to store column" ); |
190 | } |
191 | |
192 | // now that we have analyzed the compression functions we can start writing to disk |
193 | auto best_function = compression_functions[compression_idx]; |
194 | auto compress_state = best_function->init_compression(*this, std::move(analyze_state)); |
195 | ScanSegments( |
196 | callback: [&](Vector &scan_vector, idx_t count) { best_function->compress(*compress_state, scan_vector, count); }); |
197 | best_function->compress_finalize(*compress_state); |
198 | |
199 | nodes.clear(); |
200 | } |
201 | |
202 | bool ColumnDataCheckpointer::HasChanges() { |
203 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
204 | auto segment = nodes[segment_idx].node.get(); |
205 | if (segment->segment_type == ColumnSegmentType::TRANSIENT) { |
206 | // transient segment: always need to write to disk |
207 | return true; |
208 | } else { |
209 | // persistent segment; check if there were any updates or deletions in this segment |
210 | idx_t start_row_idx = segment->start - row_group.start; |
211 | idx_t end_row_idx = start_row_idx + segment->count; |
212 | if (col_data.updates && col_data.updates->HasUpdates(start_row_idx, end_row_idx)) { |
213 | return true; |
214 | } |
215 | } |
216 | } |
217 | return false; |
218 | } |
219 | |
220 | void ColumnDataCheckpointer::WritePersistentSegments() { |
221 | // all segments are persistent and there are no updates |
222 | // we only need to write the metadata |
223 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
224 | auto segment = nodes[segment_idx].node.get(); |
225 | D_ASSERT(segment->segment_type == ColumnSegmentType::PERSISTENT); |
226 | |
227 | // set up the data pointer directly using the data from the persistent segment |
228 | DataPointer pointer(segment->stats.statistics.Copy()); |
229 | pointer.block_pointer.block_id = segment->GetBlockId(); |
230 | pointer.block_pointer.offset = segment->GetBlockOffset(); |
231 | pointer.row_start = segment->start; |
232 | pointer.tuple_count = segment->count; |
233 | pointer.compression_type = segment->function.get().type; |
234 | |
235 | // merge the persistent stats into the global column stats |
236 | state.global_stats->Merge(other: segment->stats.statistics); |
237 | |
238 | // directly append the current segment to the new tree |
239 | state.new_tree.AppendSegment(segment: std::move(nodes[segment_idx].node)); |
240 | |
241 | state.data_pointers.push_back(x: std::move(pointer)); |
242 | } |
243 | } |
244 | |
245 | void ColumnDataCheckpointer::Checkpoint(vector<SegmentNode<ColumnSegment>> nodes) { |
246 | D_ASSERT(!nodes.empty()); |
247 | this->nodes = std::move(nodes); |
248 | // first check if any of the segments have changes |
249 | if (!HasChanges()) { |
250 | // no changes: only need to write the metadata for this column |
251 | WritePersistentSegments(); |
252 | } else { |
253 | // there are changes: rewrite the set of columns); |
254 | WriteToDisk(); |
255 | } |
256 | } |
257 | |
258 | CompressionFunction &ColumnDataCheckpointer::GetCompressionFunction(CompressionType compression_type) { |
259 | auto &db = GetDatabase(); |
260 | auto &column_type = GetType(); |
261 | auto &config = DBConfig::GetConfig(db); |
262 | return *config.GetCompressionFunction(type: compression_type, data_type: column_type.InternalType()); |
263 | } |
264 | |
265 | } // namespace duckdb |
266 | |