1#pragma once
2
3#include <common/DayNum.h>
4#include <Common/PODArray.h>
5#include <Common/UInt128.h>
6#include <Core/UUID.h>
7
8#include "config_formats.h"
9#if USE_PROTOBUF
10
11#include <boost/noncopyable.hpp>
12#include "ProtobufColumnMatcher.h"
13#include <IO/ReadBuffer.h>
14#include <memory>
15
16namespace google
17{
18namespace protobuf
19{
20 class Descriptor;
21}
22}
23
24namespace DB
25{
26class Arena;
27class IAggregateFunction;
28class ReadBuffer;
29using AggregateDataPtr = char *;
30using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
31
32
33/** Deserializes a protobuf, tries to cast data types if necessarily.
34 */
35class ProtobufReader : private boost::noncopyable
36{
37public:
38 ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names);
39 ~ProtobufReader();
40
41 /// Should be called when we start reading a new message.
42 bool startMessage();
43
44 /// Ends reading a message.
45 void endMessage(bool ignore_errors = false);
46
47 /// Reads the column index.
48 /// The function returns false if there are no more columns to read (call endMessage() in this case).
49 bool readColumnIndex(size_t & column_index);
50
51 /// Reads a value which should be put to column at index received with readColumnIndex().
52 /// The function returns false if there are no more values to read now (call readColumnIndex() in this case).
53 bool readNumber(Int8 & value) { return current_converter->readInt8(value); }
54 bool readNumber(UInt8 & value) { return current_converter->readUInt8(value); }
55 bool readNumber(Int16 & value) { return current_converter->readInt16(value); }
56 bool readNumber(UInt16 & value) { return current_converter->readUInt16(value); }
57 bool readNumber(Int32 & value) { return current_converter->readInt32(value); }
58 bool readNumber(UInt32 & value) { return current_converter->readUInt32(value); }
59 bool readNumber(Int64 & value) { return current_converter->readInt64(value); }
60 bool readNumber(UInt64 & value) { return current_converter->readUInt64(value); }
61 bool readNumber(UInt128 & value) { return current_converter->readUInt128(value); }
62 bool readNumber(Float32 & value) { return current_converter->readFloat32(value); }
63 bool readNumber(Float64 & value) { return current_converter->readFloat64(value); }
64
65 bool readStringInto(PaddedPODArray<UInt8> & str) { return current_converter->readStringInto(str); }
66
67 void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) { current_converter->prepareEnumMapping8(name_value_pairs); }
68 void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) { current_converter->prepareEnumMapping16(name_value_pairs); }
69 bool readEnum(Int8 & value) { return current_converter->readEnum8(value); }
70 bool readEnum(Int16 & value) { return current_converter->readEnum16(value); }
71
72 bool readUUID(UUID & uuid) { return current_converter->readUUID(uuid); }
73 bool readDate(DayNum & date) { return current_converter->readDate(date); }
74 bool readDateTime(time_t & tm) { return current_converter->readDateTime(tm); }
75 bool readDateTime64(DateTime64 & tm, UInt32 scale) { return current_converter->readDateTime64(tm, scale); }
76
77 bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal32(decimal, precision, scale); }
78 bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal64(decimal, precision, scale); }
79 bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal128(decimal, precision, scale); }
80
81 bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return current_converter->readAggregateFunction(function, place, arena); }
82
83 /// Call it after calling one of the read*() function to determine if there are more values available for reading.
84 bool ALWAYS_INLINE canReadMoreValues() const { return simple_reader.canReadMoreValues(); }
85
86private:
87 class SimpleReader
88 {
89 public:
90 SimpleReader(ReadBuffer & in_);
91 bool startMessage();
92 void endMessage(bool ignore_errors);
93 void startNestedMessage();
94 void endNestedMessage();
95 bool readFieldNumber(UInt32 & field_number);
96 bool readInt(Int64 & value);
97 bool readSInt(Int64 & value);
98 bool readUInt(UInt64 & value);
99 template<typename T> bool readFixed(T & value);
100 bool readStringInto(PaddedPODArray<UInt8> & str);
101
102 bool ALWAYS_INLINE canReadMoreValues() const { return cursor < field_end; }
103
104 private:
105 void readBinary(void * data, size_t size);
106 void ignore(UInt64 num_bytes);
107 void moveCursorBackward(UInt64 num_bytes);
108
109 UInt64 ALWAYS_INLINE readVarint()
110 {
111 char c;
112 in.readStrict(c);
113 UInt64 first_byte = static_cast<UInt8>(c);
114 ++cursor;
115 if (likely(!(c & 0x80)))
116 return first_byte;
117 return continueReadingVarint(first_byte);
118 }
119
120 UInt64 continueReadingVarint(UInt64 first_byte);
121 void ignoreVarint();
122 void ignoreGroup();
123
124 ReadBuffer & in;
125 UInt64 cursor;
126 size_t current_message_level;
127 UInt64 current_message_end;
128 std::vector<UInt64> parent_message_ends;
129 UInt64 field_end;
130 UInt64 last_string_pos;
131 };
132
133 class IConverter
134 {
135 public:
136 virtual ~IConverter() = default;
137 virtual bool readStringInto(PaddedPODArray<UInt8> &) = 0;
138 virtual bool readInt8(Int8&) = 0;
139 virtual bool readUInt8(UInt8 &) = 0;
140 virtual bool readInt16(Int16 &) = 0;
141 virtual bool readUInt16(UInt16 &) = 0;
142 virtual bool readInt32(Int32 &) = 0;
143 virtual bool readUInt32(UInt32 &) = 0;
144 virtual bool readInt64(Int64 &) = 0;
145 virtual bool readUInt64(UInt64 &) = 0;
146 virtual bool readUInt128(UInt128 &) = 0;
147 virtual bool readFloat32(Float32 &) = 0;
148 virtual bool readFloat64(Float64 &) = 0;
149 virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0;
150 virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0;
151 virtual bool readEnum8(Int8 &) = 0;
152 virtual bool readEnum16(Int16 &) = 0;
153 virtual bool readUUID(UUID &) = 0;
154 virtual bool readDate(DayNum &) = 0;
155 virtual bool readDateTime(time_t &) = 0;
156 virtual bool readDateTime64(DateTime64 &, UInt32) = 0;
157 virtual bool readDecimal32(Decimal32 &, UInt32, UInt32) = 0;
158 virtual bool readDecimal64(Decimal64 &, UInt32, UInt32) = 0;
159 virtual bool readDecimal128(Decimal128 &, UInt32, UInt32) = 0;
160 virtual bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) = 0;
161 };
162
163 class ConverterBaseImpl;
164 class ConverterFromString;
165 template<int field_type_id, typename FromType> class ConverterFromNumber;
166 class ConverterFromBool;
167 class ConverterFromEnum;
168
169 struct ColumnMatcherTraits
170 {
171 struct FieldData
172 {
173 std::unique_ptr<IConverter> converter;
174 };
175 struct MessageData
176 {
177 std::unordered_map<UInt32, const ProtobufColumnMatcher::Field<ColumnMatcherTraits>*> field_number_to_field_map;
178 };
179 };
180 using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>;
181 using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>;
182
183 void setTraitsDataAfterMatchingColumns(Message * message);
184
185 template <int field_type_id>
186 std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field);
187
188 SimpleReader simple_reader;
189 std::unique_ptr<Message> root_message;
190 Message* current_message = nullptr;
191 size_t current_field_index = 0;
192 IConverter* current_converter = nullptr;
193};
194
195}
196
197#else
198
199namespace DB
200{
201class Arena;
202class IAggregateFunction;
203class ReadBuffer;
204using AggregateDataPtr = char *;
205using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
206
207class ProtobufReader
208{
209public:
210 bool startMessage() { return false; }
211 void endMessage() {}
212 bool readColumnIndex(size_t &) { return false; }
213 bool readNumber(Int8 &) { return false; }
214 bool readNumber(UInt8 &) { return false; }
215 bool readNumber(Int16 &) { return false; }
216 bool readNumber(UInt16 &) { return false; }
217 bool readNumber(Int32 &) { return false; }
218 bool readNumber(UInt32 &) { return false; }
219 bool readNumber(Int64 &) { return false; }
220 bool readNumber(UInt64 &) { return false; }
221 bool readNumber(UInt128 &) { return false; }
222 bool readNumber(Float32 &) { return false; }
223 bool readNumber(Float64 &) { return false; }
224 bool readStringInto(PaddedPODArray<UInt8> &) { return false; }
225 void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> &) {}
226 void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> &) {}
227 bool readEnum(Int8 &) { return false; }
228 bool readEnum(Int16 &) { return false; }
229 bool readUUID(UUID &) { return false; }
230 bool readDate(DayNum &) { return false; }
231 bool readDateTime(time_t &) { return false; }
232 bool readDateTime64(DateTime64 & /*tm*/, UInt32 /*scale*/) { return false; }
233 bool readDecimal(Decimal32 &, UInt32, UInt32) { return false; }
234 bool readDecimal(Decimal64 &, UInt32, UInt32) { return false; }
235 bool readDecimal(Decimal128 &, UInt32, UInt32) { return false; }
236 bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) { return false; }
237 bool canReadMoreValues() const { return false; }
238};
239
240}
241#endif
242