1 | #pragma once |
2 | |
3 | #include <unordered_map> |
4 | #include <IO/ReadBuffer.h> |
5 | #include <IO/ReadBufferFromString.h> |
6 | #include <IO/WriteBuffer.h> |
7 | #include <IO/WriteBufferFromString.h> |
8 | #include <IO/ReadHelpers.h> |
9 | #include <IO/Operators.h> |
10 | |
11 | #include <Storages/MergeTree/MergeTreeDataPart.h> |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | struct ReplicatedMergeTreeQuorumAddedParts |
17 | { |
18 | using PartitionIdToMaxBlock = std::unordered_map<String, Int64>; |
19 | using PartitionIdToPartName = std::unordered_map<String, String>; |
20 | |
21 | PartitionIdToPartName added_parts; |
22 | |
23 | MergeTreeDataFormatVersion format_version; |
24 | |
25 | ReplicatedMergeTreeQuorumAddedParts(const MergeTreeDataFormatVersion format_version_) |
26 | : format_version(format_version_) |
27 | {} |
28 | |
29 | /// Write new parts in buffer with added parts. |
30 | void write(WriteBuffer & out) |
31 | { |
32 | out << "version: " << 2 << '\n'; |
33 | out << "parts count: " << added_parts.size() << '\n'; |
34 | |
35 | for (const auto & part : added_parts) |
36 | out << part.first << '\t' << part.second << '\n'; |
37 | } |
38 | |
39 | PartitionIdToMaxBlock getMaxInsertedBlocks() |
40 | { |
41 | PartitionIdToMaxBlock max_added_blocks; |
42 | |
43 | for (const auto & part : added_parts) |
44 | { |
45 | auto part_info = MergeTreePartInfo::fromPartName(part.second, format_version); |
46 | max_added_blocks[part.first] = part_info.max_block; |
47 | } |
48 | |
49 | return max_added_blocks; |
50 | } |
51 | |
52 | void read(ReadBuffer & in) |
53 | { |
54 | if (checkString("version: " , in)) |
55 | { |
56 | size_t version; |
57 | |
58 | readText(version, in); |
59 | assertChar('\n', in); |
60 | |
61 | if (version == 2) |
62 | added_parts = read_v2(in); |
63 | } |
64 | else |
65 | added_parts = read_v1(in); |
66 | } |
67 | |
68 | /// Read added bloks when node in ZooKeeper supports only one partition. |
69 | PartitionIdToPartName read_v1(ReadBuffer & in) |
70 | { |
71 | PartitionIdToPartName parts_in_quorum; |
72 | |
73 | std::string part_name; |
74 | |
75 | readText(part_name, in); |
76 | |
77 | auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); |
78 | parts_in_quorum[part_info.partition_id] = part_name; |
79 | |
80 | return parts_in_quorum; |
81 | } |
82 | |
83 | /// Read blocks when node in ZooKeeper suppors multiple partitions. |
84 | PartitionIdToPartName read_v2(ReadBuffer & in) |
85 | { |
86 | assertString("parts count: " , in); |
87 | |
88 | PartitionIdToPartName parts_in_quorum; |
89 | |
90 | uint64_t parts_count; |
91 | readText(parts_count, in); |
92 | assertChar('\n', in); |
93 | |
94 | for (uint64_t i = 0; i < parts_count; ++i) |
95 | { |
96 | std::string partition_id; |
97 | std::string part_name; |
98 | |
99 | readText(partition_id, in); |
100 | assertChar('\t', in); |
101 | readText(part_name, in); |
102 | assertChar('\n', in); |
103 | |
104 | parts_in_quorum[partition_id] = part_name; |
105 | } |
106 | return parts_in_quorum; |
107 | } |
108 | |
109 | void fromString(const std::string & str) |
110 | { |
111 | ReadBufferFromString in(str); |
112 | read(in); |
113 | } |
114 | |
115 | std::string toString() |
116 | { |
117 | WriteBufferFromOwnString out; |
118 | write(out); |
119 | return out.str(); |
120 | } |
121 | |
122 | }; |
123 | |
124 | } |
125 | |