1#pragma once
2
3#include <common/StringRef.h>
4#include <Common/ProfileEvents.h>
5#include <Common/MemoryTracker.h>
6
7#include <Core/SettingsCollection.h>
8
9#include <IO/Progress.h>
10
11#include <memory>
12#include <map>
13#include <mutex>
14#include <shared_mutex>
15#include <functional>
16#include <boost/noncopyable.hpp>
17
18
19namespace Poco
20{
21 class Logger;
22}
23
24
25namespace DB
26{
27
28class Context;
29class QueryStatus;
30class ThreadStatus;
31class QueryProfilerReal;
32class QueryProfilerCpu;
33class QueryThreadLog;
34struct TasksStatsCounters;
35struct RUsageCounters;
36class TaskStatsInfoGetter;
37class InternalTextLogsQueue;
38using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;
39using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
40
41
42/** Thread group is a collection of threads dedicated to single task
43 * (query or other process like background merge).
44 *
45 * ProfileEvents (counters) from a thread are propagated to thread group.
46 *
47 * Create via CurrentThread::initializeQuery (for queries) or directly (for various background tasks).
48 * Use via CurrentThread::getGroup.
49 */
50class ThreadGroupStatus
51{
52public:
53 mutable std::mutex mutex;
54
55 ProfileEvents::Counters performance_counters{VariableContext::Process};
56 MemoryTracker memory_tracker{VariableContext::Process};
57
58 Context * query_context = nullptr;
59 Context * global_context = nullptr;
60
61 InternalTextLogsQueueWeakPtr logs_queue_ptr;
62
63 std::vector<UInt32> thread_numbers;
64 std::vector<UInt32> os_thread_ids;
65
66 /// The first thread created this thread group
67 UInt32 master_thread_number = 0;
68 Int32 master_thread_os_id = -1;
69
70 LogsLevel client_logs_level = LogsLevel::none;
71
72 String query;
73};
74
75using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
76
77
78extern thread_local ThreadStatus * current_thread;
79
80/** Encapsulates all per-thread info (ProfileEvents, MemoryTracker, query_id, query context, etc.).
81 * The object must be created in thread function and destroyed in the same thread before the exit.
82 * It is accessed through thread-local pointer.
83 *
84 * This object should be used only via "CurrentThread", see CurrentThread.h
85 */
86class ThreadStatus : public boost::noncopyable
87{
88public:
89 ThreadStatus();
90 ~ThreadStatus();
91
92 /// Poco's thread number (the same number is used in logs)
93 UInt32 thread_number = 0;
94 /// Linux's PID (or TGID) (the same id is shown by ps util)
95 Int32 os_thread_id = -1;
96 /// Also called "nice" value. If it was changed to non-zero (when attaching query) - will be reset to zero when query is detached.
97 Int32 os_thread_priority = 0;
98
99 /// TODO: merge them into common entity
100 ProfileEvents::Counters performance_counters{VariableContext::Thread};
101 MemoryTracker memory_tracker{VariableContext::Thread};
102 /// Small amount of untracked memory (per thread atomic-less counter)
103 Int64 untracked_memory = 0;
104
105 /// Statistics of read and write rows/bytes
106 Progress progress_in;
107 Progress progress_out;
108
109 using Deleter = std::function<void()>;
110 Deleter deleter;
111
112 ThreadGroupStatusPtr getThreadGroup() const
113 {
114 return thread_group;
115 }
116
117 enum ThreadState
118 {
119 DetachedFromQuery = 0, /// We just created thread or it is a background thread
120 AttachedToQuery, /// Thread executes enqueued query
121 Died, /// Thread does not exist
122 };
123
124 int getCurrentState() const
125 {
126 return thread_state.load(std::memory_order_relaxed);
127 }
128
129 StringRef getQueryId() const
130 {
131 return query_id;
132 }
133
134 /// Starts new query and create new thread group for it, current thread becomes master thread of the query
135 void initializeQuery();
136
137 /// Attaches slave thread to existing thread group
138 void attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true);
139
140 InternalTextLogsQueuePtr getInternalTextLogsQueue() const
141 {
142 return thread_state == Died ? nullptr : logs_queue_ptr.lock();
143 }
144
145 void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
146 LogsLevel client_logs_level);
147
148 /// Sets query context for current thread and its thread group
149 /// NOTE: query_context have to be alive until detachQuery() is called
150 void attachQueryContext(Context & query_context);
151
152 /// Update several ProfileEvents counters
153 void updatePerformanceCounters();
154
155 /// Update ProfileEvents and dumps info to system.query_thread_log
156 void finalizePerformanceCounters();
157
158 /// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
159 void detachQuery(bool exit_if_already_detached = false, bool thread_exits = false);
160
161protected:
162 void initPerformanceCounters();
163
164 void initQueryProfiler();
165
166 void finalizeQueryProfiler();
167
168 void logToQueryThreadLog(QueryThreadLog & thread_log);
169
170 void assertState(const std::initializer_list<int> & permitted_states, const char * description = nullptr);
171
172 ThreadGroupStatusPtr thread_group;
173
174 std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
175
176 /// Is set once
177 Context * global_context = nullptr;
178 /// Use it only from current thread
179 Context * query_context = nullptr;
180
181 String query_id;
182
183 /// A logs queue used by TCPHandler to pass logs to a client
184 InternalTextLogsQueueWeakPtr logs_queue_ptr;
185
186 bool performance_counters_finalized = false;
187 UInt64 query_start_time_nanoseconds = 0;
188 time_t query_start_time = 0;
189 size_t queries_started = 0;
190
191 // CPU and Real time query profilers
192 std::unique_ptr<QueryProfilerReal> query_profiler_real;
193 std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
194
195 Poco::Logger * log = nullptr;
196
197 friend class CurrentThread;
198 friend struct TasksStatsCounters;
199
200 /// Use ptr not to add extra dependencies in the header
201 std::unique_ptr<RUsageCounters> last_rusage;
202 std::unique_ptr<TasksStatsCounters> last_taskstats;
203
204 /// Set to non-nullptr only if we have enough capabilities.
205 std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
206};
207
208}
209