1 | #pragma once |
2 | |
3 | #include <Core/UUID.h> |
4 | #include <Common/UInt128.h> |
5 | #include <common/DayNum.h> |
6 | |
7 | #include "config_formats.h" |
8 | |
9 | #include <memory> |
10 | |
11 | #if USE_PROTOBUF |
12 | |
13 | #include "ProtobufColumnMatcher.h" |
14 | #include <IO/WriteBufferFromString.h> |
15 | #include <boost/noncopyable.hpp> |
16 | #include <Common/PODArray.h> |
17 | |
18 | |
19 | namespace google |
20 | { |
21 | namespace protobuf |
22 | { |
23 | class Descriptor; |
24 | class FieldDescriptor; |
25 | } |
26 | } |
27 | |
28 | namespace DB |
29 | { |
30 | class IAggregateFunction; |
31 | using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; |
32 | using ConstAggregateDataPtr = const char *; |
33 | |
34 | |
35 | /** Serializes a protobuf, tries to cast types if necessarily. |
36 | */ |
37 | class ProtobufWriter : private boost::noncopyable |
38 | { |
39 | public: |
40 | ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names); |
41 | ~ProtobufWriter(); |
42 | |
43 | /// Should be called at the beginning of writing a message. |
44 | void startMessage(); |
45 | |
46 | /// Should be called at the end of writing a message. |
47 | void endMessage(); |
48 | |
49 | /// Prepares for writing values of a field. |
50 | /// Returns true and sets 'column_index' to the corresponding column's index. |
51 | /// Returns false if there are no more fields to write in the message type (call endMessage() in this case). |
52 | bool writeField(size_t & column_index); |
53 | |
54 | /// Writes a value. This function should be called one or multiple times after writeField(). |
55 | /// Returns false if there are no more place for the values in the protobuf's field. |
56 | /// This can happen if the protobuf's field is not declared as repeated in the protobuf schema. |
57 | bool writeNumber(Int8 value) { return writeValueIfPossible(&IConverter::writeInt8, value); } |
58 | bool writeNumber(UInt8 value) { return writeValueIfPossible(&IConverter::writeUInt8, value); } |
59 | bool writeNumber(Int16 value) { return writeValueIfPossible(&IConverter::writeInt16, value); } |
60 | bool writeNumber(UInt16 value) { return writeValueIfPossible(&IConverter::writeUInt16, value); } |
61 | bool writeNumber(Int32 value) { return writeValueIfPossible(&IConverter::writeInt32, value); } |
62 | bool writeNumber(UInt32 value) { return writeValueIfPossible(&IConverter::writeUInt32, value); } |
63 | bool writeNumber(Int64 value) { return writeValueIfPossible(&IConverter::writeInt64, value); } |
64 | bool writeNumber(UInt64 value) { return writeValueIfPossible(&IConverter::writeUInt64, value); } |
65 | bool writeNumber(UInt128 value) { return writeValueIfPossible(&IConverter::writeUInt128, value); } |
66 | bool writeNumber(Float32 value) { return writeValueIfPossible(&IConverter::writeFloat32, value); } |
67 | bool writeNumber(Float64 value) { return writeValueIfPossible(&IConverter::writeFloat64, value); } |
68 | bool writeString(const StringRef & str) { return writeValueIfPossible(&IConverter::writeString, str); } |
69 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & enum_values) { current_converter->prepareEnumMapping8(enum_values); } |
70 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & enum_values) { current_converter->prepareEnumMapping16(enum_values); } |
71 | bool writeEnum(Int8 value) { return writeValueIfPossible(&IConverter::writeEnum8, value); } |
72 | bool writeEnum(Int16 value) { return writeValueIfPossible(&IConverter::writeEnum16, value); } |
73 | bool writeUUID(const UUID & uuid) { return writeValueIfPossible(&IConverter::writeUUID, uuid); } |
74 | bool writeDate(DayNum date) { return writeValueIfPossible(&IConverter::writeDate, date); } |
75 | bool writeDateTime(time_t tm) { return writeValueIfPossible(&IConverter::writeDateTime, tm); } |
76 | bool writeDateTime64(DateTime64 tm, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDateTime64, tm, scale); } |
77 | bool writeDecimal(Decimal32 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal32, decimal, scale); } |
78 | bool writeDecimal(Decimal64 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal64, decimal, scale); } |
79 | bool writeDecimal(const Decimal128 & decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal128, decimal, scale); } |
80 | bool writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) { return writeValueIfPossible(&IConverter::writeAggregateFunction, function, place); } |
81 | |
82 | private: |
83 | class SimpleWriter |
84 | { |
85 | public: |
86 | SimpleWriter(WriteBuffer & out_); |
87 | ~SimpleWriter(); |
88 | |
89 | void startMessage(); |
90 | void endMessage(); |
91 | |
92 | void startNestedMessage(); |
93 | void endNestedMessage(UInt32 field_number, bool is_group, bool skip_if_empty); |
94 | |
95 | void writeInt(UInt32 field_number, Int64 value); |
96 | void writeUInt(UInt32 field_number, UInt64 value); |
97 | void writeSInt(UInt32 field_number, Int64 value); |
98 | template <typename T> |
99 | void writeFixed(UInt32 field_number, T value); |
100 | void writeString(UInt32 field_number, const StringRef & str); |
101 | |
102 | void startRepeatedPack(); |
103 | void addIntToRepeatedPack(Int64 value); |
104 | void addUIntToRepeatedPack(UInt64 value); |
105 | void addSIntToRepeatedPack(Int64 value); |
106 | template <typename T> |
107 | void addFixedToRepeatedPack(T value); |
108 | void endRepeatedPack(UInt32 field_number); |
109 | |
110 | private: |
111 | struct Piece |
112 | { |
113 | size_t start; |
114 | size_t end; |
115 | Piece(size_t start_, size_t end_) : start(start_), end(end_) {} |
116 | Piece() = default; |
117 | }; |
118 | |
119 | struct NestedInfo |
120 | { |
121 | size_t num_pieces_at_start; |
122 | size_t num_bytes_skipped_at_start; |
123 | NestedInfo(size_t num_pieces_at_start_, size_t num_bytes_skipped_at_start_) |
124 | : num_pieces_at_start(num_pieces_at_start_), num_bytes_skipped_at_start(num_bytes_skipped_at_start_) |
125 | { |
126 | } |
127 | }; |
128 | |
129 | WriteBuffer & out; |
130 | PODArray<UInt8> buffer; |
131 | std::vector<Piece> pieces; |
132 | size_t current_piece_start; |
133 | size_t num_bytes_skipped; |
134 | std::vector<NestedInfo> nested_infos; |
135 | }; |
136 | |
137 | class IConverter |
138 | { |
139 | public: |
140 | virtual ~IConverter() = default; |
141 | virtual void writeString(const StringRef &) = 0; |
142 | virtual void writeInt8(Int8) = 0; |
143 | virtual void writeUInt8(UInt8) = 0; |
144 | virtual void writeInt16(Int16) = 0; |
145 | virtual void writeUInt16(UInt16) = 0; |
146 | virtual void writeInt32(Int32) = 0; |
147 | virtual void writeUInt32(UInt32) = 0; |
148 | virtual void writeInt64(Int64) = 0; |
149 | virtual void writeUInt64(UInt64) = 0; |
150 | virtual void writeUInt128(const UInt128 &) = 0; |
151 | virtual void writeFloat32(Float32) = 0; |
152 | virtual void writeFloat64(Float64) = 0; |
153 | virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0; |
154 | virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0; |
155 | virtual void writeEnum8(Int8) = 0; |
156 | virtual void writeEnum16(Int16) = 0; |
157 | virtual void writeUUID(const UUID &) = 0; |
158 | virtual void writeDate(DayNum) = 0; |
159 | virtual void writeDateTime(time_t) = 0; |
160 | virtual void writeDateTime64(DateTime64, UInt32 scale) = 0; |
161 | virtual void writeDecimal32(Decimal32, UInt32) = 0; |
162 | virtual void writeDecimal64(Decimal64, UInt32) = 0; |
163 | virtual void writeDecimal128(const Decimal128 &, UInt32) = 0; |
164 | virtual void writeAggregateFunction(const AggregateFunctionPtr &, ConstAggregateDataPtr) = 0; |
165 | }; |
166 | |
167 | class ConverterBaseImpl; |
168 | template <bool skip_null_value> |
169 | class ConverterToString; |
170 | template <int field_type_id, typename ToType, bool skip_null_value, bool pack_repeated> |
171 | class ConverterToNumber; |
172 | template <bool skip_null_value, bool pack_repeated> |
173 | class ConverterToBool; |
174 | template <bool skip_null_value, bool pack_repeated> |
175 | class ConverterToEnum; |
176 | |
177 | struct ColumnMatcherTraits |
178 | { |
179 | struct FieldData |
180 | { |
181 | std::unique_ptr<IConverter> converter; |
182 | bool is_required; |
183 | bool is_repeatable; |
184 | bool should_pack_repeated; |
185 | ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message; |
186 | }; |
187 | struct MessageData |
188 | { |
189 | UInt32 parent_field_number; |
190 | bool is_group; |
191 | bool is_required; |
192 | ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message; |
193 | bool need_repeat; |
194 | }; |
195 | }; |
196 | using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>; |
197 | using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>; |
198 | |
199 | void setTraitsDataAfterMatchingColumns(Message * message); |
200 | |
201 | template <int field_type_id> |
202 | std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field); |
203 | |
204 | template <typename... Params> |
205 | using WriteValueFunctionPtr = void (IConverter::*)(Params...); |
206 | |
207 | template <typename... Params, typename... Args> |
208 | bool writeValueIfPossible(WriteValueFunctionPtr<Params...> func, Args &&... args) |
209 | { |
210 | if (num_values && !current_field->data.is_repeatable) |
211 | { |
212 | setNestedMessageNeedsRepeat(); |
213 | return false; |
214 | } |
215 | (current_converter->*func)(std::forward<Args>(args)...); |
216 | ++num_values; |
217 | return true; |
218 | } |
219 | |
220 | void setNestedMessageNeedsRepeat(); |
221 | void endWritingField(); |
222 | |
223 | SimpleWriter simple_writer; |
224 | std::unique_ptr<Message> root_message; |
225 | |
226 | Message * current_message; |
227 | size_t current_field_index = 0; |
228 | const Field * current_field = nullptr; |
229 | IConverter * current_converter = nullptr; |
230 | size_t num_values = 0; |
231 | }; |
232 | |
233 | } |
234 | |
235 | #else |
236 | |
237 | namespace DB |
238 | { |
239 | class IAggregateFunction; |
240 | using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; |
241 | using ConstAggregateDataPtr = const char *; |
242 | |
243 | class ProtobufWriter |
244 | { |
245 | public: |
246 | bool writeNumber(Int8 /* value */) { return false; } |
247 | bool writeNumber(UInt8 /* value */) { return false; } |
248 | bool writeNumber(Int16 /* value */) { return false; } |
249 | bool writeNumber(UInt16 /* value */) { return false; } |
250 | bool writeNumber(Int32 /* value */) { return false; } |
251 | bool writeNumber(UInt32 /* value */) { return false; } |
252 | bool writeNumber(Int64 /* value */) { return false; } |
253 | bool writeNumber(UInt64 /* value */) { return false; } |
254 | bool writeNumber(UInt128 /* value */) { return false; } |
255 | bool writeNumber(Float32 /* value */) { return false; } |
256 | bool writeNumber(Float64 /* value */) { return false; } |
257 | bool writeString(const StringRef & /* value */) { return false; } |
258 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & /* name_value_pairs */) {} |
259 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & /* name_value_pairs */) {} |
260 | bool writeEnum(Int8 /* value */) { return false; } |
261 | bool writeEnum(Int16 /* value */) { return false; } |
262 | bool writeUUID(const UUID & /* value */) { return false; } |
263 | bool writeDate(DayNum /* date */) { return false; } |
264 | bool writeDateTime(time_t /* tm */) { return false; } |
265 | bool writeDateTime64(DateTime64 /*tm*/, UInt32 /*scale*/) { return false; } |
266 | bool writeDecimal(Decimal32 /* decimal */, UInt32 /* scale */) { return false; } |
267 | bool writeDecimal(Decimal64 /* decimal */, UInt32 /* scale */) { return false; } |
268 | bool writeDecimal(const Decimal128 & /* decimal */, UInt32 /* scale */) { return false; } |
269 | bool writeAggregateFunction(const AggregateFunctionPtr & /* function */, ConstAggregateDataPtr /* place */) { return false; } |
270 | }; |
271 | |
272 | } |
273 | #endif |
274 | |