1#include <Interpreters/InterpreterSystemQuery.h>
2#include <Common/DNSResolver.h>
3#include <Common/ActionLock.h>
4#include "config_core.h"
5#include <Common/typeid_cast.h>
6#include <Common/getNumberOfPhysicalCPUCores.h>
7#include <Common/ThreadPool.h>
8#include <Common/escapeForFileName.h>
9#include <Interpreters/Context.h>
10#include <Interpreters/ExternalDictionariesLoader.h>
11#include <Interpreters/EmbeddedDictionaries.h>
12#include <Interpreters/ActionLocksManager.h>
13#include <Interpreters/InterpreterDropQuery.h>
14#include <Interpreters/InterpreterCreateQuery.h>
15#include <Interpreters/QueryLog.h>
16#include <Interpreters/DDLWorker.h>
17#include <Interpreters/PartLog.h>
18#include <Interpreters/QueryThreadLog.h>
19#include <Interpreters/TraceLog.h>
20#include <Interpreters/TextLog.h>
21#include <Interpreters/MetricLog.h>
22#include <Databases/IDatabase.h>
23#include <Storages/StorageDistributed.h>
24#include <Storages/StorageReplicatedMergeTree.h>
25#include <Storages/StorageFactory.h>
26#include <Parsers/ASTSystemQuery.h>
27#include <Parsers/ASTDropQuery.h>
28#include <Parsers/ASTCreateQuery.h>
29#include <csignal>
30#include <algorithm>
31#include "InterpreterSystemQuery.h"
32
33
34namespace DB
35{
36
37
38namespace ErrorCodes
39{
40 extern const int BAD_ARGUMENTS;
41 extern const int CANNOT_KILL;
42 extern const int NOT_IMPLEMENTED;
43 extern const int TIMEOUT_EXCEEDED;
44}
45
46
47namespace ActionLocks
48{
49 extern StorageActionBlockType PartsMerge;
50 extern StorageActionBlockType PartsFetch;
51 extern StorageActionBlockType PartsSend;
52 extern StorageActionBlockType ReplicationQueue;
53 extern StorageActionBlockType DistributedSend;
54 extern StorageActionBlockType PartsTTLMerge;
55 extern StorageActionBlockType PartsMove;
56}
57
58
59namespace
60{
61
62ExecutionStatus getOverallExecutionStatusOfCommands()
63{
64 return ExecutionStatus(0);
65}
66
67/// Consequently tries to execute all commands and generates final exception message for failed commands
68template <typename Callable, typename ... Callables>
69ExecutionStatus getOverallExecutionStatusOfCommands(Callable && command, Callables && ... commands)
70{
71 ExecutionStatus status_head(0);
72 try
73 {
74 command();
75 }
76 catch (...)
77 {
78 status_head = ExecutionStatus::fromCurrentException();
79 }
80
81 ExecutionStatus status_tail = getOverallExecutionStatusOfCommands(std::forward<Callables>(commands)...);
82
83 auto res_status = status_head.code != 0 ? status_head.code : status_tail.code;
84 auto res_message = status_head.message + (status_tail.message.empty() ? "" : ("\n" + status_tail.message));
85
86 return ExecutionStatus(res_status, res_message);
87}
88
89/// Consequently tries to execute all commands and throws exception with info about failed commands
90template <typename ... Callables>
91void executeCommandsAndThrowIfError(Callables && ... commands)
92{
93 auto status = getOverallExecutionStatusOfCommands(std::forward<Callables>(commands)...);
94 if (status.code != 0)
95 throw Exception(status.message, status.code);
96}
97
98
99/// Implements SYSTEM [START|STOP] <something action from ActionLocks>
100void startStopAction(Context & context, ASTSystemQuery & query, StorageActionBlockType action_type, bool start)
101{
102 auto manager = context.getActionLocksManager();
103 manager->cleanExpired();
104
105 if (!query.table.empty())
106 {
107 String database = !query.database.empty() ? query.database : context.getCurrentDatabase();
108
109 if (start)
110 manager->remove(database, query.table, action_type);
111 else
112 manager->add(database, query.table, action_type);
113 }
114 else
115 {
116 if (start)
117 manager->remove(action_type);
118 else
119 manager->add(action_type);
120 }
121}
122}
123
124
125InterpreterSystemQuery::InterpreterSystemQuery(const ASTPtr & query_ptr_, Context & context_)
126 : query_ptr(query_ptr_->clone()), context(context_), log(&Poco::Logger::get("InterpreterSystemQuery"))
127{
128}
129
130
131BlockIO InterpreterSystemQuery::execute()
132{
133 auto & query = query_ptr->as<ASTSystemQuery &>();
134
135 if (!query.cluster.empty())
136 return executeDDLQueryOnCluster(query_ptr, context, {query.database});
137
138 using Type = ASTSystemQuery::Type;
139
140 /// Use global context with fresh system profile settings
141 Context system_context = context.getGlobalContext();
142 system_context.setSetting("profile", context.getSystemProfileName());
143
144 /// Make canonical query for simpler processing
145 if (!query.table.empty() && query.database.empty())
146 query.database = context.getCurrentDatabase();
147
148 if (!query.target_dictionary.empty() && !query.database.empty())
149 query.target_dictionary = query.database + "." + query.target_dictionary;
150
151 switch (query.type)
152 {
153 case Type::SHUTDOWN:
154 if (kill(0, SIGTERM))
155 throwFromErrno("System call kill(0, SIGTERM) failed", ErrorCodes::CANNOT_KILL);
156 break;
157 case Type::KILL:
158 if (kill(0, SIGKILL))
159 throwFromErrno("System call kill(0, SIGKILL) failed", ErrorCodes::CANNOT_KILL);
160 break;
161 case Type::DROP_DNS_CACHE:
162 DNSResolver::instance().dropCache();
163 /// Reinitialize clusters to update their resolved_addresses
164 system_context.reloadClusterConfig();
165 break;
166 case Type::DROP_MARK_CACHE:
167 system_context.dropMarkCache();
168 break;
169 case Type::DROP_UNCOMPRESSED_CACHE:
170 system_context.dropUncompressedCache();
171 break;
172#if USE_EMBEDDED_COMPILER
173 case Type::DROP_COMPILED_EXPRESSION_CACHE:
174 system_context.dropCompiledExpressionCache();
175 break;
176#endif
177 case Type::RELOAD_DICTIONARY:
178 system_context.getExternalDictionariesLoader().loadOrReload(query.target_dictionary);
179 break;
180 case Type::RELOAD_DICTIONARIES:
181 executeCommandsAndThrowIfError(
182 [&] () { system_context.getExternalDictionariesLoader().reloadAllTriedToLoad(); },
183 [&] () { system_context.getEmbeddedDictionaries().reload(); }
184 );
185 break;
186 case Type::RELOAD_EMBEDDED_DICTIONARIES:
187 system_context.getEmbeddedDictionaries().reload();
188 break;
189 case Type::RELOAD_CONFIG:
190 system_context.reloadConfig();
191 break;
192 case Type::STOP_MERGES:
193 startStopAction(context, query, ActionLocks::PartsMerge, false);
194 break;
195 case Type::START_MERGES:
196 startStopAction(context, query, ActionLocks::PartsMerge, true);
197 break;
198 case Type::STOP_TTL_MERGES:
199 startStopAction(context, query, ActionLocks::PartsTTLMerge, false);
200 break;
201 case Type::START_TTL_MERGES:
202 startStopAction(context, query, ActionLocks::PartsTTLMerge, true);
203 break;
204 case Type::STOP_MOVES:
205 startStopAction(context, query, ActionLocks::PartsMove, false);
206 break;
207 case Type::START_MOVES:
208 startStopAction(context, query, ActionLocks::PartsMove, true);
209 break;
210 case Type::STOP_FETCHES:
211 startStopAction(context, query, ActionLocks::PartsFetch, false);
212 break;
213 case Type::START_FETCHES:
214 startStopAction(context, query, ActionLocks::PartsFetch, true);
215 break;
216 case Type::STOP_REPLICATED_SENDS:
217 startStopAction(context, query, ActionLocks::PartsSend, false);
218 break;
219 case Type::START_REPLICATED_SENDS:
220 startStopAction(context, query, ActionLocks::PartsSend, true);
221 break;
222 case Type::STOP_REPLICATION_QUEUES:
223 startStopAction(context, query, ActionLocks::ReplicationQueue, false);
224 break;
225 case Type::START_REPLICATION_QUEUES:
226 startStopAction(context, query, ActionLocks::ReplicationQueue, true);
227 break;
228 case Type::STOP_DISTRIBUTED_SENDS:
229 startStopAction(context, query, ActionLocks::DistributedSend, false);
230 break;
231 case Type::START_DISTRIBUTED_SENDS:
232 startStopAction(context, query, ActionLocks::DistributedSend, true);
233 break;
234 case Type::SYNC_REPLICA:
235 syncReplica(query);
236 break;
237 case Type::FLUSH_DISTRIBUTED:
238 flushDistributed(query);
239 break;
240 case Type::RESTART_REPLICAS:
241 restartReplicas(system_context);
242 break;
243 case Type::RESTART_REPLICA:
244 if (!tryRestartReplica(query.database, query.table, system_context))
245 throw Exception("There is no " + query.database + "." + query.table + " replicated table",
246 ErrorCodes::BAD_ARGUMENTS);
247 break;
248 case Type::FLUSH_LOGS:
249 executeCommandsAndThrowIfError(
250 [&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
251 [&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
252 [&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
253 [&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); },
254 [&] () { if (auto text_log = context.getTextLog()) text_log->flush(); },
255 [&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); }
256 );
257 break;
258 case Type::STOP_LISTEN_QUERIES:
259 case Type::START_LISTEN_QUERIES:
260 throw Exception(String(ASTSystemQuery::typeToString(query.type)) + " is not supported yet", ErrorCodes::NOT_IMPLEMENTED);
261 default:
262 throw Exception("Unknown type of SYSTEM query", ErrorCodes::BAD_ARGUMENTS);
263 }
264
265 return BlockIO();
266}
267
268
269StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_name, const String & table_name, Context & system_context)
270{
271 auto database = system_context.getDatabase(database_name);
272 auto table_ddl_guard = system_context.getDDLGuard(database_name, table_name);
273 ASTPtr create_ast;
274
275 /// Detach actions
276 {
277 auto table = system_context.tryGetTable(database_name, table_name);
278
279 if (!table || !dynamic_cast<const StorageReplicatedMergeTree *>(table.get()))
280 return nullptr;
281
282 table->shutdown();
283
284 /// If table was already dropped by anyone, an exception will be thrown
285 auto table_lock = table->lockExclusively(context.getCurrentQueryId());
286 create_ast = database->getCreateTableQuery(system_context, table_name);
287
288 database->detachTable(table_name);
289 }
290
291 /// Attach actions
292 {
293 /// getCreateTableQuery must return canonical CREATE query representation, there are no need for AST postprocessing
294 auto & create = create_ast->as<ASTCreateQuery &>();
295 create.attach = true;
296
297 auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context);
298 auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
299
300 StoragePtr table = StorageFactory::instance().get(create,
301 database->getTableDataPath(create),
302 table_name,
303 database_name,
304 system_context,
305 system_context.getGlobalContext(),
306 columns,
307 constraints,
308 create.attach,
309 false);
310
311 database->createTable(system_context, table_name, table, create_ast);
312
313 table->startup();
314 return table;
315 }
316}
317
318void InterpreterSystemQuery::restartReplicas(Context & system_context)
319{
320 std::vector<std::pair<String, String>> replica_names;
321
322 for (auto & elem : system_context.getDatabases())
323 {
324 DatabasePtr & database = elem.second;
325 const String & database_name = elem.first;
326
327 for (auto iterator = database->getTablesIterator(system_context); iterator->isValid(); iterator->next())
328 {
329 if (dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get()))
330 replica_names.emplace_back(database_name, iterator->name());
331 }
332 }
333
334 if (replica_names.empty())
335 return;
336
337 ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size()));
338 for (auto & table : replica_names)
339 pool.scheduleOrThrowOnError([&]() { tryRestartReplica(table.first, table.second, system_context); });
340 pool.wait();
341}
342
343void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
344{
345 String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase();
346 const String & table_name = query.table;
347
348 StoragePtr table = context.getTable(database_name, table_name);
349
350 if (auto storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
351 {
352 LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
353 if (!storage_replicated->waitForShrinkingQueueSize(0, context.getSettingsRef().receive_timeout.totalMilliseconds()))
354 {
355 LOG_ERROR(log, "SYNC REPLICA " + database_name + "." + table_name + ": Timed out!");
356 throw Exception(
357 "SYNC REPLICA " + database_name + "." + table_name + ": command timed out! "
358 "See the 'receive_timeout' setting", ErrorCodes::TIMEOUT_EXCEEDED);
359 }
360 LOG_TRACE(log, "SYNC REPLICA " + database_name + "." + table_name + ": OK");
361 }
362 else
363 throw Exception("Table " + database_name + "." + table_name + " is not replicated", ErrorCodes::BAD_ARGUMENTS);
364}
365
366void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
367{
368 String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase();
369 String & table_name = query.table;
370
371 if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get()))
372 storage_distributed->flushClusterNodesAllData();
373 else
374 throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
375}
376
377
378}
379