1#include <Core/Block.h>
2#include <Formats/FormatFactory.h>
3#include <Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h>
4#include <IO/WriteBuffer.h>
5#include <IO/WriteHelpers.h>
6
7
8#include <Core/iostream_debug_helpers.h>
9#include <DataTypes/DataTypeLowCardinality.h>
10
11
12namespace DB
13{
14ODBCDriver2BlockOutputFormat::ODBCDriver2BlockOutputFormat(
15 WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
16 : IOutputFormat(header_, out_), format_settings(format_settings_)
17{
18}
19
20static void writeODBCString(WriteBuffer & out, const std::string & str)
21{
22 writeIntBinary(Int32(str.size()), out);
23 out.write(str.data(), str.size());
24}
25
26void ODBCDriver2BlockOutputFormat::writeRow(const Block & header, const Columns & columns, size_t row_idx, std::string & buffer)
27{
28 size_t num_columns = columns.size();
29 for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
30 {
31 buffer.clear();
32 auto & column = columns[column_idx];
33
34 if (column->isNullAt(row_idx))
35 {
36 writeIntBinary(Int32(-1), out);
37 }
38 else
39 {
40 {
41 WriteBufferFromString text_out(buffer);
42 header.getByPosition(column_idx).type->serializeAsText(*column, row_idx, text_out, format_settings);
43 }
44 writeODBCString(out, buffer);
45 }
46 }
47}
48
49void ODBCDriver2BlockOutputFormat::write(Chunk chunk, PortKind port_kind)
50{
51 String text_value;
52 auto & header = getPort(port_kind).getHeader();
53 auto & columns = chunk.getColumns();
54 const size_t rows = chunk.getNumRows();
55 for (size_t i = 0; i < rows; ++i)
56 writeRow(header, columns, i, text_value);
57}
58
59void ODBCDriver2BlockOutputFormat::consume(Chunk chunk)
60{
61 writePrefixIfNot();
62 write(std::move(chunk), PortKind::Main);
63}
64
65void ODBCDriver2BlockOutputFormat::consumeTotals(Chunk chunk)
66{
67 writePrefixIfNot();
68 write(std::move(chunk), PortKind::Totals);
69}
70
71void ODBCDriver2BlockOutputFormat::finalize()
72{
73 writePrefixIfNot();
74}
75
76void ODBCDriver2BlockOutputFormat::writePrefix()
77{
78 auto & header = getPort(PortKind::Main).getHeader();
79 const size_t columns = header.columns();
80
81 /// Number of header rows.
82 writeIntBinary(Int32(2), out);
83
84 /// Names of columns.
85 /// Number of columns + 1 for first name column.
86 writeIntBinary(Int32(columns + 1), out);
87 writeODBCString(out, "name");
88 for (size_t i = 0; i < columns; ++i)
89 {
90 const ColumnWithTypeAndName & col = header.getByPosition(i);
91 writeODBCString(out, col.name);
92 }
93
94 /// Types of columns.
95 writeIntBinary(Int32(columns + 1), out);
96 writeODBCString(out, "type");
97 for (size_t i = 0; i < columns; ++i)
98 {
99 auto type = header.getByPosition(i).type;
100 if (type->lowCardinality())
101 type = recursiveRemoveLowCardinality(type);
102 writeODBCString(out, type->getName());
103 }
104}
105
106
107void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory)
108{
109 factory.registerOutputFormatProcessor(
110 "ODBCDriver2", [](WriteBuffer & buf, const Block & sample, FormatFactory::WriteCallback, const FormatSettings & format_settings)
111 {
112 return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
113 });
114}
115
116}
117