1#include "TraceCollector.h"
2
3#include <Core/Field.h>
4#include <Poco/Logger.h>
5#include <Common/PipeFDs.h>
6#include <Common/StackTrace.h>
7#include <common/logger_useful.h>
8#include <IO/ReadHelpers.h>
9#include <IO/ReadBufferFromFileDescriptor.h>
10#include <IO/WriteHelpers.h>
11#include <IO/WriteBufferFromFileDescriptor.h>
12#include <Common/Exception.h>
13#include <Interpreters/TraceLog.h>
14
15#include <unistd.h>
16#include <fcntl.h>
17
18
19namespace DB
20{
21
22LazyPipeFDs trace_pipe;
23
24namespace ErrorCodes
25{
26 extern const int NULL_POINTER_DEREFERENCE;
27 extern const int THREAD_IS_NOT_JOINABLE;
28}
29
30TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log_)
31 : log(&Poco::Logger::get("TraceCollector"))
32 , trace_log(trace_log_)
33{
34 if (trace_log == nullptr)
35 throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
36
37 trace_pipe.open();
38
39 /** Turn write end of pipe to non-blocking mode to avoid deadlocks
40 * when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
41 */
42 trace_pipe.setNonBlocking();
43 trace_pipe.tryIncreaseSize(1 << 20);
44
45 thread = ThreadFromGlobalPool(&TraceCollector::run, this);
46}
47
48TraceCollector::~TraceCollector()
49{
50 if (!thread.joinable())
51 LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined");
52 else
53 {
54 TraceCollector::notifyToStop();
55 thread.join();
56 }
57
58 trace_pipe.close();
59}
60
61/**
62 * Sends TraceCollector stop message
63 *
64 * Each sequence of data for TraceCollector thread starts with a boolean flag.
65 * If this flag is true, TraceCollector must stop reading trace_pipe and exit.
66 * This function sends flag with a true value to stop TraceCollector gracefully.
67 *
68 * NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
69 * before stop message.
70 */
71void TraceCollector::notifyToStop()
72{
73 WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]);
74 writeChar(true, out);
75 out.next();
76}
77
78void TraceCollector::run()
79{
80 ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]);
81
82 while (true)
83 {
84 char is_last;
85 readChar(is_last, in);
86 if (is_last)
87 break;
88
89 std::string query_id;
90 readStringBinary(query_id, in);
91
92 UInt8 size = 0;
93 readIntBinary(size, in);
94
95 Array trace;
96 trace.reserve(size);
97
98 for (size_t i = 0; i < size; i++)
99 {
100 uintptr_t addr = 0;
101 readPODBinary(addr, in);
102 trace.emplace_back(UInt64(addr));
103 }
104
105 TimerType timer_type;
106 readPODBinary(timer_type, in);
107
108 UInt32 thread_number;
109 readPODBinary(thread_number, in);
110
111 TraceLogElement element{std::time(nullptr), timer_type, thread_number, query_id, trace};
112 trace_log->add(element);
113 }
114}
115
116}
117