1 | #include <Interpreters/ProcessList.h> |
2 | #include <Core/Settings.h> |
3 | #include <Interpreters/Context.h> |
4 | #include <Interpreters/DatabaseAndTableWithAlias.h> |
5 | #include <Parsers/ASTSelectWithUnionQuery.h> |
6 | #include <Parsers/ASTSelectQuery.h> |
7 | #include <Parsers/ASTKillQueryQuery.h> |
8 | #include <Common/typeid_cast.h> |
9 | #include <Common/Exception.h> |
10 | #include <Common/CurrentThread.h> |
11 | #include <IO/WriteHelpers.h> |
12 | #include <DataStreams/IBlockInputStream.h> |
13 | #include <common/logger_useful.h> |
14 | #include <chrono> |
15 | |
16 | |
17 | namespace CurrentMetrics |
18 | { |
19 | extern const Metric MemoryTracking; |
20 | } |
21 | |
22 | |
23 | namespace DB |
24 | { |
25 | |
26 | namespace ErrorCodes |
27 | { |
28 | extern const int TOO_MANY_SIMULTANEOUS_QUERIES; |
29 | extern const int QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING; |
30 | extern const int LOGICAL_ERROR; |
31 | extern const int TOO_MANY_ROWS; |
32 | extern const int TOO_MANY_BYTES; |
33 | } |
34 | |
35 | |
36 | /// Should we execute the query even if max_concurrent_queries limit is exhausted |
37 | static bool isUnlimitedQuery(const IAST * ast) |
38 | { |
39 | if (!ast) |
40 | return false; |
41 | |
42 | /// It is KILL QUERY |
43 | if (ast->as<ASTKillQueryQuery>()) |
44 | return true; |
45 | |
46 | /// It is SELECT FROM system.processes |
47 | /// NOTE: This is very rough check. |
48 | /// False negative: USE system; SELECT * FROM processes; |
49 | /// False positive: SELECT * FROM system.processes CROSS JOIN (SELECT ...) |
50 | |
51 | if (const auto * ast_selects = ast->as<ASTSelectWithUnionQuery>()) |
52 | { |
53 | if (!ast_selects->list_of_selects || ast_selects->list_of_selects->children.empty()) |
54 | return false; |
55 | |
56 | const auto * ast_select = ast_selects->list_of_selects->children[0]->as<ASTSelectQuery>(); |
57 | if (!ast_select) |
58 | return false; |
59 | |
60 | if (auto database_and_table = getDatabaseAndTable(*ast_select, 0)) |
61 | return database_and_table->database == "system" && database_and_table->table == "processes" ; |
62 | |
63 | return false; |
64 | } |
65 | |
66 | return false; |
67 | } |
68 | |
69 | |
70 | ProcessList::ProcessList(size_t max_size_) |
71 | : max_size(max_size_) |
72 | { |
73 | total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); |
74 | } |
75 | |
76 | |
77 | ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, Context & query_context) |
78 | { |
79 | EntryPtr res; |
80 | |
81 | const ClientInfo & client_info = query_context.getClientInfo(); |
82 | const Settings & settings = query_context.getSettingsRef(); |
83 | |
84 | if (client_info.current_query_id.empty()) |
85 | throw Exception("Query id cannot be empty" , ErrorCodes::LOGICAL_ERROR); |
86 | |
87 | bool is_unlimited_query = isUnlimitedQuery(ast); |
88 | |
89 | { |
90 | std::unique_lock lock(mutex); |
91 | |
92 | const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds(); |
93 | if (!is_unlimited_query && max_size && processes.size() >= max_size) |
94 | { |
95 | if (queue_max_wait_ms) |
96 | LOG_WARNING(&Logger::get("ProcessList" ), "Too many simultaneous queries, will wait " << queue_max_wait_ms << " ms." ); |
97 | if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; })) |
98 | throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); |
99 | } |
100 | |
101 | /** Why we use current user? |
102 | * Because initial one is passed by client and credentials for it is not verified, |
103 | * and using initial_user for limits will be insecure. |
104 | * |
105 | * Why we use current_query_id? |
106 | * Because we want to allow distributed queries that will run multiple secondary queries on same server, |
107 | * like SELECT count() FROM remote('127.0.0.{1,2}', system.numbers) |
108 | * so they must have different query_ids. |
109 | */ |
110 | |
111 | { |
112 | auto user_process_list = user_to_queries.find(client_info.current_user); |
113 | |
114 | if (user_process_list != user_to_queries.end()) |
115 | { |
116 | if (!is_unlimited_query && settings.max_concurrent_queries_for_user |
117 | && user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user) |
118 | throw Exception("Too many simultaneous queries for user " + client_info.current_user |
119 | + ". Current: " + toString(user_process_list->second.queries.size()) |
120 | + ", maximum: " + settings.max_concurrent_queries_for_user.toString(), |
121 | ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); |
122 | |
123 | auto running_query = user_process_list->second.queries.find(client_info.current_query_id); |
124 | |
125 | if (running_query != user_process_list->second.queries.end()) |
126 | { |
127 | if (!settings.replace_running_query) |
128 | throw Exception("Query with id = " + client_info.current_query_id + " is already running." , |
129 | ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); |
130 | |
131 | /// Ask queries to cancel. They will check this flag. |
132 | running_query->second->is_killed.store(true, std::memory_order_relaxed); |
133 | |
134 | const auto replace_running_query_max_wait_ms = settings.replace_running_query_max_wait_ms.totalMilliseconds(); |
135 | if (!replace_running_query_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(replace_running_query_max_wait_ms), |
136 | [&] |
137 | { |
138 | running_query = user_process_list->second.queries.find(client_info.current_query_id); |
139 | if (running_query == user_process_list->second.queries.end()) |
140 | return true; |
141 | running_query->second->is_killed.store(true, std::memory_order_relaxed); |
142 | return false; |
143 | })) |
144 | { |
145 | throw Exception("Query with id = " + client_info.current_query_id + " is already running and can't be stopped" , |
146 | ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); |
147 | } |
148 | } |
149 | } |
150 | } |
151 | |
152 | /// Check other users running query with our query_id |
153 | for (const auto & user_process_list : user_to_queries) |
154 | { |
155 | if (user_process_list.first == client_info.current_user) |
156 | continue; |
157 | if (auto running_query = user_process_list.second.queries.find(client_info.current_query_id); running_query != user_process_list.second.queries.end()) |
158 | throw Exception("Query with id = " + client_info.current_query_id + " is already running by user " + user_process_list.first, |
159 | ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); |
160 | } |
161 | |
162 | auto process_it = processes.emplace(processes.end(), |
163 | query_, client_info, settings.max_memory_usage, settings.memory_tracker_fault_probability, priorities.insert(settings.priority)); |
164 | |
165 | res = std::make_shared<Entry>(*this, process_it); |
166 | |
167 | process_it->query_context = &query_context; |
168 | |
169 | if (!client_info.current_query_id.empty()) |
170 | { |
171 | ProcessListForUser & user_process_list = user_to_queries[client_info.current_user]; |
172 | user_process_list.queries.emplace(client_info.current_query_id, &res->get()); |
173 | |
174 | process_it->setUserProcessList(&user_process_list); |
175 | |
176 | /// Limits are only raised (to be more relaxed) or set to something instead of zero, |
177 | /// because settings for different queries will interfere each other: |
178 | /// setting from one query effectively sets values for all other queries. |
179 | |
180 | /// Track memory usage for all simultaneously running queries. |
181 | /// You should specify this value in configuration for default profile, |
182 | /// not for specific users, sessions or queries, |
183 | /// because this setting is effectively global. |
184 | total_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_all_queries); |
185 | total_memory_tracker.setDescription("(total)" ); |
186 | |
187 | /// Track memory usage for all simultaneously running queries from single user. |
188 | user_process_list.user_memory_tracker.setParent(&total_memory_tracker); |
189 | user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user); |
190 | user_process_list.user_memory_tracker.setDescription("(for user)" ); |
191 | |
192 | /// Actualize thread group info |
193 | if (auto thread_group = CurrentThread::getGroup()) |
194 | { |
195 | std::lock_guard lock_thread_group(thread_group->mutex); |
196 | thread_group->performance_counters.setParent(&user_process_list.user_performance_counters); |
197 | thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker); |
198 | thread_group->query = process_it->query; |
199 | |
200 | /// Set query-level memory trackers |
201 | thread_group->memory_tracker.setOrRaiseLimit(process_it->max_memory_usage); |
202 | thread_group->memory_tracker.setDescription("(for query)" ); |
203 | if (process_it->memory_tracker_fault_probability) |
204 | thread_group->memory_tracker.setFaultProbability(process_it->memory_tracker_fault_probability); |
205 | |
206 | /// NOTE: Do not set the limit for thread-level memory tracker since it could show unreal values |
207 | /// since allocation and deallocation could happen in different threads |
208 | |
209 | process_it->thread_group = std::move(thread_group); |
210 | } |
211 | |
212 | if (!user_process_list.user_throttler) |
213 | { |
214 | if (settings.max_network_bandwidth_for_user) |
215 | user_process_list.user_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_user, total_network_throttler); |
216 | else if (settings.max_network_bandwidth_for_all_users) |
217 | user_process_list.user_throttler = total_network_throttler; |
218 | } |
219 | } |
220 | |
221 | if (!total_network_throttler && settings.max_network_bandwidth_for_all_users) |
222 | { |
223 | total_network_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_all_users); |
224 | } |
225 | } |
226 | |
227 | return res; |
228 | } |
229 | |
230 | |
231 | ProcessListEntry::~ProcessListEntry() |
232 | { |
233 | /// Destroy all streams to avoid long lock of ProcessList |
234 | it->releaseQueryStreams(); |
235 | |
236 | std::lock_guard lock(parent.mutex); |
237 | |
238 | String user = it->getClientInfo().current_user; |
239 | String query_id = it->getClientInfo().current_query_id; |
240 | |
241 | const QueryStatus * process_list_element_ptr = &*it; |
242 | |
243 | /// This removes the memory_tracker of one request. |
244 | parent.processes.erase(it); |
245 | |
246 | auto user_process_list_it = parent.user_to_queries.find(user); |
247 | if (user_process_list_it == parent.user_to_queries.end()) |
248 | { |
249 | LOG_ERROR(&Logger::get("ProcessList" ), "Logical error: cannot find user in ProcessList" ); |
250 | std::terminate(); |
251 | } |
252 | |
253 | ProcessListForUser & user_process_list = user_process_list_it->second; |
254 | |
255 | bool found = false; |
256 | |
257 | if (auto running_query = user_process_list.queries.find(query_id); running_query != user_process_list.queries.end()) |
258 | { |
259 | if (running_query->second == process_list_element_ptr) |
260 | { |
261 | user_process_list.queries.erase(running_query->first); |
262 | found = true; |
263 | } |
264 | } |
265 | |
266 | if (!found) |
267 | { |
268 | LOG_ERROR(&Logger::get("ProcessList" ), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser" ); |
269 | std::terminate(); |
270 | } |
271 | parent.have_space.notify_all(); |
272 | |
273 | /// If there are no more queries for the user, then we will reset memory tracker and network throttler. |
274 | if (user_process_list.queries.empty()) |
275 | user_process_list.resetTrackers(); |
276 | |
277 | /// This removes memory_tracker for all requests. At this time, no other memory_trackers live. |
278 | if (parent.processes.size() == 0) |
279 | { |
280 | /// Reset MemoryTracker, similarly (see above). |
281 | parent.total_memory_tracker.logPeakMemoryUsage(); |
282 | parent.total_memory_tracker.reset(); |
283 | parent.total_network_throttler.reset(); |
284 | } |
285 | } |
286 | |
287 | |
288 | QueryStatus::QueryStatus( |
289 | const String & query_, |
290 | const ClientInfo & client_info_, |
291 | size_t max_memory_usage_, |
292 | double memory_tracker_fault_probability_, |
293 | QueryPriorities::Handle && priority_handle_) |
294 | : |
295 | query(query_), |
296 | client_info(client_info_), |
297 | priority_handle(std::move(priority_handle_)), |
298 | num_queries_increment{CurrentMetrics::Query}, |
299 | max_memory_usage(max_memory_usage_), |
300 | memory_tracker_fault_probability(memory_tracker_fault_probability_) |
301 | { |
302 | } |
303 | |
304 | QueryStatus::~QueryStatus() = default; |
305 | |
306 | void QueryStatus::setQueryStreams(const BlockIO & io) |
307 | { |
308 | std::lock_guard lock(query_streams_mutex); |
309 | |
310 | query_stream_in = io.in; |
311 | query_stream_out = io.out; |
312 | query_streams_status = QueryStreamsStatus::Initialized; |
313 | } |
314 | |
315 | void QueryStatus::releaseQueryStreams() |
316 | { |
317 | BlockInputStreamPtr in; |
318 | BlockOutputStreamPtr out; |
319 | |
320 | { |
321 | std::lock_guard lock(query_streams_mutex); |
322 | |
323 | query_streams_status = QueryStreamsStatus::Released; |
324 | in = std::move(query_stream_in); |
325 | out = std::move(query_stream_out); |
326 | } |
327 | |
328 | /// Destroy streams outside the mutex lock |
329 | } |
330 | |
331 | bool QueryStatus::streamsAreReleased() |
332 | { |
333 | std::lock_guard lock(query_streams_mutex); |
334 | |
335 | return query_streams_status == QueryStreamsStatus::Released; |
336 | } |
337 | |
338 | bool QueryStatus::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const |
339 | { |
340 | std::lock_guard lock(query_streams_mutex); |
341 | |
342 | if (query_streams_status != QueryStreamsStatus::Initialized) |
343 | return false; |
344 | |
345 | in = query_stream_in; |
346 | out = query_stream_out; |
347 | return true; |
348 | } |
349 | |
350 | CancellationCode QueryStatus::cancelQuery(bool kill) |
351 | { |
352 | /// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit |
353 | if (streamsAreReleased()) |
354 | return CancellationCode::CancelSent; |
355 | |
356 | BlockInputStreamPtr input_stream; |
357 | BlockOutputStreamPtr output_stream; |
358 | |
359 | if (tryGetQueryStreams(input_stream, output_stream)) |
360 | { |
361 | if (input_stream) |
362 | { |
363 | input_stream->cancel(kill); |
364 | return CancellationCode::CancelSent; |
365 | } |
366 | return CancellationCode::CancelCannotBeSent; |
367 | } |
368 | /// Query is not even started |
369 | is_killed.store(true); |
370 | return CancellationCode::CancelSent; |
371 | } |
372 | |
373 | |
374 | void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_) |
375 | { |
376 | user_process_list = user_process_list_; |
377 | } |
378 | |
379 | |
380 | ThrottlerPtr QueryStatus::getUserNetworkThrottler() |
381 | { |
382 | if (!user_process_list) |
383 | return {}; |
384 | return user_process_list->user_throttler; |
385 | } |
386 | |
387 | |
388 | QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query_id, const String & current_user) |
389 | { |
390 | auto user_it = user_to_queries.find(current_user); |
391 | if (user_it != user_to_queries.end()) |
392 | { |
393 | const auto & user_queries = user_it->second.queries; |
394 | auto query_it = user_queries.find(current_query_id); |
395 | |
396 | if (query_it != user_queries.end()) |
397 | return query_it->second; |
398 | } |
399 | |
400 | return nullptr; |
401 | } |
402 | |
403 | |
404 | CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill) |
405 | { |
406 | std::lock_guard lock(mutex); |
407 | |
408 | QueryStatus * elem = tryGetProcessListElement(current_query_id, current_user); |
409 | |
410 | if (!elem) |
411 | return CancellationCode::NotFound; |
412 | |
413 | return elem->cancelQuery(kill); |
414 | } |
415 | |
416 | |
417 | void ProcessList::killAllQueries() |
418 | { |
419 | std::lock_guard lock(mutex); |
420 | |
421 | for (auto & process : processes) |
422 | process.cancelQuery(true); |
423 | } |
424 | |
425 | |
426 | QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const |
427 | { |
428 | QueryStatusInfo res; |
429 | |
430 | res.query = query; |
431 | res.client_info = client_info; |
432 | res.elapsed_seconds = watch.elapsedSeconds(); |
433 | res.is_cancelled = is_killed.load(std::memory_order_relaxed); |
434 | res.read_rows = progress_in.read_rows; |
435 | res.read_bytes = progress_in.read_bytes; |
436 | res.total_rows = progress_in.total_rows_to_read; |
437 | |
438 | /// TODO: Use written_rows and written_bytes when real time progress is implemented |
439 | res.written_rows = progress_out.read_rows; |
440 | res.written_bytes = progress_out.read_bytes; |
441 | |
442 | if (thread_group) |
443 | { |
444 | res.memory_usage = thread_group->memory_tracker.get(); |
445 | res.peak_memory_usage = thread_group->memory_tracker.getPeak(); |
446 | |
447 | if (get_thread_list) |
448 | { |
449 | std::lock_guard lock(thread_group->mutex); |
450 | res.thread_numbers = thread_group->thread_numbers; |
451 | res.os_thread_ids = thread_group->os_thread_ids; |
452 | } |
453 | |
454 | if (get_profile_events) |
455 | res.profile_counters = std::make_shared<ProfileEvents::Counters>(thread_group->performance_counters.getPartiallyAtomicSnapshot()); |
456 | } |
457 | |
458 | if (get_settings && query_context) |
459 | res.query_settings = std::make_shared<Settings>(query_context->getSettingsRef()); |
460 | |
461 | return res; |
462 | } |
463 | |
464 | |
465 | ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const |
466 | { |
467 | Info per_query_infos; |
468 | |
469 | std::lock_guard lock(mutex); |
470 | |
471 | per_query_infos.reserve(processes.size()); |
472 | for (const auto & process : processes) |
473 | per_query_infos.emplace_back(process.getInfo(get_thread_list, get_profile_events, get_settings)); |
474 | |
475 | return per_query_infos; |
476 | } |
477 | |
478 | |
479 | ProcessListForUser::ProcessListForUser() = default; |
480 | |
481 | |
482 | } |
483 | |