1#pragma once
2
3#include <Core/UUID.h>
4#include <Common/UInt128.h>
5#include <common/DayNum.h>
6
7#include "config_formats.h"
8
9#include <memory>
10
11#if USE_PROTOBUF
12
13#include "ProtobufColumnMatcher.h"
14#include <IO/WriteBufferFromString.h>
15#include <boost/noncopyable.hpp>
16#include <Common/PODArray.h>
17
18
19namespace google
20{
21namespace protobuf
22{
23 class Descriptor;
24 class FieldDescriptor;
25}
26}
27
28namespace DB
29{
30class IAggregateFunction;
31using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
32using ConstAggregateDataPtr = const char *;
33
34
35/** Serializes a protobuf, tries to cast types if necessarily.
36 */
37class ProtobufWriter : private boost::noncopyable
38{
39public:
40 ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names);
41 ~ProtobufWriter();
42
43 /// Should be called at the beginning of writing a message.
44 void startMessage();
45
46 /// Should be called at the end of writing a message.
47 void endMessage();
48
49 /// Prepares for writing values of a field.
50 /// Returns true and sets 'column_index' to the corresponding column's index.
51 /// Returns false if there are no more fields to write in the message type (call endMessage() in this case).
52 bool writeField(size_t & column_index);
53
54 /// Writes a value. This function should be called one or multiple times after writeField().
55 /// Returns false if there are no more place for the values in the protobuf's field.
56 /// This can happen if the protobuf's field is not declared as repeated in the protobuf schema.
57 bool writeNumber(Int8 value) { return writeValueIfPossible(&IConverter::writeInt8, value); }
58 bool writeNumber(UInt8 value) { return writeValueIfPossible(&IConverter::writeUInt8, value); }
59 bool writeNumber(Int16 value) { return writeValueIfPossible(&IConverter::writeInt16, value); }
60 bool writeNumber(UInt16 value) { return writeValueIfPossible(&IConverter::writeUInt16, value); }
61 bool writeNumber(Int32 value) { return writeValueIfPossible(&IConverter::writeInt32, value); }
62 bool writeNumber(UInt32 value) { return writeValueIfPossible(&IConverter::writeUInt32, value); }
63 bool writeNumber(Int64 value) { return writeValueIfPossible(&IConverter::writeInt64, value); }
64 bool writeNumber(UInt64 value) { return writeValueIfPossible(&IConverter::writeUInt64, value); }
65 bool writeNumber(UInt128 value) { return writeValueIfPossible(&IConverter::writeUInt128, value); }
66 bool writeNumber(Float32 value) { return writeValueIfPossible(&IConverter::writeFloat32, value); }
67 bool writeNumber(Float64 value) { return writeValueIfPossible(&IConverter::writeFloat64, value); }
68 bool writeString(const StringRef & str) { return writeValueIfPossible(&IConverter::writeString, str); }
69 void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & enum_values) { current_converter->prepareEnumMapping8(enum_values); }
70 void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & enum_values) { current_converter->prepareEnumMapping16(enum_values); }
71 bool writeEnum(Int8 value) { return writeValueIfPossible(&IConverter::writeEnum8, value); }
72 bool writeEnum(Int16 value) { return writeValueIfPossible(&IConverter::writeEnum16, value); }
73 bool writeUUID(const UUID & uuid) { return writeValueIfPossible(&IConverter::writeUUID, uuid); }
74 bool writeDate(DayNum date) { return writeValueIfPossible(&IConverter::writeDate, date); }
75 bool writeDateTime(time_t tm) { return writeValueIfPossible(&IConverter::writeDateTime, tm); }
76 bool writeDateTime64(DateTime64 tm, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDateTime64, tm, scale); }
77 bool writeDecimal(Decimal32 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal32, decimal, scale); }
78 bool writeDecimal(Decimal64 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal64, decimal, scale); }
79 bool writeDecimal(const Decimal128 & decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal128, decimal, scale); }
80 bool writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) { return writeValueIfPossible(&IConverter::writeAggregateFunction, function, place); }
81
82private:
83 class SimpleWriter
84 {
85 public:
86 SimpleWriter(WriteBuffer & out_);
87 ~SimpleWriter();
88
89 void startMessage();
90 void endMessage();
91
92 void startNestedMessage();
93 void endNestedMessage(UInt32 field_number, bool is_group, bool skip_if_empty);
94
95 void writeInt(UInt32 field_number, Int64 value);
96 void writeUInt(UInt32 field_number, UInt64 value);
97 void writeSInt(UInt32 field_number, Int64 value);
98 template <typename T>
99 void writeFixed(UInt32 field_number, T value);
100 void writeString(UInt32 field_number, const StringRef & str);
101
102 void startRepeatedPack();
103 void addIntToRepeatedPack(Int64 value);
104 void addUIntToRepeatedPack(UInt64 value);
105 void addSIntToRepeatedPack(Int64 value);
106 template <typename T>
107 void addFixedToRepeatedPack(T value);
108 void endRepeatedPack(UInt32 field_number);
109
110 private:
111 struct Piece
112 {
113 size_t start;
114 size_t end;
115 Piece(size_t start_, size_t end_) : start(start_), end(end_) {}
116 Piece() = default;
117 };
118
119 struct NestedInfo
120 {
121 size_t num_pieces_at_start;
122 size_t num_bytes_skipped_at_start;
123 NestedInfo(size_t num_pieces_at_start_, size_t num_bytes_skipped_at_start_)
124 : num_pieces_at_start(num_pieces_at_start_), num_bytes_skipped_at_start(num_bytes_skipped_at_start_)
125 {
126 }
127 };
128
129 WriteBuffer & out;
130 PODArray<UInt8> buffer;
131 std::vector<Piece> pieces;
132 size_t current_piece_start;
133 size_t num_bytes_skipped;
134 std::vector<NestedInfo> nested_infos;
135 };
136
137 class IConverter
138 {
139 public:
140 virtual ~IConverter() = default;
141 virtual void writeString(const StringRef &) = 0;
142 virtual void writeInt8(Int8) = 0;
143 virtual void writeUInt8(UInt8) = 0;
144 virtual void writeInt16(Int16) = 0;
145 virtual void writeUInt16(UInt16) = 0;
146 virtual void writeInt32(Int32) = 0;
147 virtual void writeUInt32(UInt32) = 0;
148 virtual void writeInt64(Int64) = 0;
149 virtual void writeUInt64(UInt64) = 0;
150 virtual void writeUInt128(const UInt128 &) = 0;
151 virtual void writeFloat32(Float32) = 0;
152 virtual void writeFloat64(Float64) = 0;
153 virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0;
154 virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0;
155 virtual void writeEnum8(Int8) = 0;
156 virtual void writeEnum16(Int16) = 0;
157 virtual void writeUUID(const UUID &) = 0;
158 virtual void writeDate(DayNum) = 0;
159 virtual void writeDateTime(time_t) = 0;
160 virtual void writeDateTime64(DateTime64, UInt32 scale) = 0;
161 virtual void writeDecimal32(Decimal32, UInt32) = 0;
162 virtual void writeDecimal64(Decimal64, UInt32) = 0;
163 virtual void writeDecimal128(const Decimal128 &, UInt32) = 0;
164 virtual void writeAggregateFunction(const AggregateFunctionPtr &, ConstAggregateDataPtr) = 0;
165 };
166
167 class ConverterBaseImpl;
168 template <bool skip_null_value>
169 class ConverterToString;
170 template <int field_type_id, typename ToType, bool skip_null_value, bool pack_repeated>
171 class ConverterToNumber;
172 template <bool skip_null_value, bool pack_repeated>
173 class ConverterToBool;
174 template <bool skip_null_value, bool pack_repeated>
175 class ConverterToEnum;
176
177 struct ColumnMatcherTraits
178 {
179 struct FieldData
180 {
181 std::unique_ptr<IConverter> converter;
182 bool is_required;
183 bool is_repeatable;
184 bool should_pack_repeated;
185 ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message;
186 };
187 struct MessageData
188 {
189 UInt32 parent_field_number;
190 bool is_group;
191 bool is_required;
192 ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message;
193 bool need_repeat;
194 };
195 };
196 using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>;
197 using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>;
198
199 void setTraitsDataAfterMatchingColumns(Message * message);
200
201 template <int field_type_id>
202 std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field);
203
204 template <typename... Params>
205 using WriteValueFunctionPtr = void (IConverter::*)(Params...);
206
207 template <typename... Params, typename... Args>
208 bool writeValueIfPossible(WriteValueFunctionPtr<Params...> func, Args &&... args)
209 {
210 if (num_values && !current_field->data.is_repeatable)
211 {
212 setNestedMessageNeedsRepeat();
213 return false;
214 }
215 (current_converter->*func)(std::forward<Args>(args)...);
216 ++num_values;
217 return true;
218 }
219
220 void setNestedMessageNeedsRepeat();
221 void endWritingField();
222
223 SimpleWriter simple_writer;
224 std::unique_ptr<Message> root_message;
225
226 Message * current_message;
227 size_t current_field_index = 0;
228 const Field * current_field = nullptr;
229 IConverter * current_converter = nullptr;
230 size_t num_values = 0;
231};
232
233}
234
235#else
236
237namespace DB
238{
239class IAggregateFunction;
240using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
241using ConstAggregateDataPtr = const char *;
242
243class ProtobufWriter
244{
245public:
246 bool writeNumber(Int8 /* value */) { return false; }
247 bool writeNumber(UInt8 /* value */) { return false; }
248 bool writeNumber(Int16 /* value */) { return false; }
249 bool writeNumber(UInt16 /* value */) { return false; }
250 bool writeNumber(Int32 /* value */) { return false; }
251 bool writeNumber(UInt32 /* value */) { return false; }
252 bool writeNumber(Int64 /* value */) { return false; }
253 bool writeNumber(UInt64 /* value */) { return false; }
254 bool writeNumber(UInt128 /* value */) { return false; }
255 bool writeNumber(Float32 /* value */) { return false; }
256 bool writeNumber(Float64 /* value */) { return false; }
257 bool writeString(const StringRef & /* value */) { return false; }
258 void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & /* name_value_pairs */) {}
259 void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & /* name_value_pairs */) {}
260 bool writeEnum(Int8 /* value */) { return false; }
261 bool writeEnum(Int16 /* value */) { return false; }
262 bool writeUUID(const UUID & /* value */) { return false; }
263 bool writeDate(DayNum /* date */) { return false; }
264 bool writeDateTime(time_t /* tm */) { return false; }
265 bool writeDateTime64(DateTime64 /*tm*/, UInt32 /*scale*/) { return false; }
266 bool writeDecimal(Decimal32 /* decimal */, UInt32 /* scale */) { return false; }
267 bool writeDecimal(Decimal64 /* decimal */, UInt32 /* scale */) { return false; }
268 bool writeDecimal(const Decimal128 & /* decimal */, UInt32 /* scale */) { return false; }
269 bool writeAggregateFunction(const AggregateFunctionPtr & /* function */, ConstAggregateDataPtr /* place */) { return false; }
270};
271
272}
273#endif
274