1#include <sstream>
2
3#include <Common/CurrentThread.h>
4#include <Common/Exception.h>
5#include <Common/ThreadProfileEvents.h>
6#include <Common/TaskStatsInfoGetter.h>
7#include <Common/QueryProfiler.h>
8#include <Common/ThreadStatus.h>
9
10#include <Poco/Logger.h>
11#include <common/getThreadNumber.h>
12
13
14namespace DB
15{
16
17
18namespace ErrorCodes
19{
20 extern const int LOGICAL_ERROR;
21 extern const int PTHREAD_ERROR;
22}
23
24
25thread_local ThreadStatus * current_thread = nullptr;
26
27
28TasksStatsCounters TasksStatsCounters::current()
29{
30 TasksStatsCounters res;
31 CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().os_thread_id);
32 return res;
33}
34
35ThreadStatus::ThreadStatus()
36{
37 thread_number = getThreadNumber();
38 os_thread_id = TaskStatsInfoGetter::getCurrentTID();
39
40 last_rusage = std::make_unique<RUsageCounters>();
41 last_taskstats = std::make_unique<TasksStatsCounters>();
42
43 memory_tracker.setDescription("(for thread)");
44 log = &Poco::Logger::get("ThreadStatus");
45
46 current_thread = this;
47
48 /// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created
49 /// Otherwise it could lead to SIGSEGV due to current_thread dereferencing
50}
51
52ThreadStatus::~ThreadStatus()
53{
54 try
55 {
56 if (untracked_memory > 0)
57 memory_tracker.alloc(untracked_memory);
58 else
59 memory_tracker.free(-untracked_memory);
60 }
61 catch (const DB::Exception &)
62 {
63 /// It's a minor tracked memory leak here (not the memory itself but it's counter).
64 /// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent.
65 }
66
67 if (deleter)
68 deleter();
69 current_thread = nullptr;
70}
71
72void ThreadStatus::initPerformanceCounters()
73{
74 performance_counters_finalized = false;
75
76 /// Clear stats from previous query if a new query is started
77 /// TODO: make separate query_thread_performance_counters and thread_performance_counters
78 performance_counters.resetCounters();
79 memory_tracker.resetCounters();
80 memory_tracker.setDescription("(for thread)");
81
82 query_start_time_nanoseconds = getCurrentTimeNanoseconds();
83 query_start_time = time(nullptr);
84 ++queries_started;
85
86 *last_rusage = RUsageCounters::current(query_start_time_nanoseconds);
87
88 try
89 {
90 if (TaskStatsInfoGetter::checkPermissions())
91 {
92 if (!taskstats_getter)
93 taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
94
95 *last_taskstats = TasksStatsCounters::current();
96 }
97 }
98 catch (...)
99 {
100 taskstats_getter.reset();
101 tryLogCurrentException(__PRETTY_FUNCTION__);
102 }
103}
104
105void ThreadStatus::updatePerformanceCounters()
106{
107 try
108 {
109 RUsageCounters::updateProfileEvents(*last_rusage, performance_counters);
110 if (taskstats_getter)
111 TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters);
112 }
113 catch (...)
114 {
115 tryLogCurrentException(log);
116 }
117}
118
119void ThreadStatus::assertState(const std::initializer_list<int> & permitted_states, const char * description)
120{
121 for (auto permitted_state : permitted_states)
122 {
123 if (getCurrentState() == permitted_state)
124 return;
125 }
126
127 std::stringstream ss;
128 ss << "Unexpected thread state " << getCurrentState();
129 if (description)
130 ss << ": " << description;
131 throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR);
132}
133
134void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
135 LogsLevel client_logs_level)
136{
137 logs_queue_ptr = logs_queue;
138
139 if (!thread_group)
140 return;
141
142 std::lock_guard lock(thread_group->mutex);
143 thread_group->logs_queue_ptr = logs_queue;
144 thread_group->client_logs_level = client_logs_level;
145}
146
147}
148