1#include <Functions/IFunctionImpl.h>
2#include <Functions/FunctionFactory.h>
3#include <Functions/FunctionHelpers.h>
4#include <DataTypes/DataTypeArray.h>
5#include <Columns/ColumnArray.h>
6#include <Columns/ColumnNullable.h>
7#include <Columns/ColumnString.h>
8#include <Columns/ColumnFixedString.h>
9#include <Common/typeid_cast.h>
10#include <Common/assert_cast.h>
11
12
13namespace DB
14{
15
16namespace ErrorCodes
17{
18 extern const int ILLEGAL_COLUMN;
19 extern const int ILLEGAL_TYPE_OF_ARGUMENT;
20 extern const int LOGICAL_ERROR;
21}
22
23
24class FunctionArrayReverse : public IFunction
25{
26public:
27 static constexpr auto name = "arrayReverse";
28 static FunctionPtr create(const Context &) { return std::make_shared<FunctionArrayReverse>(); }
29
30 String getName() const override { return name; }
31
32 size_t getNumberOfArguments() const override { return 1; }
33 bool useDefaultImplementationForConstants() const override { return true; }
34
35 DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
36 {
37 const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[0].get());
38 if (!array_type)
39 throw Exception("Argument for function " + getName() + " must be array.",
40 ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
41
42 return arguments[0];
43 }
44
45 void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t) override;
46
47private:
48 template <typename T>
49 bool executeNumber(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data);
50
51 bool executeFixedString(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data);
52 bool executeString(const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets, IColumn & res_data);
53 bool executeGeneric(const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets, IColumn & res_data);
54};
55
56
57void FunctionArrayReverse::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t)
58{
59 const ColumnArray * array = checkAndGetColumn<ColumnArray>(block.getByPosition(arguments[0]).column.get());
60 if (!array)
61 throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),
62 ErrorCodes::ILLEGAL_COLUMN);
63
64 auto res_ptr = array->cloneEmpty();
65 ColumnArray & res = assert_cast<ColumnArray &>(*res_ptr);
66 res.getOffsetsPtr() = array->getOffsetsPtr();
67
68 const IColumn & src_data = array->getData();
69 const ColumnArray::Offsets & offsets = array->getOffsets();
70
71 IColumn & res_data = res.getData();
72
73 const ColumnNullable * src_nullable_col = typeid_cast<const ColumnNullable *>(&src_data);
74 ColumnNullable * res_nullable_col = typeid_cast<ColumnNullable *>(&res_data);
75
76 const IColumn * src_inner_col = src_nullable_col ? &src_nullable_col->getNestedColumn() : &src_data;
77 IColumn * res_inner_col = res_nullable_col ? &res_nullable_col->getNestedColumn() : &res_data;
78
79 false
80 || executeNumber<UInt8>(*src_inner_col, offsets, *res_inner_col)
81 || executeNumber<UInt16>(*src_inner_col, offsets, *res_inner_col)
82 || executeNumber<UInt32>(*src_inner_col, offsets, *res_inner_col)
83 || executeNumber<UInt64>(*src_inner_col, offsets, *res_inner_col)
84 || executeNumber<Int8>(*src_inner_col, offsets, *res_inner_col)
85 || executeNumber<Int16>(*src_inner_col, offsets, *res_inner_col)
86 || executeNumber<Int32>(*src_inner_col, offsets, *res_inner_col)
87 || executeNumber<Int64>(*src_inner_col, offsets, *res_inner_col)
88 || executeNumber<Float32>(*src_inner_col, offsets, *res_inner_col)
89 || executeNumber<Float64>(*src_inner_col, offsets, *res_inner_col)
90 || executeString(*src_inner_col, offsets, *res_inner_col)
91 || executeFixedString(*src_inner_col, offsets, *res_inner_col)
92 || executeGeneric(*src_inner_col, offsets, *res_inner_col);
93
94 if (src_nullable_col)
95 if (!executeNumber<UInt8>(src_nullable_col->getNullMapColumn(), offsets, res_nullable_col->getNullMapColumn()))
96 throw Exception("Illegal column " + src_nullable_col->getNullMapColumn().getName()
97 + " of null map of the first argument of function " + getName(),
98 ErrorCodes::ILLEGAL_COLUMN);
99
100 block.getByPosition(result).column = std::move(res_ptr);
101}
102
103
104bool FunctionArrayReverse::executeGeneric(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data)
105{
106 size_t size = src_offsets.size();
107 res_data.reserve(size);
108
109 ColumnArray::Offset src_prev_offset = 0;
110 for (size_t i = 0; i < size; ++i)
111 {
112 ssize_t src_index = src_offsets[i] - 1;
113
114 while (src_index >= ssize_t(src_prev_offset))
115 {
116 res_data.insertFrom(src_data, src_index);
117 --src_index;
118 }
119
120 src_prev_offset = src_offsets[i];
121 }
122
123 return true;
124}
125
126template <typename T>
127bool FunctionArrayReverse::executeNumber(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data)
128{
129 if (const ColumnVector<T> * src_data_concrete = checkAndGetColumn<ColumnVector<T>>(&src_data))
130 {
131 const PaddedPODArray<T> & src_vec = src_data_concrete->getData();
132 PaddedPODArray<T> & res_vec = typeid_cast<ColumnVector<T> &>(res_data).getData();
133 res_vec.resize(src_data.size());
134
135 size_t size = src_offsets.size();
136 ColumnArray::Offset src_prev_offset = 0;
137
138 for (size_t i = 0; i < size; ++i)
139 {
140 const auto * src = &src_vec[src_prev_offset];
141 const auto * src_end = &src_vec[src_offsets[i]];
142
143 if (src == src_end)
144 continue;
145
146 auto dst = &res_vec[src_offsets[i] - 1];
147
148 while (src < src_end)
149 {
150 *dst = *src;
151 ++src;
152 --dst;
153 }
154
155 src_prev_offset = src_offsets[i];
156 }
157
158 return true;
159 }
160 else
161 return false;
162}
163
164bool FunctionArrayReverse::executeFixedString(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data)
165{
166 if (const ColumnFixedString * src_data_concrete = checkAndGetColumn<ColumnFixedString>(&src_data))
167 {
168 const size_t n = src_data_concrete->getN();
169 const ColumnFixedString::Chars & src_data_chars = src_data_concrete->getChars();
170 ColumnFixedString::Chars & res_chars = typeid_cast<ColumnFixedString &>(res_data).getChars();
171 size_t size = src_offsets.size();
172 res_chars.resize(src_data_chars.size());
173
174 ColumnArray::Offset src_prev_offset = 0;
175
176 for (size_t i = 0; i < size; ++i)
177 {
178 const UInt8 * src = &src_data_chars[src_prev_offset * n];
179 const UInt8 * src_end = &src_data_chars[src_offsets[i] * n];
180
181 if (src == src_end)
182 continue;
183
184 UInt8 * dst = &res_chars[src_offsets[i] * n - n];
185
186 while (src < src_end)
187 {
188 /// NOTE: memcpySmallAllowReadWriteOverflow15 doesn't work correctly here.
189 memcpy(dst, src, n);
190 src += n;
191 dst -= n;
192 }
193
194 src_prev_offset = src_offsets[i];
195 }
196 return true;
197 }
198 else
199 return false;
200}
201
202bool FunctionArrayReverse::executeString(const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets, IColumn & res_data)
203{
204 if (const ColumnString * src_data_concrete = checkAndGetColumn<ColumnString>(&src_data))
205 {
206 const ColumnString::Offsets & src_string_offsets = src_data_concrete->getOffsets();
207 ColumnString::Offsets & res_string_offsets = typeid_cast<ColumnString &>(res_data).getOffsets();
208
209 const ColumnString::Chars & src_data_chars = src_data_concrete->getChars();
210 ColumnString::Chars & res_chars = typeid_cast<ColumnString &>(res_data).getChars();
211
212 size_t size = src_array_offsets.size();
213 res_string_offsets.resize(src_string_offsets.size());
214 res_chars.resize(src_data_chars.size());
215
216 ColumnArray::Offset src_array_prev_offset = 0;
217 ColumnString::Offset res_string_prev_offset = 0;
218
219 for (size_t i = 0; i < size; ++i)
220 {
221 if (src_array_offsets[i] != src_array_prev_offset)
222 {
223 size_t array_size = src_array_offsets[i] - src_array_prev_offset;
224
225 for (size_t j = 0; j < array_size; ++j)
226 {
227 size_t j_reversed = array_size - j - 1;
228
229 auto src_pos = src_string_offsets[src_array_prev_offset + j_reversed - 1];
230 size_t string_size = src_string_offsets[src_array_prev_offset + j_reversed] - src_pos;
231
232 memcpySmallAllowReadWriteOverflow15(&res_chars[res_string_prev_offset], &src_data_chars[src_pos], string_size);
233
234 res_string_prev_offset += string_size;
235 res_string_offsets[src_array_prev_offset + j] = res_string_prev_offset;
236 }
237 }
238
239 src_array_prev_offset = src_array_offsets[i];
240 }
241
242 return true;
243 }
244 else
245 return false;
246}
247
248
249void registerFunctionArrayReverse(FunctionFactory & factory)
250{
251 factory.registerFunction<FunctionArrayReverse>();
252}
253
254}
255