1 | #include <Processors/Formats/Impl/MySQLOutputFormat.h> |
2 | #include <Core/MySQLProtocol.h> |
3 | #include <Interpreters/ProcessList.h> |
4 | #include <Formats/FormatFactory.h> |
5 | #include <Interpreters/Context.h> |
6 | #include <iomanip> |
7 | #include <sstream> |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | using namespace MySQLProtocol; |
13 | |
14 | |
15 | MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & , const FormatSettings & settings_) |
16 | : IOutputFormat(header_, out_) |
17 | , format_settings(settings_) |
18 | { |
19 | } |
20 | |
21 | void MySQLOutputFormat::initialize() |
22 | { |
23 | if (initialized) |
24 | return; |
25 | |
26 | initialized = true; |
27 | auto & = getPort(PortKind::Main).getHeader(); |
28 | data_types = header.getDataTypes(); |
29 | |
30 | if (header.columns()) |
31 | { |
32 | packet_sender->sendPacket(LengthEncodedNumber(header.columns())); |
33 | |
34 | for (size_t i = 0; i < header.columns(); i++) |
35 | { |
36 | const auto & column_name = header.getColumnsWithTypeAndName()[i].name; |
37 | packet_sender->sendPacket(getColumnDefinition(column_name, data_types[i]->getTypeId())); |
38 | } |
39 | |
40 | if (!(context->mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF)) |
41 | { |
42 | packet_sender->sendPacket(EOF_Packet(0, 0)); |
43 | } |
44 | } |
45 | } |
46 | |
47 | |
48 | void MySQLOutputFormat::consume(Chunk chunk) |
49 | { |
50 | |
51 | initialize(); |
52 | |
53 | for (size_t i = 0; i < chunk.getNumRows(); i++) |
54 | { |
55 | ProtocolText::ResultsetRow row_packet(data_types, chunk.getColumns(), i); |
56 | packet_sender->sendPacket(row_packet); |
57 | } |
58 | } |
59 | |
60 | void MySQLOutputFormat::finalize() |
61 | { |
62 | size_t affected_rows = 0; |
63 | std::stringstream human_readable_info; |
64 | if (QueryStatus * process_list_elem = context->getProcessListElement()) |
65 | { |
66 | CurrentThread::finalizePerformanceCounters(); |
67 | QueryStatusInfo info = process_list_elem->getInfo(); |
68 | affected_rows = info.written_rows; |
69 | human_readable_info << std::fixed << std::setprecision(3) |
70 | << "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., " |
71 | << static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., " |
72 | << formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec." ; |
73 | } |
74 | |
75 | const auto & = getPort(PortKind::Main).getHeader(); |
76 | if (header.columns() == 0) |
77 | packet_sender->sendPacket(OK_Packet(0x0, context->mysql.client_capabilities, affected_rows, 0, 0, "" , human_readable_info.str()), true); |
78 | else |
79 | if (context->mysql.client_capabilities & CLIENT_DEPRECATE_EOF) |
80 | packet_sender->sendPacket(OK_Packet(0xfe, context->mysql.client_capabilities, affected_rows, 0, 0, "" , human_readable_info.str()), true); |
81 | else |
82 | packet_sender->sendPacket(EOF_Packet(0, 0), true); |
83 | } |
84 | |
85 | void MySQLOutputFormat::flush() |
86 | { |
87 | packet_sender->out->next(); |
88 | } |
89 | |
90 | void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory) |
91 | { |
92 | factory.registerOutputFormatProcessor( |
93 | "MySQLWire" , |
94 | [](WriteBuffer & buf, |
95 | const Block & sample, |
96 | FormatFactory::WriteCallback, |
97 | const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); }); |
98 | } |
99 | |
100 | } |
101 | |