1 | #pragma once |
2 | |
3 | #include <common/DayNum.h> |
4 | #include <Common/PODArray.h> |
5 | #include <Common/UInt128.h> |
6 | #include <Core/UUID.h> |
7 | |
8 | #include "config_formats.h" |
9 | #if USE_PROTOBUF |
10 | |
11 | #include <boost/noncopyable.hpp> |
12 | #include "ProtobufColumnMatcher.h" |
13 | #include <IO/ReadBuffer.h> |
14 | #include <memory> |
15 | |
16 | namespace google |
17 | { |
18 | namespace protobuf |
19 | { |
20 | class Descriptor; |
21 | } |
22 | } |
23 | |
24 | namespace DB |
25 | { |
26 | class Arena; |
27 | class IAggregateFunction; |
28 | class ReadBuffer; |
29 | using AggregateDataPtr = char *; |
30 | using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; |
31 | |
32 | |
33 | /** Deserializes a protobuf, tries to cast data types if necessarily. |
34 | */ |
35 | class ProtobufReader : private boost::noncopyable |
36 | { |
37 | public: |
38 | ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names); |
39 | ~ProtobufReader(); |
40 | |
41 | /// Should be called when we start reading a new message. |
42 | bool startMessage(); |
43 | |
44 | /// Ends reading a message. |
45 | void endMessage(bool ignore_errors = false); |
46 | |
47 | /// Reads the column index. |
48 | /// The function returns false if there are no more columns to read (call endMessage() in this case). |
49 | bool readColumnIndex(size_t & column_index); |
50 | |
51 | /// Reads a value which should be put to column at index received with readColumnIndex(). |
52 | /// The function returns false if there are no more values to read now (call readColumnIndex() in this case). |
53 | bool readNumber(Int8 & value) { return current_converter->readInt8(value); } |
54 | bool readNumber(UInt8 & value) { return current_converter->readUInt8(value); } |
55 | bool readNumber(Int16 & value) { return current_converter->readInt16(value); } |
56 | bool readNumber(UInt16 & value) { return current_converter->readUInt16(value); } |
57 | bool readNumber(Int32 & value) { return current_converter->readInt32(value); } |
58 | bool readNumber(UInt32 & value) { return current_converter->readUInt32(value); } |
59 | bool readNumber(Int64 & value) { return current_converter->readInt64(value); } |
60 | bool readNumber(UInt64 & value) { return current_converter->readUInt64(value); } |
61 | bool readNumber(UInt128 & value) { return current_converter->readUInt128(value); } |
62 | bool readNumber(Float32 & value) { return current_converter->readFloat32(value); } |
63 | bool readNumber(Float64 & value) { return current_converter->readFloat64(value); } |
64 | |
65 | bool readStringInto(PaddedPODArray<UInt8> & str) { return current_converter->readStringInto(str); } |
66 | |
67 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & name_value_pairs) { current_converter->prepareEnumMapping8(name_value_pairs); } |
68 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & name_value_pairs) { current_converter->prepareEnumMapping16(name_value_pairs); } |
69 | bool readEnum(Int8 & value) { return current_converter->readEnum8(value); } |
70 | bool readEnum(Int16 & value) { return current_converter->readEnum16(value); } |
71 | |
72 | bool readUUID(UUID & uuid) { return current_converter->readUUID(uuid); } |
73 | bool readDate(DayNum & date) { return current_converter->readDate(date); } |
74 | bool readDateTime(time_t & tm) { return current_converter->readDateTime(tm); } |
75 | bool readDateTime64(DateTime64 & tm, UInt32 scale) { return current_converter->readDateTime64(tm, scale); } |
76 | |
77 | bool readDecimal(Decimal32 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal32(decimal, precision, scale); } |
78 | bool readDecimal(Decimal64 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal64(decimal, precision, scale); } |
79 | bool readDecimal(Decimal128 & decimal, UInt32 precision, UInt32 scale) { return current_converter->readDecimal128(decimal, precision, scale); } |
80 | |
81 | bool readAggregateFunction(const AggregateFunctionPtr & function, AggregateDataPtr place, Arena & arena) { return current_converter->readAggregateFunction(function, place, arena); } |
82 | |
83 | /// Call it after calling one of the read*() function to determine if there are more values available for reading. |
84 | bool ALWAYS_INLINE canReadMoreValues() const { return simple_reader.canReadMoreValues(); } |
85 | |
86 | private: |
87 | class SimpleReader |
88 | { |
89 | public: |
90 | SimpleReader(ReadBuffer & in_); |
91 | bool startMessage(); |
92 | void endMessage(bool ignore_errors); |
93 | void startNestedMessage(); |
94 | void endNestedMessage(); |
95 | bool readFieldNumber(UInt32 & field_number); |
96 | bool readInt(Int64 & value); |
97 | bool readSInt(Int64 & value); |
98 | bool readUInt(UInt64 & value); |
99 | template<typename T> bool readFixed(T & value); |
100 | bool readStringInto(PaddedPODArray<UInt8> & str); |
101 | |
102 | bool ALWAYS_INLINE canReadMoreValues() const { return cursor < field_end; } |
103 | |
104 | private: |
105 | void readBinary(void * data, size_t size); |
106 | void ignore(UInt64 num_bytes); |
107 | void moveCursorBackward(UInt64 num_bytes); |
108 | |
109 | UInt64 ALWAYS_INLINE readVarint() |
110 | { |
111 | char c; |
112 | in.readStrict(c); |
113 | UInt64 first_byte = static_cast<UInt8>(c); |
114 | ++cursor; |
115 | if (likely(!(c & 0x80))) |
116 | return first_byte; |
117 | return continueReadingVarint(first_byte); |
118 | } |
119 | |
120 | UInt64 continueReadingVarint(UInt64 first_byte); |
121 | void ignoreVarint(); |
122 | void ignoreGroup(); |
123 | |
124 | ReadBuffer & in; |
125 | UInt64 cursor; |
126 | size_t current_message_level; |
127 | UInt64 current_message_end; |
128 | std::vector<UInt64> parent_message_ends; |
129 | UInt64 field_end; |
130 | UInt64 last_string_pos; |
131 | }; |
132 | |
133 | class IConverter |
134 | { |
135 | public: |
136 | virtual ~IConverter() = default; |
137 | virtual bool readStringInto(PaddedPODArray<UInt8> &) = 0; |
138 | virtual bool readInt8(Int8&) = 0; |
139 | virtual bool readUInt8(UInt8 &) = 0; |
140 | virtual bool readInt16(Int16 &) = 0; |
141 | virtual bool readUInt16(UInt16 &) = 0; |
142 | virtual bool readInt32(Int32 &) = 0; |
143 | virtual bool readUInt32(UInt32 &) = 0; |
144 | virtual bool readInt64(Int64 &) = 0; |
145 | virtual bool readUInt64(UInt64 &) = 0; |
146 | virtual bool readUInt128(UInt128 &) = 0; |
147 | virtual bool readFloat32(Float32 &) = 0; |
148 | virtual bool readFloat64(Float64 &) = 0; |
149 | virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0; |
150 | virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0; |
151 | virtual bool readEnum8(Int8 &) = 0; |
152 | virtual bool readEnum16(Int16 &) = 0; |
153 | virtual bool readUUID(UUID &) = 0; |
154 | virtual bool readDate(DayNum &) = 0; |
155 | virtual bool readDateTime(time_t &) = 0; |
156 | virtual bool readDateTime64(DateTime64 &, UInt32) = 0; |
157 | virtual bool readDecimal32(Decimal32 &, UInt32, UInt32) = 0; |
158 | virtual bool readDecimal64(Decimal64 &, UInt32, UInt32) = 0; |
159 | virtual bool readDecimal128(Decimal128 &, UInt32, UInt32) = 0; |
160 | virtual bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) = 0; |
161 | }; |
162 | |
163 | class ConverterBaseImpl; |
164 | class ConverterFromString; |
165 | template<int field_type_id, typename FromType> class ConverterFromNumber; |
166 | class ConverterFromBool; |
167 | class ; |
168 | |
169 | struct ColumnMatcherTraits |
170 | { |
171 | struct FieldData |
172 | { |
173 | std::unique_ptr<IConverter> converter; |
174 | }; |
175 | struct MessageData |
176 | { |
177 | std::unordered_map<UInt32, const ProtobufColumnMatcher::Field<ColumnMatcherTraits>*> field_number_to_field_map; |
178 | }; |
179 | }; |
180 | using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>; |
181 | using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>; |
182 | |
183 | void setTraitsDataAfterMatchingColumns(Message * message); |
184 | |
185 | template <int field_type_id> |
186 | std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field); |
187 | |
188 | SimpleReader simple_reader; |
189 | std::unique_ptr<Message> root_message; |
190 | Message* current_message = nullptr; |
191 | size_t current_field_index = 0; |
192 | IConverter* current_converter = nullptr; |
193 | }; |
194 | |
195 | } |
196 | |
197 | #else |
198 | |
199 | namespace DB |
200 | { |
201 | class Arena; |
202 | class IAggregateFunction; |
203 | class ReadBuffer; |
204 | using AggregateDataPtr = char *; |
205 | using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; |
206 | |
207 | class ProtobufReader |
208 | { |
209 | public: |
210 | bool startMessage() { return false; } |
211 | void endMessage() {} |
212 | bool readColumnIndex(size_t &) { return false; } |
213 | bool readNumber(Int8 &) { return false; } |
214 | bool readNumber(UInt8 &) { return false; } |
215 | bool readNumber(Int16 &) { return false; } |
216 | bool readNumber(UInt16 &) { return false; } |
217 | bool readNumber(Int32 &) { return false; } |
218 | bool readNumber(UInt32 &) { return false; } |
219 | bool readNumber(Int64 &) { return false; } |
220 | bool readNumber(UInt64 &) { return false; } |
221 | bool readNumber(UInt128 &) { return false; } |
222 | bool readNumber(Float32 &) { return false; } |
223 | bool readNumber(Float64 &) { return false; } |
224 | bool readStringInto(PaddedPODArray<UInt8> &) { return false; } |
225 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> &) {} |
226 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> &) {} |
227 | bool readEnum(Int8 &) { return false; } |
228 | bool readEnum(Int16 &) { return false; } |
229 | bool readUUID(UUID &) { return false; } |
230 | bool readDate(DayNum &) { return false; } |
231 | bool readDateTime(time_t &) { return false; } |
232 | bool readDateTime64(DateTime64 & /*tm*/, UInt32 /*scale*/) { return false; } |
233 | bool readDecimal(Decimal32 &, UInt32, UInt32) { return false; } |
234 | bool readDecimal(Decimal64 &, UInt32, UInt32) { return false; } |
235 | bool readDecimal(Decimal128 &, UInt32, UInt32) { return false; } |
236 | bool readAggregateFunction(const AggregateFunctionPtr &, AggregateDataPtr, Arena &) { return false; } |
237 | bool canReadMoreValues() const { return false; } |
238 | }; |
239 | |
240 | } |
241 | #endif |
242 | |