1 | #include <sstream> |
---|---|
2 | |
3 | #include <Common/CurrentThread.h> |
4 | #include <Common/Exception.h> |
5 | #include <Common/ThreadProfileEvents.h> |
6 | #include <Common/TaskStatsInfoGetter.h> |
7 | #include <Common/QueryProfiler.h> |
8 | #include <Common/ThreadStatus.h> |
9 | |
10 | #include <Poco/Logger.h> |
11 | #include <common/getThreadNumber.h> |
12 | |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | |
18 | namespace ErrorCodes |
19 | { |
20 | extern const int LOGICAL_ERROR; |
21 | extern const int PTHREAD_ERROR; |
22 | } |
23 | |
24 | |
25 | thread_local ThreadStatus * current_thread = nullptr; |
26 | |
27 | |
28 | TasksStatsCounters TasksStatsCounters::current() |
29 | { |
30 | TasksStatsCounters res; |
31 | CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().os_thread_id); |
32 | return res; |
33 | } |
34 | |
35 | ThreadStatus::ThreadStatus() |
36 | { |
37 | thread_number = getThreadNumber(); |
38 | os_thread_id = TaskStatsInfoGetter::getCurrentTID(); |
39 | |
40 | last_rusage = std::make_unique<RUsageCounters>(); |
41 | last_taskstats = std::make_unique<TasksStatsCounters>(); |
42 | |
43 | memory_tracker.setDescription("(for thread)"); |
44 | log = &Poco::Logger::get("ThreadStatus"); |
45 | |
46 | current_thread = this; |
47 | |
48 | /// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created |
49 | /// Otherwise it could lead to SIGSEGV due to current_thread dereferencing |
50 | } |
51 | |
52 | ThreadStatus::~ThreadStatus() |
53 | { |
54 | try |
55 | { |
56 | if (untracked_memory > 0) |
57 | memory_tracker.alloc(untracked_memory); |
58 | else |
59 | memory_tracker.free(-untracked_memory); |
60 | } |
61 | catch (const DB::Exception &) |
62 | { |
63 | /// It's a minor tracked memory leak here (not the memory itself but it's counter). |
64 | /// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent. |
65 | } |
66 | |
67 | if (deleter) |
68 | deleter(); |
69 | current_thread = nullptr; |
70 | } |
71 | |
72 | void ThreadStatus::initPerformanceCounters() |
73 | { |
74 | performance_counters_finalized = false; |
75 | |
76 | /// Clear stats from previous query if a new query is started |
77 | /// TODO: make separate query_thread_performance_counters and thread_performance_counters |
78 | performance_counters.resetCounters(); |
79 | memory_tracker.resetCounters(); |
80 | memory_tracker.setDescription("(for thread)"); |
81 | |
82 | query_start_time_nanoseconds = getCurrentTimeNanoseconds(); |
83 | query_start_time = time(nullptr); |
84 | ++queries_started; |
85 | |
86 | *last_rusage = RUsageCounters::current(query_start_time_nanoseconds); |
87 | |
88 | try |
89 | { |
90 | if (TaskStatsInfoGetter::checkPermissions()) |
91 | { |
92 | if (!taskstats_getter) |
93 | taskstats_getter = std::make_unique<TaskStatsInfoGetter>(); |
94 | |
95 | *last_taskstats = TasksStatsCounters::current(); |
96 | } |
97 | } |
98 | catch (...) |
99 | { |
100 | taskstats_getter.reset(); |
101 | tryLogCurrentException(__PRETTY_FUNCTION__); |
102 | } |
103 | } |
104 | |
105 | void ThreadStatus::updatePerformanceCounters() |
106 | { |
107 | try |
108 | { |
109 | RUsageCounters::updateProfileEvents(*last_rusage, performance_counters); |
110 | if (taskstats_getter) |
111 | TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters); |
112 | } |
113 | catch (...) |
114 | { |
115 | tryLogCurrentException(log); |
116 | } |
117 | } |
118 | |
119 | void ThreadStatus::assertState(const std::initializer_list<int> & permitted_states, const char * description) |
120 | { |
121 | for (auto permitted_state : permitted_states) |
122 | { |
123 | if (getCurrentState() == permitted_state) |
124 | return; |
125 | } |
126 | |
127 | std::stringstream ss; |
128 | ss << "Unexpected thread state "<< getCurrentState(); |
129 | if (description) |
130 | ss << ": "<< description; |
131 | throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR); |
132 | } |
133 | |
134 | void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, |
135 | LogsLevel client_logs_level) |
136 | { |
137 | logs_queue_ptr = logs_queue; |
138 | |
139 | if (!thread_group) |
140 | return; |
141 | |
142 | std::lock_guard lock(thread_group->mutex); |
143 | thread_group->logs_queue_ptr = logs_queue; |
144 | thread_group->client_logs_level = client_logs_level; |
145 | } |
146 | |
147 | } |
148 |