1#include <list>
2#include <iostream>
3#include <IO/ReadBufferFromFileDescriptor.h>
4#include <IO/Operators.h>
5#include <Storages/MergeTree/SimpleMergeSelector.h>
6#include <Storages/MergeTree/LevelMergeSelector.h>
7#include <Common/formatReadable.h>
8
9
10/** This program tests merge-selecting algorithm.
11 * Usage:
12clickhouse-client --query="
13 SELECT bytes, now() - modification_time, level, name
14 FROM system.parts
15 WHERE table = 'visits' AND active AND partition = '201610'" | ./merge_selector2
16 */
17
18int main(int, char **)
19{
20 using namespace DB;
21
22 IMergeSelector::Partitions partitions(1);
23 IMergeSelector::PartsInPartition & parts = partitions.back();
24
25/* SimpleMergeSelector::Settings settings;
26 SimpleMergeSelector selector(settings);*/
27
28 LevelMergeSelector::Settings settings;
29 LevelMergeSelector selector(settings);
30
31 ReadBufferFromFileDescriptor in(STDIN_FILENO);
32
33 size_t sum_parts_size = 0;
34
35 std::list<std::string> part_names;
36
37 while (!in.eof())
38 {
39 part_names.emplace_back();
40 IMergeSelector::Part part;
41 in >> part.size >> "\t" >> part.age >> "\t" >> part.level >> "\t" >> part_names.back() >> "\n";
42 part.data = part_names.back().data();
43// part.level = 0;
44 parts.emplace_back(part);
45 sum_parts_size += part.size;
46 }
47
48 size_t total_size_merged = 0;
49 size_t sum_size_written = sum_parts_size;
50 size_t num_merges = 1;
51 size_t age_passed = 0;
52
53 while (parts.size() > 1)
54 {
55 IMergeSelector::PartsInPartition selected_parts = selector.select(partitions, 100ULL * 1024 * 1024 * 1024);
56
57 if (selected_parts.empty())
58 {
59 ++age_passed;
60 for (auto & part : parts)
61 ++part.age;
62
63 if (age_passed > 60 * 86400)
64 break;
65
66 if (age_passed % 86400 == 0)
67 std::cout << ".";
68
69 continue;
70 }
71 std::cout << "Time passed: " << age_passed << '\n';
72
73 size_t sum_merged_size = 0;
74 size_t start_index = 0;
75 size_t max_level = 0;
76 bool in_range = false;
77
78 for (size_t i = 0, size = parts.size(); i < size; ++i)
79 {
80 if (parts[i].data == selected_parts.front().data)
81 {
82 std::cout << "\033[1;31m";
83 in_range = true;
84 start_index = i;
85 }
86
87 std::cout << (parts[i].size / 1024) << "_" << parts[i].level;
88 if (in_range)
89 {
90 sum_merged_size += parts[i].size;
91 if (parts[i].level > max_level)
92 max_level = parts[i].level;
93 }
94
95 if (parts[i].data == selected_parts.back().data)
96 {
97 in_range = false;
98 std::cout << "\033[0m";
99 }
100
101 std::cout << " ";
102 }
103
104 parts[start_index].size = sum_merged_size;
105 parts[start_index].level = max_level + 1;
106 parts[start_index].age = 0;
107 parts.erase(parts.begin() + start_index + 1, parts.begin() + start_index + selected_parts.size());
108
109 std::cout << '\n';
110
111 sum_size_written += sum_merged_size;
112 total_size_merged += sum_merged_size;
113
114 ++num_merges;
115
116 double time_to_merge = sum_merged_size / (1048576 * 10.0);
117
118 age_passed += time_to_merge;
119 for (auto & part : parts)
120 part.age += time_to_merge;
121
122 std::cout << "Time passed: " << age_passed << ", num parts: " << parts.size()
123 << ", merged " << selected_parts.size() << " parts, " << formatReadableSizeWithBinarySuffix(sum_merged_size)
124 << ", total written: " << formatReadableSizeWithBinarySuffix(total_size_merged) << '\n';
125 }
126
127 std::cout << std::fixed << std::setprecision(2)
128 << "Write amplification: " << static_cast<double>(sum_size_written) / sum_parts_size << "\n"
129 << "Num parts: " << part_names.size() << "\n"
130 << "Num merges: " << num_merges << "\n"
131 << "Tree depth: " << parts.front().level << "\n"
132 ;
133
134 return 0;
135}
136