| 1 | #include <Formats/FormatFactory.h> |
| 2 | #include "ProtobufRowOutputFormat.h" |
| 3 | |
| 4 | #if USE_PROTOBUF |
| 5 | |
| 6 | #include <Core/Block.h> |
| 7 | #include <Formats/FormatSchemaInfo.h> |
| 8 | #include <Formats/ProtobufSchemas.h> |
| 9 | #include <Interpreters/Context.h> |
| 10 | #include <google/protobuf/descriptor.h> |
| 11 | |
| 12 | |
| 13 | |
| 14 | namespace DB |
| 15 | { |
| 16 | namespace ErrorCodes |
| 17 | { |
| 18 | extern const int NOT_IMPLEMENTED; |
| 19 | extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD; |
| 20 | } |
| 21 | |
| 22 | |
| 23 | ProtobufRowOutputFormat::ProtobufRowOutputFormat( |
| 24 | WriteBuffer & out_, |
| 25 | const Block & , |
| 26 | FormatFactory::WriteCallback callback, |
| 27 | const FormatSchemaInfo & format_schema) |
| 28 | : IRowOutputFormat(header, out_, callback) |
| 29 | , data_types(header.getDataTypes()) |
| 30 | , writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames()) |
| 31 | { |
| 32 | value_indices.resize(header.columns()); |
| 33 | } |
| 34 | |
| 35 | void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num) |
| 36 | { |
| 37 | writer.startMessage(); |
| 38 | std::fill(value_indices.begin(), value_indices.end(), 0); |
| 39 | size_t column_index; |
| 40 | while (writer.writeField(column_index)) |
| 41 | data_types[column_index]->serializeProtobuf( |
| 42 | *columns[column_index], row_num, writer, value_indices[column_index]); |
| 43 | writer.endMessage(); |
| 44 | } |
| 45 | |
| 46 | |
| 47 | void registerOutputFormatProcessorProtobuf(FormatFactory & factory) |
| 48 | { |
| 49 | factory.registerOutputFormatProcessor( |
| 50 | "Protobuf" , |
| 51 | [](WriteBuffer & buf, |
| 52 | const Block & , |
| 53 | FormatFactory::WriteCallback callback, |
| 54 | const FormatSettings & settings) |
| 55 | { |
| 56 | return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback), |
| 57 | FormatSchemaInfo(settings.schema.format_schema, "Protobuf" , true, |
| 58 | settings.schema.is_server, settings.schema.format_schema_path)); |
| 59 | }); |
| 60 | } |
| 61 | |
| 62 | } |
| 63 | |
| 64 | #else |
| 65 | |
| 66 | namespace DB |
| 67 | { |
| 68 | class FormatFactory; |
| 69 | void registerOutputFormatProcessorProtobuf(FormatFactory &) {} |
| 70 | } |
| 71 | |
| 72 | #endif |
| 73 | |