1#pragma once
2
3#include <Core/Types.h>
4#include <Common/FieldVisitors.h>
5#include "Sources.h"
6#include "Sinks.h"
7#include <Core/AccurateComparison.h>
8#include <ext/range.h>
9
10
11namespace DB::ErrorCodes
12{
13 extern const int LOGICAL_ERROR;
14}
15
16namespace DB::GatherUtils
17{
18
19/// Methods to copy Slice to Sink, overloaded for various combinations of types.
20
21template <typename T>
22void writeSlice(const NumericArraySlice<T> & slice, NumericArraySink<T> & sink)
23{
24 sink.elements.resize(sink.current_offset + slice.size);
25 memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size * sizeof(T));
26 sink.current_offset += slice.size;
27}
28
29template <typename T, typename U>
30void writeSlice(const NumericArraySlice<T> & slice, NumericArraySink<U> & sink)
31{
32 sink.elements.resize(sink.current_offset + slice.size);
33 for (size_t i = 0; i < slice.size; ++i)
34 {
35 sink.elements[sink.current_offset] = static_cast<U>(slice.data[i]);
36 ++sink.current_offset;
37 }
38}
39
40inline ALWAYS_INLINE void writeSlice(const StringSource::Slice & slice, StringSink & sink)
41{
42 sink.elements.resize(sink.current_offset + slice.size);
43 memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
44 sink.current_offset += slice.size;
45}
46
47inline ALWAYS_INLINE void writeSlice(const StringSource::Slice & slice, FixedStringSink & sink)
48{
49 memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
50}
51
52/// Assuming same types of underlying columns for slice and sink if (ArraySlice, ArraySink) is (GenericArraySlice, GenericArraySink).
53inline ALWAYS_INLINE void writeSlice(const GenericArraySlice & slice, GenericArraySink & sink)
54{
55 if (slice.elements->structureEquals(sink.elements))
56 {
57 sink.elements.insertRangeFrom(*slice.elements, slice.begin, slice.size);
58 sink.current_offset += slice.size;
59 }
60 else
61 throw Exception("Function writeSlice expect same column types for GenericArraySlice and GenericArraySink.",
62 ErrorCodes::LOGICAL_ERROR);
63}
64
65template <typename T>
66inline ALWAYS_INLINE void writeSlice(const GenericArraySlice & slice, NumericArraySink<T> & sink)
67{
68 sink.elements.resize(sink.current_offset + slice.size);
69 for (size_t i = 0; i < slice.size; ++i)
70 {
71 Field field;
72 slice.elements->get(slice.begin + i, field);
73 sink.elements.push_back(applyVisitor(FieldVisitorConvertToNumber<T>(), field));
74 }
75 sink.current_offset += slice.size;
76}
77
78template <typename T>
79inline ALWAYS_INLINE void writeSlice(const NumericArraySlice<T> & slice, GenericArraySink & sink)
80{
81 for (size_t i = 0; i < slice.size; ++i)
82 {
83 if constexpr (IsDecimalNumber<T>)
84 {
85 DecimalField field(T(slice.data[i]), 0); /// TODO: Decimal scale
86 sink.elements.insert(field);
87 }
88 else
89 {
90 Field field = T(slice.data[i]);
91 sink.elements.insert(field);
92 }
93 }
94 sink.current_offset += slice.size;
95}
96
97template <typename Slice, typename ArraySink>
98inline ALWAYS_INLINE void writeSlice(const NullableSlice<Slice> & slice, NullableArraySink<ArraySink> & sink)
99{
100 sink.null_map.resize(sink.current_offset + slice.size);
101
102 if (slice.size == 1) /// Always true for ValueSlice.
103 sink.null_map[sink.current_offset] = *slice.null_map;
104 else
105 memcpySmallAllowReadWriteOverflow15(&sink.null_map[sink.current_offset], slice.null_map, slice.size * sizeof(UInt8));
106
107 writeSlice(static_cast<const Slice &>(slice), static_cast<ArraySink &>(sink));
108}
109
110template <typename Slice, typename ArraySink>
111inline ALWAYS_INLINE void writeSlice(const Slice & slice, NullableArraySink<ArraySink> & sink)
112{
113 sink.null_map.resize(sink.current_offset + slice.size);
114
115 if (slice.size == 1) /// Always true for ValueSlice.
116 sink.null_map[sink.current_offset] = 0;
117 else if (slice.size)
118 memset(&sink.null_map[sink.current_offset], 0, slice.size * sizeof(UInt8));
119
120 writeSlice(slice, static_cast<ArraySink &>(sink));
121}
122
123
124template <typename T, typename U>
125void writeSlice(const NumericValueSlice<T> & slice, NumericArraySink<U> & sink)
126{
127 sink.elements.resize(sink.current_offset + 1);
128 sink.elements[sink.current_offset] = slice.value;
129 ++sink.current_offset;
130}
131
132/// Assuming same types of underlying columns for slice and sink if (ArraySlice, ArraySink) is (GenericValueSlice, GenericArraySink).
133inline ALWAYS_INLINE void writeSlice(const GenericValueSlice & slice, GenericArraySink & sink)
134{
135 if (slice.elements->structureEquals(sink.elements))
136 {
137 sink.elements.insertFrom(*slice.elements, slice.position);
138 ++sink.current_offset;
139 }
140 else
141 throw Exception("Function writeSlice expect same column types for GenericValueSlice and GenericArraySink.",
142 ErrorCodes::LOGICAL_ERROR);
143}
144
145template <typename T>
146inline ALWAYS_INLINE void writeSlice(const GenericValueSlice & slice, NumericArraySink<T> & sink)
147{
148 sink.elements.resize(sink.current_offset + 1);
149
150 Field field;
151 slice.elements->get(slice.position, field);
152 sink.elements.push_back(applyVisitor(FieldVisitorConvertToNumber<T>(), field));
153 ++sink.current_offset;
154}
155
156template <typename T>
157inline ALWAYS_INLINE void writeSlice(const NumericValueSlice<T> & slice, GenericArraySink & sink)
158{
159 Field field = T(slice.value);
160 sink.elements.insert(field);
161 ++sink.current_offset;
162}
163
164
165
166template <typename SourceA, typename SourceB, typename Sink>
167void NO_INLINE concat(SourceA && src_a, SourceB && src_b, Sink && sink)
168{
169 sink.reserve(src_a.getSizeForReserve() + src_b.getSizeForReserve());
170
171 while (!src_a.isEnd())
172 {
173 writeSlice(src_a.getWhole(), sink);
174 writeSlice(src_b.getWhole(), sink);
175
176 sink.next();
177 src_a.next();
178 src_b.next();
179 }
180}
181
182template <typename Source, typename Sink>
183void concat(const std::vector<std::unique_ptr<IArraySource>> & array_sources, Sink && sink)
184{
185 size_t sources_num = array_sources.size();
186 std::vector<char> is_const(sources_num);
187
188 auto checkAndGetSizeToReserve = [] (auto source, IArraySource * array_source)
189 {
190 if (source == nullptr)
191 throw Exception("Concat function expected " + demangle(typeid(Source).name()) + " or "
192 + demangle(typeid(ConstSource<Source>).name()) + " but got "
193 + demangle(typeid(*array_source).name()), ErrorCodes::LOGICAL_ERROR);
194 return source->getSizeForReserve();
195 };
196
197 size_t size_to_reserve = 0;
198 for (auto i : ext::range(0, sources_num))
199 {
200 auto & source = array_sources[i];
201 is_const[i] = source->isConst();
202 if (is_const[i])
203 size_to_reserve += checkAndGetSizeToReserve(typeid_cast<ConstSource<Source> *>(source.get()), source.get());
204 else
205 size_to_reserve += checkAndGetSizeToReserve(typeid_cast<Source *>(source.get()), source.get());
206 }
207
208 sink.reserve(size_to_reserve);
209
210 auto writeNext = [& sink] (auto source)
211 {
212 writeSlice(source->getWhole(), sink);
213 source->next();
214 };
215
216 while (!sink.isEnd())
217 {
218 for (auto i : ext::range(0, sources_num))
219 {
220 auto & source = array_sources[i];
221 if (is_const[i])
222 writeNext(static_cast<ConstSource<Source> *>(source.get()));
223 else
224 writeNext(static_cast<Source *>(source.get()));
225 }
226 sink.next();
227 }
228}
229
230template <typename Sink>
231void NO_INLINE concat(StringSources & sources, Sink && sink)
232{
233 while (!sink.isEnd())
234 {
235 for (auto & source : sources)
236 {
237 writeSlice(source->getWhole(), sink);
238 source->next();
239 }
240 sink.next();
241 }
242}
243
244
245template <typename Source, typename Sink>
246void NO_INLINE sliceFromLeftConstantOffsetUnbounded(Source && src, Sink && sink, size_t offset)
247{
248 while (!src.isEnd())
249 {
250 writeSlice(src.getSliceFromLeft(offset), sink);
251 sink.next();
252 src.next();
253 }
254}
255
256template <typename Source, typename Sink>
257void NO_INLINE sliceFromLeftConstantOffsetBounded(Source && src, Sink && sink, size_t offset, ssize_t length)
258{
259 while (!src.isEnd())
260 {
261 ssize_t size = length;
262 if (size < 0)
263 size += static_cast<ssize_t>(src.getElementSize()) - offset;
264
265 if (size > 0)
266 writeSlice(src.getSliceFromLeft(offset, size), sink);
267
268 sink.next();
269 src.next();
270 }
271}
272
273template <typename Source, typename Sink>
274void NO_INLINE sliceFromRightConstantOffsetUnbounded(Source && src, Sink && sink, size_t offset)
275{
276 while (!src.isEnd())
277 {
278 writeSlice(src.getSliceFromRight(offset), sink);
279 sink.next();
280 src.next();
281 }
282}
283
284template <typename Source, typename Sink>
285void NO_INLINE sliceFromRightConstantOffsetBounded(Source && src, Sink && sink, size_t offset, ssize_t length)
286{
287 while (!src.isEnd())
288 {
289 ssize_t size = length;
290 if (size < 0)
291 size += static_cast<ssize_t>(src.getElementSize()) - offset;
292
293 if (size > 0)
294 writeSlice(src.getSliceFromRight(offset, size), sink);
295
296 sink.next();
297 src.next();
298 }
299}
300
301template <typename Source, typename Sink>
302void NO_INLINE sliceDynamicOffsetUnbounded(Source && src, Sink && sink, const IColumn & offset_column)
303{
304 const bool is_null = offset_column.onlyNull();
305 const auto * nullable = typeid_cast<const ColumnNullable *>(&offset_column);
306 const ColumnUInt8::Container * null_map = nullable ? &nullable->getNullMapData() : nullptr;
307 const IColumn * nested_column = nullable ? &nullable->getNestedColumn() : &offset_column;
308
309 while (!src.isEnd())
310 {
311 auto row_num = src.rowNum();
312 bool has_offset = !is_null && !(null_map && (*null_map)[row_num]);
313 Int64 offset = has_offset ? nested_column->getInt(row_num) : 1;
314
315 if (offset != 0)
316 {
317 typename std::decay_t<Source>::Slice slice;
318
319 if (offset > 0)
320 slice = src.getSliceFromLeft(offset - 1);
321 else
322 slice = src.getSliceFromRight(-offset);
323
324 writeSlice(slice, sink);
325 }
326
327 sink.next();
328 src.next();
329 }
330}
331
332template <typename Source, typename Sink>
333void NO_INLINE sliceDynamicOffsetBounded(Source && src, Sink && sink, const IColumn & offset_column, const IColumn & length_column)
334{
335 const bool is_offset_null = offset_column.onlyNull();
336 const auto * offset_nullable = typeid_cast<const ColumnNullable *>(&offset_column);
337 const ColumnUInt8::Container * offset_null_map = offset_nullable ? &offset_nullable->getNullMapData() : nullptr;
338 const IColumn * offset_nested_column = offset_nullable ? &offset_nullable->getNestedColumn() : &offset_column;
339
340 const bool is_length_null = length_column.onlyNull();
341 const auto * length_nullable = typeid_cast<const ColumnNullable *>(&length_column);
342 const ColumnUInt8::Container * length_null_map = length_nullable ? &length_nullable->getNullMapData() : nullptr;
343 const IColumn * length_nested_column = length_nullable ? &length_nullable->getNestedColumn() : &length_column;
344
345 while (!src.isEnd())
346 {
347 size_t row_num = src.rowNum();
348 bool has_offset = !is_offset_null && !(offset_null_map && (*offset_null_map)[row_num]);
349 bool has_length = !is_length_null && !(length_null_map && (*length_null_map)[row_num]);
350 Int64 offset = has_offset ? offset_nested_column->getInt(row_num) : 1;
351 Int64 size = has_length ? length_nested_column->getInt(row_num) : static_cast<Int64>(src.getElementSize());
352
353 if (size < 0)
354 size += offset > 0 ? static_cast<Int64>(src.getElementSize()) - (offset - 1) : -offset;
355
356 if (offset != 0 && size > 0)
357 {
358 typename std::decay_t<Source>::Slice slice;
359
360 if (offset > 0)
361 slice = src.getSliceFromLeft(offset - 1, size);
362 else
363 slice = src.getSliceFromRight(-offset, size);
364
365 writeSlice(slice, sink);
366 }
367
368 sink.next();
369 src.next();
370 }
371}
372
373
374template <typename SourceA, typename SourceB, typename Sink>
375void NO_INLINE conditional(SourceA && src_a, SourceB && src_b, Sink && sink, const PaddedPODArray<UInt8> & condition)
376{
377 sink.reserve(std::max(src_a.getSizeForReserve(), src_b.getSizeForReserve()));
378
379 const UInt8 * cond_pos = condition.data();
380 const UInt8 * cond_end = cond_pos + condition.size();
381
382 while (cond_pos < cond_end)
383 {
384 if (*cond_pos)
385 writeSlice(src_a.getWhole(), sink);
386 else
387 writeSlice(src_b.getWhole(), sink);
388
389 ++cond_pos;
390 src_a.next();
391 src_b.next();
392 sink.next();
393 }
394}
395
396
397/// Methods to check if first array has elements from second array, overloaded for various combinations of types.
398
399template <bool all, typename FirstSliceType, typename SecondSliceType,
400 bool (*isEqual)(const FirstSliceType &, const SecondSliceType &, size_t, size_t)>
401bool sliceHasImpl(const FirstSliceType & first, const SecondSliceType & second,
402 const UInt8 * first_null_map, const UInt8 * second_null_map)
403{
404 const bool has_first_null_map = first_null_map != nullptr;
405 const bool has_second_null_map = second_null_map != nullptr;
406
407 for (size_t i = 0; i < second.size; ++i)
408 {
409 bool has = false;
410 for (size_t j = 0; j < first.size && !has; ++j)
411 {
412 const bool is_first_null = has_first_null_map && first_null_map[j];
413 const bool is_second_null = has_second_null_map && second_null_map[i];
414
415 if (is_first_null && is_second_null)
416 has = true;
417
418 if (!is_first_null && !is_second_null && isEqual(first, second, j, i))
419 has = true;
420 }
421
422 if (has && !all)
423 return true;
424
425 if (!has && all)
426 return false;
427
428 }
429
430 return all;
431}
432
433template <typename T, typename U>
434bool sliceEqualElements(const NumericArraySlice<T> & first [[maybe_unused]],
435 const NumericArraySlice<U> & second [[maybe_unused]],
436 size_t first_ind [[maybe_unused]],
437 size_t second_ind [[maybe_unused]])
438{
439 /// TODO: Decimal scale
440 if constexpr (IsDecimalNumber<T> && IsDecimalNumber<U>)
441 return accurate::equalsOp(typename T::NativeType(first.data[first_ind]), typename U::NativeType(second.data[second_ind]));
442 else if constexpr (IsDecimalNumber<T> || IsDecimalNumber<U>)
443 return false;
444 else
445 return accurate::equalsOp(first.data[first_ind], second.data[second_ind]);
446}
447
448template <typename T>
449bool sliceEqualElements(const NumericArraySlice<T> &, const GenericArraySlice &, size_t, size_t)
450{
451 return false;
452}
453
454template <typename U>
455bool sliceEqualElements(const GenericArraySlice &, const NumericArraySlice<U> &, size_t, size_t)
456{
457 return false;
458}
459
460inline ALWAYS_INLINE bool sliceEqualElements(const GenericArraySlice & first, const GenericArraySlice & second, size_t first_ind, size_t second_ind)
461{
462 return first.elements->compareAt(first_ind + first.begin, second_ind + second.begin, *second.elements, -1) == 0;
463}
464
465template <bool all, typename T, typename U>
466bool sliceHas(const NumericArraySlice<T> & first, const NumericArraySlice<U> & second)
467{
468 auto impl = sliceHasImpl<all, NumericArraySlice<T>, NumericArraySlice<U>, sliceEqualElements<T, U>>;
469 return impl(first, second, nullptr, nullptr);
470}
471
472template <bool all>
473bool sliceHas(const GenericArraySlice & first, const GenericArraySlice & second)
474{
475 /// Generic arrays should have the same type in order to use column.compareAt(...)
476 if (!first.elements->structureEquals(*second.elements))
477 return false;
478
479 auto impl = sliceHasImpl<all, GenericArraySlice, GenericArraySlice, sliceEqualElements>;
480 return impl(first, second, nullptr, nullptr);
481}
482
483template <bool all, typename U>
484bool sliceHas(const GenericArraySlice & /*first*/, const NumericArraySlice<U> & /*second*/)
485{
486 return false;
487}
488
489template <bool all, typename T>
490bool sliceHas(const NumericArraySlice<T> & /*first*/, const GenericArraySlice & /*second*/)
491{
492 return false;
493}
494
495template <bool all, typename FirstArraySlice, typename SecondArraySlice>
496bool sliceHas(const FirstArraySlice & first, NullableSlice<SecondArraySlice> & second)
497{
498 auto impl = sliceHasImpl<all, FirstArraySlice, SecondArraySlice, sliceEqualElements<FirstArraySlice, SecondArraySlice>>;
499 return impl(first, second, nullptr, second.null_map);
500}
501
502template <bool all, typename FirstArraySlice, typename SecondArraySlice>
503bool sliceHas(const NullableSlice<FirstArraySlice> & first, SecondArraySlice & second)
504{
505 auto impl = sliceHasImpl<all, FirstArraySlice, SecondArraySlice, sliceEqualElements<FirstArraySlice, SecondArraySlice>>;
506 return impl(first, second, first.null_map, nullptr);
507}
508
509template <bool all, typename FirstArraySlice, typename SecondArraySlice>
510bool sliceHas(const NullableSlice<FirstArraySlice> & first, NullableSlice<SecondArraySlice> & second)
511{
512 auto impl = sliceHasImpl<all, FirstArraySlice, SecondArraySlice, sliceEqualElements<FirstArraySlice, SecondArraySlice>>;
513 return impl(first, second, first.null_map, second.null_map);
514}
515
516template <bool all, typename FirstSource, typename SecondSource>
517void NO_INLINE arrayAllAny(FirstSource && first, SecondSource && second, ColumnUInt8 & result)
518{
519 auto size = result.size();
520 auto & data = result.getData();
521 for (auto row : ext::range(0, size))
522 {
523 data[row] = static_cast<UInt8>(sliceHas<all>(first.getWhole(), second.getWhole()) ? 1 : 0);
524 first.next();
525 second.next();
526 }
527}
528
529template <typename ArraySource, typename ValueSource, typename Sink>
530void resizeDynamicSize(ArraySource && array_source, ValueSource && value_source, Sink && sink, const IColumn & size_column)
531{
532 const auto * size_nullable = typeid_cast<const ColumnNullable *>(&size_column);
533 const NullMap * size_null_map = size_nullable ? &size_nullable->getNullMapData() : nullptr;
534 const IColumn * size_nested_column = size_nullable ? &size_nullable->getNestedColumn() : &size_column;
535
536 while (!sink.isEnd())
537 {
538 size_t row_num = array_source.rowNum();
539 bool has_size = !size_null_map || (*size_null_map)[row_num];
540
541 if (has_size)
542 {
543 auto size = size_nested_column->getInt(row_num);
544 auto array_size = array_source.getElementSize();
545
546 if (size >= 0)
547 {
548 auto length = static_cast<size_t>(size);
549 if (array_size <= length)
550 {
551 writeSlice(array_source.getWhole(), sink);
552 for (size_t i = array_size; i < length; ++i)
553 writeSlice(value_source.getWhole(), sink);
554 }
555 else
556 writeSlice(array_source.getSliceFromLeft(0, length), sink);
557 }
558 else
559 {
560 auto length = static_cast<size_t>(-size);
561 if (array_size <= length)
562 {
563 for (size_t i = array_size; i < length; ++i)
564 writeSlice(value_source.getWhole(), sink);
565 writeSlice(array_source.getWhole(), sink);
566 }
567 else
568 writeSlice(array_source.getSliceFromRight(length, length), sink);
569 }
570 }
571 else
572 writeSlice(array_source.getWhole(), sink);
573
574 value_source.next();
575 array_source.next();
576 sink.next();
577 }
578}
579
580template <typename ArraySource, typename ValueSource, typename Sink>
581void resizeConstantSize(ArraySource && array_source, ValueSource && value_source, Sink && sink, const ssize_t size)
582{
583 while (!sink.isEnd())
584 {
585 auto array_size = array_source.getElementSize();
586
587 if (size >= 0)
588 {
589 auto length = static_cast<size_t>(size);
590 if (array_size <= length)
591 {
592 writeSlice(array_source.getWhole(), sink);
593 for (size_t i = array_size; i < length; ++i)
594 writeSlice(value_source.getWhole(), sink);
595 }
596 else
597 writeSlice(array_source.getSliceFromLeft(0, length), sink);
598 }
599 else
600 {
601 auto length = static_cast<size_t>(-size);
602 if (array_size <= length)
603 {
604 for (size_t i = array_size; i < length; ++i)
605 writeSlice(value_source.getWhole(), sink);
606 writeSlice(array_source.getWhole(), sink);
607 }
608 else
609 writeSlice(array_source.getSliceFromRight(length, length), sink);
610 }
611
612 value_source.next();
613 array_source.next();
614 sink.next();
615 }
616}
617
618}
619