| 1 | #include "InternalTextLogsRowOutputStream.h" |
| 2 | #include <Core/Block.h> |
| 3 | #include <Interpreters/InternalTextLogsQueue.h> |
| 4 | #include <Common/typeid_cast.h> |
| 5 | #include <DataTypes/IDataType.h> |
| 6 | #include <Columns/ColumnsNumber.h> |
| 7 | #include <Columns/ColumnString.h> |
| 8 | #include <IO/WriteHelpers.h> |
| 9 | |
| 10 | |
| 11 | namespace DB |
| 12 | { |
| 13 | |
| 14 | Block InternalTextLogsRowOutputStream::() const |
| 15 | { |
| 16 | return InternalTextLogsQueue::getSampleBlock(); |
| 17 | } |
| 18 | |
| 19 | void InternalTextLogsRowOutputStream::write(const Block & block) |
| 20 | { |
| 21 | auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time" ).column).getData(); |
| 22 | auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds" ).column).getData(); |
| 23 | |
| 24 | auto & column_host_name = typeid_cast<const ColumnString &>(*block.getByName("host_name" ).column); |
| 25 | auto & column_query_id = typeid_cast<const ColumnString &>(*block.getByName("query_id" ).column); |
| 26 | |
| 27 | auto & array_thread_number = typeid_cast<const ColumnUInt32 &>(*block.getByName("thread_number" ).column).getData(); |
| 28 | auto & array_priority = typeid_cast<const ColumnInt8 &>(*block.getByName("priority" ).column).getData(); |
| 29 | auto & column_source = typeid_cast<const ColumnString &>(*block.getByName("source" ).column); |
| 30 | auto & column_text = typeid_cast<const ColumnString &>(*block.getByName("text" ).column); |
| 31 | |
| 32 | for (size_t row_num = 0; row_num < block.rows(); ++row_num) |
| 33 | { |
| 34 | auto host_name = column_host_name.getDataAt(row_num); |
| 35 | if (host_name.size) |
| 36 | { |
| 37 | writeCString("[" , wb); |
| 38 | writeString(host_name, wb); |
| 39 | writeCString("] " , wb); |
| 40 | } |
| 41 | |
| 42 | auto event_time = array_event_time[row_num]; |
| 43 | writeDateTimeText<'.', ':'>(event_time, wb); |
| 44 | |
| 45 | auto microseconds = array_microseconds[row_num]; |
| 46 | writeChar('.', wb); |
| 47 | writeChar('0' + ((microseconds / 100000) % 10), wb); |
| 48 | writeChar('0' + ((microseconds / 10000) % 10), wb); |
| 49 | writeChar('0' + ((microseconds / 1000) % 10), wb); |
| 50 | writeChar('0' + ((microseconds / 100) % 10), wb); |
| 51 | writeChar('0' + ((microseconds / 10) % 10), wb); |
| 52 | writeChar('0' + ((microseconds / 1) % 10), wb); |
| 53 | |
| 54 | auto query_id = column_query_id.getDataAt(row_num); |
| 55 | if (query_id.size) |
| 56 | { |
| 57 | writeCString(" {" , wb); |
| 58 | writeString(query_id, wb); |
| 59 | writeCString("}" , wb); |
| 60 | } |
| 61 | |
| 62 | UInt32 thread_number = array_thread_number[row_num]; |
| 63 | writeCString(" [ " , wb); |
| 64 | writeIntText(thread_number, wb); |
| 65 | writeCString(" ] <" , wb); |
| 66 | |
| 67 | Int8 priority = array_priority[row_num]; |
| 68 | writeString(InternalTextLogsQueue::getPriorityName(priority), wb); |
| 69 | writeCString("> " , wb); |
| 70 | |
| 71 | auto source = column_source.getDataAt(row_num); |
| 72 | writeString(source, wb); |
| 73 | writeCString(": " , wb); |
| 74 | |
| 75 | auto text = column_text.getDataAt(row_num); |
| 76 | writeString(text, wb); |
| 77 | |
| 78 | writeChar('\n', wb); |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | } |
| 83 | |