1 | #include "TraceCollector.h" |
2 | |
3 | #include <Core/Field.h> |
4 | #include <Poco/Logger.h> |
5 | #include <Common/PipeFDs.h> |
6 | #include <Common/StackTrace.h> |
7 | #include <common/logger_useful.h> |
8 | #include <IO/ReadHelpers.h> |
9 | #include <IO/ReadBufferFromFileDescriptor.h> |
10 | #include <IO/WriteHelpers.h> |
11 | #include <IO/WriteBufferFromFileDescriptor.h> |
12 | #include <Common/Exception.h> |
13 | #include <Interpreters/TraceLog.h> |
14 | |
15 | #include <unistd.h> |
16 | #include <fcntl.h> |
17 | |
18 | |
19 | namespace DB |
20 | { |
21 | |
22 | LazyPipeFDs trace_pipe; |
23 | |
24 | namespace ErrorCodes |
25 | { |
26 | extern const int NULL_POINTER_DEREFERENCE; |
27 | extern const int THREAD_IS_NOT_JOINABLE; |
28 | } |
29 | |
30 | TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_) |
31 | : log(&Poco::Logger::get("TraceCollector" )) |
32 | , trace_log(trace_log_) |
33 | { |
34 | if (trace_log == nullptr) |
35 | throw Exception("Invalid trace log pointer passed" , ErrorCodes::NULL_POINTER_DEREFERENCE); |
36 | |
37 | trace_pipe.open(); |
38 | |
39 | /** Turn write end of pipe to non-blocking mode to avoid deadlocks |
40 | * when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe. |
41 | */ |
42 | trace_pipe.setNonBlocking(); |
43 | trace_pipe.tryIncreaseSize(1 << 20); |
44 | |
45 | thread = ThreadFromGlobalPool(&TraceCollector::run, this); |
46 | } |
47 | |
48 | TraceCollector::~TraceCollector() |
49 | { |
50 | if (!thread.joinable()) |
51 | LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined" ); |
52 | else |
53 | { |
54 | TraceCollector::notifyToStop(); |
55 | thread.join(); |
56 | } |
57 | |
58 | trace_pipe.close(); |
59 | } |
60 | |
61 | /** |
62 | * Sends TraceCollector stop message |
63 | * |
64 | * Each sequence of data for TraceCollector thread starts with a boolean flag. |
65 | * If this flag is true, TraceCollector must stop reading trace_pipe and exit. |
66 | * This function sends flag with a true value to stop TraceCollector gracefully. |
67 | * |
68 | * NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe |
69 | * before stop message. |
70 | */ |
71 | void TraceCollector::notifyToStop() |
72 | { |
73 | WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]); |
74 | writeChar(true, out); |
75 | out.next(); |
76 | } |
77 | |
78 | void TraceCollector::run() |
79 | { |
80 | ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]); |
81 | |
82 | while (true) |
83 | { |
84 | char is_last; |
85 | readChar(is_last, in); |
86 | if (is_last) |
87 | break; |
88 | |
89 | std::string query_id; |
90 | readStringBinary(query_id, in); |
91 | |
92 | UInt8 size = 0; |
93 | readIntBinary(size, in); |
94 | |
95 | Array trace; |
96 | trace.reserve(size); |
97 | |
98 | for (size_t i = 0; i < size; i++) |
99 | { |
100 | uintptr_t addr = 0; |
101 | readPODBinary(addr, in); |
102 | trace.emplace_back(UInt64(addr)); |
103 | } |
104 | |
105 | TimerType timer_type; |
106 | readPODBinary(timer_type, in); |
107 | |
108 | UInt32 thread_number; |
109 | readPODBinary(thread_number, in); |
110 | |
111 | TraceLogElement element{std::time(nullptr), timer_type, thread_number, query_id, trace}; |
112 | trace_log->add(element); |
113 | } |
114 | } |
115 | |
116 | } |
117 | |