| 1 | #include "duckdb/storage/table/column_data_checkpointer.hpp" |
| 2 | #include "duckdb/main/config.hpp" |
| 3 | #include "duckdb/storage/table/update_segment.hpp" |
| 4 | #include "duckdb/storage/data_table.hpp" |
| 5 | #include "duckdb/parser/column_definition.hpp" |
| 6 | #include "duckdb/storage/table/scan_state.hpp" |
| 7 | |
| 8 | namespace duckdb { |
| 9 | |
| 10 | ColumnDataCheckpointer::ColumnDataCheckpointer(ColumnData &col_data_p, RowGroup &row_group_p, |
| 11 | ColumnCheckpointState &state_p, ColumnCheckpointInfo &checkpoint_info_p) |
| 12 | : col_data(col_data_p), row_group(row_group_p), state(state_p), |
| 13 | is_validity(GetType().id() == LogicalTypeId::VALIDITY), |
| 14 | intermediate(is_validity ? LogicalType::BOOLEAN : GetType(), true, is_validity), |
| 15 | checkpoint_info(checkpoint_info_p) { |
| 16 | auto &config = DBConfig::GetConfig(db&: GetDatabase()); |
| 17 | auto functions = config.GetCompressionFunctions(data_type: GetType().InternalType()); |
| 18 | for (auto &func : functions) { |
| 19 | compression_functions.push_back(x: &func.get()); |
| 20 | } |
| 21 | } |
| 22 | |
| 23 | DatabaseInstance &ColumnDataCheckpointer::GetDatabase() { |
| 24 | return col_data.GetDatabase(); |
| 25 | } |
| 26 | |
| 27 | const LogicalType &ColumnDataCheckpointer::GetType() const { |
| 28 | return col_data.type; |
| 29 | } |
| 30 | |
| 31 | ColumnData &ColumnDataCheckpointer::GetColumnData() { |
| 32 | return col_data; |
| 33 | } |
| 34 | |
| 35 | RowGroup &ColumnDataCheckpointer::GetRowGroup() { |
| 36 | return row_group; |
| 37 | } |
| 38 | |
| 39 | ColumnCheckpointState &ColumnDataCheckpointer::GetCheckpointState() { |
| 40 | return state; |
| 41 | } |
| 42 | |
| 43 | void ColumnDataCheckpointer::ScanSegments(const std::function<void(Vector &, idx_t)> &callback) { |
| 44 | Vector scan_vector(intermediate.GetType(), nullptr); |
| 45 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
| 46 | auto &segment = *nodes[segment_idx].node; |
| 47 | ColumnScanState scan_state; |
| 48 | scan_state.current = &segment; |
| 49 | segment.InitializeScan(state&: scan_state); |
| 50 | |
| 51 | for (idx_t base_row_index = 0; base_row_index < segment.count; base_row_index += STANDARD_VECTOR_SIZE) { |
| 52 | scan_vector.Reference(other&: intermediate); |
| 53 | |
| 54 | idx_t count = MinValue<idx_t>(a: segment.count - base_row_index, STANDARD_VECTOR_SIZE); |
| 55 | scan_state.row_index = segment.start + base_row_index; |
| 56 | |
| 57 | col_data.CheckpointScan(segment, state&: scan_state, row_group_start: row_group.start, count, scan_vector); |
| 58 | |
| 59 | callback(scan_vector, count); |
| 60 | } |
| 61 | } |
| 62 | } |
| 63 | |
| 64 | CompressionType ForceCompression(vector<optional_ptr<CompressionFunction>> &compression_functions, |
| 65 | CompressionType compression_type) { |
| 66 | // On of the force_compression flags has been set |
| 67 | // check if this compression method is available |
| 68 | bool found = false; |
| 69 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
| 70 | auto &compression_function = *compression_functions[i]; |
| 71 | if (compression_function.type == compression_type) { |
| 72 | found = true; |
| 73 | break; |
| 74 | } |
| 75 | } |
| 76 | if (found) { |
| 77 | // the force_compression method is available |
| 78 | // clear all other compression methods |
| 79 | // except the uncompressed method, so we can fall back on that |
| 80 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
| 81 | auto &compression_function = *compression_functions[i]; |
| 82 | if (compression_function.type == CompressionType::COMPRESSION_UNCOMPRESSED) { |
| 83 | continue; |
| 84 | } |
| 85 | if (compression_function.type != compression_type) { |
| 86 | compression_functions[i] = nullptr; |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | return found ? compression_type : CompressionType::COMPRESSION_AUTO; |
| 91 | } |
| 92 | |
| 93 | unique_ptr<AnalyzeState> ColumnDataCheckpointer::DetectBestCompressionMethod(idx_t &compression_idx) { |
| 94 | D_ASSERT(!compression_functions.empty()); |
| 95 | auto &config = DBConfig::GetConfig(db&: GetDatabase()); |
| 96 | CompressionType forced_method = CompressionType::COMPRESSION_AUTO; |
| 97 | |
| 98 | auto compression_type = checkpoint_info.compression_type; |
| 99 | if (compression_type != CompressionType::COMPRESSION_AUTO) { |
| 100 | forced_method = ForceCompression(compression_functions, compression_type); |
| 101 | } |
| 102 | if (compression_type == CompressionType::COMPRESSION_AUTO && |
| 103 | config.options.force_compression != CompressionType::COMPRESSION_AUTO) { |
| 104 | forced_method = ForceCompression(compression_functions, compression_type: config.options.force_compression); |
| 105 | } |
| 106 | // set up the analyze states for each compression method |
| 107 | vector<unique_ptr<AnalyzeState>> analyze_states; |
| 108 | analyze_states.reserve(n: compression_functions.size()); |
| 109 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
| 110 | if (!compression_functions[i]) { |
| 111 | analyze_states.push_back(x: nullptr); |
| 112 | continue; |
| 113 | } |
| 114 | analyze_states.push_back(x: compression_functions[i]->init_analyze(col_data, col_data.type.InternalType())); |
| 115 | } |
| 116 | |
| 117 | // scan over all the segments and run the analyze step |
| 118 | ScanSegments(callback: [&](Vector &scan_vector, idx_t count) { |
| 119 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
| 120 | if (!compression_functions[i]) { |
| 121 | continue; |
| 122 | } |
| 123 | auto success = compression_functions[i]->analyze(*analyze_states[i], scan_vector, count); |
| 124 | if (!success) { |
| 125 | // could not use this compression function on this data set |
| 126 | // erase it |
| 127 | compression_functions[i] = nullptr; |
| 128 | analyze_states[i].reset(); |
| 129 | } |
| 130 | } |
| 131 | }); |
| 132 | |
| 133 | // now that we have passed over all the data, we need to figure out the best method |
| 134 | // we do this using the final_analyze method |
| 135 | unique_ptr<AnalyzeState> state; |
| 136 | compression_idx = DConstants::INVALID_INDEX; |
| 137 | idx_t best_score = NumericLimits<idx_t>::Maximum(); |
| 138 | for (idx_t i = 0; i < compression_functions.size(); i++) { |
| 139 | if (!compression_functions[i]) { |
| 140 | continue; |
| 141 | } |
| 142 | //! Check if the method type is the forced method (if forced is used) |
| 143 | bool forced_method_found = compression_functions[i]->type == forced_method; |
| 144 | auto score = compression_functions[i]->final_analyze(*analyze_states[i]); |
| 145 | |
| 146 | //! The finalize method can return this value from final_analyze to indicate it should not be used. |
| 147 | if (score == DConstants::INVALID_INDEX) { |
| 148 | continue; |
| 149 | } |
| 150 | |
| 151 | if (score < best_score || forced_method_found) { |
| 152 | compression_idx = i; |
| 153 | best_score = score; |
| 154 | state = std::move(analyze_states[i]); |
| 155 | } |
| 156 | //! If we have found the forced method, we're done |
| 157 | if (forced_method_found) { |
| 158 | break; |
| 159 | } |
| 160 | } |
| 161 | return state; |
| 162 | } |
| 163 | |
| 164 | void ColumnDataCheckpointer::WriteToDisk() { |
| 165 | // there were changes or transient segments |
| 166 | // we need to rewrite the column segments to disk |
| 167 | |
| 168 | // first we check the current segments |
| 169 | // if there are any persistent segments, we will mark their old block ids as modified |
| 170 | // since the segments will be rewritten their old on disk data is no longer required |
| 171 | auto &block_manager = col_data.GetBlockManager(); |
| 172 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
| 173 | auto segment = nodes[segment_idx].node.get(); |
| 174 | if (segment->segment_type == ColumnSegmentType::PERSISTENT) { |
| 175 | // persistent segment has updates: mark it as modified and rewrite the block with the merged updates |
| 176 | auto block_id = segment->GetBlockId(); |
| 177 | if (block_id != INVALID_BLOCK) { |
| 178 | block_manager.MarkBlockAsModified(block_id); |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | |
| 183 | // now we need to write our segment |
| 184 | // we will first run an analyze step that determines which compression function to use |
| 185 | idx_t compression_idx; |
| 186 | auto analyze_state = DetectBestCompressionMethod(compression_idx); |
| 187 | |
| 188 | if (!analyze_state) { |
| 189 | throw FatalException("No suitable compression/storage method found to store column" ); |
| 190 | } |
| 191 | |
| 192 | // now that we have analyzed the compression functions we can start writing to disk |
| 193 | auto best_function = compression_functions[compression_idx]; |
| 194 | auto compress_state = best_function->init_compression(*this, std::move(analyze_state)); |
| 195 | ScanSegments( |
| 196 | callback: [&](Vector &scan_vector, idx_t count) { best_function->compress(*compress_state, scan_vector, count); }); |
| 197 | best_function->compress_finalize(*compress_state); |
| 198 | |
| 199 | nodes.clear(); |
| 200 | } |
| 201 | |
| 202 | bool ColumnDataCheckpointer::HasChanges() { |
| 203 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
| 204 | auto segment = nodes[segment_idx].node.get(); |
| 205 | if (segment->segment_type == ColumnSegmentType::TRANSIENT) { |
| 206 | // transient segment: always need to write to disk |
| 207 | return true; |
| 208 | } else { |
| 209 | // persistent segment; check if there were any updates or deletions in this segment |
| 210 | idx_t start_row_idx = segment->start - row_group.start; |
| 211 | idx_t end_row_idx = start_row_idx + segment->count; |
| 212 | if (col_data.updates && col_data.updates->HasUpdates(start_row_idx, end_row_idx)) { |
| 213 | return true; |
| 214 | } |
| 215 | } |
| 216 | } |
| 217 | return false; |
| 218 | } |
| 219 | |
| 220 | void ColumnDataCheckpointer::WritePersistentSegments() { |
| 221 | // all segments are persistent and there are no updates |
| 222 | // we only need to write the metadata |
| 223 | for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { |
| 224 | auto segment = nodes[segment_idx].node.get(); |
| 225 | D_ASSERT(segment->segment_type == ColumnSegmentType::PERSISTENT); |
| 226 | |
| 227 | // set up the data pointer directly using the data from the persistent segment |
| 228 | DataPointer pointer(segment->stats.statistics.Copy()); |
| 229 | pointer.block_pointer.block_id = segment->GetBlockId(); |
| 230 | pointer.block_pointer.offset = segment->GetBlockOffset(); |
| 231 | pointer.row_start = segment->start; |
| 232 | pointer.tuple_count = segment->count; |
| 233 | pointer.compression_type = segment->function.get().type; |
| 234 | |
| 235 | // merge the persistent stats into the global column stats |
| 236 | state.global_stats->Merge(other: segment->stats.statistics); |
| 237 | |
| 238 | // directly append the current segment to the new tree |
| 239 | state.new_tree.AppendSegment(segment: std::move(nodes[segment_idx].node)); |
| 240 | |
| 241 | state.data_pointers.push_back(x: std::move(pointer)); |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | void ColumnDataCheckpointer::Checkpoint(vector<SegmentNode<ColumnSegment>> nodes) { |
| 246 | D_ASSERT(!nodes.empty()); |
| 247 | this->nodes = std::move(nodes); |
| 248 | // first check if any of the segments have changes |
| 249 | if (!HasChanges()) { |
| 250 | // no changes: only need to write the metadata for this column |
| 251 | WritePersistentSegments(); |
| 252 | } else { |
| 253 | // there are changes: rewrite the set of columns); |
| 254 | WriteToDisk(); |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | CompressionFunction &ColumnDataCheckpointer::GetCompressionFunction(CompressionType compression_type) { |
| 259 | auto &db = GetDatabase(); |
| 260 | auto &column_type = GetType(); |
| 261 | auto &config = DBConfig::GetConfig(db); |
| 262 | return *config.GetCompressionFunction(type: compression_type, data_type: column_type.InternalType()); |
| 263 | } |
| 264 | |
| 265 | } // namespace duckdb |
| 266 | |