1#include <Common/ThreadStatus.h>
2#include <Common/CurrentThread.h>
3#include <Common/ThreadProfileEvents.h>
4#include <Common/Exception.h>
5#include <Common/QueryProfiler.h>
6#include <Interpreters/Context.h>
7#include <Interpreters/QueryThreadLog.h>
8#include <Interpreters/ProcessList.h>
9
10#if defined(__linux__)
11#include <sys/time.h>
12#include <sys/resource.h>
13
14#include <Common/hasLinuxCapability.h>
15#endif
16
17
18/// Implement some methods of ThreadStatus and CurrentThread here to avoid extra linking dependencies in clickhouse_common_io
19/// TODO It doesn't make sense.
20
21namespace DB
22{
23
24namespace ErrorCodes
25{
26 extern const int CANNOT_SET_THREAD_PRIORITY;
27}
28
29
30void ThreadStatus::attachQueryContext(Context & query_context_)
31{
32 query_context = &query_context_;
33 query_id = query_context->getCurrentQueryId();
34 if (!global_context)
35 global_context = &query_context->getGlobalContext();
36
37 if (thread_group)
38 {
39 std::lock_guard lock(thread_group->mutex);
40 thread_group->query_context = query_context;
41 if (!thread_group->global_context)
42 thread_group->global_context = global_context;
43 }
44}
45
46void CurrentThread::defaultThreadDeleter()
47{
48 if (unlikely(!current_thread))
49 return;
50 current_thread->detachQuery(true, true);
51}
52
53void ThreadStatus::initializeQuery()
54{
55 assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
56
57 thread_group = std::make_shared<ThreadGroupStatus>();
58
59 performance_counters.setParent(&thread_group->performance_counters);
60 memory_tracker.setParent(&thread_group->memory_tracker);
61 thread_group->memory_tracker.setDescription("(for query)");
62
63 thread_group->thread_numbers.emplace_back(thread_number);
64 thread_group->os_thread_ids.emplace_back(os_thread_id);
65 thread_group->master_thread_number = thread_number;
66 thread_group->master_thread_os_id = os_thread_id;
67
68 initPerformanceCounters();
69 thread_state = ThreadState::AttachedToQuery;
70}
71
72void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
73{
74 if (thread_state == ThreadState::AttachedToQuery)
75 {
76 if (check_detached)
77 throw Exception("Can't attach query to the thread, it is already attached", ErrorCodes::LOGICAL_ERROR);
78 return;
79 }
80
81 assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);
82
83 if (!thread_group_)
84 throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);
85
86 /// Attach current thread to thread group and copy useful information from it
87 thread_group = thread_group_;
88
89 performance_counters.setParent(&thread_group->performance_counters);
90 memory_tracker.setParent(&thread_group->memory_tracker);
91
92 {
93 std::lock_guard lock(thread_group->mutex);
94
95 logs_queue_ptr = thread_group->logs_queue_ptr;
96 query_context = thread_group->query_context;
97
98 if (!global_context)
99 global_context = thread_group->global_context;
100
101 /// NOTE: A thread may be attached multiple times if it is reused from a thread pool.
102 thread_group->thread_numbers.emplace_back(thread_number);
103 thread_group->os_thread_ids.emplace_back(os_thread_id);
104 }
105
106 if (query_context)
107 {
108 query_id = query_context->getCurrentQueryId();
109
110#if defined(__linux__)
111 /// Set "nice" value if required.
112 Int32 new_os_thread_priority = query_context->getSettingsRef().os_thread_priority;
113 if (new_os_thread_priority && hasLinuxCapability(CAP_SYS_NICE))
114 {
115 LOG_TRACE(log, "Setting nice to " << new_os_thread_priority);
116
117 if (0 != setpriority(PRIO_PROCESS, os_thread_id, new_os_thread_priority))
118 throwFromErrno("Cannot 'setpriority'", ErrorCodes::CANNOT_SET_THREAD_PRIORITY);
119
120 os_thread_priority = new_os_thread_priority;
121 }
122#endif
123 }
124
125 initPerformanceCounters();
126 initQueryProfiler();
127
128 thread_state = ThreadState::AttachedToQuery;
129}
130
131void ThreadStatus::finalizePerformanceCounters()
132{
133 if (performance_counters_finalized)
134 return;
135
136 performance_counters_finalized = true;
137 updatePerformanceCounters();
138
139 try
140 {
141 if (global_context && query_context)
142 {
143 auto & settings = query_context->getSettingsRef();
144 if (settings.log_queries && settings.log_query_threads)
145 if (auto thread_log = global_context->getQueryThreadLog())
146 logToQueryThreadLog(*thread_log);
147 }
148 }
149 catch (...)
150 {
151 tryLogCurrentException(log);
152 }
153}
154
155void ThreadStatus::initQueryProfiler()
156{
157 /// query profilers are useless without trace collector
158 if (!global_context || !global_context->hasTraceCollector())
159 return;
160
161 const auto & settings = query_context->getSettingsRef();
162
163 if (settings.query_profiler_real_time_period_ns > 0)
164 query_profiler_real = std::make_unique<QueryProfilerReal>(
165 /* thread_id */ os_thread_id,
166 /* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
167
168 if (settings.query_profiler_cpu_time_period_ns > 0)
169 query_profiler_cpu = std::make_unique<QueryProfilerCpu>(
170 /* thread_id */ os_thread_id,
171 /* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
172}
173
174void ThreadStatus::finalizeQueryProfiler()
175{
176 query_profiler_real.reset();
177 query_profiler_cpu.reset();
178}
179
180void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
181{
182 if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
183 {
184 thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;
185 return;
186 }
187
188 assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
189
190 finalizeQueryProfiler();
191 finalizePerformanceCounters();
192
193 /// Detach from thread group
194 performance_counters.setParent(&ProfileEvents::global_counters);
195 memory_tracker.reset();
196
197 /// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
198 memory_tracker.setParent(nullptr);
199
200 query_id.clear();
201 query_context = nullptr;
202 thread_group.reset();
203
204 thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;
205
206#if defined(__linux__)
207 if (os_thread_priority)
208 {
209 LOG_TRACE(log, "Resetting nice");
210
211 if (0 != setpriority(PRIO_PROCESS, os_thread_id, 0))
212 LOG_ERROR(log, "Cannot 'setpriority' back to zero: " << errnoToString(ErrorCodes::CANNOT_SET_THREAD_PRIORITY, errno));
213
214 os_thread_priority = 0;
215 }
216#endif
217}
218
219void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log)
220{
221 QueryThreadLogElement elem;
222
223 elem.event_time = time(nullptr);
224 elem.query_start_time = query_start_time;
225 elem.query_duration_ms = (getCurrentTimeNanoseconds() - query_start_time_nanoseconds) / 1000000U;
226
227 elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
228 elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
229
230 /// TODO: Use written_rows and written_bytes when run time progress is implemented
231 elem.written_rows = progress_out.read_rows.load(std::memory_order_relaxed);
232 elem.written_bytes = progress_out.read_bytes.load(std::memory_order_relaxed);
233 elem.memory_usage = memory_tracker.get();
234 elem.peak_memory_usage = memory_tracker.getPeak();
235
236 elem.thread_name = getThreadName();
237 elem.thread_number = thread_number;
238 elem.os_thread_id = os_thread_id;
239
240 if (thread_group)
241 {
242 {
243 std::lock_guard lock(thread_group->mutex);
244
245 elem.master_thread_number = thread_group->master_thread_number;
246 elem.master_os_thread_id = thread_group->master_thread_os_id;
247
248 elem.query = thread_group->query;
249 }
250 }
251
252 if (query_context)
253 {
254 elem.client_info = query_context->getClientInfo();
255
256 if (query_context->getSettingsRef().log_profile_events != 0)
257 {
258 /// NOTE: Here we are in the same thread, so we can make memcpy()
259 elem.profile_counters = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot());
260 }
261 }
262
263 thread_log.add(elem);
264}
265
266void CurrentThread::initializeQuery()
267{
268 if (unlikely(!current_thread))
269 return;
270 current_thread->initializeQuery();
271 current_thread->deleter = CurrentThread::defaultThreadDeleter;
272}
273
274void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
275{
276 if (unlikely(!current_thread))
277 return;
278 current_thread->attachQuery(thread_group, true);
279 current_thread->deleter = CurrentThread::defaultThreadDeleter;
280}
281
282void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group)
283{
284 if (unlikely(!current_thread))
285 return;
286 current_thread->attachQuery(thread_group, false);
287 current_thread->deleter = CurrentThread::defaultThreadDeleter;
288}
289
290void CurrentThread::attachQueryContext(Context & query_context)
291{
292 if (unlikely(!current_thread))
293 return;
294 current_thread->attachQueryContext(query_context);
295}
296
297void CurrentThread::finalizePerformanceCounters()
298{
299 if (unlikely(!current_thread))
300 return;
301 current_thread->finalizePerformanceCounters();
302}
303
304void CurrentThread::detachQuery()
305{
306 if (unlikely(!current_thread))
307 return;
308 current_thread->detachQuery(false);
309}
310
311void CurrentThread::detachQueryIfNotDetached()
312{
313 if (unlikely(!current_thread))
314 return;
315 current_thread->detachQuery(true);
316}
317
318
319CurrentThread::QueryScope::QueryScope(Context & query_context)
320{
321 CurrentThread::initializeQuery();
322 CurrentThread::attachQueryContext(query_context);
323}
324
325void CurrentThread::QueryScope::logPeakMemoryUsage()
326{
327 auto group = CurrentThread::getGroup();
328 if (!group)
329 return;
330
331 log_peak_memory_usage_in_destructor = false;
332 group->memory_tracker.logPeakMemoryUsage();
333}
334
335CurrentThread::QueryScope::~QueryScope()
336{
337 try
338 {
339 if (log_peak_memory_usage_in_destructor)
340 logPeakMemoryUsage();
341
342 CurrentThread::detachQueryIfNotDetached();
343 }
344 catch (...)
345 {
346 tryLogCurrentException("CurrentThread", __PRETTY_FUNCTION__);
347 }
348}
349
350}
351