1 | #include <Interpreters/TablesStatus.h> |
2 | #include <IO/ReadBuffer.h> |
3 | #include <IO/WriteBuffer.h> |
4 | #include <IO/ReadHelpers.h> |
5 | #include <IO/WriteHelpers.h> |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | void TableStatus::write(WriteBuffer & out) const |
11 | { |
12 | writeBinary(is_replicated, out); |
13 | if (is_replicated) |
14 | { |
15 | writeVarUInt(absolute_delay, out); |
16 | } |
17 | } |
18 | |
19 | void TableStatus::read(ReadBuffer & in) |
20 | { |
21 | absolute_delay = 0; |
22 | readBinary(is_replicated, in); |
23 | if (is_replicated) |
24 | { |
25 | readVarUInt(absolute_delay, in); |
26 | } |
27 | } |
28 | |
29 | void TablesStatusRequest::write(WriteBuffer & out, UInt64 server_protocol_revision) const |
30 | { |
31 | if (server_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) |
32 | throw Exception( |
33 | "Logical error: method TablesStatusRequest::write is called for unsupported server revision" , |
34 | ErrorCodes::LOGICAL_ERROR); |
35 | |
36 | writeVarUInt(tables.size(), out); |
37 | for (const auto & table_name : tables) |
38 | { |
39 | writeBinary(table_name.database, out); |
40 | writeBinary(table_name.table, out); |
41 | } |
42 | } |
43 | |
44 | void TablesStatusRequest::read(ReadBuffer & in, UInt64 client_protocol_revision) |
45 | { |
46 | if (client_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) |
47 | throw Exception( |
48 | "method TablesStatusRequest::read is called for unsupported client revision" , |
49 | ErrorCodes::LOGICAL_ERROR); |
50 | |
51 | size_t size = 0; |
52 | readVarUInt(size, in); |
53 | |
54 | if (size > DEFAULT_MAX_STRING_SIZE) |
55 | throw Exception("Too large collection size." , ErrorCodes::TOO_LARGE_ARRAY_SIZE); |
56 | |
57 | for (size_t i = 0; i < size; ++i) |
58 | { |
59 | QualifiedTableName table_name; |
60 | readBinary(table_name.database, in); |
61 | readBinary(table_name.table, in); |
62 | tables.emplace(std::move(table_name)); |
63 | } |
64 | } |
65 | |
66 | void TablesStatusResponse::write(WriteBuffer & out, UInt64 client_protocol_revision) const |
67 | { |
68 | if (client_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) |
69 | throw Exception( |
70 | "method TablesStatusResponse::write is called for unsupported client revision" , |
71 | ErrorCodes::LOGICAL_ERROR); |
72 | |
73 | writeVarUInt(table_states_by_id.size(), out); |
74 | for (const auto & kv: table_states_by_id) |
75 | { |
76 | const QualifiedTableName & table_name = kv.first; |
77 | writeBinary(table_name.database, out); |
78 | writeBinary(table_name.table, out); |
79 | |
80 | const TableStatus & status = kv.second; |
81 | status.write(out); |
82 | } |
83 | } |
84 | |
85 | void TablesStatusResponse::read(ReadBuffer & in, UInt64 server_protocol_revision) |
86 | { |
87 | if (server_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) |
88 | throw Exception( |
89 | "method TablesStatusResponse::read is called for unsupported server revision" , |
90 | ErrorCodes::LOGICAL_ERROR); |
91 | |
92 | size_t size = 0; |
93 | readVarUInt(size, in); |
94 | |
95 | if (size > DEFAULT_MAX_STRING_SIZE) |
96 | throw Exception("Too large collection size." , ErrorCodes::TOO_LARGE_ARRAY_SIZE); |
97 | |
98 | for (size_t i = 0; i < size; ++i) |
99 | { |
100 | QualifiedTableName table_name; |
101 | readBinary(table_name.database, in); |
102 | readBinary(table_name.table, in); |
103 | |
104 | TableStatus status; |
105 | status.read(in); |
106 | table_states_by_id.emplace(std::move(table_name), std::move(status)); |
107 | } |
108 | } |
109 | |
110 | } |
111 | |