1#include "OwnSplitChannel.h"
2
3#include <iostream>
4#include <Core/Block.h>
5#include <Interpreters/InternalTextLogsQueue.h>
6#include <Interpreters/TextLog.h>
7#include <sys/time.h>
8#include <Poco/Message.h>
9#include <Common/CurrentThread.h>
10#include <Common/DNSResolver.h>
11#include <common/getThreadNumber.h>
12#include <Common/SensitiveDataMasker.h>
13
14namespace DB
15{
16void OwnSplitChannel::log(const Poco::Message & msg)
17{
18 auto logs_queue = CurrentThread::getInternalTextLogsQueue();
19
20 if (channels.empty() && (logs_queue == nullptr || msg.getPriority() > logs_queue->max_priority))
21 return;
22
23 if (auto masker = SensitiveDataMasker::getInstance())
24 {
25 auto message_text = msg.getText();
26 auto matches = masker->wipeSensitiveData(message_text);
27 if (matches > 0)
28 {
29 logSplit({msg, message_text}); // we will continue with the copy of original message with text modified
30 return;
31 }
32
33 }
34
35 logSplit(msg);
36}
37
38
39void OwnSplitChannel::logSplit(const Poco::Message & msg)
40{
41 ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);
42
43 /// Log data to child channels
44 for (auto & channel : channels)
45 {
46 if (channel.second)
47 channel.second->logExtended(msg_ext); // extended child
48 else
49 channel.first->log(msg); // ordinary child
50 }
51
52 auto logs_queue = CurrentThread::getInternalTextLogsQueue();
53
54 /// Log to "TCP queue" if message is not too noisy
55 if (logs_queue && msg.getPriority() <= logs_queue->max_priority)
56 {
57 MutableColumns columns = InternalTextLogsQueue::getSampleColumns();
58
59 size_t i = 0;
60 columns[i++]->insert(msg_ext.time_seconds);
61 columns[i++]->insert(msg_ext.time_microseconds);
62 columns[i++]->insert(DNSResolver::instance().getHostName());
63 columns[i++]->insert(msg_ext.query_id);
64 columns[i++]->insert(msg_ext.thread_number);
65 columns[i++]->insert(Int64(msg.getPriority()));
66 columns[i++]->insert(msg.getSource());
67 columns[i++]->insert(msg.getText());
68
69 logs_queue->emplace(std::move(columns));
70 }
71
72
73 /// Also log to system.text_log table
74 TextLogElement elem;
75
76 elem.event_time = msg_ext.time_seconds;
77 elem.microseconds = msg_ext.time_microseconds;
78
79 elem.thread_name = getThreadName();
80 elem.thread_number = msg_ext.thread_number;
81
82 if (CurrentThread::isInitialized())
83 elem.os_thread_id = CurrentThread::get().os_thread_id;
84 else
85 elem.os_thread_id = 0;
86
87 elem.query_id = msg_ext.query_id;
88
89 elem.message = msg.getText();
90 elem.logger_name = msg.getSource();
91 elem.level = msg.getPriority();
92
93 if (msg.getSourceFile() != nullptr)
94 elem.source_file = msg.getSourceFile();
95
96 elem.source_line = msg.getSourceLine();
97
98 std::lock_guard<std::mutex> lock(text_log_mutex);
99 if (auto log = text_log.lock())
100 log->add(elem);
101}
102
103
104void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)
105{
106 channels.emplace_back(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get()));
107}
108
109void OwnSplitChannel::addTextLog(std::shared_ptr<DB::TextLog> log)
110{
111 std::lock_guard<std::mutex> lock(text_log_mutex);
112 text_log = log;
113}
114
115}
116