1#include <Columns/ColumnString.h>
2#include <DataTypes/DataTypeString.h>
3#include <DataTypes/DataTypesNumber.h>
4#include <DataTypes/DataTypeDateTime.h>
5#include <DataStreams/OneBlockInputStream.h>
6#include <Storages/System/StorageSystemReplicas.h>
7#include <Storages/StorageReplicatedMergeTree.h>
8#include <Storages/VirtualColumnUtils.h>
9#include <Common/typeid_cast.h>
10#include <Databases/IDatabase.h>
11
12
13namespace DB
14{
15
16
17StorageSystemReplicas::StorageSystemReplicas(const std::string & name_)
18 : name(name_)
19{
20 setColumns(ColumnsDescription({
21 { "database", std::make_shared<DataTypeString>() },
22 { "table", std::make_shared<DataTypeString>() },
23 { "engine", std::make_shared<DataTypeString>() },
24 { "is_leader", std::make_shared<DataTypeUInt8>() },
25 { "can_become_leader", std::make_shared<DataTypeUInt8>() },
26 { "is_readonly", std::make_shared<DataTypeUInt8>() },
27 { "is_session_expired", std::make_shared<DataTypeUInt8>() },
28 { "future_parts", std::make_shared<DataTypeUInt32>() },
29 { "parts_to_check", std::make_shared<DataTypeUInt32>() },
30 { "zookeeper_path", std::make_shared<DataTypeString>() },
31 { "replica_name", std::make_shared<DataTypeString>() },
32 { "replica_path", std::make_shared<DataTypeString>() },
33 { "columns_version", std::make_shared<DataTypeInt32>() },
34 { "queue_size", std::make_shared<DataTypeUInt32>() },
35 { "inserts_in_queue", std::make_shared<DataTypeUInt32>() },
36 { "merges_in_queue", std::make_shared<DataTypeUInt32>() },
37 { "part_mutations_in_queue", std::make_shared<DataTypeUInt32>() },
38 { "queue_oldest_time", std::make_shared<DataTypeDateTime>() },
39 { "inserts_oldest_time", std::make_shared<DataTypeDateTime>() },
40 { "merges_oldest_time", std::make_shared<DataTypeDateTime>() },
41 { "part_mutations_oldest_time", std::make_shared<DataTypeDateTime>() },
42 { "oldest_part_to_get", std::make_shared<DataTypeString>() },
43 { "oldest_part_to_merge_to", std::make_shared<DataTypeString>() },
44 { "oldest_part_to_mutate_to", std::make_shared<DataTypeString>() },
45 { "log_max_index", std::make_shared<DataTypeUInt64>() },
46 { "log_pointer", std::make_shared<DataTypeUInt64>() },
47 { "last_queue_update", std::make_shared<DataTypeDateTime>() },
48 { "absolute_delay", std::make_shared<DataTypeUInt64>() },
49 { "total_replicas", std::make_shared<DataTypeUInt8>() },
50 { "active_replicas", std::make_shared<DataTypeUInt8>() },
51 }));
52}
53
54
55BlockInputStreams StorageSystemReplicas::read(
56 const Names & column_names,
57 const SelectQueryInfo & query_info,
58 const Context & context,
59 QueryProcessingStage::Enum /*processed_stage*/,
60 const size_t /*max_block_size*/,
61 const unsigned /*num_streams*/)
62{
63 check(column_names);
64
65 /// We collect a set of replicated tables.
66 std::map<String, std::map<String, StoragePtr>> replicated_tables;
67 for (const auto & db : context.getDatabases())
68 {
69 /// Lazy database can not contain replicated tables
70 if (db.second->getEngineName() == "Lazy")
71 continue;
72 if (context.hasDatabaseAccessRights(db.first))
73 {
74 for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
75 if (dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get()))
76 replicated_tables[db.first][iterator->name()] = iterator->table();
77 }
78 }
79
80
81 /// Do you need columns that require a ZooKeeper request to compute.
82 bool with_zk_fields = false;
83 for (const auto & column_name : column_names)
84 {
85 if ( column_name == "log_max_index"
86 || column_name == "log_pointer"
87 || column_name == "total_replicas"
88 || column_name == "active_replicas")
89 {
90 with_zk_fields = true;
91 break;
92 }
93 }
94
95 MutableColumnPtr col_database_mut = ColumnString::create();
96 MutableColumnPtr col_table_mut = ColumnString::create();
97 MutableColumnPtr col_engine_mut = ColumnString::create();
98
99 for (auto & db : replicated_tables)
100 {
101 for (auto & table : db.second)
102 {
103 col_database_mut->insert(db.first);
104 col_table_mut->insert(table.first);
105 col_engine_mut->insert(table.second->getName());
106 }
107 }
108
109 ColumnPtr col_database = std::move(col_database_mut);
110 ColumnPtr col_table = std::move(col_table_mut);
111 ColumnPtr col_engine = std::move(col_engine_mut);
112
113 /// Determine what tables are needed by the conditions in the query.
114 {
115 Block filtered_block
116 {
117 { col_database, std::make_shared<DataTypeString>(), "database" },
118 { col_table, std::make_shared<DataTypeString>(), "table" },
119 { col_engine, std::make_shared<DataTypeString>(), "engine" },
120 };
121
122 VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
123
124 if (!filtered_block.rows())
125 return BlockInputStreams();
126
127 col_database = filtered_block.getByName("database").column;
128 col_table = filtered_block.getByName("table").column;
129 col_engine = filtered_block.getByName("engine").column;
130 }
131
132 MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
133
134 for (size_t i = 0, size = col_database->size(); i < size; ++i)
135 {
136 StorageReplicatedMergeTree::Status status;
137 dynamic_cast<StorageReplicatedMergeTree &>(
138 *replicated_tables
139 [(*col_database)[i].safeGet<const String &>()]
140 [(*col_table)[i].safeGet<const String &>()]).getStatus(status, with_zk_fields);
141
142 size_t col_num = 3;
143 res_columns[col_num++]->insert(status.is_leader);
144 res_columns[col_num++]->insert(status.can_become_leader);
145 res_columns[col_num++]->insert(status.is_readonly);
146 res_columns[col_num++]->insert(status.is_session_expired);
147 res_columns[col_num++]->insert(status.queue.future_parts);
148 res_columns[col_num++]->insert(status.parts_to_check);
149 res_columns[col_num++]->insert(status.zookeeper_path);
150 res_columns[col_num++]->insert(status.replica_name);
151 res_columns[col_num++]->insert(status.replica_path);
152 res_columns[col_num++]->insert(status.columns_version);
153 res_columns[col_num++]->insert(status.queue.queue_size);
154 res_columns[col_num++]->insert(status.queue.inserts_in_queue);
155 res_columns[col_num++]->insert(status.queue.merges_in_queue);
156 res_columns[col_num++]->insert(status.queue.part_mutations_in_queue);
157 res_columns[col_num++]->insert(status.queue.queue_oldest_time);
158 res_columns[col_num++]->insert(status.queue.inserts_oldest_time);
159 res_columns[col_num++]->insert(status.queue.merges_oldest_time);
160 res_columns[col_num++]->insert(status.queue.part_mutations_oldest_time);
161 res_columns[col_num++]->insert(status.queue.oldest_part_to_get);
162 res_columns[col_num++]->insert(status.queue.oldest_part_to_merge_to);
163 res_columns[col_num++]->insert(status.queue.oldest_part_to_mutate_to);
164 res_columns[col_num++]->insert(status.log_max_index);
165 res_columns[col_num++]->insert(status.log_pointer);
166 res_columns[col_num++]->insert(status.queue.last_queue_update);
167 res_columns[col_num++]->insert(status.absolute_delay);
168 res_columns[col_num++]->insert(status.total_replicas);
169 res_columns[col_num++]->insert(status.active_replicas);
170 }
171
172 Block res = getSampleBlock().cloneEmpty();
173 size_t col_num = 0;
174 res.getByPosition(col_num++).column = col_database;
175 res.getByPosition(col_num++).column = col_table;
176 res.getByPosition(col_num++).column = col_engine;
177 size_t num_columns = res.columns();
178 while (col_num < num_columns)
179 {
180 res.getByPosition(col_num).column = std::move(res_columns[col_num]);
181 ++col_num;
182 }
183
184 return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res));
185}
186
187
188}
189