1#pragma once
2
3#include <unordered_map>
4#include <IO/ReadBuffer.h>
5#include <IO/ReadBufferFromString.h>
6#include <IO/WriteBuffer.h>
7#include <IO/WriteBufferFromString.h>
8#include <IO/ReadHelpers.h>
9#include <IO/Operators.h>
10
11#include <Storages/MergeTree/MergeTreeDataPart.h>
12
13namespace DB
14{
15
16struct ReplicatedMergeTreeQuorumAddedParts
17{
18 using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
19 using PartitionIdToPartName = std::unordered_map<String, String>;
20
21 PartitionIdToPartName added_parts;
22
23 MergeTreeDataFormatVersion format_version;
24
25 ReplicatedMergeTreeQuorumAddedParts(const MergeTreeDataFormatVersion format_version_)
26 : format_version(format_version_)
27 {}
28
29 /// Write new parts in buffer with added parts.
30 void write(WriteBuffer & out)
31 {
32 out << "version: " << 2 << '\n';
33 out << "parts count: " << added_parts.size() << '\n';
34
35 for (const auto & part : added_parts)
36 out << part.first << '\t' << part.second << '\n';
37 }
38
39 PartitionIdToMaxBlock getMaxInsertedBlocks()
40 {
41 PartitionIdToMaxBlock max_added_blocks;
42
43 for (const auto & part : added_parts)
44 {
45 auto part_info = MergeTreePartInfo::fromPartName(part.second, format_version);
46 max_added_blocks[part.first] = part_info.max_block;
47 }
48
49 return max_added_blocks;
50 }
51
52 void read(ReadBuffer & in)
53 {
54 if (checkString("version: ", in))
55 {
56 size_t version;
57
58 readText(version, in);
59 assertChar('\n', in);
60
61 if (version == 2)
62 added_parts = read_v2(in);
63 }
64 else
65 added_parts = read_v1(in);
66 }
67
68 /// Read added bloks when node in ZooKeeper supports only one partition.
69 PartitionIdToPartName read_v1(ReadBuffer & in)
70 {
71 PartitionIdToPartName parts_in_quorum;
72
73 std::string part_name;
74
75 readText(part_name, in);
76
77 auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
78 parts_in_quorum[part_info.partition_id] = part_name;
79
80 return parts_in_quorum;
81 }
82
83 /// Read blocks when node in ZooKeeper suppors multiple partitions.
84 PartitionIdToPartName read_v2(ReadBuffer & in)
85 {
86 assertString("parts count: ", in);
87
88 PartitionIdToPartName parts_in_quorum;
89
90 uint64_t parts_count;
91 readText(parts_count, in);
92 assertChar('\n', in);
93
94 for (uint64_t i = 0; i < parts_count; ++i)
95 {
96 std::string partition_id;
97 std::string part_name;
98
99 readText(partition_id, in);
100 assertChar('\t', in);
101 readText(part_name, in);
102 assertChar('\n', in);
103
104 parts_in_quorum[partition_id] = part_name;
105 }
106 return parts_in_quorum;
107 }
108
109 void fromString(const std::string & str)
110 {
111 ReadBufferFromString in(str);
112 read(in);
113 }
114
115 std::string toString()
116 {
117 WriteBufferFromOwnString out;
118 write(out);
119 return out.str();
120 }
121
122};
123
124}
125