1 | #include <DataTypes/DataTypeString.h> |
2 | #include <DataTypes/DataTypesNumber.h> |
3 | #include <DataTypes/DataTypeDateTime.h> |
4 | #include <Storages/System/StorageSystemZooKeeper.h> |
5 | #include <Parsers/ASTSelectQuery.h> |
6 | #include <Parsers/ASTIdentifier.h> |
7 | #include <Parsers/ASTLiteral.h> |
8 | #include <Parsers/ASTExpressionList.h> |
9 | #include <Parsers/ASTFunction.h> |
10 | #include <Interpreters/Context.h> |
11 | #include <Common/ZooKeeper/ZooKeeper.h> |
12 | #include <Common/typeid_cast.h> |
13 | |
14 | |
15 | namespace DB |
16 | { |
17 | |
18 | namespace ErrorCodes |
19 | { |
20 | extern const int BAD_ARGUMENTS; |
21 | } |
22 | |
23 | |
24 | NamesAndTypesList StorageSystemZooKeeper::getNamesAndTypes() |
25 | { |
26 | return { |
27 | { "name" , std::make_shared<DataTypeString>() }, |
28 | { "value" , std::make_shared<DataTypeString>() }, |
29 | { "czxid" , std::make_shared<DataTypeInt64>() }, |
30 | { "mzxid" , std::make_shared<DataTypeInt64>() }, |
31 | { "ctime" , std::make_shared<DataTypeDateTime>() }, |
32 | { "mtime" , std::make_shared<DataTypeDateTime>() }, |
33 | { "version" , std::make_shared<DataTypeInt32>() }, |
34 | { "cversion" , std::make_shared<DataTypeInt32>() }, |
35 | { "aversion" , std::make_shared<DataTypeInt32>() }, |
36 | { "ephemeralOwner" , std::make_shared<DataTypeInt64>() }, |
37 | { "dataLength" , std::make_shared<DataTypeInt32>() }, |
38 | { "numChildren" , std::make_shared<DataTypeInt32>() }, |
39 | { "pzxid" , std::make_shared<DataTypeInt64>() }, |
40 | { "path" , std::make_shared<DataTypeString>() }, |
41 | }; |
42 | } |
43 | |
44 | |
45 | static bool (const IAST & elem, String & res) |
46 | { |
47 | const auto * function = elem.as<ASTFunction>(); |
48 | if (!function) |
49 | return false; |
50 | |
51 | if (function->name == "and" ) |
52 | { |
53 | for (size_t i = 0; i < function->arguments->children.size(); ++i) |
54 | if (extractPathImpl(*function->arguments->children[i], res)) |
55 | return true; |
56 | |
57 | return false; |
58 | } |
59 | |
60 | if (function->name == "equals" ) |
61 | { |
62 | const auto & args = function->arguments->as<ASTExpressionList &>(); |
63 | const IAST * value; |
64 | |
65 | if (args.children.size() != 2) |
66 | return false; |
67 | |
68 | const ASTIdentifier * ident; |
69 | if ((ident = args.children.at(0)->as<ASTIdentifier>())) |
70 | value = args.children.at(1).get(); |
71 | else if ((ident = args.children.at(1)->as<ASTIdentifier>())) |
72 | value = args.children.at(0).get(); |
73 | else |
74 | return false; |
75 | |
76 | if (ident->name != "path" ) |
77 | return false; |
78 | |
79 | const auto * literal = value->as<ASTLiteral>(); |
80 | if (!literal) |
81 | return false; |
82 | |
83 | if (literal->value.getType() != Field::Types::String) |
84 | return false; |
85 | |
86 | res = literal->value.safeGet<String>(); |
87 | return true; |
88 | } |
89 | |
90 | return false; |
91 | } |
92 | |
93 | |
94 | /** Retrieve from the query a condition of the form `path = 'path'`, from conjunctions in the WHERE clause. |
95 | */ |
96 | static String (const ASTPtr & query) |
97 | { |
98 | const auto & select = query->as<ASTSelectQuery &>(); |
99 | if (!select.where()) |
100 | return "" ; |
101 | |
102 | String res; |
103 | return extractPathImpl(*select.where(), res) ? res : "" ; |
104 | } |
105 | |
106 | |
107 | void StorageSystemZooKeeper::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const |
108 | { |
109 | String path = extractPath(query_info.query); |
110 | if (path.empty()) |
111 | throw Exception("SELECT from system.zookeeper table must contain condition like path = 'path' in WHERE clause." , ErrorCodes::BAD_ARGUMENTS); |
112 | |
113 | zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper(); |
114 | |
115 | /// In all cases except the root, path must not end with a slash. |
116 | String path_corrected = path; |
117 | if (path_corrected != "/" && path_corrected.back() == '/') |
118 | path_corrected.resize(path_corrected.size() - 1); |
119 | |
120 | zkutil::Strings nodes = zookeeper->getChildren(path_corrected); |
121 | |
122 | String path_part = path_corrected; |
123 | if (path_part == "/" ) |
124 | path_part.clear(); |
125 | |
126 | std::vector<std::future<Coordination::GetResponse>> futures; |
127 | futures.reserve(nodes.size()); |
128 | for (const String & node : nodes) |
129 | futures.push_back(zookeeper->asyncTryGet(path_part + '/' + node)); |
130 | |
131 | for (size_t i = 0, size = nodes.size(); i < size; ++i) |
132 | { |
133 | auto res = futures[i].get(); |
134 | if (res.error == Coordination::ZNONODE) |
135 | continue; /// Node was deleted meanwhile. |
136 | |
137 | const Coordination::Stat & stat = res.stat; |
138 | |
139 | size_t col_num = 0; |
140 | res_columns[col_num++]->insert(nodes[i]); |
141 | res_columns[col_num++]->insert(res.data); |
142 | res_columns[col_num++]->insert(stat.czxid); |
143 | res_columns[col_num++]->insert(stat.mzxid); |
144 | res_columns[col_num++]->insert(UInt64(stat.ctime / 1000)); |
145 | res_columns[col_num++]->insert(UInt64(stat.mtime / 1000)); |
146 | res_columns[col_num++]->insert(stat.version); |
147 | res_columns[col_num++]->insert(stat.cversion); |
148 | res_columns[col_num++]->insert(stat.aversion); |
149 | res_columns[col_num++]->insert(stat.ephemeralOwner); |
150 | res_columns[col_num++]->insert(stat.dataLength); |
151 | res_columns[col_num++]->insert(stat.numChildren); |
152 | res_columns[col_num++]->insert(stat.pzxid); |
153 | res_columns[col_num++]->insert(path); /// This is the original path. In order to process the request, condition in WHERE should be triggered. |
154 | } |
155 | } |
156 | |
157 | |
158 | } |
159 | |