| 1 | #include <Storages/MergeTree/SimpleMergeSelector.h> |
| 2 | #include <Common/interpolate.h> |
| 3 | |
| 4 | #include <cmath> |
| 5 | #include <iostream> |
| 6 | |
| 7 | |
| 8 | namespace DB |
| 9 | { |
| 10 | |
| 11 | namespace |
| 12 | { |
| 13 | |
| 14 | /** Estimates best set of parts to merge within passed alternatives. |
| 15 | */ |
| 16 | struct Estimator |
| 17 | { |
| 18 | using Iterator = SimpleMergeSelector::PartsInPartition::const_iterator; |
| 19 | |
| 20 | void consider(Iterator begin, Iterator end, size_t sum_size, size_t size_prev_at_left, const SimpleMergeSelector::Settings & settings) |
| 21 | { |
| 22 | double current_score = score(end - begin, sum_size, settings.size_fixed_cost_to_add); |
| 23 | |
| 24 | if (settings.enable_heuristic_to_align_parts |
| 25 | && size_prev_at_left > sum_size * settings.heuristic_to_align_parts_min_ratio_of_sum_size_to_prev_part) |
| 26 | { |
| 27 | double difference = std::abs(log2(static_cast<double>(sum_size) / size_prev_at_left)); |
| 28 | if (difference < settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two) |
| 29 | current_score *= interpolateLinear(settings.heuristic_to_align_parts_max_score_adjustment, 1, |
| 30 | difference / settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two); |
| 31 | } |
| 32 | |
| 33 | if (settings.enable_heuristic_to_remove_small_parts_at_right) |
| 34 | while (end >= begin + 3 && (end - 1)->size < settings.heuristic_to_remove_small_parts_at_right_max_ratio * sum_size) |
| 35 | --end; |
| 36 | |
| 37 | if (!min_score || current_score < min_score) |
| 38 | { |
| 39 | min_score = current_score; |
| 40 | best_begin = begin; |
| 41 | best_end = end; |
| 42 | } |
| 43 | } |
| 44 | |
| 45 | SimpleMergeSelector::PartsInPartition getBest() |
| 46 | { |
| 47 | return SimpleMergeSelector::PartsInPartition(best_begin, best_end); |
| 48 | } |
| 49 | |
| 50 | static double score(double count, double sum_size, double sum_size_fixed_cost) |
| 51 | { |
| 52 | /** Consider we have two alternative ranges of data parts to merge. |
| 53 | * Assume time to merge a range is proportional to sum size of its parts. |
| 54 | * |
| 55 | * Cost of query execution is proportional to total number of data parts in a moment of time. |
| 56 | * Let define our target: to minimize average (in time) total number of data parts. |
| 57 | * |
| 58 | * Let calculate integral of total number of parts, if we are going to do merge of one or another range. |
| 59 | * It must be lower, and thus we decide, what range is better to merge. |
| 60 | * |
| 61 | * The integral is lower iff the following formula is lower: |
| 62 | * |
| 63 | * sum_size / (count - 1) |
| 64 | * |
| 65 | * But we have some tunes to prefer longer ranges. |
| 66 | */ |
| 67 | return (sum_size + sum_size_fixed_cost * count) / (count - 1.9); |
| 68 | } |
| 69 | |
| 70 | double min_score = 0; |
| 71 | Iterator best_begin {}; |
| 72 | Iterator best_end {}; |
| 73 | }; |
| 74 | |
| 75 | |
| 76 | /** |
| 77 | * 1 _____ |
| 78 | * / |
| 79 | * 0_____/ |
| 80 | * ^ ^ |
| 81 | * min max |
| 82 | */ |
| 83 | double mapPiecewiseLinearToUnit(double value, double min, double max) |
| 84 | { |
| 85 | return value <= min ? 0 |
| 86 | : (value >= max ? 1 |
| 87 | : ((value - min) / (max - min))); |
| 88 | } |
| 89 | |
| 90 | |
| 91 | /** Is allowed to merge parts in range with specific properties. |
| 92 | */ |
| 93 | bool allow( |
| 94 | double sum_size, |
| 95 | double max_size, |
| 96 | double min_age, |
| 97 | double range_size, |
| 98 | double partition_size, |
| 99 | const SimpleMergeSelector::Settings & settings) |
| 100 | { |
| 101 | // std::cerr << "sum_size: " << sum_size << "\n"; |
| 102 | |
| 103 | /// Map size to 0..1 using logarithmic scale |
| 104 | double size_normalized = mapPiecewiseLinearToUnit(log1p(sum_size), log1p(settings.min_size_to_lower_base), log1p(settings.max_size_to_lower_base)); |
| 105 | |
| 106 | // std::cerr << "size_normalized: " << size_normalized << "\n"; |
| 107 | |
| 108 | /// Calculate boundaries for age |
| 109 | double min_age_to_lower_base = interpolateLinear(settings.min_age_to_lower_base_at_min_size, settings.min_age_to_lower_base_at_max_size, size_normalized); |
| 110 | double max_age_to_lower_base = interpolateLinear(settings.max_age_to_lower_base_at_min_size, settings.max_age_to_lower_base_at_max_size, size_normalized); |
| 111 | |
| 112 | // std::cerr << "min_age_to_lower_base: " << min_age_to_lower_base << "\n"; |
| 113 | // std::cerr << "max_age_to_lower_base: " << max_age_to_lower_base << "\n"; |
| 114 | |
| 115 | /// Map age to 0..1 |
| 116 | double age_normalized = mapPiecewiseLinearToUnit(min_age, min_age_to_lower_base, max_age_to_lower_base); |
| 117 | |
| 118 | // std::cerr << "age: " << min_age << "\n"; |
| 119 | // std::cerr << "age_normalized: " << age_normalized << "\n"; |
| 120 | |
| 121 | /// Map partition_size to 0..1 |
| 122 | double num_parts_normalized = mapPiecewiseLinearToUnit(partition_size, settings.min_parts_to_lower_base, settings.max_parts_to_lower_base); |
| 123 | |
| 124 | // std::cerr << "partition_size: " << partition_size << "\n"; |
| 125 | // std::cerr << "num_parts_normalized: " << num_parts_normalized << "\n"; |
| 126 | |
| 127 | double combined_ratio = std::min(1.0, age_normalized + num_parts_normalized); |
| 128 | |
| 129 | // std::cerr << "combined_ratio: " << combined_ratio << "\n"; |
| 130 | |
| 131 | double lowered_base = interpolateLinear(settings.base, 2.0, combined_ratio); |
| 132 | |
| 133 | // std::cerr << "------- lowered_base: " << lowered_base << "\n"; |
| 134 | |
| 135 | return (sum_size + range_size * settings.size_fixed_cost_to_add) / (max_size + settings.size_fixed_cost_to_add) >= lowered_base; |
| 136 | } |
| 137 | |
| 138 | |
| 139 | void selectWithinPartition( |
| 140 | const SimpleMergeSelector::PartsInPartition & parts, |
| 141 | const size_t max_total_size_to_merge, |
| 142 | Estimator & estimator, |
| 143 | const SimpleMergeSelector::Settings & settings) |
| 144 | { |
| 145 | size_t parts_count = parts.size(); |
| 146 | if (parts_count <= 1) |
| 147 | return; |
| 148 | |
| 149 | for (size_t begin = 0; begin < parts_count; ++begin) |
| 150 | { |
| 151 | /// If too many parts, select only from first, to avoid complexity. |
| 152 | if (begin > 1000) |
| 153 | break; |
| 154 | |
| 155 | size_t sum_size = parts[begin].size; |
| 156 | size_t max_size = parts[begin].size; |
| 157 | size_t min_age = parts[begin].age; |
| 158 | |
| 159 | for (size_t end = begin + 2; end <= parts_count; ++end) |
| 160 | { |
| 161 | if (settings.max_parts_to_merge_at_once && end - begin > settings.max_parts_to_merge_at_once) |
| 162 | break; |
| 163 | |
| 164 | size_t cur_size = parts[end - 1].size; |
| 165 | size_t cur_age = parts[end - 1].age; |
| 166 | |
| 167 | sum_size += cur_size; |
| 168 | max_size = std::max(max_size, cur_size); |
| 169 | min_age = std::min(min_age, cur_age); |
| 170 | |
| 171 | if (max_total_size_to_merge && sum_size > max_total_size_to_merge) |
| 172 | break; |
| 173 | |
| 174 | if (allow(sum_size, max_size, min_age, end - begin, parts_count, settings)) |
| 175 | estimator.consider( |
| 176 | parts.begin() + begin, |
| 177 | parts.begin() + end, |
| 178 | sum_size, |
| 179 | begin == 0 ? 0 : parts[begin - 1].size, |
| 180 | settings); |
| 181 | } |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | } |
| 186 | |
| 187 | |
| 188 | SimpleMergeSelector::PartsInPartition SimpleMergeSelector::select( |
| 189 | const Partitions & partitions, |
| 190 | const size_t max_total_size_to_merge) |
| 191 | { |
| 192 | Estimator estimator; |
| 193 | |
| 194 | for (const auto & partition : partitions) |
| 195 | selectWithinPartition(partition, max_total_size_to_merge, estimator, settings); |
| 196 | |
| 197 | return estimator.getBest(); |
| 198 | } |
| 199 | |
| 200 | } |
| 201 | |