1#pragma once
2
3
4#include <Core/Defines.h>
5#include <DataStreams/BlockIO.h>
6#include <IO/Progress.h>
7#include <Interpreters/CancellationCode.h>
8#include <Interpreters/ClientInfo.h>
9#include <Interpreters/QueryPriorities.h>
10#include <Storages/IStorage_fwd.h>
11#include <Poco/Condition.h>
12#include <Common/CurrentMetrics.h>
13#include <Common/CurrentThread.h>
14#include <Common/MemoryTracker.h>
15#include <Common/ProfileEvents.h>
16#include <Common/Stopwatch.h>
17#include <Common/Throttler.h>
18
19#include <condition_variable>
20#include <list>
21#include <map>
22#include <memory>
23#include <mutex>
24#include <shared_mutex>
25#include <unordered_map>
26
27
28namespace CurrentMetrics
29{
30 extern const Metric Query;
31}
32
33namespace DB
34{
35
36class Context;
37struct Settings;
38class IAST;
39
40struct ProcessListForUser;
41class QueryStatus;
42class ThreadStatus;
43class ProcessListEntry;
44
45
46/** List of currently executing queries.
47 * Also implements limit on their number.
48 */
49
50/** Information of process list element.
51 * To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor.
52 */
53struct QueryStatusInfo
54{
55 String query;
56 double elapsed_seconds;
57 size_t read_rows;
58 size_t read_bytes;
59 size_t total_rows;
60 size_t written_rows;
61 size_t written_bytes;
62 Int64 memory_usage;
63 Int64 peak_memory_usage;
64 ClientInfo client_info;
65 bool is_cancelled;
66
67 /// Optional fields, filled by request
68 std::vector<UInt32> thread_numbers;
69 std::vector<UInt32> os_thread_ids;
70 std::shared_ptr<ProfileEvents::Counters> profile_counters;
71 std::shared_ptr<Settings> query_settings;
72};
73
74/// Query and information about its execution.
75class QueryStatus
76{
77protected:
78 friend class ProcessList;
79 friend class ThreadStatus;
80 friend class CurrentThread;
81 friend class ProcessListEntry;
82
83 String query;
84 ClientInfo client_info;
85
86 /// Is set once when init
87 Context * query_context = nullptr;
88
89 /// Info about all threads involved in query execution
90 ThreadGroupStatusPtr thread_group;
91
92 Stopwatch watch;
93
94 /// Progress of input stream
95 Progress progress_in;
96 /// Progress of output stream
97 Progress progress_out;
98
99 QueryPriorities::Handle priority_handle;
100
101 CurrentMetrics::Increment num_queries_increment{CurrentMetrics::Query};
102
103 size_t max_memory_usage = 0;
104 double memory_tracker_fault_probability = 0.0;
105
106 std::atomic<bool> is_killed { false };
107
108 void setUserProcessList(ProcessListForUser * user_process_list_);
109 /// Be careful using it. For example, queries field of ProcessListForUser could be modified concurrently.
110 const ProcessListForUser * getUserProcessList() const { return user_process_list; }
111
112 mutable std::mutex query_streams_mutex;
113
114 /// Streams with query results, point to BlockIO from executeQuery()
115 /// This declaration is compatible with notes about BlockIO::process_list_entry:
116 /// there are no cyclic dependencies: BlockIO::in,out point to objects inside ProcessListElement (not whole object)
117 BlockInputStreamPtr query_stream_in;
118 BlockOutputStreamPtr query_stream_out;
119
120 enum QueryStreamsStatus
121 {
122 NotInitialized,
123 Initialized,
124 Released
125 };
126
127 QueryStreamsStatus query_streams_status{NotInitialized};
128
129 ProcessListForUser * user_process_list = nullptr;
130
131public:
132
133 QueryStatus(
134 const String & query_,
135 const ClientInfo & client_info_,
136 size_t max_memory_usage,
137 double memory_tracker_fault_probability,
138 QueryPriorities::Handle && priority_handle_);
139
140 ~QueryStatus();
141
142 const ClientInfo & getClientInfo() const
143 {
144 return client_info;
145 }
146
147 ProgressValues getProgressIn() const
148 {
149 return progress_in.getValues();
150 }
151
152 ProgressValues getProgressOut() const
153 {
154 return progress_out.getValues();
155 }
156
157 ThrottlerPtr getUserNetworkThrottler();
158
159 bool updateProgressIn(const Progress & value)
160 {
161 CurrentThread::updateProgressIn(value);
162 progress_in.incrementPiecewiseAtomically(value);
163
164 if (priority_handle)
165 priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable.
166
167 return !is_killed.load(std::memory_order_relaxed);
168 }
169
170 bool updateProgressOut(const Progress & value)
171 {
172 CurrentThread::updateProgressOut(value);
173 progress_out.incrementPiecewiseAtomically(value);
174
175 return !is_killed.load(std::memory_order_relaxed);
176 }
177
178 QueryStatusInfo getInfo(bool get_thread_list = false, bool get_profile_events = false, bool get_settings = false) const;
179
180 Context * tryGetQueryContext() { return query_context; }
181 const Context * tryGetQueryContext() const { return query_context; }
182
183 /// Copies pointers to in/out streams
184 void setQueryStreams(const BlockIO & io);
185
186 /// Frees in/out streams
187 void releaseQueryStreams();
188
189 /// It means that ProcessListEntry still exists, but stream was already destroyed
190 bool streamsAreReleased();
191
192 /// Get query in/out pointers from BlockIO
193 bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const;
194
195 CancellationCode cancelQuery(bool kill);
196
197 bool isKilled() const { return is_killed; }
198};
199
200
201/// Data about queries for one user.
202struct ProcessListForUser
203{
204 ProcessListForUser();
205
206 /// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled.
207 using QueryToElement = std::unordered_map<String, QueryStatus *>;
208 QueryToElement queries;
209
210 ProfileEvents::Counters user_performance_counters{VariableContext::User, &ProfileEvents::global_counters};
211 /// Limit and counter for memory of all simultaneously running queries of single user.
212 MemoryTracker user_memory_tracker{VariableContext::User};
213
214 /// Count network usage for all simultaneously running queries of single user.
215 ThrottlerPtr user_throttler;
216
217 /// Clears MemoryTracker for the user.
218 /// Sometimes it is important to reset the MemoryTracker, because it may accumulate skew
219 /// due to the fact that there are cases when memory can be allocated while processing the query, but released later.
220 /// Clears network bandwidth Throttler, so it will not count periods of inactivity.
221 void resetTrackers()
222 {
223 user_memory_tracker.reset();
224 if (user_throttler)
225 user_throttler.reset();
226 }
227};
228
229
230class ProcessList;
231
232
233/// Keeps iterator to process list and removes element in destructor.
234class ProcessListEntry
235{
236private:
237 using Container = std::list<QueryStatus>;
238
239 ProcessList & parent;
240 Container::iterator it;
241
242public:
243 ProcessListEntry(ProcessList & parent_, Container::iterator it_)
244 : parent(parent_), it(it_) {}
245
246 ~ProcessListEntry();
247
248 QueryStatus * operator->() { return &*it; }
249 const QueryStatus * operator->() const { return &*it; }
250
251 QueryStatus & get() { return *it; }
252 const QueryStatus & get() const { return *it; }
253};
254
255
256class ProcessList
257{
258public:
259 using Element = QueryStatus;
260 using Entry = ProcessListEntry;
261
262 /// list, for iterators not to invalidate. NOTE: could replace with cyclic buffer, but not worth.
263 using Container = std::list<Element>;
264 using Info = std::vector<QueryStatusInfo>;
265 /// User -> queries
266 using UserToQueries = std::unordered_map<String, ProcessListForUser>;
267
268protected:
269 friend class ProcessListEntry;
270
271 mutable std::mutex mutex;
272 mutable std::condition_variable have_space; /// Number of currently running queries has become less than maximum.
273
274 /// List of queries
275 Container processes;
276 size_t max_size; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
277
278 /// Stores per-user info: queries, statistics and limits
279 UserToQueries user_to_queries;
280
281 /// Stores info about queries grouped by their priority
282 QueryPriorities priorities;
283
284 /// Limit and counter for memory of all simultaneously running queries.
285 MemoryTracker total_memory_tracker{VariableContext::Global};
286
287 /// Limit network bandwidth for all users
288 ThrottlerPtr total_network_throttler;
289
290 /// Call under lock. Finds process with specified current_user and current_query_id.
291 QueryStatus * tryGetProcessListElement(const String & current_query_id, const String & current_user);
292
293public:
294 ProcessList(size_t max_size_ = 0);
295
296 using EntryPtr = std::shared_ptr<ProcessListEntry>;
297
298 /** Register running query. Returns refcounted object, that will remove element from list in destructor.
299 * If too many running queries - wait for not more than specified (see settings) amount of time.
300 * If timeout is passed - throw an exception.
301 * Don't count KILL QUERY queries.
302 */
303 EntryPtr insert(const String & query_, const IAST * ast, Context & query_context);
304
305 /// Number of currently executing queries.
306 size_t size() const { return processes.size(); }
307
308 /// Get current state of process list.
309 Info getInfo(bool get_thread_list = false, bool get_profile_events = false, bool get_settings = false) const;
310
311 void setMaxSize(size_t max_size_)
312 {
313 std::lock_guard lock(mutex);
314 max_size = max_size_;
315 }
316
317 /// Try call cancel() for input and output streams of query with specified id and user
318 CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false);
319
320 void killAllQueries();
321};
322
323}
324