1 | #include <Common/ThreadStatus.h> |
2 | #include <Common/CurrentThread.h> |
3 | #include <Common/ThreadProfileEvents.h> |
4 | #include <Common/Exception.h> |
5 | #include <Common/QueryProfiler.h> |
6 | #include <Interpreters/Context.h> |
7 | #include <Interpreters/QueryThreadLog.h> |
8 | #include <Interpreters/ProcessList.h> |
9 | |
10 | #if defined(__linux__) |
11 | #include <sys/time.h> |
12 | #include <sys/resource.h> |
13 | |
14 | #include <Common/hasLinuxCapability.h> |
15 | #endif |
16 | |
17 | |
18 | /// Implement some methods of ThreadStatus and CurrentThread here to avoid extra linking dependencies in clickhouse_common_io |
19 | /// TODO It doesn't make sense. |
20 | |
21 | namespace DB |
22 | { |
23 | |
24 | namespace ErrorCodes |
25 | { |
26 | extern const int CANNOT_SET_THREAD_PRIORITY; |
27 | } |
28 | |
29 | |
30 | void ThreadStatus::attachQueryContext(Context & query_context_) |
31 | { |
32 | query_context = &query_context_; |
33 | query_id = query_context->getCurrentQueryId(); |
34 | if (!global_context) |
35 | global_context = &query_context->getGlobalContext(); |
36 | |
37 | if (thread_group) |
38 | { |
39 | std::lock_guard lock(thread_group->mutex); |
40 | thread_group->query_context = query_context; |
41 | if (!thread_group->global_context) |
42 | thread_group->global_context = global_context; |
43 | } |
44 | } |
45 | |
46 | void CurrentThread::defaultThreadDeleter() |
47 | { |
48 | if (unlikely(!current_thread)) |
49 | return; |
50 | current_thread->detachQuery(true, true); |
51 | } |
52 | |
53 | void ThreadStatus::initializeQuery() |
54 | { |
55 | assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__); |
56 | |
57 | thread_group = std::make_shared<ThreadGroupStatus>(); |
58 | |
59 | performance_counters.setParent(&thread_group->performance_counters); |
60 | memory_tracker.setParent(&thread_group->memory_tracker); |
61 | thread_group->memory_tracker.setDescription("(for query)" ); |
62 | |
63 | thread_group->thread_numbers.emplace_back(thread_number); |
64 | thread_group->os_thread_ids.emplace_back(os_thread_id); |
65 | thread_group->master_thread_number = thread_number; |
66 | thread_group->master_thread_os_id = os_thread_id; |
67 | |
68 | initPerformanceCounters(); |
69 | thread_state = ThreadState::AttachedToQuery; |
70 | } |
71 | |
72 | void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached) |
73 | { |
74 | if (thread_state == ThreadState::AttachedToQuery) |
75 | { |
76 | if (check_detached) |
77 | throw Exception("Can't attach query to the thread, it is already attached" , ErrorCodes::LOGICAL_ERROR); |
78 | return; |
79 | } |
80 | |
81 | assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__); |
82 | |
83 | if (!thread_group_) |
84 | throw Exception("Attempt to attach to nullptr thread group" , ErrorCodes::LOGICAL_ERROR); |
85 | |
86 | /// Attach current thread to thread group and copy useful information from it |
87 | thread_group = thread_group_; |
88 | |
89 | performance_counters.setParent(&thread_group->performance_counters); |
90 | memory_tracker.setParent(&thread_group->memory_tracker); |
91 | |
92 | { |
93 | std::lock_guard lock(thread_group->mutex); |
94 | |
95 | logs_queue_ptr = thread_group->logs_queue_ptr; |
96 | query_context = thread_group->query_context; |
97 | |
98 | if (!global_context) |
99 | global_context = thread_group->global_context; |
100 | |
101 | /// NOTE: A thread may be attached multiple times if it is reused from a thread pool. |
102 | thread_group->thread_numbers.emplace_back(thread_number); |
103 | thread_group->os_thread_ids.emplace_back(os_thread_id); |
104 | } |
105 | |
106 | if (query_context) |
107 | { |
108 | query_id = query_context->getCurrentQueryId(); |
109 | |
110 | #if defined(__linux__) |
111 | /// Set "nice" value if required. |
112 | Int32 new_os_thread_priority = query_context->getSettingsRef().os_thread_priority; |
113 | if (new_os_thread_priority && hasLinuxCapability(CAP_SYS_NICE)) |
114 | { |
115 | LOG_TRACE(log, "Setting nice to " << new_os_thread_priority); |
116 | |
117 | if (0 != setpriority(PRIO_PROCESS, os_thread_id, new_os_thread_priority)) |
118 | throwFromErrno("Cannot 'setpriority'" , ErrorCodes::CANNOT_SET_THREAD_PRIORITY); |
119 | |
120 | os_thread_priority = new_os_thread_priority; |
121 | } |
122 | #endif |
123 | } |
124 | |
125 | initPerformanceCounters(); |
126 | initQueryProfiler(); |
127 | |
128 | thread_state = ThreadState::AttachedToQuery; |
129 | } |
130 | |
131 | void ThreadStatus::finalizePerformanceCounters() |
132 | { |
133 | if (performance_counters_finalized) |
134 | return; |
135 | |
136 | performance_counters_finalized = true; |
137 | updatePerformanceCounters(); |
138 | |
139 | try |
140 | { |
141 | if (global_context && query_context) |
142 | { |
143 | auto & settings = query_context->getSettingsRef(); |
144 | if (settings.log_queries && settings.log_query_threads) |
145 | if (auto thread_log = global_context->getQueryThreadLog()) |
146 | logToQueryThreadLog(*thread_log); |
147 | } |
148 | } |
149 | catch (...) |
150 | { |
151 | tryLogCurrentException(log); |
152 | } |
153 | } |
154 | |
155 | void ThreadStatus::initQueryProfiler() |
156 | { |
157 | /// query profilers are useless without trace collector |
158 | if (!global_context || !global_context->hasTraceCollector()) |
159 | return; |
160 | |
161 | const auto & settings = query_context->getSettingsRef(); |
162 | |
163 | if (settings.query_profiler_real_time_period_ns > 0) |
164 | query_profiler_real = std::make_unique<QueryProfilerReal>( |
165 | /* thread_id */ os_thread_id, |
166 | /* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns)); |
167 | |
168 | if (settings.query_profiler_cpu_time_period_ns > 0) |
169 | query_profiler_cpu = std::make_unique<QueryProfilerCpu>( |
170 | /* thread_id */ os_thread_id, |
171 | /* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns)); |
172 | } |
173 | |
174 | void ThreadStatus::finalizeQueryProfiler() |
175 | { |
176 | query_profiler_real.reset(); |
177 | query_profiler_cpu.reset(); |
178 | } |
179 | |
180 | void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits) |
181 | { |
182 | if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery) |
183 | { |
184 | thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery; |
185 | return; |
186 | } |
187 | |
188 | assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__); |
189 | |
190 | finalizeQueryProfiler(); |
191 | finalizePerformanceCounters(); |
192 | |
193 | /// Detach from thread group |
194 | performance_counters.setParent(&ProfileEvents::global_counters); |
195 | memory_tracker.reset(); |
196 | |
197 | /// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below. |
198 | memory_tracker.setParent(nullptr); |
199 | |
200 | query_id.clear(); |
201 | query_context = nullptr; |
202 | thread_group.reset(); |
203 | |
204 | thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery; |
205 | |
206 | #if defined(__linux__) |
207 | if (os_thread_priority) |
208 | { |
209 | LOG_TRACE(log, "Resetting nice" ); |
210 | |
211 | if (0 != setpriority(PRIO_PROCESS, os_thread_id, 0)) |
212 | LOG_ERROR(log, "Cannot 'setpriority' back to zero: " << errnoToString(ErrorCodes::CANNOT_SET_THREAD_PRIORITY, errno)); |
213 | |
214 | os_thread_priority = 0; |
215 | } |
216 | #endif |
217 | } |
218 | |
219 | void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log) |
220 | { |
221 | QueryThreadLogElement elem; |
222 | |
223 | elem.event_time = time(nullptr); |
224 | elem.query_start_time = query_start_time; |
225 | elem.query_duration_ms = (getCurrentTimeNanoseconds() - query_start_time_nanoseconds) / 1000000U; |
226 | |
227 | elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed); |
228 | elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed); |
229 | |
230 | /// TODO: Use written_rows and written_bytes when run time progress is implemented |
231 | elem.written_rows = progress_out.read_rows.load(std::memory_order_relaxed); |
232 | elem.written_bytes = progress_out.read_bytes.load(std::memory_order_relaxed); |
233 | elem.memory_usage = memory_tracker.get(); |
234 | elem.peak_memory_usage = memory_tracker.getPeak(); |
235 | |
236 | elem.thread_name = getThreadName(); |
237 | elem.thread_number = thread_number; |
238 | elem.os_thread_id = os_thread_id; |
239 | |
240 | if (thread_group) |
241 | { |
242 | { |
243 | std::lock_guard lock(thread_group->mutex); |
244 | |
245 | elem.master_thread_number = thread_group->master_thread_number; |
246 | elem.master_os_thread_id = thread_group->master_thread_os_id; |
247 | |
248 | elem.query = thread_group->query; |
249 | } |
250 | } |
251 | |
252 | if (query_context) |
253 | { |
254 | elem.client_info = query_context->getClientInfo(); |
255 | |
256 | if (query_context->getSettingsRef().log_profile_events != 0) |
257 | { |
258 | /// NOTE: Here we are in the same thread, so we can make memcpy() |
259 | elem.profile_counters = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot()); |
260 | } |
261 | } |
262 | |
263 | thread_log.add(elem); |
264 | } |
265 | |
266 | void CurrentThread::initializeQuery() |
267 | { |
268 | if (unlikely(!current_thread)) |
269 | return; |
270 | current_thread->initializeQuery(); |
271 | current_thread->deleter = CurrentThread::defaultThreadDeleter; |
272 | } |
273 | |
274 | void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group) |
275 | { |
276 | if (unlikely(!current_thread)) |
277 | return; |
278 | current_thread->attachQuery(thread_group, true); |
279 | current_thread->deleter = CurrentThread::defaultThreadDeleter; |
280 | } |
281 | |
282 | void CurrentThread::attachToIfDetached(const ThreadGroupStatusPtr & thread_group) |
283 | { |
284 | if (unlikely(!current_thread)) |
285 | return; |
286 | current_thread->attachQuery(thread_group, false); |
287 | current_thread->deleter = CurrentThread::defaultThreadDeleter; |
288 | } |
289 | |
290 | void CurrentThread::attachQueryContext(Context & query_context) |
291 | { |
292 | if (unlikely(!current_thread)) |
293 | return; |
294 | current_thread->attachQueryContext(query_context); |
295 | } |
296 | |
297 | void CurrentThread::finalizePerformanceCounters() |
298 | { |
299 | if (unlikely(!current_thread)) |
300 | return; |
301 | current_thread->finalizePerformanceCounters(); |
302 | } |
303 | |
304 | void CurrentThread::detachQuery() |
305 | { |
306 | if (unlikely(!current_thread)) |
307 | return; |
308 | current_thread->detachQuery(false); |
309 | } |
310 | |
311 | void CurrentThread::detachQueryIfNotDetached() |
312 | { |
313 | if (unlikely(!current_thread)) |
314 | return; |
315 | current_thread->detachQuery(true); |
316 | } |
317 | |
318 | |
319 | CurrentThread::QueryScope::QueryScope(Context & query_context) |
320 | { |
321 | CurrentThread::initializeQuery(); |
322 | CurrentThread::attachQueryContext(query_context); |
323 | } |
324 | |
325 | void CurrentThread::QueryScope::logPeakMemoryUsage() |
326 | { |
327 | auto group = CurrentThread::getGroup(); |
328 | if (!group) |
329 | return; |
330 | |
331 | log_peak_memory_usage_in_destructor = false; |
332 | group->memory_tracker.logPeakMemoryUsage(); |
333 | } |
334 | |
335 | CurrentThread::QueryScope::~QueryScope() |
336 | { |
337 | try |
338 | { |
339 | if (log_peak_memory_usage_in_destructor) |
340 | logPeakMemoryUsage(); |
341 | |
342 | CurrentThread::detachQueryIfNotDetached(); |
343 | } |
344 | catch (...) |
345 | { |
346 | tryLogCurrentException("CurrentThread" , __PRETTY_FUNCTION__); |
347 | } |
348 | } |
349 | |
350 | } |
351 | |