1 | #include <Core/Block.h> |
2 | #include <Formats/FormatFactory.h> |
3 | #include <Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h> |
4 | #include <IO/WriteBuffer.h> |
5 | #include <IO/WriteHelpers.h> |
6 | |
7 | |
8 | #include <Core/iostream_debug_helpers.h> |
9 | #include <DataTypes/DataTypeLowCardinality.h> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | ODBCDriver2BlockOutputFormat::ODBCDriver2BlockOutputFormat( |
15 | WriteBuffer & out_, const Block & , const FormatSettings & format_settings_) |
16 | : IOutputFormat(header_, out_), format_settings(format_settings_) |
17 | { |
18 | } |
19 | |
20 | static void writeODBCString(WriteBuffer & out, const std::string & str) |
21 | { |
22 | writeIntBinary(Int32(str.size()), out); |
23 | out.write(str.data(), str.size()); |
24 | } |
25 | |
26 | void ODBCDriver2BlockOutputFormat::writeRow(const Block & , const Columns & columns, size_t row_idx, std::string & buffer) |
27 | { |
28 | size_t num_columns = columns.size(); |
29 | for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) |
30 | { |
31 | buffer.clear(); |
32 | auto & column = columns[column_idx]; |
33 | |
34 | if (column->isNullAt(row_idx)) |
35 | { |
36 | writeIntBinary(Int32(-1), out); |
37 | } |
38 | else |
39 | { |
40 | { |
41 | WriteBufferFromString text_out(buffer); |
42 | header.getByPosition(column_idx).type->serializeAsText(*column, row_idx, text_out, format_settings); |
43 | } |
44 | writeODBCString(out, buffer); |
45 | } |
46 | } |
47 | } |
48 | |
49 | void ODBCDriver2BlockOutputFormat::write(Chunk chunk, PortKind port_kind) |
50 | { |
51 | String text_value; |
52 | auto & = getPort(port_kind).getHeader(); |
53 | auto & columns = chunk.getColumns(); |
54 | const size_t rows = chunk.getNumRows(); |
55 | for (size_t i = 0; i < rows; ++i) |
56 | writeRow(header, columns, i, text_value); |
57 | } |
58 | |
59 | void ODBCDriver2BlockOutputFormat::consume(Chunk chunk) |
60 | { |
61 | writePrefixIfNot(); |
62 | write(std::move(chunk), PortKind::Main); |
63 | } |
64 | |
65 | void ODBCDriver2BlockOutputFormat::consumeTotals(Chunk chunk) |
66 | { |
67 | writePrefixIfNot(); |
68 | write(std::move(chunk), PortKind::Totals); |
69 | } |
70 | |
71 | void ODBCDriver2BlockOutputFormat::finalize() |
72 | { |
73 | writePrefixIfNot(); |
74 | } |
75 | |
76 | void ODBCDriver2BlockOutputFormat::writePrefix() |
77 | { |
78 | auto & = getPort(PortKind::Main).getHeader(); |
79 | const size_t columns = header.columns(); |
80 | |
81 | /// Number of header rows. |
82 | writeIntBinary(Int32(2), out); |
83 | |
84 | /// Names of columns. |
85 | /// Number of columns + 1 for first name column. |
86 | writeIntBinary(Int32(columns + 1), out); |
87 | writeODBCString(out, "name" ); |
88 | for (size_t i = 0; i < columns; ++i) |
89 | { |
90 | const ColumnWithTypeAndName & col = header.getByPosition(i); |
91 | writeODBCString(out, col.name); |
92 | } |
93 | |
94 | /// Types of columns. |
95 | writeIntBinary(Int32(columns + 1), out); |
96 | writeODBCString(out, "type" ); |
97 | for (size_t i = 0; i < columns; ++i) |
98 | { |
99 | auto type = header.getByPosition(i).type; |
100 | if (type->lowCardinality()) |
101 | type = recursiveRemoveLowCardinality(type); |
102 | writeODBCString(out, type->getName()); |
103 | } |
104 | } |
105 | |
106 | |
107 | void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory) |
108 | { |
109 | factory.registerOutputFormatProcessor( |
110 | "ODBCDriver2" , [](WriteBuffer & buf, const Block & sample, FormatFactory::WriteCallback, const FormatSettings & format_settings) |
111 | { |
112 | return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings); |
113 | }); |
114 | } |
115 | |
116 | } |
117 | |