| 1 | #pragma once |
| 2 | |
| 3 | #include <Core/UUID.h> |
| 4 | #include <Common/UInt128.h> |
| 5 | #include <common/DayNum.h> |
| 6 | |
| 7 | #include "config_formats.h" |
| 8 | |
| 9 | #include <memory> |
| 10 | |
| 11 | #if USE_PROTOBUF |
| 12 | |
| 13 | #include "ProtobufColumnMatcher.h" |
| 14 | #include <IO/WriteBufferFromString.h> |
| 15 | #include <boost/noncopyable.hpp> |
| 16 | #include <Common/PODArray.h> |
| 17 | |
| 18 | |
| 19 | namespace google |
| 20 | { |
| 21 | namespace protobuf |
| 22 | { |
| 23 | class Descriptor; |
| 24 | class FieldDescriptor; |
| 25 | } |
| 26 | } |
| 27 | |
| 28 | namespace DB |
| 29 | { |
| 30 | class IAggregateFunction; |
| 31 | using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; |
| 32 | using ConstAggregateDataPtr = const char *; |
| 33 | |
| 34 | |
| 35 | /** Serializes a protobuf, tries to cast types if necessarily. |
| 36 | */ |
| 37 | class ProtobufWriter : private boost::noncopyable |
| 38 | { |
| 39 | public: |
| 40 | ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names); |
| 41 | ~ProtobufWriter(); |
| 42 | |
| 43 | /// Should be called at the beginning of writing a message. |
| 44 | void startMessage(); |
| 45 | |
| 46 | /// Should be called at the end of writing a message. |
| 47 | void endMessage(); |
| 48 | |
| 49 | /// Prepares for writing values of a field. |
| 50 | /// Returns true and sets 'column_index' to the corresponding column's index. |
| 51 | /// Returns false if there are no more fields to write in the message type (call endMessage() in this case). |
| 52 | bool writeField(size_t & column_index); |
| 53 | |
| 54 | /// Writes a value. This function should be called one or multiple times after writeField(). |
| 55 | /// Returns false if there are no more place for the values in the protobuf's field. |
| 56 | /// This can happen if the protobuf's field is not declared as repeated in the protobuf schema. |
| 57 | bool writeNumber(Int8 value) { return writeValueIfPossible(&IConverter::writeInt8, value); } |
| 58 | bool writeNumber(UInt8 value) { return writeValueIfPossible(&IConverter::writeUInt8, value); } |
| 59 | bool writeNumber(Int16 value) { return writeValueIfPossible(&IConverter::writeInt16, value); } |
| 60 | bool writeNumber(UInt16 value) { return writeValueIfPossible(&IConverter::writeUInt16, value); } |
| 61 | bool writeNumber(Int32 value) { return writeValueIfPossible(&IConverter::writeInt32, value); } |
| 62 | bool writeNumber(UInt32 value) { return writeValueIfPossible(&IConverter::writeUInt32, value); } |
| 63 | bool writeNumber(Int64 value) { return writeValueIfPossible(&IConverter::writeInt64, value); } |
| 64 | bool writeNumber(UInt64 value) { return writeValueIfPossible(&IConverter::writeUInt64, value); } |
| 65 | bool writeNumber(UInt128 value) { return writeValueIfPossible(&IConverter::writeUInt128, value); } |
| 66 | bool writeNumber(Float32 value) { return writeValueIfPossible(&IConverter::writeFloat32, value); } |
| 67 | bool writeNumber(Float64 value) { return writeValueIfPossible(&IConverter::writeFloat64, value); } |
| 68 | bool writeString(const StringRef & str) { return writeValueIfPossible(&IConverter::writeString, str); } |
| 69 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & enum_values) { current_converter->prepareEnumMapping8(enum_values); } |
| 70 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & enum_values) { current_converter->prepareEnumMapping16(enum_values); } |
| 71 | bool writeEnum(Int8 value) { return writeValueIfPossible(&IConverter::writeEnum8, value); } |
| 72 | bool writeEnum(Int16 value) { return writeValueIfPossible(&IConverter::writeEnum16, value); } |
| 73 | bool writeUUID(const UUID & uuid) { return writeValueIfPossible(&IConverter::writeUUID, uuid); } |
| 74 | bool writeDate(DayNum date) { return writeValueIfPossible(&IConverter::writeDate, date); } |
| 75 | bool writeDateTime(time_t tm) { return writeValueIfPossible(&IConverter::writeDateTime, tm); } |
| 76 | bool writeDateTime64(DateTime64 tm, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDateTime64, tm, scale); } |
| 77 | bool writeDecimal(Decimal32 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal32, decimal, scale); } |
| 78 | bool writeDecimal(Decimal64 decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal64, decimal, scale); } |
| 79 | bool writeDecimal(const Decimal128 & decimal, UInt32 scale) { return writeValueIfPossible(&IConverter::writeDecimal128, decimal, scale); } |
| 80 | bool writeAggregateFunction(const AggregateFunctionPtr & function, ConstAggregateDataPtr place) { return writeValueIfPossible(&IConverter::writeAggregateFunction, function, place); } |
| 81 | |
| 82 | private: |
| 83 | class SimpleWriter |
| 84 | { |
| 85 | public: |
| 86 | SimpleWriter(WriteBuffer & out_); |
| 87 | ~SimpleWriter(); |
| 88 | |
| 89 | void startMessage(); |
| 90 | void endMessage(); |
| 91 | |
| 92 | void startNestedMessage(); |
| 93 | void endNestedMessage(UInt32 field_number, bool is_group, bool skip_if_empty); |
| 94 | |
| 95 | void writeInt(UInt32 field_number, Int64 value); |
| 96 | void writeUInt(UInt32 field_number, UInt64 value); |
| 97 | void writeSInt(UInt32 field_number, Int64 value); |
| 98 | template <typename T> |
| 99 | void writeFixed(UInt32 field_number, T value); |
| 100 | void writeString(UInt32 field_number, const StringRef & str); |
| 101 | |
| 102 | void startRepeatedPack(); |
| 103 | void addIntToRepeatedPack(Int64 value); |
| 104 | void addUIntToRepeatedPack(UInt64 value); |
| 105 | void addSIntToRepeatedPack(Int64 value); |
| 106 | template <typename T> |
| 107 | void addFixedToRepeatedPack(T value); |
| 108 | void endRepeatedPack(UInt32 field_number); |
| 109 | |
| 110 | private: |
| 111 | struct Piece |
| 112 | { |
| 113 | size_t start; |
| 114 | size_t end; |
| 115 | Piece(size_t start_, size_t end_) : start(start_), end(end_) {} |
| 116 | Piece() = default; |
| 117 | }; |
| 118 | |
| 119 | struct NestedInfo |
| 120 | { |
| 121 | size_t num_pieces_at_start; |
| 122 | size_t num_bytes_skipped_at_start; |
| 123 | NestedInfo(size_t num_pieces_at_start_, size_t num_bytes_skipped_at_start_) |
| 124 | : num_pieces_at_start(num_pieces_at_start_), num_bytes_skipped_at_start(num_bytes_skipped_at_start_) |
| 125 | { |
| 126 | } |
| 127 | }; |
| 128 | |
| 129 | WriteBuffer & out; |
| 130 | PODArray<UInt8> buffer; |
| 131 | std::vector<Piece> pieces; |
| 132 | size_t current_piece_start; |
| 133 | size_t num_bytes_skipped; |
| 134 | std::vector<NestedInfo> nested_infos; |
| 135 | }; |
| 136 | |
| 137 | class IConverter |
| 138 | { |
| 139 | public: |
| 140 | virtual ~IConverter() = default; |
| 141 | virtual void writeString(const StringRef &) = 0; |
| 142 | virtual void writeInt8(Int8) = 0; |
| 143 | virtual void writeUInt8(UInt8) = 0; |
| 144 | virtual void writeInt16(Int16) = 0; |
| 145 | virtual void writeUInt16(UInt16) = 0; |
| 146 | virtual void writeInt32(Int32) = 0; |
| 147 | virtual void writeUInt32(UInt32) = 0; |
| 148 | virtual void writeInt64(Int64) = 0; |
| 149 | virtual void writeUInt64(UInt64) = 0; |
| 150 | virtual void writeUInt128(const UInt128 &) = 0; |
| 151 | virtual void writeFloat32(Float32) = 0; |
| 152 | virtual void writeFloat64(Float64) = 0; |
| 153 | virtual void prepareEnumMapping8(const std::vector<std::pair<std::string, Int8>> &) = 0; |
| 154 | virtual void prepareEnumMapping16(const std::vector<std::pair<std::string, Int16>> &) = 0; |
| 155 | virtual void writeEnum8(Int8) = 0; |
| 156 | virtual void writeEnum16(Int16) = 0; |
| 157 | virtual void writeUUID(const UUID &) = 0; |
| 158 | virtual void writeDate(DayNum) = 0; |
| 159 | virtual void writeDateTime(time_t) = 0; |
| 160 | virtual void writeDateTime64(DateTime64, UInt32 scale) = 0; |
| 161 | virtual void writeDecimal32(Decimal32, UInt32) = 0; |
| 162 | virtual void writeDecimal64(Decimal64, UInt32) = 0; |
| 163 | virtual void writeDecimal128(const Decimal128 &, UInt32) = 0; |
| 164 | virtual void writeAggregateFunction(const AggregateFunctionPtr &, ConstAggregateDataPtr) = 0; |
| 165 | }; |
| 166 | |
| 167 | class ConverterBaseImpl; |
| 168 | template <bool skip_null_value> |
| 169 | class ConverterToString; |
| 170 | template <int field_type_id, typename ToType, bool skip_null_value, bool pack_repeated> |
| 171 | class ConverterToNumber; |
| 172 | template <bool skip_null_value, bool pack_repeated> |
| 173 | class ConverterToBool; |
| 174 | template <bool skip_null_value, bool pack_repeated> |
| 175 | class ConverterToEnum; |
| 176 | |
| 177 | struct ColumnMatcherTraits |
| 178 | { |
| 179 | struct FieldData |
| 180 | { |
| 181 | std::unique_ptr<IConverter> converter; |
| 182 | bool is_required; |
| 183 | bool is_repeatable; |
| 184 | bool should_pack_repeated; |
| 185 | ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message; |
| 186 | }; |
| 187 | struct MessageData |
| 188 | { |
| 189 | UInt32 parent_field_number; |
| 190 | bool is_group; |
| 191 | bool is_required; |
| 192 | ProtobufColumnMatcher::Message<ColumnMatcherTraits> * repeatable_container_message; |
| 193 | bool need_repeat; |
| 194 | }; |
| 195 | }; |
| 196 | using Message = ProtobufColumnMatcher::Message<ColumnMatcherTraits>; |
| 197 | using Field = ProtobufColumnMatcher::Field<ColumnMatcherTraits>; |
| 198 | |
| 199 | void setTraitsDataAfterMatchingColumns(Message * message); |
| 200 | |
| 201 | template <int field_type_id> |
| 202 | std::unique_ptr<IConverter> createConverter(const google::protobuf::FieldDescriptor * field); |
| 203 | |
| 204 | template <typename... Params> |
| 205 | using WriteValueFunctionPtr = void (IConverter::*)(Params...); |
| 206 | |
| 207 | template <typename... Params, typename... Args> |
| 208 | bool writeValueIfPossible(WriteValueFunctionPtr<Params...> func, Args &&... args) |
| 209 | { |
| 210 | if (num_values && !current_field->data.is_repeatable) |
| 211 | { |
| 212 | setNestedMessageNeedsRepeat(); |
| 213 | return false; |
| 214 | } |
| 215 | (current_converter->*func)(std::forward<Args>(args)...); |
| 216 | ++num_values; |
| 217 | return true; |
| 218 | } |
| 219 | |
| 220 | void setNestedMessageNeedsRepeat(); |
| 221 | void endWritingField(); |
| 222 | |
| 223 | SimpleWriter simple_writer; |
| 224 | std::unique_ptr<Message> root_message; |
| 225 | |
| 226 | Message * current_message; |
| 227 | size_t current_field_index = 0; |
| 228 | const Field * current_field = nullptr; |
| 229 | IConverter * current_converter = nullptr; |
| 230 | size_t num_values = 0; |
| 231 | }; |
| 232 | |
| 233 | } |
| 234 | |
| 235 | #else |
| 236 | |
| 237 | namespace DB |
| 238 | { |
| 239 | class IAggregateFunction; |
| 240 | using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; |
| 241 | using ConstAggregateDataPtr = const char *; |
| 242 | |
| 243 | class ProtobufWriter |
| 244 | { |
| 245 | public: |
| 246 | bool writeNumber(Int8 /* value */) { return false; } |
| 247 | bool writeNumber(UInt8 /* value */) { return false; } |
| 248 | bool writeNumber(Int16 /* value */) { return false; } |
| 249 | bool writeNumber(UInt16 /* value */) { return false; } |
| 250 | bool writeNumber(Int32 /* value */) { return false; } |
| 251 | bool writeNumber(UInt32 /* value */) { return false; } |
| 252 | bool writeNumber(Int64 /* value */) { return false; } |
| 253 | bool writeNumber(UInt64 /* value */) { return false; } |
| 254 | bool writeNumber(UInt128 /* value */) { return false; } |
| 255 | bool writeNumber(Float32 /* value */) { return false; } |
| 256 | bool writeNumber(Float64 /* value */) { return false; } |
| 257 | bool writeString(const StringRef & /* value */) { return false; } |
| 258 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int8>> & /* name_value_pairs */) {} |
| 259 | void prepareEnumMapping(const std::vector<std::pair<std::string, Int16>> & /* name_value_pairs */) {} |
| 260 | bool writeEnum(Int8 /* value */) { return false; } |
| 261 | bool writeEnum(Int16 /* value */) { return false; } |
| 262 | bool writeUUID(const UUID & /* value */) { return false; } |
| 263 | bool writeDate(DayNum /* date */) { return false; } |
| 264 | bool writeDateTime(time_t /* tm */) { return false; } |
| 265 | bool writeDateTime64(DateTime64 /*tm*/, UInt32 /*scale*/) { return false; } |
| 266 | bool writeDecimal(Decimal32 /* decimal */, UInt32 /* scale */) { return false; } |
| 267 | bool writeDecimal(Decimal64 /* decimal */, UInt32 /* scale */) { return false; } |
| 268 | bool writeDecimal(const Decimal128 & /* decimal */, UInt32 /* scale */) { return false; } |
| 269 | bool writeAggregateFunction(const AggregateFunctionPtr & /* function */, ConstAggregateDataPtr /* place */) { return false; } |
| 270 | }; |
| 271 | |
| 272 | } |
| 273 | #endif |
| 274 | |