1#include "ProtobufRowInputFormat.h"
2
3#if USE_PROTOBUF
4#include <Core/Block.h>
5#include <Formats/FormatFactory.h>
6#include <Formats/FormatSchemaInfo.h>
7#include <Formats/ProtobufSchemas.h>
8#include <Interpreters/Context.h>
9
10
11namespace DB
12{
13
14ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_)
15 : IRowInputFormat(header_, in_, params_)
16 , data_types(header_.getDataTypes())
17 , reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames())
18{
19}
20
21ProtobufRowInputFormat::~ProtobufRowInputFormat() = default;
22
23bool ProtobufRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & extra)
24{
25 if (!reader.startMessage())
26 return false; // EOF reached, no more messages.
27
28 // Set of columns for which the values were read. The rest will be filled with default values.
29 auto & read_columns = extra.read_columns;
30 read_columns.assign(columns.size(), false);
31
32 // Read values from this message and put them to the columns while it's possible.
33 size_t column_index;
34 while (reader.readColumnIndex(column_index))
35 {
36 bool allow_add_row = !static_cast<bool>(read_columns[column_index]);
37 do
38 {
39 bool row_added;
40 data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added);
41 if (row_added)
42 {
43 read_columns[column_index] = true;
44 allow_add_row = false;
45 }
46 } while (reader.canReadMoreValues());
47 }
48
49 // Fill non-visited columns with the default values.
50 for (column_index = 0; column_index < read_columns.size(); ++column_index)
51 if (!read_columns[column_index])
52 data_types[column_index]->insertDefaultInto(*columns[column_index]);
53
54 reader.endMessage();
55 return true;
56}
57
58bool ProtobufRowInputFormat::allowSyncAfterError() const
59{
60 return true;
61}
62
63void ProtobufRowInputFormat::syncAfterError()
64{
65 reader.endMessage(true);
66}
67
68void registerInputFormatProcessorProtobuf(FormatFactory & factory)
69{
70 factory.registerInputFormatProcessor("Protobuf", [](
71 ReadBuffer & buf,
72 const Block & sample,
73 IRowInputFormat::Params params,
74 const FormatSettings & settings)
75 {
76 return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
77 FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
78 settings.schema.is_server, settings.schema.format_schema_path));
79 });
80}
81
82}
83
84#else
85
86namespace DB
87{
88class FormatFactory;
89void registerInputFormatProcessorProtobuf(FormatFactory &) {}
90}
91
92#endif
93