1 | #include "InternalTextLogsRowOutputStream.h" |
2 | #include <Core/Block.h> |
3 | #include <Interpreters/InternalTextLogsQueue.h> |
4 | #include <Common/typeid_cast.h> |
5 | #include <DataTypes/IDataType.h> |
6 | #include <Columns/ColumnsNumber.h> |
7 | #include <Columns/ColumnString.h> |
8 | #include <IO/WriteHelpers.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | Block InternalTextLogsRowOutputStream::() const |
15 | { |
16 | return InternalTextLogsQueue::getSampleBlock(); |
17 | } |
18 | |
19 | void InternalTextLogsRowOutputStream::write(const Block & block) |
20 | { |
21 | auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time" ).column).getData(); |
22 | auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds" ).column).getData(); |
23 | |
24 | auto & column_host_name = typeid_cast<const ColumnString &>(*block.getByName("host_name" ).column); |
25 | auto & column_query_id = typeid_cast<const ColumnString &>(*block.getByName("query_id" ).column); |
26 | |
27 | auto & array_thread_number = typeid_cast<const ColumnUInt32 &>(*block.getByName("thread_number" ).column).getData(); |
28 | auto & array_priority = typeid_cast<const ColumnInt8 &>(*block.getByName("priority" ).column).getData(); |
29 | auto & column_source = typeid_cast<const ColumnString &>(*block.getByName("source" ).column); |
30 | auto & column_text = typeid_cast<const ColumnString &>(*block.getByName("text" ).column); |
31 | |
32 | for (size_t row_num = 0; row_num < block.rows(); ++row_num) |
33 | { |
34 | auto host_name = column_host_name.getDataAt(row_num); |
35 | if (host_name.size) |
36 | { |
37 | writeCString("[" , wb); |
38 | writeString(host_name, wb); |
39 | writeCString("] " , wb); |
40 | } |
41 | |
42 | auto event_time = array_event_time[row_num]; |
43 | writeDateTimeText<'.', ':'>(event_time, wb); |
44 | |
45 | auto microseconds = array_microseconds[row_num]; |
46 | writeChar('.', wb); |
47 | writeChar('0' + ((microseconds / 100000) % 10), wb); |
48 | writeChar('0' + ((microseconds / 10000) % 10), wb); |
49 | writeChar('0' + ((microseconds / 1000) % 10), wb); |
50 | writeChar('0' + ((microseconds / 100) % 10), wb); |
51 | writeChar('0' + ((microseconds / 10) % 10), wb); |
52 | writeChar('0' + ((microseconds / 1) % 10), wb); |
53 | |
54 | auto query_id = column_query_id.getDataAt(row_num); |
55 | if (query_id.size) |
56 | { |
57 | writeCString(" {" , wb); |
58 | writeString(query_id, wb); |
59 | writeCString("}" , wb); |
60 | } |
61 | |
62 | UInt32 thread_number = array_thread_number[row_num]; |
63 | writeCString(" [ " , wb); |
64 | writeIntText(thread_number, wb); |
65 | writeCString(" ] <" , wb); |
66 | |
67 | Int8 priority = array_priority[row_num]; |
68 | writeString(InternalTextLogsQueue::getPriorityName(priority), wb); |
69 | writeCString("> " , wb); |
70 | |
71 | auto source = column_source.getDataAt(row_num); |
72 | writeString(source, wb); |
73 | writeCString(": " , wb); |
74 | |
75 | auto text = column_text.getDataAt(row_num); |
76 | writeString(text, wb); |
77 | |
78 | writeChar('\n', wb); |
79 | } |
80 | } |
81 | |
82 | } |
83 | |