1#include <IO/ReadBuffer.h>
2#include <IO/WriteBuffer.h>
3#include <IO/ReadHelpers.h>
4#include <IO/WriteHelpers.h>
5
6#include <Core/Field.h>
7#include <Core/DecimalComparison.h>
8#include <Common/FieldVisitors.h>
9
10
11namespace DB
12{
13 void readBinary(Array & x, ReadBuffer & buf)
14 {
15 size_t size;
16 UInt8 type;
17 DB::readBinary(type, buf);
18 DB::readBinary(size, buf);
19
20 for (size_t index = 0; index < size; ++index)
21 {
22 switch (type)
23 {
24 case Field::Types::Null:
25 {
26 x.push_back(DB::Field());
27 break;
28 }
29 case Field::Types::UInt64:
30 {
31 UInt64 value;
32 DB::readVarUInt(value, buf);
33 x.push_back(value);
34 break;
35 }
36 case Field::Types::UInt128:
37 {
38 UInt128 value;
39 DB::readBinary(value, buf);
40 x.push_back(value);
41 break;
42 }
43 case Field::Types::Int64:
44 {
45 Int64 value;
46 DB::readVarInt(value, buf);
47 x.push_back(value);
48 break;
49 }
50 case Field::Types::Float64:
51 {
52 Float64 value;
53 DB::readFloatBinary(value, buf);
54 x.push_back(value);
55 break;
56 }
57 case Field::Types::String:
58 {
59 std::string value;
60 DB::readStringBinary(value, buf);
61 x.push_back(value);
62 break;
63 }
64 case Field::Types::Array:
65 {
66 Array value;
67 DB::readBinary(value, buf);
68 x.push_back(value);
69 break;
70 }
71 case Field::Types::Tuple:
72 {
73 Tuple value;
74 DB::readBinary(value, buf);
75 x.push_back(value);
76 break;
77 }
78 case Field::Types::AggregateFunctionState:
79 {
80 AggregateFunctionStateData value;
81 DB::readStringBinary(value.name, buf);
82 DB::readStringBinary(value.data, buf);
83 x.push_back(value);
84 break;
85 }
86 }
87 }
88 }
89
90 void writeBinary(const Array & x, WriteBuffer & buf)
91 {
92 UInt8 type = Field::Types::Null;
93 size_t size = x.size();
94 if (size)
95 type = x.front().getType();
96 DB::writeBinary(type, buf);
97 DB::writeBinary(size, buf);
98
99 for (Array::const_iterator it = x.begin(); it != x.end(); ++it)
100 {
101 switch (type)
102 {
103 case Field::Types::Null: break;
104 case Field::Types::UInt64:
105 {
106 DB::writeVarUInt(get<UInt64>(*it), buf);
107 break;
108 }
109 case Field::Types::UInt128:
110 {
111 DB::writeBinary(get<UInt128>(*it), buf);
112 break;
113 }
114 case Field::Types::Int64:
115 {
116 DB::writeVarInt(get<Int64>(*it), buf);
117 break;
118 }
119 case Field::Types::Float64:
120 {
121 DB::writeFloatBinary(get<Float64>(*it), buf);
122 break;
123 }
124 case Field::Types::String:
125 {
126 DB::writeStringBinary(get<std::string>(*it), buf);
127 break;
128 }
129 case Field::Types::Array:
130 {
131 DB::writeBinary(get<Array>(*it), buf);
132 break;
133 }
134 case Field::Types::Tuple:
135 {
136 DB::writeBinary(get<Tuple>(*it), buf);
137 break;
138 }
139 case Field::Types::AggregateFunctionState:
140 {
141 DB::writeStringBinary(it->get<AggregateFunctionStateData>().name, buf);
142 DB::writeStringBinary(it->get<AggregateFunctionStateData>().data, buf);
143 break;
144 }
145 }
146 }
147 }
148
149 void writeText(const Array & x, WriteBuffer & buf)
150 {
151 DB::String res = applyVisitor(DB::FieldVisitorToString(), DB::Field(x));
152 buf.write(res.data(), res.size());
153 }
154
155 void readBinary(Tuple & x, ReadBuffer & buf)
156 {
157 size_t size;
158 DB::readBinary(size, buf);
159
160 for (size_t index = 0; index < size; ++index)
161 {
162 UInt8 type;
163 DB::readBinary(type, buf);
164
165 switch (type)
166 {
167 case Field::Types::Null:
168 {
169 x.push_back(DB::Field());
170 break;
171 }
172 case Field::Types::UInt64:
173 {
174 UInt64 value;
175 DB::readVarUInt(value, buf);
176 x.push_back(value);
177 break;
178 }
179 case Field::Types::UInt128:
180 {
181 UInt128 value;
182 DB::readBinary(value, buf);
183 x.push_back(value);
184 break;
185 }
186 case Field::Types::Int64:
187 {
188 Int64 value;
189 DB::readVarInt(value, buf);
190 x.push_back(value);
191 break;
192 }
193 case Field::Types::Float64:
194 {
195 Float64 value;
196 DB::readFloatBinary(value, buf);
197 x.push_back(value);
198 break;
199 }
200 case Field::Types::String:
201 {
202 std::string value;
203 DB::readStringBinary(value, buf);
204 x.push_back(value);
205 break;
206 }
207 case Field::Types::Array:
208 {
209 Array value;
210 DB::readBinary(value, buf);
211 x.push_back(value);
212 break;
213 }
214 case Field::Types::Tuple:
215 {
216 Tuple value;
217 DB::readBinary(value, buf);
218 x.push_back(value);
219 break;
220 }
221 case Field::Types::AggregateFunctionState:
222 {
223 AggregateFunctionStateData value;
224 DB::readStringBinary(value.name, buf);
225 DB::readStringBinary(value.data, buf);
226 x.push_back(value);
227 break;
228 }
229 }
230 }
231 }
232
233 void writeBinary(const Tuple & x, WriteBuffer & buf)
234 {
235 const size_t size = x.size();
236 DB::writeBinary(size, buf);
237
238 for (auto it = x.begin(); it != x.end(); ++it)
239 {
240 const UInt8 type = it->getType();
241 DB::writeBinary(type, buf);
242
243 switch (type)
244 {
245 case Field::Types::Null: break;
246 case Field::Types::UInt64:
247 {
248 DB::writeVarUInt(get<UInt64>(*it), buf);
249 break;
250 }
251 case Field::Types::UInt128:
252 {
253 DB::writeBinary(get<UInt128>(*it), buf);
254 break;
255 }
256 case Field::Types::Int64:
257 {
258 DB::writeVarInt(get<Int64>(*it), buf);
259 break;
260 }
261 case Field::Types::Float64:
262 {
263 DB::writeFloatBinary(get<Float64>(*it), buf);
264 break;
265 }
266 case Field::Types::String:
267 {
268 DB::writeStringBinary(get<std::string>(*it), buf);
269 break;
270 }
271 case Field::Types::Array:
272 {
273 DB::writeBinary(get<Array>(*it), buf);
274 break;
275 }
276 case Field::Types::Tuple:
277 {
278 DB::writeBinary(get<Tuple>(*it), buf);
279 break;
280 }
281 case Field::Types::AggregateFunctionState:
282 {
283 DB::writeStringBinary(it->get<AggregateFunctionStateData>().name, buf);
284 DB::writeStringBinary(it->get<AggregateFunctionStateData>().data, buf);
285 break;
286 }
287 }
288 }
289 }
290
291 void writeText(const Tuple & x, WriteBuffer & buf)
292 {
293 writeFieldText(DB::Field(x), buf);
294 }
295
296 void writeFieldText(const Field & x, WriteBuffer & buf)
297 {
298 DB::String res = Field::dispatch(DB::FieldVisitorToString(), x);
299 buf.write(res.data(), res.size());
300 }
301
302
303 template <typename T>
304 static bool decEqual(T x, T y, UInt32 x_scale, UInt32 y_scale)
305 {
306 using Comparator = DecimalComparison<T, T, EqualsOp>;
307 return Comparator::compare(x, y, x_scale, y_scale);
308 }
309
310 template <typename T>
311 static bool decLess(T x, T y, UInt32 x_scale, UInt32 y_scale)
312 {
313 using Comparator = DecimalComparison<T, T, LessOp>;
314 return Comparator::compare(x, y, x_scale, y_scale);
315 }
316
317 template <typename T>
318 static bool decLessOrEqual(T x, T y, UInt32 x_scale, UInt32 y_scale)
319 {
320 using Comparator = DecimalComparison<T, T, LessOrEqualsOp>;
321 return Comparator::compare(x, y, x_scale, y_scale);
322 }
323
324 template <> bool decimalEqual(Decimal32 x, Decimal32 y, UInt32 xs, UInt32 ys) { return decEqual(x, y, xs, ys); }
325 template <> bool decimalLess(Decimal32 x, Decimal32 y, UInt32 xs, UInt32 ys) { return decLess(x, y, xs, ys); }
326 template <> bool decimalLessOrEqual(Decimal32 x, Decimal32 y, UInt32 xs, UInt32 ys) { return decLessOrEqual(x, y, xs, ys); }
327
328 template <> bool decimalEqual(Decimal64 x, Decimal64 y, UInt32 xs, UInt32 ys) { return decEqual(x, y, xs, ys); }
329 template <> bool decimalLess(Decimal64 x, Decimal64 y, UInt32 xs, UInt32 ys) { return decLess(x, y, xs, ys); }
330 template <> bool decimalLessOrEqual(Decimal64 x, Decimal64 y, UInt32 xs, UInt32 ys) { return decLessOrEqual(x, y, xs, ys); }
331
332 template <> bool decimalEqual(Decimal128 x, Decimal128 y, UInt32 xs, UInt32 ys) { return decEqual(x, y, xs, ys); }
333 template <> bool decimalLess(Decimal128 x, Decimal128 y, UInt32 xs, UInt32 ys) { return decLess(x, y, xs, ys); }
334 template <> bool decimalLessOrEqual(Decimal128 x, Decimal128 y, UInt32 xs, UInt32 ys) { return decLessOrEqual(x, y, xs, ys); }
335}
336