1#pragma once
2
3#include <Columns/ColumnVector.h>
4#include <Columns/ColumnDecimal.h>
5#include <Columns/ColumnArray.h>
6#include <Columns/ColumnString.h>
7#include <Columns/ColumnFixedString.h>
8#include <Columns/ColumnConst.h>
9#include <Columns/ColumnNullable.h>
10
11#include <Common/typeid_cast.h>
12#include <Common/UTF8Helpers.h>
13
14#include "IArraySource.h"
15#include "IValueSource.h"
16#include "Slices.h"
17#include <Functions/FunctionHelpers.h>
18
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int ILLEGAL_COLUMN;
26}
27
28namespace GatherUtils
29{
30
31template <typename T>
32struct NumericArraySource : public ArraySourceImpl<NumericArraySource<T>>
33{
34 using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
35 using Slice = NumericArraySlice<T>;
36 using Column = ColumnArray;
37
38 const typename ColVecType::Container & elements;
39 const typename ColumnArray::Offsets & offsets;
40
41 size_t row_num = 0;
42 ColumnArray::Offset prev_offset = 0;
43
44 explicit NumericArraySource(const ColumnArray & arr)
45 : elements(typeid_cast<const ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets())
46 {
47 }
48
49 void next()
50 {
51 prev_offset = offsets[row_num];
52 ++row_num;
53 }
54
55 bool isEnd() const
56 {
57 return row_num == offsets.size();
58 }
59
60 size_t rowNum() const
61 {
62 return row_num;
63 }
64
65 const typename ColumnArray::Offsets & getOffsets() const override
66 {
67 return offsets;
68 }
69
70 /// Get size for corresponding call or Sink::reserve to reserve memory for elements.
71 size_t getSizeForReserve() const override
72 {
73 return elements.size();
74 }
75
76 size_t getColumnSize() const override
77 {
78 return offsets.size();
79 }
80
81 size_t getElementSize() const
82 {
83 return offsets[row_num] - prev_offset;
84 }
85
86 Slice getWhole() const
87 {
88 return {&elements[prev_offset], offsets[row_num] - prev_offset};
89 }
90
91 Slice getSliceFromLeft(size_t offset) const
92 {
93 size_t elem_size = offsets[row_num] - prev_offset;
94 if (offset >= elem_size)
95 return {&elements[prev_offset], 0};
96 return {&elements[prev_offset + offset], elem_size - offset};
97 }
98
99 Slice getSliceFromLeft(size_t offset, size_t length) const
100 {
101 size_t elem_size = offsets[row_num] - prev_offset;
102 if (offset >= elem_size)
103 return {&elements[prev_offset], 0};
104 return {&elements[prev_offset + offset], std::min(length, elem_size - offset)};
105 }
106
107 Slice getSliceFromRight(size_t offset) const
108 {
109 size_t elem_size = offsets[row_num] - prev_offset;
110 if (offset > elem_size)
111 return {&elements[prev_offset], elem_size};
112 return {&elements[offsets[row_num] - offset], offset};
113 }
114
115 Slice getSliceFromRight(size_t offset, size_t length) const
116 {
117 size_t elem_size = offsets[row_num] - prev_offset;
118 if (offset > elem_size)
119 return {&elements[prev_offset], length + elem_size > offset ? std::min(elem_size, length + elem_size - offset) : 0};
120 return {&elements[offsets[row_num] - offset], std::min(length, offset)};
121 }
122};
123
124template <typename Base>
125struct ConstSource : public Base
126{
127 using Slice = typename Base::Slice;
128 using Column = ColumnConst;
129
130 size_t total_rows;
131 size_t row_num = 0;
132
133 explicit ConstSource(const ColumnConst & col_)
134 : Base(static_cast<const typename Base::Column &>(col_.getDataColumn())), total_rows(col_.size())
135 {
136 }
137
138 template <typename ColumnType>
139 ConstSource(const ColumnType & col_, size_t total_rows_) : Base(col_), total_rows(total_rows_)
140 {
141 }
142
143 template <typename ColumnType>
144 ConstSource(const ColumnType & col_, const NullMap & null_map_, size_t total_rows_) : Base(col_, null_map_), total_rows(total_rows_)
145 {
146 }
147
148 ConstSource(const ConstSource &) = default;
149 virtual ~ConstSource() = default;
150
151 virtual void accept(ArraySourceVisitor & visitor) // override
152 {
153 if constexpr (std::is_base_of<IArraySource, Base>::value)
154 visitor.visit(*this);
155 else
156 throw Exception(
157 "accept(ArraySourceVisitor &) is not implemented for " + demangle(typeid(ConstSource<Base>).name())
158 + " because " + demangle(typeid(Base).name()) + " is not derived from IArraySource", ErrorCodes::NOT_IMPLEMENTED);
159 }
160
161 virtual void accept(ValueSourceVisitor & visitor) // override
162 {
163 if constexpr (std::is_base_of<IValueSource, Base>::value)
164 visitor.visit(*this);
165 else
166 throw Exception(
167 "accept(ValueSourceVisitor &) is not implemented for " + demangle(typeid(ConstSource<Base>).name())
168 + " because " + demangle(typeid(Base).name()) + " is not derived from IValueSource", ErrorCodes::NOT_IMPLEMENTED);
169 }
170
171 void next()
172 {
173 ++row_num;
174 }
175
176 bool isEnd() const
177 {
178 return row_num == total_rows;
179 }
180
181 size_t rowNum() const
182 {
183 return row_num;
184 }
185
186 size_t getSizeForReserve() const
187 {
188 return total_rows * Base::getSizeForReserve();
189 }
190
191 size_t getColumnSize() const
192 {
193 return total_rows;
194 }
195
196 bool isConst() const
197 {
198 return true;
199 }
200};
201
202struct StringSource
203{
204 using Slice = NumericArraySlice<UInt8>;
205 using Column = ColumnString;
206
207 const typename ColumnString::Chars & elements;
208 const typename ColumnString::Offsets & offsets;
209
210 size_t row_num = 0;
211 ColumnString::Offset prev_offset = 0;
212
213 explicit StringSource(const ColumnString & col)
214 : elements(col.getChars()), offsets(col.getOffsets())
215 {
216 }
217
218 void next()
219 {
220 prev_offset = offsets[row_num];
221 ++row_num;
222 }
223
224 bool isEnd() const
225 {
226 return row_num == offsets.size();
227 }
228
229 size_t rowNum() const
230 {
231 return row_num;
232 }
233
234 size_t getSizeForReserve() const
235 {
236 return elements.size();
237 }
238
239 size_t getElementSize() const
240 {
241 return offsets[row_num] - prev_offset;
242 }
243
244 Slice getWhole() const
245 {
246 return {&elements[prev_offset], offsets[row_num] - prev_offset - 1};
247 }
248
249 Slice getSliceFromLeft(size_t offset) const
250 {
251 size_t elem_size = offsets[row_num] - prev_offset - 1;
252 if (offset >= elem_size)
253 return {&elements[prev_offset], 0};
254 return {&elements[prev_offset + offset], elem_size - offset};
255 }
256
257 Slice getSliceFromLeft(size_t offset, size_t length) const
258 {
259 size_t elem_size = offsets[row_num] - prev_offset - 1;
260 if (offset >= elem_size)
261 return {&elements[prev_offset], 0};
262 return {&elements[prev_offset + offset], std::min(length, elem_size - offset)};
263 }
264
265 Slice getSliceFromRight(size_t offset) const
266 {
267 size_t elem_size = offsets[row_num] - prev_offset - 1;
268 if (offset > elem_size)
269 return {&elements[prev_offset], elem_size};
270 return {&elements[prev_offset + elem_size - offset], offset};
271 }
272
273 Slice getSliceFromRight(size_t offset, size_t length) const
274 {
275 size_t elem_size = offsets[row_num] - prev_offset - 1;
276 if (offset > elem_size)
277 return {&elements[prev_offset], length + elem_size > offset ? std::min(elem_size, length + elem_size - offset) : 0};
278 return {&elements[prev_offset + elem_size - offset], std::min(length, offset)};
279 }
280};
281
282
283/// Differs to StringSource by having 'offest' and 'length' in code points instead of bytes in getSlice* methods.
284/** NOTE: The behaviour of substring and substringUTF8 is inconsistent when negative offset is greater than string size:
285 * substring:
286 * hello
287 * ^-----^ - offset -10, length 7, result: "he"
288 * substringUTF8:
289 * hello
290 * ^-----^ - offset -10, length 7, result: "hello"
291 * This may be subject for change.
292 */
293struct UTF8StringSource : public StringSource
294{
295 using StringSource::StringSource;
296
297 static const ColumnString::Char * skipCodePointsForward(const ColumnString::Char * pos, size_t size, const ColumnString::Char * end)
298 {
299 for (size_t i = 0; i < size && pos < end; ++i)
300 pos += UTF8::seqLength(*pos); /// NOTE pos may become greater than end. It is Ok due to padding in PaddedPODArray.
301 return pos;
302 }
303
304 static const ColumnString::Char * skipCodePointsBackward(const ColumnString::Char * pos, size_t size, const ColumnString::Char * begin)
305 {
306 for (size_t i = 0; i < size && pos > begin; ++i)
307 {
308 --pos;
309 if (pos == begin)
310 break;
311 UTF8::syncBackward(pos, begin);
312 }
313 return pos;
314 }
315
316 Slice getSliceFromLeft(size_t offset) const
317 {
318 auto begin = &elements[prev_offset];
319 auto end = elements.data() + offsets[row_num] - 1;
320 auto res_begin = skipCodePointsForward(begin, offset, end);
321
322 if (res_begin >= end)
323 return {begin, 0};
324
325 return {res_begin, size_t(end - res_begin)};
326 }
327
328 Slice getSliceFromLeft(size_t offset, size_t length) const
329 {
330 auto begin = &elements[prev_offset];
331 auto end = elements.data() + offsets[row_num] - 1;
332 auto res_begin = skipCodePointsForward(begin, offset, end);
333
334 if (res_begin >= end)
335 return {begin, 0};
336
337 auto res_end = skipCodePointsForward(res_begin, length, end);
338
339 if (res_end >= end)
340 return {res_begin, size_t(end - res_begin)};
341
342 return {res_begin, size_t(res_end - res_begin)};
343 }
344
345 Slice getSliceFromRight(size_t offset) const
346 {
347 auto begin = &elements[prev_offset];
348 auto end = elements.data() + offsets[row_num] - 1;
349 auto res_begin = skipCodePointsBackward(end, offset, begin);
350
351 return {res_begin, size_t(end - res_begin)};
352 }
353
354 Slice getSliceFromRight(size_t offset, size_t length) const
355 {
356 auto begin = &elements[prev_offset];
357 auto end = elements.data() + offsets[row_num] - 1;
358 auto res_begin = skipCodePointsBackward(end, offset, begin);
359 auto res_end = skipCodePointsForward(res_begin, length, end);
360
361 if (res_end >= end)
362 return {res_begin, size_t(end - res_begin)};
363
364 return {res_begin, size_t(res_end - res_begin)};
365 }
366};
367
368
369struct FixedStringSource
370{
371 using Slice = NumericArraySlice<UInt8>;
372 using Column = ColumnFixedString;
373
374 const UInt8 * pos;
375 const UInt8 * end;
376 size_t string_size;
377 size_t row_num = 0;
378
379 explicit FixedStringSource(const ColumnFixedString & col)
380 : string_size(col.getN())
381 {
382 const auto & chars = col.getChars();
383 pos = chars.data();
384 end = pos + chars.size();
385 }
386
387 void next()
388 {
389 pos += string_size;
390 ++row_num;
391 }
392
393 bool isEnd() const
394 {
395 return pos == end;
396 }
397
398 size_t rowNum() const
399 {
400 return row_num;
401 }
402
403 size_t getSizeForReserve() const
404 {
405 return end - pos;
406 }
407
408 size_t getElementSize() const
409 {
410 return string_size;
411 }
412
413 Slice getWhole() const
414 {
415 return {pos, string_size};
416 }
417
418 Slice getSliceFromLeft(size_t offset) const
419 {
420 if (offset >= string_size)
421 return {pos, 0};
422 return {pos + offset, string_size - offset};
423 }
424
425 Slice getSliceFromLeft(size_t offset, size_t length) const
426 {
427 if (offset >= string_size)
428 return {pos, 0};
429 return {pos + offset, std::min(length, string_size - offset)};
430 }
431
432 Slice getSliceFromRight(size_t offset) const
433 {
434 if (offset > string_size)
435 return {pos, string_size};
436 return {pos + string_size - offset, offset};
437 }
438
439 Slice getSliceFromRight(size_t offset, size_t length) const
440 {
441 if (offset > string_size)
442 return {pos, length + string_size > offset ? std::min(string_size, length + string_size - offset) : 0};
443 return {pos + string_size - offset, std::min(length, offset)};
444 }
445};
446
447
448struct IStringSource
449{
450 using Slice = NumericArraySlice<UInt8>;
451
452 virtual void next() = 0;
453 virtual bool isEnd() const = 0;
454 virtual size_t getSizeForReserve() const = 0;
455 virtual Slice getWhole() const = 0;
456 virtual ~IStringSource() {}
457};
458
459
460template <typename Impl>
461struct DynamicStringSource final : IStringSource
462{
463 Impl impl;
464
465 explicit DynamicStringSource(const IColumn & col) : impl(static_cast<const typename Impl::Column &>(col)) {}
466
467 void next() override { impl.next(); }
468 bool isEnd() const override { return impl.isEnd(); }
469 size_t getSizeForReserve() const override { return impl.getSizeForReserve(); }
470 Slice getWhole() const override { return impl.getWhole(); }
471};
472
473inline std::unique_ptr<IStringSource> createDynamicStringSource(const IColumn & col)
474{
475 if (checkColumn<ColumnString>(&col))
476 return std::make_unique<DynamicStringSource<StringSource>>(col);
477 if (checkColumn<ColumnFixedString>(&col))
478 return std::make_unique<DynamicStringSource<FixedStringSource>>(col);
479 if (checkColumnConst<ColumnString>(&col))
480 return std::make_unique<DynamicStringSource<ConstSource<StringSource>>>(col);
481 if (checkColumnConst<ColumnFixedString>(&col))
482 return std::make_unique<DynamicStringSource<ConstSource<FixedStringSource>>>(col);
483 throw Exception("Unexpected type of string column: " + col.getName(), ErrorCodes::ILLEGAL_COLUMN);
484}
485
486using StringSources = std::vector<std::unique_ptr<IStringSource>>;
487
488
489struct GenericArraySource : public ArraySourceImpl<GenericArraySource>
490{
491 using Slice = GenericArraySlice;
492 using Column = ColumnArray;
493
494 const IColumn & elements;
495 const typename ColumnArray::Offsets & offsets;
496
497 size_t row_num = 0;
498 ColumnArray::Offset prev_offset = 0;
499
500 explicit GenericArraySource(const ColumnArray & arr)
501 : elements(arr.getData()), offsets(arr.getOffsets())
502 {
503 }
504
505 void next()
506 {
507 prev_offset = offsets[row_num];
508 ++row_num;
509 }
510
511 bool isEnd() const
512 {
513 return row_num == offsets.size();
514 }
515
516 size_t rowNum() const
517 {
518 return row_num;
519 }
520
521 const typename ColumnArray::Offsets & getOffsets() const override
522 {
523 return offsets;
524 }
525
526 size_t getSizeForReserve() const override
527 {
528 return elements.size();
529 }
530
531 size_t getColumnSize() const override
532 {
533 return elements.size();
534 }
535
536 size_t getElementSize() const
537 {
538 return offsets[row_num] - prev_offset;
539 }
540
541 Slice getWhole() const
542 {
543 return {&elements, prev_offset, offsets[row_num] - prev_offset};
544 }
545
546 Slice getSliceFromLeft(size_t offset) const
547 {
548 size_t elem_size = offsets[row_num] - prev_offset;
549 if (offset >= elem_size)
550 return {&elements, prev_offset, 0};
551 return {&elements, prev_offset + offset, elem_size - offset};
552 }
553
554 Slice getSliceFromLeft(size_t offset, size_t length) const
555 {
556 size_t elem_size = offsets[row_num] - prev_offset;
557 if (offset >= elem_size)
558 return {&elements, prev_offset, 0};
559 return {&elements, prev_offset + offset, std::min(length, elem_size - offset)};
560 }
561
562 Slice getSliceFromRight(size_t offset) const
563 {
564 size_t elem_size = offsets[row_num] - prev_offset;
565 if (offset > elem_size)
566 return {&elements, prev_offset, elem_size};
567 return {&elements, offsets[row_num] - offset, offset};
568 }
569
570 Slice getSliceFromRight(size_t offset, size_t length) const
571 {
572 size_t elem_size = offsets[row_num] - prev_offset;
573 if (offset > elem_size)
574 return {&elements, prev_offset, length + elem_size > offset ? std::min(elem_size, length + elem_size - offset) : 0};
575 return {&elements, offsets[row_num] - offset, std::min(length, offset)};
576 }
577};
578
579
580template <typename ArraySource>
581struct NullableArraySource : public ArraySource
582{
583 using Slice = NullableSlice<typename ArraySource::Slice>;
584 using ArraySource::prev_offset;
585 using ArraySource::row_num;
586 using ArraySource::offsets;
587
588 const NullMap & null_map;
589
590 NullableArraySource(const ColumnArray & arr, const NullMap & null_map_)
591 : ArraySource(arr), null_map(null_map_)
592 {
593 }
594
595 void accept(ArraySourceVisitor & visitor) override { visitor.visit(*this); }
596
597 Slice getWhole() const
598 {
599 Slice slice = ArraySource::getWhole();
600 slice.null_map = &null_map[prev_offset];
601 return slice;
602 }
603
604 Slice getSliceFromLeft(size_t offset) const
605 {
606 Slice slice = ArraySource::getSliceFromLeft(offset);
607 if (offsets[row_num] > prev_offset + offset)
608 slice.null_map = &null_map[prev_offset + offset];
609 else
610 slice.null_map = &null_map[prev_offset];
611 return slice;
612 }
613
614 Slice getSliceFromLeft(size_t offset, size_t length) const
615 {
616 Slice slice = ArraySource::getSliceFromLeft(offset, length);
617 if (offsets[row_num] > prev_offset + offset)
618 slice.null_map = &null_map[prev_offset + offset];
619 else
620 slice.null_map = &null_map[prev_offset];
621 return slice;
622 }
623
624 Slice getSliceFromRight(size_t offset) const
625 {
626 Slice slice = ArraySource::getSliceFromRight(offset);
627 if (offsets[row_num] > prev_offset + offset)
628 slice.null_map = &null_map[offsets[row_num] - offset];
629 else
630 slice.null_map = &null_map[prev_offset];
631 return slice;
632 }
633
634 Slice getSliceFromRight(size_t offset, size_t length) const
635 {
636 Slice slice = ArraySource::getSliceFromRight(offset, length);
637 if (offsets[row_num] > prev_offset + offset)
638 slice.null_map = &null_map[offsets[row_num] - offset];
639 else
640 slice.null_map = &null_map[prev_offset];
641 return slice;
642 }
643
644 bool isNullable() const override
645 {
646 return true;
647 }
648};
649
650
651template <typename T>
652struct NumericValueSource : ValueSourceImpl<NumericValueSource<T>>
653{
654 using Slice = NumericValueSlice<T>;
655 using Column = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
656
657 const T * begin;
658 size_t total_rows;
659 size_t row_num = 0;
660
661 explicit NumericValueSource(const Column & col)
662 {
663 const auto & container = col.getData();
664 begin = container.data();
665 total_rows = container.size();
666 }
667
668 void next()
669 {
670 ++row_num;
671 }
672
673 bool isEnd() const
674 {
675 return row_num == total_rows;
676 }
677
678 size_t rowNum() const
679 {
680 return row_num;
681 }
682
683 size_t getSizeForReserve() const
684 {
685 return total_rows;
686 }
687
688 Slice getWhole() const
689 {
690 Slice slice;
691 slice.value = begin[row_num];
692 return slice;
693 }
694};
695
696struct GenericValueSource : public ValueSourceImpl<GenericValueSource>
697{
698 using Slice = GenericValueSlice;
699
700 const IColumn * column;
701 size_t total_rows;
702 size_t row_num = 0;
703
704 explicit GenericValueSource(const IColumn & col)
705 {
706 column = &col;
707 total_rows = col.size();
708 }
709
710 void next()
711 {
712 ++row_num;
713 }
714
715 bool isEnd() const
716 {
717 return row_num == total_rows;
718 }
719
720 size_t rowNum() const
721 {
722 return row_num;
723 }
724
725 size_t getSizeForReserve() const
726 {
727 return total_rows;
728 }
729
730 Slice getWhole() const
731 {
732 Slice slice;
733 slice.elements = column;
734 slice.position = row_num;
735 return slice;
736 }
737};
738
739template <typename ValueSource>
740struct NullableValueSource : public ValueSource
741{
742 using Slice = NullableSlice<typename ValueSource::Slice>;
743 using ValueSource::row_num;
744
745 const NullMap & null_map;
746
747 template <typename Column>
748 explicit NullableValueSource(const Column & col, const NullMap & null_map_) : ValueSource(col), null_map(null_map_) {}
749
750 void accept(ValueSourceVisitor & visitor) override { visitor.visit(*this); }
751
752 Slice getWhole() const
753 {
754 Slice slice = ValueSource::getWhole();
755 slice.null_map = null_map.data() + row_num;
756 return slice;
757 }
758};
759
760}
761
762}
763