1 | #pragma once |
2 | |
3 | #include <IO/WriteHelpers.h> |
4 | #include <IO/ReadHelpers.h> |
5 | |
6 | #include <Columns/ColumnVector.h> |
7 | #include <Columns/ColumnDecimal.h> |
8 | #include <Columns/ColumnString.h> |
9 | #include <DataTypes/IDataType.h> |
10 | #include <common/StringRef.h> |
11 | #include <Common/assert_cast.h> |
12 | |
13 | #include <AggregateFunctions/IAggregateFunction.h> |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | /** Aggregate functions that store one of passed values. |
20 | * For example: min, max, any, anyLast. |
21 | */ |
22 | |
23 | |
24 | /// For numeric values. |
25 | template <typename T> |
26 | struct SingleValueDataFixed |
27 | { |
28 | private: |
29 | using Self = SingleValueDataFixed; |
30 | using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; |
31 | |
32 | bool has_value = false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf. |
33 | T value; |
34 | |
35 | public: |
36 | bool has() const |
37 | { |
38 | return has_value; |
39 | } |
40 | |
41 | void insertResultInto(IColumn & to) const |
42 | { |
43 | if (has()) |
44 | assert_cast<ColVecType &>(to).getData().push_back(value); |
45 | else |
46 | assert_cast<ColVecType &>(to).insertDefault(); |
47 | } |
48 | |
49 | void write(WriteBuffer & buf, const IDataType & /*data_type*/) const |
50 | { |
51 | writeBinary(has(), buf); |
52 | if (has()) |
53 | writeBinary(value, buf); |
54 | } |
55 | |
56 | void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena *) |
57 | { |
58 | readBinary(has_value, buf); |
59 | if (has()) |
60 | readBinary(value, buf); |
61 | } |
62 | |
63 | |
64 | void change(const IColumn & column, size_t row_num, Arena *) |
65 | { |
66 | has_value = true; |
67 | value = assert_cast<const ColVecType &>(column).getData()[row_num]; |
68 | } |
69 | |
70 | /// Assuming to.has() |
71 | void change(const Self & to, Arena *) |
72 | { |
73 | has_value = true; |
74 | value = to.value; |
75 | } |
76 | |
77 | bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) |
78 | { |
79 | if (!has()) |
80 | { |
81 | change(column, row_num, arena); |
82 | return true; |
83 | } |
84 | else |
85 | return false; |
86 | } |
87 | |
88 | bool changeFirstTime(const Self & to, Arena * arena) |
89 | { |
90 | if (!has() && to.has()) |
91 | { |
92 | change(to, arena); |
93 | return true; |
94 | } |
95 | else |
96 | return false; |
97 | } |
98 | |
99 | bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) |
100 | { |
101 | change(column, row_num, arena); |
102 | return true; |
103 | } |
104 | |
105 | bool changeEveryTime(const Self & to, Arena * arena) |
106 | { |
107 | if (to.has()) |
108 | { |
109 | change(to, arena); |
110 | return true; |
111 | } |
112 | else |
113 | return false; |
114 | } |
115 | |
116 | bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) |
117 | { |
118 | if (!has() || assert_cast<const ColVecType &>(column).getData()[row_num] < value) |
119 | { |
120 | change(column, row_num, arena); |
121 | return true; |
122 | } |
123 | else |
124 | return false; |
125 | } |
126 | |
127 | bool changeIfLess(const Self & to, Arena * arena) |
128 | { |
129 | if (to.has() && (!has() || to.value < value)) |
130 | { |
131 | change(to, arena); |
132 | return true; |
133 | } |
134 | else |
135 | return false; |
136 | } |
137 | |
138 | bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) |
139 | { |
140 | if (!has() || assert_cast<const ColVecType &>(column).getData()[row_num] > value) |
141 | { |
142 | change(column, row_num, arena); |
143 | return true; |
144 | } |
145 | else |
146 | return false; |
147 | } |
148 | |
149 | bool changeIfGreater(const Self & to, Arena * arena) |
150 | { |
151 | if (to.has() && (!has() || to.value > value)) |
152 | { |
153 | change(to, arena); |
154 | return true; |
155 | } |
156 | else |
157 | return false; |
158 | } |
159 | |
160 | bool isEqualTo(const Self & to) const |
161 | { |
162 | return has() && to.value == value; |
163 | } |
164 | |
165 | bool isEqualTo(const IColumn & column, size_t row_num) const |
166 | { |
167 | return has() && assert_cast<const ColVecType &>(column).getData()[row_num] == value; |
168 | } |
169 | |
170 | static bool allocatesMemoryInArena() |
171 | { |
172 | return false; |
173 | } |
174 | }; |
175 | |
176 | |
177 | /** For strings. Short strings are stored in the object itself, and long strings are allocated separately. |
178 | * NOTE It could also be suitable for arrays of numbers. |
179 | */ |
180 | struct SingleValueDataString |
181 | { |
182 | private: |
183 | using Self = SingleValueDataString; |
184 | |
185 | Int32 size = -1; /// -1 indicates that there is no value. |
186 | Int32 capacity = 0; /// power of two or zero |
187 | char * large_data; |
188 | |
189 | public: |
190 | static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64; |
191 | static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size) - sizeof(capacity) - sizeof(large_data); |
192 | |
193 | private: |
194 | char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero. |
195 | |
196 | public: |
197 | bool has() const |
198 | { |
199 | return size >= 0; |
200 | } |
201 | |
202 | const char * getData() const |
203 | { |
204 | return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; |
205 | } |
206 | |
207 | StringRef getStringRef() const |
208 | { |
209 | return StringRef(getData(), size); |
210 | } |
211 | |
212 | void insertResultInto(IColumn & to) const |
213 | { |
214 | if (has()) |
215 | assert_cast<ColumnString &>(to).insertDataWithTerminatingZero(getData(), size); |
216 | else |
217 | assert_cast<ColumnString &>(to).insertDefault(); |
218 | } |
219 | |
220 | void write(WriteBuffer & buf, const IDataType & /*data_type*/) const |
221 | { |
222 | writeBinary(size, buf); |
223 | if (has()) |
224 | buf.write(getData(), size); |
225 | } |
226 | |
227 | void read(ReadBuffer & buf, const IDataType & /*data_type*/, Arena * arena) |
228 | { |
229 | Int32 rhs_size; |
230 | readBinary(rhs_size, buf); |
231 | |
232 | if (rhs_size >= 0) |
233 | { |
234 | if (rhs_size <= MAX_SMALL_STRING_SIZE) |
235 | { |
236 | /// Don't free large_data here. |
237 | |
238 | size = rhs_size; |
239 | |
240 | if (size > 0) |
241 | buf.read(small_data, size); |
242 | } |
243 | else |
244 | { |
245 | if (capacity < rhs_size) |
246 | { |
247 | capacity = static_cast<UInt32>(roundUpToPowerOfTwoOrZero(rhs_size)); |
248 | /// Don't free large_data here. |
249 | large_data = arena->alloc(capacity); |
250 | } |
251 | |
252 | size = rhs_size; |
253 | buf.read(large_data, size); |
254 | } |
255 | } |
256 | else |
257 | { |
258 | /// Don't free large_data here. |
259 | size = rhs_size; |
260 | } |
261 | } |
262 | |
263 | /// Assuming to.has() |
264 | void changeImpl(StringRef value, Arena * arena) |
265 | { |
266 | Int32 value_size = value.size; |
267 | |
268 | if (value_size <= MAX_SMALL_STRING_SIZE) |
269 | { |
270 | /// Don't free large_data here. |
271 | size = value_size; |
272 | |
273 | if (size > 0) |
274 | memcpy(small_data, value.data, size); |
275 | } |
276 | else |
277 | { |
278 | if (capacity < value_size) |
279 | { |
280 | /// Don't free large_data here. |
281 | capacity = roundUpToPowerOfTwoOrZero(value_size); |
282 | large_data = arena->alloc(capacity); |
283 | } |
284 | |
285 | size = value_size; |
286 | memcpy(large_data, value.data, size); |
287 | } |
288 | } |
289 | |
290 | void change(const IColumn & column, size_t row_num, Arena * arena) |
291 | { |
292 | changeImpl(assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num), arena); |
293 | } |
294 | |
295 | void change(const Self & to, Arena * arena) |
296 | { |
297 | changeImpl(to.getStringRef(), arena); |
298 | } |
299 | |
300 | bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) |
301 | { |
302 | if (!has()) |
303 | { |
304 | change(column, row_num, arena); |
305 | return true; |
306 | } |
307 | else |
308 | return false; |
309 | } |
310 | |
311 | bool changeFirstTime(const Self & to, Arena * arena) |
312 | { |
313 | if (!has() && to.has()) |
314 | { |
315 | change(to, arena); |
316 | return true; |
317 | } |
318 | else |
319 | return false; |
320 | } |
321 | |
322 | bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) |
323 | { |
324 | change(column, row_num, arena); |
325 | return true; |
326 | } |
327 | |
328 | bool changeEveryTime(const Self & to, Arena * arena) |
329 | { |
330 | if (to.has()) |
331 | { |
332 | change(to, arena); |
333 | return true; |
334 | } |
335 | else |
336 | return false; |
337 | } |
338 | |
339 | bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) |
340 | { |
341 | if (!has() || assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) < getStringRef()) |
342 | { |
343 | change(column, row_num, arena); |
344 | return true; |
345 | } |
346 | else |
347 | return false; |
348 | } |
349 | |
350 | bool changeIfLess(const Self & to, Arena * arena) |
351 | { |
352 | if (to.has() && (!has() || to.getStringRef() < getStringRef())) |
353 | { |
354 | change(to, arena); |
355 | return true; |
356 | } |
357 | else |
358 | return false; |
359 | } |
360 | |
361 | bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) |
362 | { |
363 | if (!has() || assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) > getStringRef()) |
364 | { |
365 | change(column, row_num, arena); |
366 | return true; |
367 | } |
368 | else |
369 | return false; |
370 | } |
371 | |
372 | bool changeIfGreater(const Self & to, Arena * arena) |
373 | { |
374 | if (to.has() && (!has() || to.getStringRef() > getStringRef())) |
375 | { |
376 | change(to, arena); |
377 | return true; |
378 | } |
379 | else |
380 | return false; |
381 | } |
382 | |
383 | bool isEqualTo(const Self & to) const |
384 | { |
385 | return has() && to.getStringRef() == getStringRef(); |
386 | } |
387 | |
388 | bool isEqualTo(const IColumn & column, size_t row_num) const |
389 | { |
390 | return has() && assert_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) == getStringRef(); |
391 | } |
392 | |
393 | static bool allocatesMemoryInArena() |
394 | { |
395 | return true; |
396 | } |
397 | }; |
398 | |
399 | static_assert( |
400 | sizeof(SingleValueDataString) == SingleValueDataString::AUTOMATIC_STORAGE_SIZE, |
401 | "Incorrect size of SingleValueDataString struct" ); |
402 | |
403 | |
404 | /// For any other value types. |
405 | struct SingleValueDataGeneric |
406 | { |
407 | private: |
408 | using Self = SingleValueDataGeneric; |
409 | |
410 | Field value; |
411 | |
412 | public: |
413 | bool has() const |
414 | { |
415 | return !value.isNull(); |
416 | } |
417 | |
418 | void insertResultInto(IColumn & to) const |
419 | { |
420 | if (has()) |
421 | to.insert(value); |
422 | else |
423 | to.insertDefault(); |
424 | } |
425 | |
426 | void write(WriteBuffer & buf, const IDataType & data_type) const |
427 | { |
428 | if (!value.isNull()) |
429 | { |
430 | writeBinary(true, buf); |
431 | data_type.serializeBinary(value, buf); |
432 | } |
433 | else |
434 | writeBinary(false, buf); |
435 | } |
436 | |
437 | void read(ReadBuffer & buf, const IDataType & data_type, Arena *) |
438 | { |
439 | bool is_not_null; |
440 | readBinary(is_not_null, buf); |
441 | |
442 | if (is_not_null) |
443 | data_type.deserializeBinary(value, buf); |
444 | } |
445 | |
446 | void change(const IColumn & column, size_t row_num, Arena *) |
447 | { |
448 | column.get(row_num, value); |
449 | } |
450 | |
451 | void change(const Self & to, Arena *) |
452 | { |
453 | value = to.value; |
454 | } |
455 | |
456 | bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) |
457 | { |
458 | if (!has()) |
459 | { |
460 | change(column, row_num, arena); |
461 | return true; |
462 | } |
463 | else |
464 | return false; |
465 | } |
466 | |
467 | bool changeFirstTime(const Self & to, Arena * arena) |
468 | { |
469 | if (!has() && to.has()) |
470 | { |
471 | change(to, arena); |
472 | return true; |
473 | } |
474 | else |
475 | return false; |
476 | } |
477 | |
478 | bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) |
479 | { |
480 | change(column, row_num, arena); |
481 | return true; |
482 | } |
483 | |
484 | bool changeEveryTime(const Self & to, Arena * arena) |
485 | { |
486 | if (to.has()) |
487 | { |
488 | change(to, arena); |
489 | return true; |
490 | } |
491 | else |
492 | return false; |
493 | } |
494 | |
495 | bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) |
496 | { |
497 | if (!has()) |
498 | { |
499 | change(column, row_num, arena); |
500 | return true; |
501 | } |
502 | else |
503 | { |
504 | Field new_value; |
505 | column.get(row_num, new_value); |
506 | if (new_value < value) |
507 | { |
508 | value = new_value; |
509 | return true; |
510 | } |
511 | else |
512 | return false; |
513 | } |
514 | } |
515 | |
516 | bool changeIfLess(const Self & to, Arena * arena) |
517 | { |
518 | if (to.has() && (!has() || to.value < value)) |
519 | { |
520 | change(to, arena); |
521 | return true; |
522 | } |
523 | else |
524 | return false; |
525 | } |
526 | |
527 | bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) |
528 | { |
529 | if (!has()) |
530 | { |
531 | change(column, row_num, arena); |
532 | return true; |
533 | } |
534 | else |
535 | { |
536 | Field new_value; |
537 | column.get(row_num, new_value); |
538 | if (new_value > value) |
539 | { |
540 | value = new_value; |
541 | return true; |
542 | } |
543 | else |
544 | return false; |
545 | } |
546 | } |
547 | |
548 | bool changeIfGreater(const Self & to, Arena * arena) |
549 | { |
550 | if (to.has() && (!has() || to.value > value)) |
551 | { |
552 | change(to, arena); |
553 | return true; |
554 | } |
555 | else |
556 | return false; |
557 | } |
558 | |
559 | bool isEqualTo(const IColumn & column, size_t row_num) const |
560 | { |
561 | return has() && value == column[row_num]; |
562 | } |
563 | |
564 | bool isEqualTo(const Self & to) const |
565 | { |
566 | return has() && to.value == value; |
567 | } |
568 | |
569 | static bool allocatesMemoryInArena() |
570 | { |
571 | return false; |
572 | } |
573 | }; |
574 | |
575 | |
576 | /** What is the difference between the aggregate functions min, max, any, anyLast |
577 | * (the condition that the stored value is replaced by a new one, |
578 | * as well as, of course, the name). |
579 | */ |
580 | |
581 | template <typename Data> |
582 | struct AggregateFunctionMinData : Data |
583 | { |
584 | using Self = AggregateFunctionMinData; |
585 | |
586 | bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfLess(column, row_num, arena); } |
587 | bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfLess(to, arena); } |
588 | |
589 | static const char * name() { return "min" ; } |
590 | }; |
591 | |
592 | template <typename Data> |
593 | struct AggregateFunctionMaxData : Data |
594 | { |
595 | using Self = AggregateFunctionMaxData; |
596 | |
597 | bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeIfGreater(column, row_num, arena); } |
598 | bool changeIfBetter(const Self & to, Arena * arena) { return this->changeIfGreater(to, arena); } |
599 | |
600 | static const char * name() { return "max" ; } |
601 | }; |
602 | |
603 | template <typename Data> |
604 | struct AggregateFunctionAnyData : Data |
605 | { |
606 | using Self = AggregateFunctionAnyData; |
607 | |
608 | bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); } |
609 | bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); } |
610 | |
611 | static const char * name() { return "any" ; } |
612 | }; |
613 | |
614 | template <typename Data> |
615 | struct AggregateFunctionAnyLastData : Data |
616 | { |
617 | using Self = AggregateFunctionAnyLastData; |
618 | |
619 | bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); } |
620 | bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); } |
621 | |
622 | static const char * name() { return "anyLast" ; } |
623 | }; |
624 | |
625 | |
626 | /** Implement 'heavy hitters' algorithm. |
627 | * Selects most frequent value if its frequency is more than 50% in each thread of execution. |
628 | * Otherwise, selects some arbitrary value. |
629 | * http://www.cs.umd.edu/~samir/498/karp.pdf |
630 | */ |
631 | template <typename Data> |
632 | struct AggregateFunctionAnyHeavyData : Data |
633 | { |
634 | size_t counter = 0; |
635 | |
636 | using Self = AggregateFunctionAnyHeavyData; |
637 | |
638 | bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) |
639 | { |
640 | if (this->isEqualTo(column, row_num)) |
641 | { |
642 | ++counter; |
643 | } |
644 | else |
645 | { |
646 | if (counter == 0) |
647 | { |
648 | this->change(column, row_num, arena); |
649 | ++counter; |
650 | return true; |
651 | } |
652 | else |
653 | --counter; |
654 | } |
655 | return false; |
656 | } |
657 | |
658 | bool changeIfBetter(const Self & to, Arena * arena) |
659 | { |
660 | if (this->isEqualTo(to)) |
661 | { |
662 | counter += to.counter; |
663 | } |
664 | else |
665 | { |
666 | if ((!this->has() && to.has()) || counter < to.counter) |
667 | { |
668 | this->change(to, arena); |
669 | return true; |
670 | } |
671 | else |
672 | counter -= to.counter; |
673 | } |
674 | return false; |
675 | } |
676 | |
677 | void write(WriteBuffer & buf, const IDataType & data_type) const |
678 | { |
679 | Data::write(buf, data_type); |
680 | writeBinary(counter, buf); |
681 | } |
682 | |
683 | void read(ReadBuffer & buf, const IDataType & data_type, Arena * arena) |
684 | { |
685 | Data::read(buf, data_type, arena); |
686 | readBinary(counter, buf); |
687 | } |
688 | |
689 | static const char * name() { return "anyHeavy" ; } |
690 | }; |
691 | |
692 | |
693 | template <typename Data> |
694 | class AggregateFunctionsSingleValue final : public IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>> |
695 | { |
696 | private: |
697 | DataTypePtr & type; |
698 | |
699 | public: |
700 | AggregateFunctionsSingleValue(const DataTypePtr & type_) |
701 | : IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>({type_}, {}) |
702 | , type(this->argument_types[0]) |
703 | { |
704 | if (StringRef(Data::name()) == StringRef("min" ) |
705 | || StringRef(Data::name()) == StringRef("max" )) |
706 | { |
707 | if (!type->isComparable()) |
708 | throw Exception("Illegal type " + type->getName() + " of argument of aggregate function " + getName() |
709 | + " because the values of that data type are not comparable" , ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); |
710 | } |
711 | } |
712 | |
713 | String getName() const override { return Data::name(); } |
714 | |
715 | DataTypePtr getReturnType() const override |
716 | { |
717 | return type; |
718 | } |
719 | |
720 | void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override |
721 | { |
722 | this->data(place).changeIfBetter(*columns[0], row_num, arena); |
723 | } |
724 | |
725 | void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override |
726 | { |
727 | this->data(place).changeIfBetter(this->data(rhs), arena); |
728 | } |
729 | |
730 | void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override |
731 | { |
732 | this->data(place).write(buf, *type.get()); |
733 | } |
734 | |
735 | void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override |
736 | { |
737 | this->data(place).read(buf, *type.get(), arena); |
738 | } |
739 | |
740 | bool allocatesMemoryInArena() const override |
741 | { |
742 | return Data::allocatesMemoryInArena(); |
743 | } |
744 | |
745 | void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override |
746 | { |
747 | this->data(place).insertResultInto(to); |
748 | } |
749 | }; |
750 | |
751 | } |
752 | |