| 1 | #include <sstream> | 
|---|---|
| 2 | |
| 3 | #include <Common/CurrentThread.h> | 
| 4 | #include <Common/Exception.h> | 
| 5 | #include <Common/ThreadProfileEvents.h> | 
| 6 | #include <Common/TaskStatsInfoGetter.h> | 
| 7 | #include <Common/QueryProfiler.h> | 
| 8 | #include <Common/ThreadStatus.h> | 
| 9 | |
| 10 | #include <Poco/Logger.h> | 
| 11 | #include <common/getThreadNumber.h> | 
| 12 | |
| 13 | |
| 14 | namespace DB | 
| 15 | { | 
| 16 | |
| 17 | |
| 18 | namespace ErrorCodes | 
| 19 | { | 
| 20 | extern const int LOGICAL_ERROR; | 
| 21 | extern const int PTHREAD_ERROR; | 
| 22 | } | 
| 23 | |
| 24 | |
| 25 | thread_local ThreadStatus * current_thread = nullptr; | 
| 26 | |
| 27 | |
| 28 | TasksStatsCounters TasksStatsCounters::current() | 
| 29 | { | 
| 30 | TasksStatsCounters res; | 
| 31 | CurrentThread::get().taskstats_getter->getStat(res.stat, CurrentThread::get().os_thread_id); | 
| 32 | return res; | 
| 33 | } | 
| 34 | |
| 35 | ThreadStatus::ThreadStatus() | 
| 36 | { | 
| 37 | thread_number = getThreadNumber(); | 
| 38 | os_thread_id = TaskStatsInfoGetter::getCurrentTID(); | 
| 39 | |
| 40 | last_rusage = std::make_unique<RUsageCounters>(); | 
| 41 | last_taskstats = std::make_unique<TasksStatsCounters>(); | 
| 42 | |
| 43 | memory_tracker.setDescription( "(for thread)"); | 
| 44 | log = &Poco::Logger::get( "ThreadStatus"); | 
| 45 | |
| 46 | current_thread = this; | 
| 47 | |
| 48 | /// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created | 
| 49 | /// Otherwise it could lead to SIGSEGV due to current_thread dereferencing | 
| 50 | } | 
| 51 | |
| 52 | ThreadStatus::~ThreadStatus() | 
| 53 | { | 
| 54 | try | 
| 55 | { | 
| 56 | if (untracked_memory > 0) | 
| 57 | memory_tracker.alloc(untracked_memory); | 
| 58 | else | 
| 59 | memory_tracker.free(-untracked_memory); | 
| 60 | } | 
| 61 | catch (const DB::Exception &) | 
| 62 | { | 
| 63 | /// It's a minor tracked memory leak here (not the memory itself but it's counter). | 
| 64 | /// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent. | 
| 65 | } | 
| 66 | |
| 67 | if (deleter) | 
| 68 | deleter(); | 
| 69 | current_thread = nullptr; | 
| 70 | } | 
| 71 | |
| 72 | void ThreadStatus::initPerformanceCounters() | 
| 73 | { | 
| 74 | performance_counters_finalized = false; | 
| 75 | |
| 76 | /// Clear stats from previous query if a new query is started | 
| 77 | /// TODO: make separate query_thread_performance_counters and thread_performance_counters | 
| 78 | performance_counters.resetCounters(); | 
| 79 | memory_tracker.resetCounters(); | 
| 80 | memory_tracker.setDescription( "(for thread)"); | 
| 81 | |
| 82 | query_start_time_nanoseconds = getCurrentTimeNanoseconds(); | 
| 83 | query_start_time = time(nullptr); | 
| 84 | ++queries_started; | 
| 85 | |
| 86 | *last_rusage = RUsageCounters::current(query_start_time_nanoseconds); | 
| 87 | |
| 88 | try | 
| 89 | { | 
| 90 | if (TaskStatsInfoGetter::checkPermissions()) | 
| 91 | { | 
| 92 | if (!taskstats_getter) | 
| 93 | taskstats_getter = std::make_unique<TaskStatsInfoGetter>(); | 
| 94 | |
| 95 | *last_taskstats = TasksStatsCounters::current(); | 
| 96 | } | 
| 97 | } | 
| 98 | catch (...) | 
| 99 | { | 
| 100 | taskstats_getter.reset(); | 
| 101 | tryLogCurrentException(__PRETTY_FUNCTION__); | 
| 102 | } | 
| 103 | } | 
| 104 | |
| 105 | void ThreadStatus::updatePerformanceCounters() | 
| 106 | { | 
| 107 | try | 
| 108 | { | 
| 109 | RUsageCounters::updateProfileEvents(*last_rusage, performance_counters); | 
| 110 | if (taskstats_getter) | 
| 111 | TasksStatsCounters::updateProfileEvents(*last_taskstats, performance_counters); | 
| 112 | } | 
| 113 | catch (...) | 
| 114 | { | 
| 115 | tryLogCurrentException(log); | 
| 116 | } | 
| 117 | } | 
| 118 | |
| 119 | void ThreadStatus::assertState(const std::initializer_list<int> & permitted_states, const char * description) | 
| 120 | { | 
| 121 | for (auto permitted_state : permitted_states) | 
| 122 | { | 
| 123 | if (getCurrentState() == permitted_state) | 
| 124 | return; | 
| 125 | } | 
| 126 | |
| 127 | std::stringstream ss; | 
| 128 | ss << "Unexpected thread state "<< getCurrentState(); | 
| 129 | if (description) | 
| 130 | ss << ": "<< description; | 
| 131 | throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR); | 
| 132 | } | 
| 133 | |
| 134 | void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, | 
| 135 | LogsLevel client_logs_level) | 
| 136 | { | 
| 137 | logs_queue_ptr = logs_queue; | 
| 138 | |
| 139 | if (!thread_group) | 
| 140 | return; | 
| 141 | |
| 142 | std::lock_guard lock(thread_group->mutex); | 
| 143 | thread_group->logs_queue_ptr = logs_queue; | 
| 144 | thread_group->client_logs_level = client_logs_level; | 
| 145 | } | 
| 146 | |
| 147 | } | 
| 148 | 
