1 | #include <Formats/FormatFactory.h> |
2 | #include "ProtobufRowOutputFormat.h" |
3 | |
4 | #if USE_PROTOBUF |
5 | |
6 | #include <Core/Block.h> |
7 | #include <Formats/FormatSchemaInfo.h> |
8 | #include <Formats/ProtobufSchemas.h> |
9 | #include <Interpreters/Context.h> |
10 | #include <google/protobuf/descriptor.h> |
11 | |
12 | |
13 | |
14 | namespace DB |
15 | { |
16 | namespace ErrorCodes |
17 | { |
18 | extern const int NOT_IMPLEMENTED; |
19 | extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD; |
20 | } |
21 | |
22 | |
23 | ProtobufRowOutputFormat::ProtobufRowOutputFormat( |
24 | WriteBuffer & out_, |
25 | const Block & , |
26 | FormatFactory::WriteCallback callback, |
27 | const FormatSchemaInfo & format_schema) |
28 | : IRowOutputFormat(header, out_, callback) |
29 | , data_types(header.getDataTypes()) |
30 | , writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames()) |
31 | { |
32 | value_indices.resize(header.columns()); |
33 | } |
34 | |
35 | void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num) |
36 | { |
37 | writer.startMessage(); |
38 | std::fill(value_indices.begin(), value_indices.end(), 0); |
39 | size_t column_index; |
40 | while (writer.writeField(column_index)) |
41 | data_types[column_index]->serializeProtobuf( |
42 | *columns[column_index], row_num, writer, value_indices[column_index]); |
43 | writer.endMessage(); |
44 | } |
45 | |
46 | |
47 | void registerOutputFormatProcessorProtobuf(FormatFactory & factory) |
48 | { |
49 | factory.registerOutputFormatProcessor( |
50 | "Protobuf" , |
51 | [](WriteBuffer & buf, |
52 | const Block & , |
53 | FormatFactory::WriteCallback callback, |
54 | const FormatSettings & settings) |
55 | { |
56 | return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback), |
57 | FormatSchemaInfo(settings.schema.format_schema, "Protobuf" , true, |
58 | settings.schema.is_server, settings.schema.format_schema_path)); |
59 | }); |
60 | } |
61 | |
62 | } |
63 | |
64 | #else |
65 | |
66 | namespace DB |
67 | { |
68 | class FormatFactory; |
69 | void registerOutputFormatProcessorProtobuf(FormatFactory &) {} |
70 | } |
71 | |
72 | #endif |
73 | |