1#include <Interpreters/ProcessList.h>
2#include <Core/Settings.h>
3#include <Interpreters/Context.h>
4#include <Interpreters/DatabaseAndTableWithAlias.h>
5#include <Parsers/ASTSelectWithUnionQuery.h>
6#include <Parsers/ASTSelectQuery.h>
7#include <Parsers/ASTKillQueryQuery.h>
8#include <Common/typeid_cast.h>
9#include <Common/Exception.h>
10#include <Common/CurrentThread.h>
11#include <IO/WriteHelpers.h>
12#include <DataStreams/IBlockInputStream.h>
13#include <common/logger_useful.h>
14#include <chrono>
15
16
17namespace CurrentMetrics
18{
19 extern const Metric MemoryTracking;
20}
21
22
23namespace DB
24{
25
26namespace ErrorCodes
27{
28 extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
29 extern const int QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING;
30 extern const int LOGICAL_ERROR;
31 extern const int TOO_MANY_ROWS;
32 extern const int TOO_MANY_BYTES;
33}
34
35
36/// Should we execute the query even if max_concurrent_queries limit is exhausted
37static bool isUnlimitedQuery(const IAST * ast)
38{
39 if (!ast)
40 return false;
41
42 /// It is KILL QUERY
43 if (ast->as<ASTKillQueryQuery>())
44 return true;
45
46 /// It is SELECT FROM system.processes
47 /// NOTE: This is very rough check.
48 /// False negative: USE system; SELECT * FROM processes;
49 /// False positive: SELECT * FROM system.processes CROSS JOIN (SELECT ...)
50
51 if (const auto * ast_selects = ast->as<ASTSelectWithUnionQuery>())
52 {
53 if (!ast_selects->list_of_selects || ast_selects->list_of_selects->children.empty())
54 return false;
55
56 const auto * ast_select = ast_selects->list_of_selects->children[0]->as<ASTSelectQuery>();
57 if (!ast_select)
58 return false;
59
60 if (auto database_and_table = getDatabaseAndTable(*ast_select, 0))
61 return database_and_table->database == "system" && database_and_table->table == "processes";
62
63 return false;
64 }
65
66 return false;
67}
68
69
70ProcessList::ProcessList(size_t max_size_)
71 : max_size(max_size_)
72{
73 total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
74}
75
76
77ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, Context & query_context)
78{
79 EntryPtr res;
80
81 const ClientInfo & client_info = query_context.getClientInfo();
82 const Settings & settings = query_context.getSettingsRef();
83
84 if (client_info.current_query_id.empty())
85 throw Exception("Query id cannot be empty", ErrorCodes::LOGICAL_ERROR);
86
87 bool is_unlimited_query = isUnlimitedQuery(ast);
88
89 {
90 std::unique_lock lock(mutex);
91
92 const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
93 if (!is_unlimited_query && max_size && processes.size() >= max_size)
94 {
95 if (queue_max_wait_ms)
96 LOG_WARNING(&Logger::get("ProcessList"), "Too many simultaneous queries, will wait " << queue_max_wait_ms << " ms.");
97 if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; }))
98 throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
99 }
100
101 /** Why we use current user?
102 * Because initial one is passed by client and credentials for it is not verified,
103 * and using initial_user for limits will be insecure.
104 *
105 * Why we use current_query_id?
106 * Because we want to allow distributed queries that will run multiple secondary queries on same server,
107 * like SELECT count() FROM remote('127.0.0.{1,2}', system.numbers)
108 * so they must have different query_ids.
109 */
110
111 {
112 auto user_process_list = user_to_queries.find(client_info.current_user);
113
114 if (user_process_list != user_to_queries.end())
115 {
116 if (!is_unlimited_query && settings.max_concurrent_queries_for_user
117 && user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user)
118 throw Exception("Too many simultaneous queries for user " + client_info.current_user
119 + ". Current: " + toString(user_process_list->second.queries.size())
120 + ", maximum: " + settings.max_concurrent_queries_for_user.toString(),
121 ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
122
123 auto running_query = user_process_list->second.queries.find(client_info.current_query_id);
124
125 if (running_query != user_process_list->second.queries.end())
126 {
127 if (!settings.replace_running_query)
128 throw Exception("Query with id = " + client_info.current_query_id + " is already running.",
129 ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
130
131 /// Ask queries to cancel. They will check this flag.
132 running_query->second->is_killed.store(true, std::memory_order_relaxed);
133
134 const auto replace_running_query_max_wait_ms = settings.replace_running_query_max_wait_ms.totalMilliseconds();
135 if (!replace_running_query_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(replace_running_query_max_wait_ms),
136 [&]
137 {
138 running_query = user_process_list->second.queries.find(client_info.current_query_id);
139 if (running_query == user_process_list->second.queries.end())
140 return true;
141 running_query->second->is_killed.store(true, std::memory_order_relaxed);
142 return false;
143 }))
144 {
145 throw Exception("Query with id = " + client_info.current_query_id + " is already running and can't be stopped",
146 ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
147 }
148 }
149 }
150 }
151
152 /// Check other users running query with our query_id
153 for (const auto & user_process_list : user_to_queries)
154 {
155 if (user_process_list.first == client_info.current_user)
156 continue;
157 if (auto running_query = user_process_list.second.queries.find(client_info.current_query_id); running_query != user_process_list.second.queries.end())
158 throw Exception("Query with id = " + client_info.current_query_id + " is already running by user " + user_process_list.first,
159 ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
160 }
161
162 auto process_it = processes.emplace(processes.end(),
163 query_, client_info, settings.max_memory_usage, settings.memory_tracker_fault_probability, priorities.insert(settings.priority));
164
165 res = std::make_shared<Entry>(*this, process_it);
166
167 process_it->query_context = &query_context;
168
169 if (!client_info.current_query_id.empty())
170 {
171 ProcessListForUser & user_process_list = user_to_queries[client_info.current_user];
172 user_process_list.queries.emplace(client_info.current_query_id, &res->get());
173
174 process_it->setUserProcessList(&user_process_list);
175
176 /// Limits are only raised (to be more relaxed) or set to something instead of zero,
177 /// because settings for different queries will interfere each other:
178 /// setting from one query effectively sets values for all other queries.
179
180 /// Track memory usage for all simultaneously running queries.
181 /// You should specify this value in configuration for default profile,
182 /// not for specific users, sessions or queries,
183 /// because this setting is effectively global.
184 total_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_all_queries);
185 total_memory_tracker.setDescription("(total)");
186
187 /// Track memory usage for all simultaneously running queries from single user.
188 user_process_list.user_memory_tracker.setParent(&total_memory_tracker);
189 user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user);
190 user_process_list.user_memory_tracker.setDescription("(for user)");
191
192 /// Actualize thread group info
193 if (auto thread_group = CurrentThread::getGroup())
194 {
195 std::lock_guard lock_thread_group(thread_group->mutex);
196 thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
197 thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
198 thread_group->query = process_it->query;
199
200 /// Set query-level memory trackers
201 thread_group->memory_tracker.setOrRaiseLimit(process_it->max_memory_usage);
202 thread_group->memory_tracker.setDescription("(for query)");
203 if (process_it->memory_tracker_fault_probability)
204 thread_group->memory_tracker.setFaultProbability(process_it->memory_tracker_fault_probability);
205
206 /// NOTE: Do not set the limit for thread-level memory tracker since it could show unreal values
207 /// since allocation and deallocation could happen in different threads
208
209 process_it->thread_group = std::move(thread_group);
210 }
211
212 if (!user_process_list.user_throttler)
213 {
214 if (settings.max_network_bandwidth_for_user)
215 user_process_list.user_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_user, total_network_throttler);
216 else if (settings.max_network_bandwidth_for_all_users)
217 user_process_list.user_throttler = total_network_throttler;
218 }
219 }
220
221 if (!total_network_throttler && settings.max_network_bandwidth_for_all_users)
222 {
223 total_network_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_all_users);
224 }
225 }
226
227 return res;
228}
229
230
231ProcessListEntry::~ProcessListEntry()
232{
233 /// Destroy all streams to avoid long lock of ProcessList
234 it->releaseQueryStreams();
235
236 std::lock_guard lock(parent.mutex);
237
238 String user = it->getClientInfo().current_user;
239 String query_id = it->getClientInfo().current_query_id;
240
241 const QueryStatus * process_list_element_ptr = &*it;
242
243 /// This removes the memory_tracker of one request.
244 parent.processes.erase(it);
245
246 auto user_process_list_it = parent.user_to_queries.find(user);
247 if (user_process_list_it == parent.user_to_queries.end())
248 {
249 LOG_ERROR(&Logger::get("ProcessList"), "Logical error: cannot find user in ProcessList");
250 std::terminate();
251 }
252
253 ProcessListForUser & user_process_list = user_process_list_it->second;
254
255 bool found = false;
256
257 if (auto running_query = user_process_list.queries.find(query_id); running_query != user_process_list.queries.end())
258 {
259 if (running_query->second == process_list_element_ptr)
260 {
261 user_process_list.queries.erase(running_query->first);
262 found = true;
263 }
264 }
265
266 if (!found)
267 {
268 LOG_ERROR(&Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser");
269 std::terminate();
270 }
271 parent.have_space.notify_all();
272
273 /// If there are no more queries for the user, then we will reset memory tracker and network throttler.
274 if (user_process_list.queries.empty())
275 user_process_list.resetTrackers();
276
277 /// This removes memory_tracker for all requests. At this time, no other memory_trackers live.
278 if (parent.processes.size() == 0)
279 {
280 /// Reset MemoryTracker, similarly (see above).
281 parent.total_memory_tracker.logPeakMemoryUsage();
282 parent.total_memory_tracker.reset();
283 parent.total_network_throttler.reset();
284 }
285}
286
287
288QueryStatus::QueryStatus(
289 const String & query_,
290 const ClientInfo & client_info_,
291 size_t max_memory_usage_,
292 double memory_tracker_fault_probability_,
293 QueryPriorities::Handle && priority_handle_)
294 :
295 query(query_),
296 client_info(client_info_),
297 priority_handle(std::move(priority_handle_)),
298 num_queries_increment{CurrentMetrics::Query},
299 max_memory_usage(max_memory_usage_),
300 memory_tracker_fault_probability(memory_tracker_fault_probability_)
301{
302}
303
304QueryStatus::~QueryStatus() = default;
305
306void QueryStatus::setQueryStreams(const BlockIO & io)
307{
308 std::lock_guard lock(query_streams_mutex);
309
310 query_stream_in = io.in;
311 query_stream_out = io.out;
312 query_streams_status = QueryStreamsStatus::Initialized;
313}
314
315void QueryStatus::releaseQueryStreams()
316{
317 BlockInputStreamPtr in;
318 BlockOutputStreamPtr out;
319
320 {
321 std::lock_guard lock(query_streams_mutex);
322
323 query_streams_status = QueryStreamsStatus::Released;
324 in = std::move(query_stream_in);
325 out = std::move(query_stream_out);
326 }
327
328 /// Destroy streams outside the mutex lock
329}
330
331bool QueryStatus::streamsAreReleased()
332{
333 std::lock_guard lock(query_streams_mutex);
334
335 return query_streams_status == QueryStreamsStatus::Released;
336}
337
338bool QueryStatus::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const
339{
340 std::lock_guard lock(query_streams_mutex);
341
342 if (query_streams_status != QueryStreamsStatus::Initialized)
343 return false;
344
345 in = query_stream_in;
346 out = query_stream_out;
347 return true;
348}
349
350CancellationCode QueryStatus::cancelQuery(bool kill)
351{
352 /// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit
353 if (streamsAreReleased())
354 return CancellationCode::CancelSent;
355
356 BlockInputStreamPtr input_stream;
357 BlockOutputStreamPtr output_stream;
358
359 if (tryGetQueryStreams(input_stream, output_stream))
360 {
361 if (input_stream)
362 {
363 input_stream->cancel(kill);
364 return CancellationCode::CancelSent;
365 }
366 return CancellationCode::CancelCannotBeSent;
367 }
368 /// Query is not even started
369 is_killed.store(true);
370 return CancellationCode::CancelSent;
371}
372
373
374void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
375{
376 user_process_list = user_process_list_;
377}
378
379
380ThrottlerPtr QueryStatus::getUserNetworkThrottler()
381{
382 if (!user_process_list)
383 return {};
384 return user_process_list->user_throttler;
385}
386
387
388QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query_id, const String & current_user)
389{
390 auto user_it = user_to_queries.find(current_user);
391 if (user_it != user_to_queries.end())
392 {
393 const auto & user_queries = user_it->second.queries;
394 auto query_it = user_queries.find(current_query_id);
395
396 if (query_it != user_queries.end())
397 return query_it->second;
398 }
399
400 return nullptr;
401}
402
403
404CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
405{
406 std::lock_guard lock(mutex);
407
408 QueryStatus * elem = tryGetProcessListElement(current_query_id, current_user);
409
410 if (!elem)
411 return CancellationCode::NotFound;
412
413 return elem->cancelQuery(kill);
414}
415
416
417void ProcessList::killAllQueries()
418{
419 std::lock_guard lock(mutex);
420
421 for (auto & process : processes)
422 process.cancelQuery(true);
423}
424
425
426QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
427{
428 QueryStatusInfo res;
429
430 res.query = query;
431 res.client_info = client_info;
432 res.elapsed_seconds = watch.elapsedSeconds();
433 res.is_cancelled = is_killed.load(std::memory_order_relaxed);
434 res.read_rows = progress_in.read_rows;
435 res.read_bytes = progress_in.read_bytes;
436 res.total_rows = progress_in.total_rows_to_read;
437
438 /// TODO: Use written_rows and written_bytes when real time progress is implemented
439 res.written_rows = progress_out.read_rows;
440 res.written_bytes = progress_out.read_bytes;
441
442 if (thread_group)
443 {
444 res.memory_usage = thread_group->memory_tracker.get();
445 res.peak_memory_usage = thread_group->memory_tracker.getPeak();
446
447 if (get_thread_list)
448 {
449 std::lock_guard lock(thread_group->mutex);
450 res.thread_numbers = thread_group->thread_numbers;
451 res.os_thread_ids = thread_group->os_thread_ids;
452 }
453
454 if (get_profile_events)
455 res.profile_counters = std::make_shared<ProfileEvents::Counters>(thread_group->performance_counters.getPartiallyAtomicSnapshot());
456 }
457
458 if (get_settings && query_context)
459 res.query_settings = std::make_shared<Settings>(query_context->getSettingsRef());
460
461 return res;
462}
463
464
465ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
466{
467 Info per_query_infos;
468
469 std::lock_guard lock(mutex);
470
471 per_query_infos.reserve(processes.size());
472 for (const auto & process : processes)
473 per_query_infos.emplace_back(process.getInfo(get_thread_list, get_profile_events, get_settings));
474
475 return per_query_infos;
476}
477
478
479ProcessListForUser::ProcessListForUser() = default;
480
481
482}
483