1#include "InternalTextLogsRowOutputStream.h"
2#include <Core/Block.h>
3#include <Interpreters/InternalTextLogsQueue.h>
4#include <Common/typeid_cast.h>
5#include <DataTypes/IDataType.h>
6#include <Columns/ColumnsNumber.h>
7#include <Columns/ColumnString.h>
8#include <IO/WriteHelpers.h>
9
10
11namespace DB
12{
13
14Block InternalTextLogsRowOutputStream::getHeader() const
15{
16 return InternalTextLogsQueue::getSampleBlock();
17}
18
19void InternalTextLogsRowOutputStream::write(const Block & block)
20{
21 auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time").column).getData();
22 auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds").column).getData();
23
24 auto & column_host_name = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
25 auto & column_query_id = typeid_cast<const ColumnString &>(*block.getByName("query_id").column);
26
27 auto & array_thread_number = typeid_cast<const ColumnUInt32 &>(*block.getByName("thread_number").column).getData();
28 auto & array_priority = typeid_cast<const ColumnInt8 &>(*block.getByName("priority").column).getData();
29 auto & column_source = typeid_cast<const ColumnString &>(*block.getByName("source").column);
30 auto & column_text = typeid_cast<const ColumnString &>(*block.getByName("text").column);
31
32 for (size_t row_num = 0; row_num < block.rows(); ++row_num)
33 {
34 auto host_name = column_host_name.getDataAt(row_num);
35 if (host_name.size)
36 {
37 writeCString("[", wb);
38 writeString(host_name, wb);
39 writeCString("] ", wb);
40 }
41
42 auto event_time = array_event_time[row_num];
43 writeDateTimeText<'.', ':'>(event_time, wb);
44
45 auto microseconds = array_microseconds[row_num];
46 writeChar('.', wb);
47 writeChar('0' + ((microseconds / 100000) % 10), wb);
48 writeChar('0' + ((microseconds / 10000) % 10), wb);
49 writeChar('0' + ((microseconds / 1000) % 10), wb);
50 writeChar('0' + ((microseconds / 100) % 10), wb);
51 writeChar('0' + ((microseconds / 10) % 10), wb);
52 writeChar('0' + ((microseconds / 1) % 10), wb);
53
54 auto query_id = column_query_id.getDataAt(row_num);
55 if (query_id.size)
56 {
57 writeCString(" {", wb);
58 writeString(query_id, wb);
59 writeCString("}", wb);
60 }
61
62 UInt32 thread_number = array_thread_number[row_num];
63 writeCString(" [ ", wb);
64 writeIntText(thread_number, wb);
65 writeCString(" ] <", wb);
66
67 Int8 priority = array_priority[row_num];
68 writeString(InternalTextLogsQueue::getPriorityName(priority), wb);
69 writeCString("> ", wb);
70
71 auto source = column_source.getDataAt(row_num);
72 writeString(source, wb);
73 writeCString(": ", wb);
74
75 auto text = column_text.getDataAt(row_num);
76 writeString(text, wb);
77
78 writeChar('\n', wb);
79 }
80}
81
82}
83