1#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
2
3#include <signal.h>
4#include <poll.h>
5
6#include <mutex>
7#include <filesystem>
8
9#include <ext/scope_guard.h>
10
11#include <Storages/System/StorageSystemStackTrace.h>
12#include <DataTypes/DataTypeString.h>
13#include <DataTypes/DataTypesNumber.h>
14#include <DataTypes/DataTypeArray.h>
15#include <IO/ReadHelpers.h>
16#include <Common/PipeFDs.h>
17#include <common/getThreadNumber.h>
18
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int CANNOT_SIGQUEUE;
26 extern const int CANNOT_MANIPULATE_SIGSET;
27 extern const int CANNOT_SET_SIGNAL_HANDLER;
28 extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
29 extern const int LOGICAL_ERROR;
30}
31
32
33namespace
34{
35 const pid_t expected_pid = getpid();
36 const int sig = SIGRTMIN;
37
38 int sequence_num = 0; /// For messages sent via pipe.
39
40 UInt32 thread_number{0};
41 std::optional<StackTrace> stack_trace;
42
43 static constexpr size_t max_query_id_size = 128;
44 char query_id_data[max_query_id_size];
45 size_t query_id_size = 0;
46
47 LazyPipeFDs notification_pipe;
48
49 void signalHandler(int, siginfo_t * info, void * context)
50 {
51 /// In case malicious user is sending signals manually (for unknown reason).
52 /// If we don't check - it may break our synchronization.
53 if (info->si_pid != expected_pid)
54 return;
55
56 /// Signal received too late.
57 if (info->si_value.sival_int != sequence_num)
58 return;
59
60 /// All these methods are signal-safe.
61 const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
62 stack_trace.emplace(signal_context);
63 thread_number = getThreadNumber();
64
65 StringRef query_id = CurrentThread::getQueryId();
66 query_id_size = std::min(query_id.size, max_query_id_size);
67 memcpy(query_id_data, query_id.data, query_id_size);
68
69 int notification_num = info->si_value.sival_int;
70 ssize_t res = ::write(notification_pipe.fds_rw[1], &notification_num, sizeof(notification_num));
71
72 /// We cannot do anything if write failed.
73 (void)res;
74 }
75
76 /// Wait for data in pipe and read it.
77 bool wait(int timeout_ms)
78 {
79 while (true)
80 {
81 int fd = notification_pipe.fds_rw[0];
82 pollfd poll_fd{fd, POLLIN, 0};
83
84 int poll_res = poll(&poll_fd, 1, timeout_ms);
85 if (poll_res < 0)
86 {
87 if (errno == EINTR)
88 {
89 --timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting.
90 if (timeout_ms == 0)
91 return false;
92 continue;
93 }
94
95 throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
96 }
97 if (poll_res == 0)
98 return false;
99
100 int notification_num = 0;
101 ssize_t read_res = ::read(fd, &notification_num, sizeof(notification_num));
102
103 if (read_res < 0)
104 {
105 if (errno == EINTR)
106 continue;
107
108 throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
109 }
110
111 if (read_res == sizeof(notification_num))
112 {
113 if (notification_num == sequence_num)
114 return true;
115 else
116 continue; /// Drain delayed notifications.
117 }
118
119 throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR);
120 }
121 }
122}
123
124
125StorageSystemStackTrace::StorageSystemStackTrace(const String & name_)
126 : IStorageSystemOneBlock<StorageSystemStackTrace>(name_)
127{
128 notification_pipe.open();
129
130 /// Setup signal handler.
131
132 struct sigaction sa{};
133 sa.sa_sigaction = signalHandler;
134 sa.sa_flags = SA_SIGINFO;
135
136 if (sigemptyset(&sa.sa_mask))
137 throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
138
139 if (sigaddset(&sa.sa_mask, sig))
140 throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
141
142 if (sigaction(sig, &sa, nullptr))
143 throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
144}
145
146
147NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
148{
149 return
150 {
151 { "thread_number", std::make_shared<DataTypeUInt32>() },
152 { "query_id", std::make_shared<DataTypeString>() },
153 { "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
154 };
155}
156
157
158void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
159{
160 /// It shouldn't be possible to do concurrent reads from this table.
161 std::lock_guard lock(mutex);
162
163 /// Send a signal to every thread and wait for result.
164 /// We must wait for every thread one by one sequentially,
165 /// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
166 /// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals).
167
168 /// Obviously, results for different threads may be out of sync.
169
170 /// There is no better way to enumerate threads in a process other than looking into procfs.
171
172 std::filesystem::directory_iterator end;
173 for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
174 {
175 pid_t tid = parse<pid_t>(it->path().filename());
176
177 sigval sig_value{};
178 sig_value.sival_int = sequence_num;
179 if (0 != ::sigqueue(tid, sig, sig_value))
180 {
181 /// The thread may has been already finished.
182 if (ESRCH == errno)
183 continue;
184
185 throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
186 }
187
188 /// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
189
190 if (wait(100))
191 {
192 size_t stack_trace_size = stack_trace->getSize();
193 size_t stack_trace_offset = stack_trace->getOffset();
194
195 Array arr;
196 arr.reserve(stack_trace_size - stack_trace_offset);
197 for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
198 arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace->getFrames()[i]));
199
200 res_columns[0]->insert(thread_number);
201 res_columns[1]->insertData(query_id_data, query_id_size);
202 res_columns[2]->insert(arr);
203 }
204 else
205 {
206 /// Cannot obtain a stack trace. But create a record in result nevertheless.
207
208 res_columns[0]->insert(tid); /// TODO Replace all thread numbers to OS thread numbers.
209 res_columns[1]->insertDefault();
210 res_columns[2]->insertDefault();
211 }
212
213 sequence_num = static_cast<int>(static_cast<unsigned>(sequence_num) + 1);
214 }
215}
216
217}
218
219#endif
220