1 | #pragma once |
2 | |
3 | #include "IArraySink.h" |
4 | |
5 | #include <Columns/ColumnVector.h> |
6 | #include <Columns/ColumnDecimal.h> |
7 | #include <Columns/ColumnArray.h> |
8 | #include <Columns/ColumnString.h> |
9 | #include <Columns/ColumnFixedString.h> |
10 | #include <Columns/ColumnConst.h> |
11 | #include <Columns/ColumnNullable.h> |
12 | |
13 | #include <Common/typeid_cast.h> |
14 | |
15 | namespace DB::GatherUtils |
16 | { |
17 | |
18 | template <typename T> |
19 | struct NumericArraySource; |
20 | |
21 | struct GenericArraySource; |
22 | |
23 | template <typename ArraySource> |
24 | struct NullableArraySource; |
25 | |
26 | template <typename T> |
27 | struct NumericValueSource; |
28 | |
29 | struct GenericValueSource; |
30 | |
31 | template <typename ArraySource> |
32 | struct NullableValueSource; |
33 | |
34 | template <typename T> |
35 | struct NumericArraySink : public ArraySinkImpl<NumericArraySink<T>> |
36 | { |
37 | using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; |
38 | using CompatibleArraySource = NumericArraySource<T>; |
39 | using CompatibleValueSource = NumericValueSource<T>; |
40 | |
41 | typename ColVecType::Container & elements; |
42 | typename ColumnArray::Offsets & offsets; |
43 | |
44 | size_t row_num = 0; |
45 | ColumnArray::Offset current_offset = 0; |
46 | |
47 | NumericArraySink(ColumnArray & arr, size_t column_size) |
48 | : elements(typeid_cast<ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets()) |
49 | { |
50 | offsets.resize(column_size); |
51 | } |
52 | |
53 | void next() |
54 | { |
55 | offsets[row_num] = current_offset; |
56 | ++row_num; |
57 | } |
58 | |
59 | bool isEnd() const |
60 | { |
61 | return row_num == offsets.size(); |
62 | } |
63 | |
64 | size_t rowNum() const |
65 | { |
66 | return row_num; |
67 | } |
68 | |
69 | void reserve(size_t num_elements) |
70 | { |
71 | elements.reserve(num_elements); |
72 | } |
73 | }; |
74 | |
75 | |
76 | struct StringSink |
77 | { |
78 | typename ColumnString::Chars & elements; |
79 | typename ColumnString::Offsets & offsets; |
80 | |
81 | size_t row_num = 0; |
82 | ColumnString::Offset current_offset = 0; |
83 | |
84 | StringSink(ColumnString & col, size_t column_size) |
85 | : elements(col.getChars()), offsets(col.getOffsets()) |
86 | { |
87 | offsets.resize(column_size); |
88 | } |
89 | |
90 | void ALWAYS_INLINE next() |
91 | { |
92 | elements.push_back(0); |
93 | ++current_offset; |
94 | offsets[row_num] = current_offset; |
95 | ++row_num; |
96 | } |
97 | |
98 | bool isEnd() const |
99 | { |
100 | return row_num == offsets.size(); |
101 | } |
102 | |
103 | size_t rowNum() const |
104 | { |
105 | return row_num; |
106 | } |
107 | |
108 | void reserve(size_t num_elements) |
109 | { |
110 | elements.reserve(num_elements); |
111 | } |
112 | }; |
113 | |
114 | |
115 | struct FixedStringSink |
116 | { |
117 | typename ColumnString::Chars & elements; |
118 | size_t string_size; |
119 | |
120 | size_t row_num = 0; |
121 | size_t total_rows; |
122 | ColumnString::Offset current_offset = 0; |
123 | |
124 | FixedStringSink(ColumnFixedString & col, size_t column_size) |
125 | : elements(col.getChars()), string_size(col.getN()), total_rows(column_size) |
126 | { |
127 | elements.resize(column_size * string_size); |
128 | } |
129 | |
130 | void next() |
131 | { |
132 | current_offset += string_size; |
133 | ++row_num; |
134 | } |
135 | |
136 | bool isEnd() const |
137 | { |
138 | return row_num == total_rows; |
139 | } |
140 | |
141 | size_t rowNum() const |
142 | { |
143 | return row_num; |
144 | } |
145 | |
146 | void reserve(size_t num_elements) |
147 | { |
148 | elements.reserve(num_elements); |
149 | } |
150 | }; |
151 | |
152 | |
153 | struct GenericArraySink : public ArraySinkImpl<GenericArraySink> |
154 | { |
155 | using CompatibleArraySource = GenericArraySource; |
156 | using CompatibleValueSource = GenericValueSource; |
157 | |
158 | IColumn & elements; |
159 | ColumnArray::Offsets & offsets; |
160 | |
161 | size_t row_num = 0; |
162 | ColumnArray::Offset current_offset = 0; |
163 | |
164 | GenericArraySink(ColumnArray & arr, size_t column_size) |
165 | : elements(arr.getData()), offsets(arr.getOffsets()) |
166 | { |
167 | offsets.resize(column_size); |
168 | } |
169 | |
170 | void next() |
171 | { |
172 | offsets[row_num] = current_offset; |
173 | ++row_num; |
174 | } |
175 | |
176 | bool isEnd() const |
177 | { |
178 | return row_num == offsets.size(); |
179 | } |
180 | |
181 | size_t rowNum() const |
182 | { |
183 | return row_num; |
184 | } |
185 | |
186 | void reserve(size_t num_elements) |
187 | { |
188 | elements.reserve(num_elements); |
189 | } |
190 | }; |
191 | |
192 | |
193 | template <typename ArraySink> |
194 | struct NullableArraySink : public ArraySink |
195 | { |
196 | using CompatibleArraySource = NullableArraySource<typename ArraySink::CompatibleArraySource>; |
197 | using CompatibleValueSource = NullableValueSource<typename ArraySink::CompatibleValueSource>; |
198 | |
199 | NullMap & null_map; |
200 | |
201 | NullableArraySink(ColumnArray & arr, NullMap & null_map_, size_t column_size) |
202 | : ArraySink(arr, column_size), null_map(null_map_) |
203 | { |
204 | } |
205 | |
206 | void accept(ArraySinkVisitor & visitor) override { visitor.visit(*this); } |
207 | |
208 | void reserve(size_t num_elements) |
209 | { |
210 | ArraySink::reserve(num_elements); |
211 | null_map.reserve(num_elements); |
212 | } |
213 | }; |
214 | |
215 | |
216 | } |
217 | |