1#include <IO/WriteBuffer.h>
2#include <IO/WriteHelpers.h>
3#include <Core/Block.h>
4#include <Processors/Formats/Impl/ODBCDriverBlockOutputFormat.h>
5#include <Formats/FormatFactory.h>
6
7
8namespace DB
9{
10
11ODBCDriverBlockOutputFormat::ODBCDriverBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
12 : IOutputFormat(header_, out_), format_settings(format_settings_)
13{
14}
15
16void ODBCDriverBlockOutputFormat::consume(Chunk chunk)
17{
18 writePrefixIfNot();
19
20 const size_t num_rows = chunk.getNumRows();
21 const size_t num_columns = chunk.getNumColumns();
22 auto & columns = chunk.getColumns();
23 auto & header = getPort(PortKind::Main).getHeader();
24 String text_value;
25
26 for (size_t i = 0; i < num_rows; ++i)
27 {
28 for (size_t j = 0; j < num_columns; ++j)
29 {
30 text_value.resize(0);
31 auto & column = columns[j];
32 auto & type = header.getByPosition(j).type;
33
34 {
35 WriteBufferFromString text_out(text_value);
36 type->serializeAsText(*column, i, text_out, format_settings);
37 }
38
39 writeStringBinary(text_value, out);
40 }
41 }
42}
43
44void ODBCDriverBlockOutputFormat::writePrefix()
45{
46 auto & header = getPort(PortKind::Main).getHeader();
47 const size_t columns = header.columns();
48
49 /// Number of columns.
50 writeVarUInt(columns, out);
51
52 /// Names and types of columns.
53 for (size_t i = 0; i < columns; ++i)
54 {
55 const ColumnWithTypeAndName & col = header.getByPosition(i);
56
57 writeStringBinary(col.name, out);
58 writeStringBinary(col.type->getName(), out);
59 }
60}
61
62void ODBCDriverBlockOutputFormat::finalize()
63{
64 writePrefixIfNot();
65}
66
67void registerOutputFormatProcessorODBCDriver(FormatFactory & factory)
68{
69 factory.registerOutputFormatProcessor("ODBCDriver", [](
70 WriteBuffer & buf,
71 const Block & sample,
72 FormatFactory::WriteCallback,
73 const FormatSettings & format_settings)
74 {
75 return std::make_shared<ODBCDriverBlockOutputFormat>(buf, sample, format_settings);
76 });
77}
78
79}
80