1#pragma once
2
3#include "IArraySink.h"
4
5#include <Columns/ColumnVector.h>
6#include <Columns/ColumnDecimal.h>
7#include <Columns/ColumnArray.h>
8#include <Columns/ColumnString.h>
9#include <Columns/ColumnFixedString.h>
10#include <Columns/ColumnConst.h>
11#include <Columns/ColumnNullable.h>
12
13#include <Common/typeid_cast.h>
14
15namespace DB::GatherUtils
16{
17
18template <typename T>
19struct NumericArraySource;
20
21struct GenericArraySource;
22
23template <typename ArraySource>
24struct NullableArraySource;
25
26template <typename T>
27struct NumericValueSource;
28
29struct GenericValueSource;
30
31template <typename ArraySource>
32struct NullableValueSource;
33
34template <typename T>
35struct NumericArraySink : public ArraySinkImpl<NumericArraySink<T>>
36{
37 using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
38 using CompatibleArraySource = NumericArraySource<T>;
39 using CompatibleValueSource = NumericValueSource<T>;
40
41 typename ColVecType::Container & elements;
42 typename ColumnArray::Offsets & offsets;
43
44 size_t row_num = 0;
45 ColumnArray::Offset current_offset = 0;
46
47 NumericArraySink(ColumnArray & arr, size_t column_size)
48 : elements(typeid_cast<ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets())
49 {
50 offsets.resize(column_size);
51 }
52
53 void next()
54 {
55 offsets[row_num] = current_offset;
56 ++row_num;
57 }
58
59 bool isEnd() const
60 {
61 return row_num == offsets.size();
62 }
63
64 size_t rowNum() const
65 {
66 return row_num;
67 }
68
69 void reserve(size_t num_elements)
70 {
71 elements.reserve(num_elements);
72 }
73};
74
75
76struct StringSink
77{
78 typename ColumnString::Chars & elements;
79 typename ColumnString::Offsets & offsets;
80
81 size_t row_num = 0;
82 ColumnString::Offset current_offset = 0;
83
84 StringSink(ColumnString & col, size_t column_size)
85 : elements(col.getChars()), offsets(col.getOffsets())
86 {
87 offsets.resize(column_size);
88 }
89
90 void ALWAYS_INLINE next()
91 {
92 elements.push_back(0);
93 ++current_offset;
94 offsets[row_num] = current_offset;
95 ++row_num;
96 }
97
98 bool isEnd() const
99 {
100 return row_num == offsets.size();
101 }
102
103 size_t rowNum() const
104 {
105 return row_num;
106 }
107
108 void reserve(size_t num_elements)
109 {
110 elements.reserve(num_elements);
111 }
112};
113
114
115struct FixedStringSink
116{
117 typename ColumnString::Chars & elements;
118 size_t string_size;
119
120 size_t row_num = 0;
121 size_t total_rows;
122 ColumnString::Offset current_offset = 0;
123
124 FixedStringSink(ColumnFixedString & col, size_t column_size)
125 : elements(col.getChars()), string_size(col.getN()), total_rows(column_size)
126 {
127 elements.resize(column_size * string_size);
128 }
129
130 void next()
131 {
132 current_offset += string_size;
133 ++row_num;
134 }
135
136 bool isEnd() const
137 {
138 return row_num == total_rows;
139 }
140
141 size_t rowNum() const
142 {
143 return row_num;
144 }
145
146 void reserve(size_t num_elements)
147 {
148 elements.reserve(num_elements);
149 }
150};
151
152
153struct GenericArraySink : public ArraySinkImpl<GenericArraySink>
154{
155 using CompatibleArraySource = GenericArraySource;
156 using CompatibleValueSource = GenericValueSource;
157
158 IColumn & elements;
159 ColumnArray::Offsets & offsets;
160
161 size_t row_num = 0;
162 ColumnArray::Offset current_offset = 0;
163
164 GenericArraySink(ColumnArray & arr, size_t column_size)
165 : elements(arr.getData()), offsets(arr.getOffsets())
166 {
167 offsets.resize(column_size);
168 }
169
170 void next()
171 {
172 offsets[row_num] = current_offset;
173 ++row_num;
174 }
175
176 bool isEnd() const
177 {
178 return row_num == offsets.size();
179 }
180
181 size_t rowNum() const
182 {
183 return row_num;
184 }
185
186 void reserve(size_t num_elements)
187 {
188 elements.reserve(num_elements);
189 }
190};
191
192
193template <typename ArraySink>
194struct NullableArraySink : public ArraySink
195{
196 using CompatibleArraySource = NullableArraySource<typename ArraySink::CompatibleArraySource>;
197 using CompatibleValueSource = NullableValueSource<typename ArraySink::CompatibleValueSource>;
198
199 NullMap & null_map;
200
201 NullableArraySink(ColumnArray & arr, NullMap & null_map_, size_t column_size)
202 : ArraySink(arr, column_size), null_map(null_map_)
203 {
204 }
205
206 void accept(ArraySinkVisitor & visitor) override { visitor.visit(*this); }
207
208 void reserve(size_t num_elements)
209 {
210 ArraySink::reserve(num_elements);
211 null_map.reserve(num_elements);
212 }
213};
214
215
216}
217