1 | #include "ProtobufRowInputFormat.h" |
2 | |
3 | #if USE_PROTOBUF |
4 | #include <Core/Block.h> |
5 | #include <Formats/FormatFactory.h> |
6 | #include <Formats/FormatSchemaInfo.h> |
7 | #include <Formats/ProtobufSchemas.h> |
8 | #include <Interpreters/Context.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & , Params params_, const FormatSchemaInfo & info_) |
15 | : IRowInputFormat(header_, in_, params_) |
16 | , data_types(header_.getDataTypes()) |
17 | , reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames()) |
18 | { |
19 | } |
20 | |
21 | ProtobufRowInputFormat::~ProtobufRowInputFormat() = default; |
22 | |
23 | bool ProtobufRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ) |
24 | { |
25 | if (!reader.startMessage()) |
26 | return false; // EOF reached, no more messages. |
27 | |
28 | // Set of columns for which the values were read. The rest will be filled with default values. |
29 | auto & read_columns = extra.read_columns; |
30 | read_columns.assign(columns.size(), false); |
31 | |
32 | // Read values from this message and put them to the columns while it's possible. |
33 | size_t column_index; |
34 | while (reader.readColumnIndex(column_index)) |
35 | { |
36 | bool allow_add_row = !static_cast<bool>(read_columns[column_index]); |
37 | do |
38 | { |
39 | bool row_added; |
40 | data_types[column_index]->deserializeProtobuf(*columns[column_index], reader, allow_add_row, row_added); |
41 | if (row_added) |
42 | { |
43 | read_columns[column_index] = true; |
44 | allow_add_row = false; |
45 | } |
46 | } while (reader.canReadMoreValues()); |
47 | } |
48 | |
49 | // Fill non-visited columns with the default values. |
50 | for (column_index = 0; column_index < read_columns.size(); ++column_index) |
51 | if (!read_columns[column_index]) |
52 | data_types[column_index]->insertDefaultInto(*columns[column_index]); |
53 | |
54 | reader.endMessage(); |
55 | return true; |
56 | } |
57 | |
58 | bool ProtobufRowInputFormat::allowSyncAfterError() const |
59 | { |
60 | return true; |
61 | } |
62 | |
63 | void ProtobufRowInputFormat::syncAfterError() |
64 | { |
65 | reader.endMessage(true); |
66 | } |
67 | |
68 | void registerInputFormatProcessorProtobuf(FormatFactory & factory) |
69 | { |
70 | factory.registerInputFormatProcessor("Protobuf" , []( |
71 | ReadBuffer & buf, |
72 | const Block & sample, |
73 | IRowInputFormat::Params params, |
74 | const FormatSettings & settings) |
75 | { |
76 | return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params), |
77 | FormatSchemaInfo(settings.schema.format_schema, "Protobuf" , true, |
78 | settings.schema.is_server, settings.schema.format_schema_path)); |
79 | }); |
80 | } |
81 | |
82 | } |
83 | |
84 | #else |
85 | |
86 | namespace DB |
87 | { |
88 | class FormatFactory; |
89 | void registerInputFormatProcessorProtobuf(FormatFactory &) {} |
90 | } |
91 | |
92 | #endif |
93 | |