1#include <Interpreters/TablesStatus.h>
2#include <IO/ReadBuffer.h>
3#include <IO/WriteBuffer.h>
4#include <IO/ReadHelpers.h>
5#include <IO/WriteHelpers.h>
6
7namespace DB
8{
9
10void TableStatus::write(WriteBuffer & out) const
11{
12 writeBinary(is_replicated, out);
13 if (is_replicated)
14 {
15 writeVarUInt(absolute_delay, out);
16 }
17}
18
19void TableStatus::read(ReadBuffer & in)
20{
21 absolute_delay = 0;
22 readBinary(is_replicated, in);
23 if (is_replicated)
24 {
25 readVarUInt(absolute_delay, in);
26 }
27}
28
29void TablesStatusRequest::write(WriteBuffer & out, UInt64 server_protocol_revision) const
30{
31 if (server_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
32 throw Exception(
33 "Logical error: method TablesStatusRequest::write is called for unsupported server revision",
34 ErrorCodes::LOGICAL_ERROR);
35
36 writeVarUInt(tables.size(), out);
37 for (const auto & table_name : tables)
38 {
39 writeBinary(table_name.database, out);
40 writeBinary(table_name.table, out);
41 }
42}
43
44void TablesStatusRequest::read(ReadBuffer & in, UInt64 client_protocol_revision)
45{
46 if (client_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
47 throw Exception(
48 "method TablesStatusRequest::read is called for unsupported client revision",
49 ErrorCodes::LOGICAL_ERROR);
50
51 size_t size = 0;
52 readVarUInt(size, in);
53
54 if (size > DEFAULT_MAX_STRING_SIZE)
55 throw Exception("Too large collection size.", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
56
57 for (size_t i = 0; i < size; ++i)
58 {
59 QualifiedTableName table_name;
60 readBinary(table_name.database, in);
61 readBinary(table_name.table, in);
62 tables.emplace(std::move(table_name));
63 }
64}
65
66void TablesStatusResponse::write(WriteBuffer & out, UInt64 client_protocol_revision) const
67{
68 if (client_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
69 throw Exception(
70 "method TablesStatusResponse::write is called for unsupported client revision",
71 ErrorCodes::LOGICAL_ERROR);
72
73 writeVarUInt(table_states_by_id.size(), out);
74 for (const auto & kv: table_states_by_id)
75 {
76 const QualifiedTableName & table_name = kv.first;
77 writeBinary(table_name.database, out);
78 writeBinary(table_name.table, out);
79
80 const TableStatus & status = kv.second;
81 status.write(out);
82 }
83}
84
85void TablesStatusResponse::read(ReadBuffer & in, UInt64 server_protocol_revision)
86{
87 if (server_protocol_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
88 throw Exception(
89 "method TablesStatusResponse::read is called for unsupported server revision",
90 ErrorCodes::LOGICAL_ERROR);
91
92 size_t size = 0;
93 readVarUInt(size, in);
94
95 if (size > DEFAULT_MAX_STRING_SIZE)
96 throw Exception("Too large collection size.", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
97
98 for (size_t i = 0; i < size; ++i)
99 {
100 QualifiedTableName table_name;
101 readBinary(table_name.database, in);
102 readBinary(table_name.table, in);
103
104 TableStatus status;
105 status.read(in);
106 table_states_by_id.emplace(std::move(table_name), std::move(status));
107 }
108}
109
110}
111