1 | #include "TableFunctionRemote.h" |
2 | |
3 | #include <Storages/getStructureOfRemoteTable.h> |
4 | #include <Storages/StorageDistributed.h> |
5 | #include <Parsers/ASTIdentifier.h> |
6 | #include <Parsers/ASTLiteral.h> |
7 | #include <Parsers/ASTFunction.h> |
8 | #include <Parsers/ASTExpressionList.h> |
9 | #include <Interpreters/evaluateConstantExpression.h> |
10 | #include <Interpreters/Cluster.h> |
11 | #include <Interpreters/Context.h> |
12 | #include <Interpreters/IdentifierSemantic.h> |
13 | #include <Common/typeid_cast.h> |
14 | #include <Common/parseRemoteDescription.h> |
15 | #include <TableFunctions/TableFunctionFactory.h> |
16 | #include <Core/Defines.h> |
17 | #include "registerTableFunctions.h" |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | namespace ErrorCodes |
24 | { |
25 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
26 | extern const int BAD_ARGUMENTS; |
27 | } |
28 | |
29 | StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const |
30 | { |
31 | ASTs & args_func = ast_function->children; |
32 | |
33 | if (args_func.size() != 1) |
34 | throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
35 | |
36 | ASTs & args = args_func.at(0)->children; |
37 | |
38 | const size_t max_args = is_cluster_function ? 3 : 5; |
39 | if (args.size() < 2 || args.size() > max_args) |
40 | throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
41 | |
42 | String cluster_name; |
43 | String cluster_description; |
44 | String remote_database; |
45 | String remote_table; |
46 | ASTPtr remote_table_function_ptr; |
47 | String username; |
48 | String password; |
49 | |
50 | size_t arg_num = 0; |
51 | |
52 | auto getStringLiteral = [](const IAST & node, const char * description) |
53 | { |
54 | const auto * lit = node.as<ASTLiteral>(); |
55 | if (!lit) |
56 | throw Exception(description + String(" must be string literal (in single quotes)." ), ErrorCodes::BAD_ARGUMENTS); |
57 | |
58 | if (lit->value.getType() != Field::Types::String) |
59 | throw Exception(description + String(" must be string literal (in single quotes)." ), ErrorCodes::BAD_ARGUMENTS); |
60 | |
61 | return safeGet<const String &>(lit->value); |
62 | }; |
63 | |
64 | if (is_cluster_function) |
65 | { |
66 | ASTPtr ast_name = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context); |
67 | cluster_name = ast_name->as<ASTLiteral &>().value.safeGet<const String &>(); |
68 | } |
69 | else |
70 | { |
71 | if (!tryGetIdentifierNameInto(args[arg_num], cluster_name)) |
72 | cluster_description = getStringLiteral(*args[arg_num], "Hosts pattern" ); |
73 | } |
74 | ++arg_num; |
75 | |
76 | args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context); |
77 | |
78 | const auto * function = args[arg_num]->as<ASTFunction>(); |
79 | |
80 | if (function && TableFunctionFactory::instance().isTableFunctionName(function->name)) |
81 | { |
82 | remote_table_function_ptr = args[arg_num]; |
83 | ++arg_num; |
84 | } |
85 | else |
86 | { |
87 | remote_database = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>(); |
88 | |
89 | ++arg_num; |
90 | |
91 | size_t dot = remote_database.find('.'); |
92 | if (dot != String::npos) |
93 | { |
94 | /// NOTE Bad - do not support identifiers in backquotes. |
95 | remote_table = remote_database.substr(dot + 1); |
96 | remote_database = remote_database.substr(0, dot); |
97 | } |
98 | else |
99 | { |
100 | if (arg_num >= args.size()) |
101 | { |
102 | throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
103 | } |
104 | else |
105 | { |
106 | args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context); |
107 | remote_table = args[arg_num]->as<ASTLiteral &>().value.safeGet<String>(); |
108 | ++arg_num; |
109 | } |
110 | } |
111 | } |
112 | |
113 | /// Username and password parameters are prohibited in cluster version of the function |
114 | if (!is_cluster_function) |
115 | { |
116 | if (arg_num < args.size()) |
117 | { |
118 | username = getStringLiteral(*args[arg_num], "Username" ); |
119 | ++arg_num; |
120 | } |
121 | else |
122 | username = "default" ; |
123 | |
124 | if (arg_num < args.size()) |
125 | { |
126 | password = getStringLiteral(*args[arg_num], "Password" ); |
127 | ++arg_num; |
128 | } |
129 | } |
130 | |
131 | if (arg_num < args.size()) |
132 | throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
133 | |
134 | /// ExpressionAnalyzer will be created in InterpreterSelectQuery that will meet these `Identifier` when processing the request. |
135 | /// We need to mark them as the name of the database or table, because the default value is column. |
136 | for (auto ast : args) |
137 | setIdentifierSpecial(ast); |
138 | |
139 | ClusterPtr cluster; |
140 | if (!cluster_name.empty()) |
141 | { |
142 | /// Use an existing cluster from the main config |
143 | cluster = context.getCluster(cluster_name); |
144 | } |
145 | else |
146 | { |
147 | /// Create new cluster from the scratch |
148 | size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses; |
149 | std::vector<String> shards = parseRemoteDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses); |
150 | |
151 | std::vector<std::vector<String>> names; |
152 | for (size_t i = 0; i < shards.size(); ++i) |
153 | names.push_back(parseRemoteDescription(shards[i], 0, shards[i].size(), '|', max_addresses)); |
154 | |
155 | if (names.empty()) |
156 | throw Exception("Shard list is empty after parsing first argument" , ErrorCodes::BAD_ARGUMENTS); |
157 | |
158 | auto maybe_secure_port = context.getTCPPortSecure(); |
159 | |
160 | /// Check host and port on affiliation allowed hosts. |
161 | for (auto hosts : names) |
162 | { |
163 | for (auto host : hosts) |
164 | { |
165 | size_t colon = host.find(':'); |
166 | if (colon == String::npos) |
167 | context.getRemoteHostFilter().checkHostAndPort(host, toString((secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()))); |
168 | else |
169 | context.getRemoteHostFilter().checkHostAndPort(host.substr(0, colon), host.substr(colon + 1)); |
170 | } |
171 | } |
172 | |
173 | cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), false, secure); |
174 | } |
175 | |
176 | auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr); |
177 | |
178 | StoragePtr res = remote_table_function_ptr |
179 | ? StorageDistributed::createWithOwnCluster( |
180 | table_name, |
181 | structure_remote_table, |
182 | remote_table_function_ptr, |
183 | cluster, |
184 | context) |
185 | : StorageDistributed::createWithOwnCluster( |
186 | table_name, |
187 | structure_remote_table, |
188 | remote_database, |
189 | remote_table, |
190 | cluster, |
191 | context); |
192 | |
193 | res->startup(); |
194 | return res; |
195 | } |
196 | |
197 | |
198 | TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_) |
199 | : name{name_}, secure{secure_} |
200 | { |
201 | is_cluster_function = name == "cluster" ; |
202 | |
203 | std::stringstream ss; |
204 | ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters" |
205 | << ": <addresses pattern or cluster name>, <name of remote database>, <name of remote table>" |
206 | << (is_cluster_function ? "" : ", [username, [password]]." ); |
207 | help_message = ss.str(); |
208 | } |
209 | |
210 | |
211 | void registerTableFunctionRemote(TableFunctionFactory & factory) |
212 | { |
213 | factory.registerFunction("remote" , [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote" ); }); |
214 | factory.registerFunction("remoteSecure" , [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote" , /* secure = */ true); }); |
215 | factory.registerFunction("cluster" , [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster" ); }); |
216 | } |
217 | |
218 | } |
219 | |