1#pragma once
2
3#include <IO/WriteHelpers.h>
4#include <IO/ReadHelpers.h>
5
6#include <Columns/ColumnVector.h>
7#include <Columns/ColumnDecimal.h>
8#include <Columns/ColumnString.h>
9#include <DataTypes/IDataType.h>
10#include <common/StringRef.h>
11#include <Common/assert_cast.h>
12
13#include <AggregateFunctions/IAggregateFunction.h>
14
15
16namespace DB
17{
18
19/** Aggregate functions that store one of passed values.
20 * For example: min, max, any, anyLast.
21 */
22
23
24/// For numeric values.
25template <typename T>
26struct SingleValueDataFixed
27{
28private:
29 using Self = SingleValueDataFixed;
30 using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
31
32 bool has_value = false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf.
33 T value;
34
35public:
36 bool has() const
37 {
38 return has_value;
39 }
40
41 void insertResultInto(IColumn & to) const
42 {
43 if (has())
44 assert_cast<ColVecType &>(to).getData().push_back(value);
45 else
46 assert_cast<ColVecType &>(to).insertDefault();
47 }
48
49 void write(WriteBuffer & buf, const IDataType & /*data_type*/) const
50 {
51 writeBinary(has(), buf);
52 if (has())
53 writeBinary(value, buf);
54 }
55
56 void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena *)
57 {
58 readBinary(has_value, buf);
59 if (has())
60 readBinary(value, buf);
61 }
62
63
64 void change(const IColumn & column, size_t row_num, Arena *)
65 {
66 has_value = true;
67 value = assert_cast<const ColVecType &>(column).getData()[row_num];
68 }
69
70 /// Assuming to.has()
71 void change(const Self & to, Arena *)
72 {
73 has_value = true;
74 value = to.value;
75 }
76
77 bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena)
78 {
79 if (!has())
80 {
81 change(column, row_num, arena);
82 return true;
83 }
84 else
85 return false;
86 }
87
88 bool changeFirstTime(const Self & to, Arena * arena)
89 {
90 if (!has() && to.has())
91 {
92 change(to, arena);
93 return true;
94 }
95 else
96 return false;
97 }
98
99 bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena)
100 {
101 change(column, row_num, arena);
102 return true;
103 }
104
105 bool changeEveryTime(const Self & to, Arena * arena)
106 {
107 if (to.has())
108 {
109 change(to, arena);
110 return true;
111 }
112 else
113 return false;
114 }
115
116 bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
117 {
118 if (!has() || assert_cast<const ColVecType &>(column).getData()[row_num] < value)
119 {
120 change(column, row_num, arena);
121 return true;
122 }
123 else
124 return false;
125 }
126
127 bool changeIfLess(const Self & to, Arena * arena)
128 {
129 if (to.has() && (!has() || to.value < value))
130 {
131 change(to, arena);
132 return true;
133 }
134 else
135 return false;
136 }
137
138 bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
139 {
140 if (!has() || assert_cast<const ColVecType &>(column).getData()[row_num] > value)
141 {
142 change(column, row_num, arena);
143 return true;
144 }
145 else
146 return false;
147 }
148
149 bool changeIfGreater(const Self & to, Arena * arena)
150 {
151 if (to.has() && (!has() || to.value > value))
152 {
153 change(to, arena);
154 return true;
155 }
156 else
157 return false;
158 }
159
160 bool isEqualTo(const Self & to) const
161 {
162 return has() && to.value == value;
163 }
164
165 bool isEqualTo(const IColumn & column, size_t row_num) const
166 {
167 return has() && assert_cast<const ColVecType &>(column).getData()[row_num] == value;
168 }
169
170 static bool allocatesMemoryInArena()
171 {
172 return false;
173 }
174};
175
176
177/** For strings. Short strings are stored in the object itself, and long strings are allocated separately.
178 * NOTE It could also be suitable for arrays of numbers.
179 */
180struct SingleValueDataString
181{
182private:
183 using Self = SingleValueDataString;
184
185 Int32 size = -1; /// -1 indicates that there is no value.
186 Int32 capacity = 0; /// power of two or zero
187 char * large_data;
188
189public:
190 static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64;
191 static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size) - sizeof(capacity) - sizeof(large_data);
192
193private:
194 char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
195
196public:
197 bool has() const
198 {
199 return size >= 0;
200 }
201
202 const char * getData() const
203 {
204 return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data;
205 }
206
207 StringRef getStringRef() const
208 {
209 return StringRef(getData(), size);
210 }
211
212 void insertResultInto(IColumn & to) const
213 {
214 if (has())
215 assert_cast<ColumnString &>(to).insertDataWithTerminatingZero(getData(), size);
216 else
217 assert_cast<ColumnString &>(to).insertDefault();
218 }
219
220 void write(WriteBuffer & buf, const IDataType & /*data_type*/) const
221 {
222 writeBinary(size, buf);
223 if (has())
224 buf.write(getData(), size);
225 }
226
227 void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena * arena)
228 {
229 Int32 rhs_size;
230 readBinary(rhs_size, buf);
231
232 if (rhs_size >= 0)
233 {
234 if (rhs_size <= MAX_SMALL_STRING_SIZE)
235 {
236 /// Don't free large_data here.
237
238 size = rhs_size;
239
240 if (size > 0)
241 buf.read(small_data, size);
242 }
243 else
244 {
245 if (capacity < rhs_size)
246 {
247 capacity = static_cast<UInt32>(roundUpToPowerOfTwoOrZero(rhs_size));
248 /// Don't free large_data here.
249 large_data = arena->alloc(capacity);
250 }
251
252 size = rhs_size;
253 buf.read(large_data, size);
254 }
255 }
256 else
257 {
258 /// Don't free large_data here.
259 size = rhs_size;
260 }
261 }
262
263 /// Assuming to.has()
264 void changeImpl(StringRef value, Arena * arena)
265 {
266 Int32 value_size = value.size;
267
268 if (value_size <= MAX_SMALL_STRING_SIZE)
269 {
270 /// Don't free large_data here.
271 size = value_size;
272
273 if (size > 0)
274 memcpy(small_data, value.data, size);
275 }
276 else
277 {
278 if (capacity < value_size)
279 {
280 /// Don't free large_data here.
281 capacity = roundUpToPowerOfTwoOrZero(value_size);
282 large_data = arena->alloc(capacity);
283 }
284
285 size = value_size;
286 memcpy(large_data, value.data, size);
287 }
288 }
289
290 void change(const IColumn & column, size_t row_num, Arena * arena)
291 {
292 changeImpl(assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num), arena);
293 }
294
295 void change(const Self & to, Arena * arena)
296 {
297 changeImpl(to.getStringRef(), arena);
298 }
299
300 bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena)
301 {
302 if (!has())
303 {
304 change(column, row_num, arena);
305 return true;
306 }
307 else
308 return false;
309 }
310
311 bool changeFirstTime(const Self & to, Arena * arena)
312 {
313 if (!has() && to.has())
314 {
315 change(to, arena);
316 return true;
317 }
318 else
319 return false;
320 }
321
322 bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena)
323 {
324 change(column, row_num, arena);
325 return true;
326 }
327
328 bool changeEveryTime(const Self & to, Arena * arena)
329 {
330 if (to.has())
331 {
332 change(to, arena);
333 return true;
334 }
335 else
336 return false;
337 }
338
339 bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
340 {
341 if (!has() || assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) < getStringRef())
342 {
343 change(column, row_num, arena);
344 return true;
345 }
346 else
347 return false;
348 }
349
350 bool changeIfLess(const Self & to, Arena * arena)
351 {
352 if (to.has() && (!has() || to.getStringRef() < getStringRef()))
353 {
354 change(to, arena);
355 return true;
356 }
357 else
358 return false;
359 }
360
361 bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
362 {
363 if (!has() || assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) > getStringRef())
364 {
365 change(column, row_num, arena);
366 return true;
367 }
368 else
369 return false;
370 }
371
372 bool changeIfGreater(const Self & to, Arena * arena)
373 {
374 if (to.has() && (!has() || to.getStringRef() > getStringRef()))
375 {
376 change(to, arena);
377 return true;
378 }
379 else
380 return false;
381 }
382
383 bool isEqualTo(const Self & to) const
384 {
385 return has() && to.getStringRef() == getStringRef();
386 }
387
388 bool isEqualTo(const IColumn & column, size_t row_num) const
389 {
390 return has() && assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) == getStringRef();
391 }
392
393 static bool allocatesMemoryInArena()
394 {
395 return true;
396 }
397};
398
399static_assert(
400 sizeof(SingleValueDataString) == SingleValueDataString::AUTOMATIC_STORAGE_SIZE,
401 "Incorrect size of SingleValueDataString struct");
402
403
404/// For any other value types.
405struct SingleValueDataGeneric
406{
407private:
408 using Self = SingleValueDataGeneric;
409
410 Field value;
411
412public:
413 bool has() const
414 {
415 return !value.isNull();
416 }
417
418 void insertResultInto(IColumn & to) const
419 {
420 if (has())
421 to.insert(value);
422 else
423 to.insertDefault();
424 }
425
426 void write(WriteBuffer & buf, const IDataType & data_type) const
427 {
428 if (!value.isNull())
429 {
430 writeBinary(true, buf);
431 data_type.serializeBinary(value, buf);
432 }
433 else
434 writeBinary(false, buf);
435 }
436
437 void read(ReadBuffer & buf, const IDataType & data_type, Arena *)
438 {
439 bool is_not_null;
440 readBinary(is_not_null, buf);
441
442 if (is_not_null)
443 data_type.deserializeBinary(value, buf);
444 }
445
446 void change(const IColumn & column, size_t row_num, Arena *)
447 {
448 column.get(row_num, value);
449 }
450
451 void change(const Self & to, Arena *)
452 {
453 value = to.value;
454 }
455
456 bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena)
457 {
458 if (!has())
459 {
460 change(column, row_num, arena);
461 return true;
462 }
463 else
464 return false;
465 }
466
467 bool changeFirstTime(const Self & to, Arena * arena)
468 {
469 if (!has() && to.has())
470 {
471 change(to, arena);
472 return true;
473 }
474 else
475 return false;
476 }
477
478 bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena)
479 {
480 change(column, row_num, arena);
481 return true;
482 }
483
484 bool changeEveryTime(const Self & to, Arena * arena)
485 {
486 if (to.has())
487 {
488 change(to, arena);
489 return true;
490 }
491 else
492 return false;
493 }
494
495 bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena)
496 {
497 if (!has())
498 {
499 change(column, row_num, arena);
500 return true;
501 }
502 else
503 {
504 Field new_value;
505 column.get(row_num, new_value);
506 if (new_value < value)
507 {
508 value = new_value;
509 return true;
510 }
511 else
512 return false;
513 }
514 }
515
516 bool changeIfLess(const Self & to, Arena * arena)
517 {
518 if (to.has() && (!has() || to.value < value))
519 {
520 change(to, arena);
521 return true;
522 }
523 else
524 return false;
525 }
526
527 bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena)
528 {
529 if (!has())
530 {
531 change(column, row_num, arena);
532 return true;
533 }
534 else
535 {
536 Field new_value;
537 column.get(row_num, new_value);
538 if (new_value > value)
539 {
540 value = new_value;
541 return true;
542 }
543 else
544 return false;
545 }
546 }
547
548 bool changeIfGreater(const Self & to, Arena * arena)
549 {
550 if (to.has() && (!has() || to.value > value))
551 {
552 change(to, arena);
553 return true;
554 }
555 else
556 return false;
557 }
558
559 bool isEqualTo(const IColumn & column, size_t row_num) const
560 {
561 return has() && value == column[row_num];
562 }
563
564 bool isEqualTo(const Self & to) const
565 {
566 return has() && to.value == value;
567 }
568
569 static bool allocatesMemoryInArena()
570 {
571 return false;
572 }
573};
574
575
576/** What is the difference between the aggregate functions min, max, any, anyLast
577 * (the condition that the stored value is replaced by a new one,
578 * as well as, of course, the name).
579 */
580
581template <typename Data>
582struct AggregateFunctionMinData : Data
583{
584 using Self = AggregateFunctionMinData;
585
586 bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfLess(column, row_num, arena); }
587 bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfLess(to, arena); }
588
589 static const char * name() { return "min"; }
590};
591
592template <typename Data>
593struct AggregateFunctionMaxData : Data
594{
595 using Self = AggregateFunctionMaxData;
596
597 bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfGreater(column, row_num, arena); }
598 bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfGreater(to, arena); }
599
600 static const char * name() { return "max"; }
601};
602
603template <typename Data>
604struct AggregateFunctionAnyData : Data
605{
606 using Self = AggregateFunctionAnyData;
607
608 bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); }
609 bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); }
610
611 static const char * name() { return "any"; }
612};
613
614template <typename Data>
615struct AggregateFunctionAnyLastData : Data
616{
617 using Self = AggregateFunctionAnyLastData;
618
619 bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); }
620 bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); }
621
622 static const char * name() { return "anyLast"; }
623};
624
625
626/** Implement 'heavy hitters' algorithm.
627 * Selects most frequent value if its frequency is more than 50% in each thread of execution.
628 * Otherwise, selects some arbitrary value.
629 * http://www.cs.umd.edu/~samir/498/karp.pdf
630 */
631template <typename Data>
632struct AggregateFunctionAnyHeavyData : Data
633{
634 size_t counter = 0;
635
636 using Self = AggregateFunctionAnyHeavyData;
637
638 bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena)
639 {
640 if (this->isEqualTo(column, row_num))
641 {
642 ++counter;
643 }
644 else
645 {
646 if (counter == 0)
647 {
648 this->change(column, row_num, arena);
649 ++counter;
650 return true;
651 }
652 else
653 --counter;
654 }
655 return false;
656 }
657
658 bool changeIfBetter(const Self & to, Arena * arena)
659 {
660 if (this->isEqualTo(to))
661 {
662 counter += to.counter;
663 }
664 else
665 {
666 if ((!this->has() && to.has()) || counter < to.counter)
667 {
668 this->change(to, arena);
669 return true;
670 }
671 else
672 counter -= to.counter;
673 }
674 return false;
675 }
676
677 void write(WriteBuffer & buf, const IDataType & data_type) const
678 {
679 Data::write(buf, data_type);
680 writeBinary(counter, buf);
681 }
682
683 void read(ReadBuffer & buf, const IDataType & data_type, Arena * arena)
684 {
685 Data::read(buf, data_type, arena);
686 readBinary(counter, buf);
687 }
688
689 static const char * name() { return "anyHeavy"; }
690};
691
692
693template <typename Data>
694class AggregateFunctionsSingleValue final : public IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>
695{
696private:
697 DataTypePtr & type;
698
699public:
700 AggregateFunctionsSingleValue(const DataTypePtr & type_)
701 : IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>({type_}, {})
702 , type(this->argument_types[0])
703 {
704 if (StringRef(Data::name()) == StringRef("min")
705 || StringRef(Data::name()) == StringRef("max"))
706 {
707 if (!type->isComparable())
708 throw Exception("Illegal type " + type->getName() + " of argument of aggregate function " + getName()
709 + " because the values of that data type are not comparable", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
710 }
711 }
712
713 String getName() const override { return Data::name(); }
714
715 DataTypePtr getReturnType() const override
716 {
717 return type;
718 }
719
720 void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
721 {
722 this->data(place).changeIfBetter(*columns[0], row_num, arena);
723 }
724
725 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
726 {
727 this->data(place).changeIfBetter(this->data(rhs), arena);
728 }
729
730 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
731 {
732 this->data(place).write(buf, *type.get());
733 }
734
735 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
736 {
737 this->data(place).read(buf, *type.get(), arena);
738 }
739
740 bool allocatesMemoryInArena() const override
741 {
742 return Data::allocatesMemoryInArena();
743 }
744
745 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
746 {
747 this->data(place).insertResultInto(to);
748 }
749};
750
751}
752