1#include <Storages/MergeTree/SimpleMergeSelector.h>
2#include <Common/interpolate.h>
3
4#include <cmath>
5#include <iostream>
6
7
8namespace DB
9{
10
11namespace
12{
13
14/** Estimates best set of parts to merge within passed alternatives.
15 */
16struct Estimator
17{
18 using Iterator = SimpleMergeSelector::PartsInPartition::const_iterator;
19
20 void consider(Iterator begin, Iterator end, size_t sum_size, size_t size_prev_at_left, const SimpleMergeSelector::Settings & settings)
21 {
22 double current_score = score(end - begin, sum_size, settings.size_fixed_cost_to_add);
23
24 if (settings.enable_heuristic_to_align_parts
25 && size_prev_at_left > sum_size * settings.heuristic_to_align_parts_min_ratio_of_sum_size_to_prev_part)
26 {
27 double difference = std::abs(log2(static_cast<double>(sum_size) / size_prev_at_left));
28 if (difference < settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two)
29 current_score *= interpolateLinear(settings.heuristic_to_align_parts_max_score_adjustment, 1,
30 difference / settings.heuristic_to_align_parts_max_absolute_difference_in_powers_of_two);
31 }
32
33 if (settings.enable_heuristic_to_remove_small_parts_at_right)
34 while (end >= begin + 3 && (end - 1)->size < settings.heuristic_to_remove_small_parts_at_right_max_ratio * sum_size)
35 --end;
36
37 if (!min_score || current_score < min_score)
38 {
39 min_score = current_score;
40 best_begin = begin;
41 best_end = end;
42 }
43 }
44
45 SimpleMergeSelector::PartsInPartition getBest()
46 {
47 return SimpleMergeSelector::PartsInPartition(best_begin, best_end);
48 }
49
50 static double score(double count, double sum_size, double sum_size_fixed_cost)
51 {
52 /** Consider we have two alternative ranges of data parts to merge.
53 * Assume time to merge a range is proportional to sum size of its parts.
54 *
55 * Cost of query execution is proportional to total number of data parts in a moment of time.
56 * Let define our target: to minimize average (in time) total number of data parts.
57 *
58 * Let calculate integral of total number of parts, if we are going to do merge of one or another range.
59 * It must be lower, and thus we decide, what range is better to merge.
60 *
61 * The integral is lower iff the following formula is lower:
62 *
63 * sum_size / (count - 1)
64 *
65 * But we have some tunes to prefer longer ranges.
66 */
67 return (sum_size + sum_size_fixed_cost * count) / (count - 1.9);
68 }
69
70 double min_score = 0;
71 Iterator best_begin {};
72 Iterator best_end {};
73};
74
75
76/**
77 * 1 _____
78 * /
79 * 0_____/
80 * ^ ^
81 * min max
82 */
83double mapPiecewiseLinearToUnit(double value, double min, double max)
84{
85 return value <= min ? 0
86 : (value >= max ? 1
87 : ((value - min) / (max - min)));
88}
89
90
91/** Is allowed to merge parts in range with specific properties.
92 */
93bool allow(
94 double sum_size,
95 double max_size,
96 double min_age,
97 double range_size,
98 double partition_size,
99 const SimpleMergeSelector::Settings & settings)
100{
101// std::cerr << "sum_size: " << sum_size << "\n";
102
103 /// Map size to 0..1 using logarithmic scale
104 double size_normalized = mapPiecewiseLinearToUnit(log1p(sum_size), log1p(settings.min_size_to_lower_base), log1p(settings.max_size_to_lower_base));
105
106// std::cerr << "size_normalized: " << size_normalized << "\n";
107
108 /// Calculate boundaries for age
109 double min_age_to_lower_base = interpolateLinear(settings.min_age_to_lower_base_at_min_size, settings.min_age_to_lower_base_at_max_size, size_normalized);
110 double max_age_to_lower_base = interpolateLinear(settings.max_age_to_lower_base_at_min_size, settings.max_age_to_lower_base_at_max_size, size_normalized);
111
112// std::cerr << "min_age_to_lower_base: " << min_age_to_lower_base << "\n";
113// std::cerr << "max_age_to_lower_base: " << max_age_to_lower_base << "\n";
114
115 /// Map age to 0..1
116 double age_normalized = mapPiecewiseLinearToUnit(min_age, min_age_to_lower_base, max_age_to_lower_base);
117
118// std::cerr << "age: " << min_age << "\n";
119// std::cerr << "age_normalized: " << age_normalized << "\n";
120
121 /// Map partition_size to 0..1
122 double num_parts_normalized = mapPiecewiseLinearToUnit(partition_size, settings.min_parts_to_lower_base, settings.max_parts_to_lower_base);
123
124// std::cerr << "partition_size: " << partition_size << "\n";
125// std::cerr << "num_parts_normalized: " << num_parts_normalized << "\n";
126
127 double combined_ratio = std::min(1.0, age_normalized + num_parts_normalized);
128
129// std::cerr << "combined_ratio: " << combined_ratio << "\n";
130
131 double lowered_base = interpolateLinear(settings.base, 2.0, combined_ratio);
132
133// std::cerr << "------- lowered_base: " << lowered_base << "\n";
134
135 return (sum_size + range_size * settings.size_fixed_cost_to_add) / (max_size + settings.size_fixed_cost_to_add) >= lowered_base;
136}
137
138
139void selectWithinPartition(
140 const SimpleMergeSelector::PartsInPartition & parts,
141 const size_t max_total_size_to_merge,
142 Estimator & estimator,
143 const SimpleMergeSelector::Settings & settings)
144{
145 size_t parts_count = parts.size();
146 if (parts_count <= 1)
147 return;
148
149 for (size_t begin = 0; begin < parts_count; ++begin)
150 {
151 /// If too many parts, select only from first, to avoid complexity.
152 if (begin > 1000)
153 break;
154
155 size_t sum_size = parts[begin].size;
156 size_t max_size = parts[begin].size;
157 size_t min_age = parts[begin].age;
158
159 for (size_t end = begin + 2; end <= parts_count; ++end)
160 {
161 if (settings.max_parts_to_merge_at_once && end - begin > settings.max_parts_to_merge_at_once)
162 break;
163
164 size_t cur_size = parts[end - 1].size;
165 size_t cur_age = parts[end - 1].age;
166
167 sum_size += cur_size;
168 max_size = std::max(max_size, cur_size);
169 min_age = std::min(min_age, cur_age);
170
171 if (max_total_size_to_merge && sum_size > max_total_size_to_merge)
172 break;
173
174 if (allow(sum_size, max_size, min_age, end - begin, parts_count, settings))
175 estimator.consider(
176 parts.begin() + begin,
177 parts.begin() + end,
178 sum_size,
179 begin == 0 ? 0 : parts[begin - 1].size,
180 settings);
181 }
182 }
183}
184
185}
186
187
188SimpleMergeSelector::PartsInPartition SimpleMergeSelector::select(
189 const Partitions & partitions,
190 const size_t max_total_size_to_merge)
191{
192 Estimator estimator;
193
194 for (const auto & partition : partitions)
195 selectWithinPartition(partition, max_total_size_to_merge, estimator, settings);
196
197 return estimator.getBest();
198}
199
200}
201