1 | #include <Interpreters/InterpreterSystemQuery.h> |
2 | #include <Common/DNSResolver.h> |
3 | #include <Common/ActionLock.h> |
4 | #include "config_core.h" |
5 | #include <Common/typeid_cast.h> |
6 | #include <Common/getNumberOfPhysicalCPUCores.h> |
7 | #include <Common/ThreadPool.h> |
8 | #include <Common/escapeForFileName.h> |
9 | #include <Interpreters/Context.h> |
10 | #include <Interpreters/ExternalDictionariesLoader.h> |
11 | #include <Interpreters/EmbeddedDictionaries.h> |
12 | #include <Interpreters/ActionLocksManager.h> |
13 | #include <Interpreters/InterpreterDropQuery.h> |
14 | #include <Interpreters/InterpreterCreateQuery.h> |
15 | #include <Interpreters/QueryLog.h> |
16 | #include <Interpreters/DDLWorker.h> |
17 | #include <Interpreters/PartLog.h> |
18 | #include <Interpreters/QueryThreadLog.h> |
19 | #include <Interpreters/TraceLog.h> |
20 | #include <Interpreters/TextLog.h> |
21 | #include <Interpreters/MetricLog.h> |
22 | #include <Databases/IDatabase.h> |
23 | #include <Storages/StorageDistributed.h> |
24 | #include <Storages/StorageReplicatedMergeTree.h> |
25 | #include <Storages/StorageFactory.h> |
26 | #include <Parsers/ASTSystemQuery.h> |
27 | #include <Parsers/ASTDropQuery.h> |
28 | #include <Parsers/ASTCreateQuery.h> |
29 | #include <csignal> |
30 | #include <algorithm> |
31 | #include "InterpreterSystemQuery.h" |
32 | |
33 | |
34 | namespace DB |
35 | { |
36 | |
37 | |
38 | namespace ErrorCodes |
39 | { |
40 | extern const int BAD_ARGUMENTS; |
41 | extern const int CANNOT_KILL; |
42 | extern const int NOT_IMPLEMENTED; |
43 | extern const int TIMEOUT_EXCEEDED; |
44 | } |
45 | |
46 | |
47 | namespace ActionLocks |
48 | { |
49 | extern StorageActionBlockType PartsMerge; |
50 | extern StorageActionBlockType PartsFetch; |
51 | extern StorageActionBlockType PartsSend; |
52 | extern StorageActionBlockType ReplicationQueue; |
53 | extern StorageActionBlockType DistributedSend; |
54 | extern StorageActionBlockType PartsTTLMerge; |
55 | extern StorageActionBlockType PartsMove; |
56 | } |
57 | |
58 | |
59 | namespace |
60 | { |
61 | |
62 | ExecutionStatus getOverallExecutionStatusOfCommands() |
63 | { |
64 | return ExecutionStatus(0); |
65 | } |
66 | |
67 | /// Consequently tries to execute all commands and generates final exception message for failed commands |
68 | template <typename Callable, typename ... Callables> |
69 | ExecutionStatus getOverallExecutionStatusOfCommands(Callable && command, Callables && ... commands) |
70 | { |
71 | ExecutionStatus status_head(0); |
72 | try |
73 | { |
74 | command(); |
75 | } |
76 | catch (...) |
77 | { |
78 | status_head = ExecutionStatus::fromCurrentException(); |
79 | } |
80 | |
81 | ExecutionStatus status_tail = getOverallExecutionStatusOfCommands(std::forward<Callables>(commands)...); |
82 | |
83 | auto res_status = status_head.code != 0 ? status_head.code : status_tail.code; |
84 | auto res_message = status_head.message + (status_tail.message.empty() ? "" : ("\n" + status_tail.message)); |
85 | |
86 | return ExecutionStatus(res_status, res_message); |
87 | } |
88 | |
89 | /// Consequently tries to execute all commands and throws exception with info about failed commands |
90 | template <typename ... Callables> |
91 | void executeCommandsAndThrowIfError(Callables && ... commands) |
92 | { |
93 | auto status = getOverallExecutionStatusOfCommands(std::forward<Callables>(commands)...); |
94 | if (status.code != 0) |
95 | throw Exception(status.message, status.code); |
96 | } |
97 | |
98 | |
99 | /// Implements SYSTEM [START|STOP] <something action from ActionLocks> |
100 | void startStopAction(Context & context, ASTSystemQuery & query, StorageActionBlockType action_type, bool start) |
101 | { |
102 | auto manager = context.getActionLocksManager(); |
103 | manager->cleanExpired(); |
104 | |
105 | if (!query.table.empty()) |
106 | { |
107 | String database = !query.database.empty() ? query.database : context.getCurrentDatabase(); |
108 | |
109 | if (start) |
110 | manager->remove(database, query.table, action_type); |
111 | else |
112 | manager->add(database, query.table, action_type); |
113 | } |
114 | else |
115 | { |
116 | if (start) |
117 | manager->remove(action_type); |
118 | else |
119 | manager->add(action_type); |
120 | } |
121 | } |
122 | } |
123 | |
124 | |
125 | InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, Context & context_) |
126 | : query_ptr(query_ptr_->clone()), context(context_), log(&Poco::Logger::get("InterpreterSystemQuery" )) |
127 | { |
128 | } |
129 | |
130 | |
131 | BlockIO InterpreterSystemQuery::execute() |
132 | { |
133 | auto & query = query_ptr->as<ASTSystemQuery &>(); |
134 | |
135 | if (!query.cluster.empty()) |
136 | return executeDDLQueryOnCluster(query_ptr, context, {query.database}); |
137 | |
138 | using Type = ASTSystemQuery::Type; |
139 | |
140 | /// Use global context with fresh system profile settings |
141 | Context system_context = context.getGlobalContext(); |
142 | system_context.setSetting("profile" , context.getSystemProfileName()); |
143 | |
144 | /// Make canonical query for simpler processing |
145 | if (!query.table.empty() && query.database.empty()) |
146 | query.database = context.getCurrentDatabase(); |
147 | |
148 | if (!query.target_dictionary.empty() && !query.database.empty()) |
149 | query.target_dictionary = query.database + "." + query.target_dictionary; |
150 | |
151 | switch (query.type) |
152 | { |
153 | case Type::SHUTDOWN: |
154 | if (kill(0, SIGTERM)) |
155 | throwFromErrno("System call kill(0, SIGTERM) failed" , ErrorCodes::CANNOT_KILL); |
156 | break; |
157 | case Type::KILL: |
158 | if (kill(0, SIGKILL)) |
159 | throwFromErrno("System call kill(0, SIGKILL) failed" , ErrorCodes::CANNOT_KILL); |
160 | break; |
161 | case Type::DROP_DNS_CACHE: |
162 | DNSResolver::instance().dropCache(); |
163 | /// Reinitialize clusters to update their resolved_addresses |
164 | system_context.reloadClusterConfig(); |
165 | break; |
166 | case Type::DROP_MARK_CACHE: |
167 | system_context.dropMarkCache(); |
168 | break; |
169 | case Type::DROP_UNCOMPRESSED_CACHE: |
170 | system_context.dropUncompressedCache(); |
171 | break; |
172 | #if USE_EMBEDDED_COMPILER |
173 | case Type::DROP_COMPILED_EXPRESSION_CACHE: |
174 | system_context.dropCompiledExpressionCache(); |
175 | break; |
176 | #endif |
177 | case Type::RELOAD_DICTIONARY: |
178 | system_context.getExternalDictionariesLoader().loadOrReload(query.target_dictionary); |
179 | break; |
180 | case Type::RELOAD_DICTIONARIES: |
181 | executeCommandsAndThrowIfError( |
182 | [&] () { system_context.getExternalDictionariesLoader().reloadAllTriedToLoad(); }, |
183 | [&] () { system_context.getEmbeddedDictionaries().reload(); } |
184 | ); |
185 | break; |
186 | case Type::RELOAD_EMBEDDED_DICTIONARIES: |
187 | system_context.getEmbeddedDictionaries().reload(); |
188 | break; |
189 | case Type::RELOAD_CONFIG: |
190 | system_context.reloadConfig(); |
191 | break; |
192 | case Type::STOP_MERGES: |
193 | startStopAction(context, query, ActionLocks::PartsMerge, false); |
194 | break; |
195 | case Type::START_MERGES: |
196 | startStopAction(context, query, ActionLocks::PartsMerge, true); |
197 | break; |
198 | case Type::STOP_TTL_MERGES: |
199 | startStopAction(context, query, ActionLocks::PartsTTLMerge, false); |
200 | break; |
201 | case Type::START_TTL_MERGES: |
202 | startStopAction(context, query, ActionLocks::PartsTTLMerge, true); |
203 | break; |
204 | case Type::STOP_MOVES: |
205 | startStopAction(context, query, ActionLocks::PartsMove, false); |
206 | break; |
207 | case Type::START_MOVES: |
208 | startStopAction(context, query, ActionLocks::PartsMove, true); |
209 | break; |
210 | case Type::STOP_FETCHES: |
211 | startStopAction(context, query, ActionLocks::PartsFetch, false); |
212 | break; |
213 | case Type::START_FETCHES: |
214 | startStopAction(context, query, ActionLocks::PartsFetch, true); |
215 | break; |
216 | case Type::STOP_REPLICATED_SENDS: |
217 | startStopAction(context, query, ActionLocks::PartsSend, false); |
218 | break; |
219 | case Type::START_REPLICATED_SENDS: |
220 | startStopAction(context, query, ActionLocks::PartsSend, true); |
221 | break; |
222 | case Type::STOP_REPLICATION_QUEUES: |
223 | startStopAction(context, query, ActionLocks::ReplicationQueue, false); |
224 | break; |
225 | case Type::START_REPLICATION_QUEUES: |
226 | startStopAction(context, query, ActionLocks::ReplicationQueue, true); |
227 | break; |
228 | case Type::STOP_DISTRIBUTED_SENDS: |
229 | startStopAction(context, query, ActionLocks::DistributedSend, false); |
230 | break; |
231 | case Type::START_DISTRIBUTED_SENDS: |
232 | startStopAction(context, query, ActionLocks::DistributedSend, true); |
233 | break; |
234 | case Type::SYNC_REPLICA: |
235 | syncReplica(query); |
236 | break; |
237 | case Type::FLUSH_DISTRIBUTED: |
238 | flushDistributed(query); |
239 | break; |
240 | case Type::RESTART_REPLICAS: |
241 | restartReplicas(system_context); |
242 | break; |
243 | case Type::RESTART_REPLICA: |
244 | if (!tryRestartReplica(query.database, query.table, system_context)) |
245 | throw Exception("There is no " + query.database + "." + query.table + " replicated table" , |
246 | ErrorCodes::BAD_ARGUMENTS); |
247 | break; |
248 | case Type::FLUSH_LOGS: |
249 | executeCommandsAndThrowIfError( |
250 | [&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); }, |
251 | [&] () { if (auto part_log = context.getPartLog("" )) part_log->flush(); }, |
252 | [&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); }, |
253 | [&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); }, |
254 | [&] () { if (auto text_log = context.getTextLog()) text_log->flush(); }, |
255 | [&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); } |
256 | ); |
257 | break; |
258 | case Type::STOP_LISTEN_QUERIES: |
259 | case Type::START_LISTEN_QUERIES: |
260 | throw Exception(String(ASTSystemQuery::typeToString(query.type)) + " is not supported yet" , ErrorCodes::NOT_IMPLEMENTED); |
261 | default: |
262 | throw Exception("Unknown type of SYSTEM query" , ErrorCodes::BAD_ARGUMENTS); |
263 | } |
264 | |
265 | return BlockIO(); |
266 | } |
267 | |
268 | |
269 | StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_name, const String & table_name, Context & system_context) |
270 | { |
271 | auto database = system_context.getDatabase(database_name); |
272 | auto table_ddl_guard = system_context.getDDLGuard(database_name, table_name); |
273 | ASTPtr create_ast; |
274 | |
275 | /// Detach actions |
276 | { |
277 | auto table = system_context.tryGetTable(database_name, table_name); |
278 | |
279 | if (!table || !dynamic_cast<const StorageReplicatedMergeTree *>(table.get())) |
280 | return nullptr; |
281 | |
282 | table->shutdown(); |
283 | |
284 | /// If table was already dropped by anyone, an exception will be thrown |
285 | auto table_lock = table->lockExclusively(context.getCurrentQueryId()); |
286 | create_ast = database->getCreateTableQuery(system_context, table_name); |
287 | |
288 | database->detachTable(table_name); |
289 | } |
290 | |
291 | /// Attach actions |
292 | { |
293 | /// getCreateTableQuery must return canonical CREATE query representation, there are no need for AST postprocessing |
294 | auto & create = create_ast->as<ASTCreateQuery &>(); |
295 | create.attach = true; |
296 | |
297 | auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context); |
298 | auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints); |
299 | |
300 | StoragePtr table = StorageFactory::instance().get(create, |
301 | database->getTableDataPath(create), |
302 | table_name, |
303 | database_name, |
304 | system_context, |
305 | system_context.getGlobalContext(), |
306 | columns, |
307 | constraints, |
308 | create.attach, |
309 | false); |
310 | |
311 | database->createTable(system_context, table_name, table, create_ast); |
312 | |
313 | table->startup(); |
314 | return table; |
315 | } |
316 | } |
317 | |
318 | void InterpreterSystemQuery::restartReplicas(Context & system_context) |
319 | { |
320 | std::vector<std::pair<String, String>> replica_names; |
321 | |
322 | for (auto & elem : system_context.getDatabases()) |
323 | { |
324 | DatabasePtr & database = elem.second; |
325 | const String & database_name = elem.first; |
326 | |
327 | for (auto iterator = database->getTablesIterator(system_context); iterator->isValid(); iterator->next()) |
328 | { |
329 | if (dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get())) |
330 | replica_names.emplace_back(database_name, iterator->name()); |
331 | } |
332 | } |
333 | |
334 | if (replica_names.empty()) |
335 | return; |
336 | |
337 | ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size())); |
338 | for (auto & table : replica_names) |
339 | pool.scheduleOrThrowOnError([&]() { tryRestartReplica(table.first, table.second, system_context); }); |
340 | pool.wait(); |
341 | } |
342 | |
343 | void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query) |
344 | { |
345 | String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase(); |
346 | const String & table_name = query.table; |
347 | |
348 | StoragePtr table = context.getTable(database_name, table_name); |
349 | |
350 | if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get())) |
351 | { |
352 | LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty" ); |
353 | if (!storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.totalMilliseconds())) |
354 | { |
355 | LOG_ERROR(log, "SYNC REPLICA " + database_name + "." + table_name + ": Timed out!" ); |
356 | throw Exception( |
357 | "SYNC REPLICA " + database_name + "." + table_name + ": command timed out! " |
358 | "See the 'receive_timeout' setting" , ErrorCodes::TIMEOUT_EXCEEDED); |
359 | } |
360 | LOG_TRACE(log, "SYNC REPLICA " + database_name + "." + table_name + ": OK" ); |
361 | } |
362 | else |
363 | throw Exception("Table " + database_name + "." + table_name + " is not replicated" , ErrorCodes::BAD_ARGUMENTS); |
364 | } |
365 | |
366 | void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query) |
367 | { |
368 | String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase(); |
369 | String & table_name = query.table; |
370 | |
371 | if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get())) |
372 | storage_distributed->flushClusterNodesAllData(); |
373 | else |
374 | throw Exception("Table " + database_name + "." + table_name + " is not distributed" , ErrorCodes::BAD_ARGUMENTS); |
375 | } |
376 | |
377 | |
378 | } |
379 | |