1#include <Formats/FormatFactory.h>
2#include "ProtobufRowOutputFormat.h"
3
4#if USE_PROTOBUF
5
6#include <Core/Block.h>
7#include <Formats/FormatSchemaInfo.h>
8#include <Formats/ProtobufSchemas.h>
9#include <Interpreters/Context.h>
10#include <google/protobuf/descriptor.h>
11
12
13
14namespace DB
15{
16namespace ErrorCodes
17{
18 extern const int NOT_IMPLEMENTED;
19 extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD;
20}
21
22
23ProtobufRowOutputFormat::ProtobufRowOutputFormat(
24 WriteBuffer & out_,
25 const Block & header,
26 FormatFactory::WriteCallback callback,
27 const FormatSchemaInfo & format_schema)
28 : IRowOutputFormat(header, out_, callback)
29 , data_types(header.getDataTypes())
30 , writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
31{
32 value_indices.resize(header.columns());
33}
34
35void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
36{
37 writer.startMessage();
38 std::fill(value_indices.begin(), value_indices.end(), 0);
39 size_t column_index;
40 while (writer.writeField(column_index))
41 data_types[column_index]->serializeProtobuf(
42 *columns[column_index], row_num, writer, value_indices[column_index]);
43 writer.endMessage();
44}
45
46
47void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
48{
49 factory.registerOutputFormatProcessor(
50 "Protobuf",
51 [](WriteBuffer & buf,
52 const Block & header,
53 FormatFactory::WriteCallback callback,
54 const FormatSettings & settings)
55 {
56 return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback),
57 FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
58 settings.schema.is_server, settings.schema.format_schema_path));
59 });
60}
61
62}
63
64#else
65
66namespace DB
67{
68 class FormatFactory;
69 void registerOutputFormatProcessorProtobuf(FormatFactory &) {}
70}
71
72#endif
73