| 1 | #include <Formats/FormatFactory.h> | 
|---|
| 2 | #include "ProtobufRowOutputFormat.h" | 
|---|
| 3 |  | 
|---|
| 4 | #if USE_PROTOBUF | 
|---|
| 5 |  | 
|---|
| 6 | #include <Core/Block.h> | 
|---|
| 7 | #include <Formats/FormatSchemaInfo.h> | 
|---|
| 8 | #include <Formats/ProtobufSchemas.h> | 
|---|
| 9 | #include <Interpreters/Context.h> | 
|---|
| 10 | #include <google/protobuf/descriptor.h> | 
|---|
| 11 |  | 
|---|
| 12 |  | 
|---|
| 13 |  | 
|---|
| 14 | namespace DB | 
|---|
| 15 | { | 
|---|
| 16 | namespace ErrorCodes | 
|---|
| 17 | { | 
|---|
| 18 | extern const int NOT_IMPLEMENTED; | 
|---|
| 19 | extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD; | 
|---|
| 20 | } | 
|---|
| 21 |  | 
|---|
| 22 |  | 
|---|
| 23 | ProtobufRowOutputFormat::ProtobufRowOutputFormat( | 
|---|
| 24 | WriteBuffer & out_, | 
|---|
| 25 | const Block & , | 
|---|
| 26 | FormatFactory::WriteCallback callback, | 
|---|
| 27 | const FormatSchemaInfo & format_schema) | 
|---|
| 28 | : IRowOutputFormat(header, out_, callback) | 
|---|
| 29 | , data_types(header.getDataTypes()) | 
|---|
| 30 | , writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames()) | 
|---|
| 31 | { | 
|---|
| 32 | value_indices.resize(header.columns()); | 
|---|
| 33 | } | 
|---|
| 34 |  | 
|---|
| 35 | void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num) | 
|---|
| 36 | { | 
|---|
| 37 | writer.startMessage(); | 
|---|
| 38 | std::fill(value_indices.begin(), value_indices.end(), 0); | 
|---|
| 39 | size_t column_index; | 
|---|
| 40 | while (writer.writeField(column_index)) | 
|---|
| 41 | data_types[column_index]->serializeProtobuf( | 
|---|
| 42 | *columns[column_index], row_num, writer, value_indices[column_index]); | 
|---|
| 43 | writer.endMessage(); | 
|---|
| 44 | } | 
|---|
| 45 |  | 
|---|
| 46 |  | 
|---|
| 47 | void registerOutputFormatProcessorProtobuf(FormatFactory & factory) | 
|---|
| 48 | { | 
|---|
| 49 | factory.registerOutputFormatProcessor( | 
|---|
| 50 | "Protobuf", | 
|---|
| 51 | [](WriteBuffer & buf, | 
|---|
| 52 | const Block & , | 
|---|
| 53 | FormatFactory::WriteCallback callback, | 
|---|
| 54 | const FormatSettings & settings) | 
|---|
| 55 | { | 
|---|
| 56 | return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback), | 
|---|
| 57 | FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true, | 
|---|
| 58 | settings.schema.is_server, settings.schema.format_schema_path)); | 
|---|
| 59 | }); | 
|---|
| 60 | } | 
|---|
| 61 |  | 
|---|
| 62 | } | 
|---|
| 63 |  | 
|---|
| 64 | #else | 
|---|
| 65 |  | 
|---|
| 66 | namespace DB | 
|---|
| 67 | { | 
|---|
| 68 | class FormatFactory; | 
|---|
| 69 | void registerOutputFormatProcessorProtobuf(FormatFactory &) {} | 
|---|
| 70 | } | 
|---|
| 71 |  | 
|---|
| 72 | #endif | 
|---|
| 73 |  | 
|---|