1#include <Processors/Formats/Impl/MySQLOutputFormat.h>
2#include <Core/MySQLProtocol.h>
3#include <Interpreters/ProcessList.h>
4#include <Formats/FormatFactory.h>
5#include <Interpreters/Context.h>
6#include <iomanip>
7#include <sstream>
8
9namespace DB
10{
11
12using namespace MySQLProtocol;
13
14
15MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
16 : IOutputFormat(header_, out_)
17 , format_settings(settings_)
18{
19}
20
21void MySQLOutputFormat::initialize()
22{
23 if (initialized)
24 return;
25
26 initialized = true;
27 auto & header = getPort(PortKind::Main).getHeader();
28 data_types = header.getDataTypes();
29
30 if (header.columns())
31 {
32 packet_sender->sendPacket(LengthEncodedNumber(header.columns()));
33
34 for (size_t i = 0; i < header.columns(); i++)
35 {
36 const auto & column_name = header.getColumnsWithTypeAndName()[i].name;
37 packet_sender->sendPacket(getColumnDefinition(column_name, data_types[i]->getTypeId()));
38 }
39
40 if (!(context->mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
41 {
42 packet_sender->sendPacket(EOF_Packet(0, 0));
43 }
44 }
45}
46
47
48void MySQLOutputFormat::consume(Chunk chunk)
49{
50
51 initialize();
52
53 for (size_t i = 0; i < chunk.getNumRows(); i++)
54 {
55 ProtocolText::ResultsetRow row_packet(data_types, chunk.getColumns(), i);
56 packet_sender->sendPacket(row_packet);
57 }
58}
59
60void MySQLOutputFormat::finalize()
61{
62 size_t affected_rows = 0;
63 std::stringstream human_readable_info;
64 if (QueryStatus * process_list_elem = context->getProcessListElement())
65 {
66 CurrentThread::finalizePerformanceCounters();
67 QueryStatusInfo info = process_list_elem->getInfo();
68 affected_rows = info.written_rows;
69 human_readable_info << std::fixed << std::setprecision(3)
70 << "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
71 << static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
72 << formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
73 }
74
75 const auto & header = getPort(PortKind::Main).getHeader();
76 if (header.columns() == 0)
77 packet_sender->sendPacket(OK_Packet(0x0, context->mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
78 else
79 if (context->mysql.client_capabilities & CLIENT_DEPRECATE_EOF)
80 packet_sender->sendPacket(OK_Packet(0xfe, context->mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
81 else
82 packet_sender->sendPacket(EOF_Packet(0, 0), true);
83}
84
85void MySQLOutputFormat::flush()
86{
87 packet_sender->out->next();
88}
89
90void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory)
91{
92 factory.registerOutputFormatProcessor(
93 "MySQLWire",
94 [](WriteBuffer & buf,
95 const Block & sample,
96 FormatFactory::WriteCallback,
97 const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
98}
99
100}
101