1#pragma once
2
3#include <IO/WriteHelpers.h>
4#include <IO/ReadHelpers.h>
5
6#include <DataTypes/DataTypeArray.h>
7#include <DataTypes/DataTypesNumber.h>
8
9#include <Columns/ColumnArray.h>
10#include <Columns/ColumnVector.h>
11
12#include <Common/FieldVisitors.h>
13#include <Common/assert_cast.h>
14#include <Interpreters/convertFieldToType.h>
15
16#include <AggregateFunctions/IAggregateFunction.h>
17
18#define AGGREGATE_FUNCTION_GROUP_ARRAY_INSERT_AT_MAX_SIZE 0xFFFFFF
19
20
21namespace DB
22{
23
24namespace ErrorCodes
25{
26 extern const int TOO_LARGE_ARRAY_SIZE;
27 extern const int CANNOT_CONVERT_TYPE;
28 extern const int ILLEGAL_TYPE_OF_ARGUMENT;
29 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
30}
31
32
33/** Aggregate function, that takes two arguments: value and position,
34 * and as a result, builds an array with values are located at corresponding positions.
35 *
36 * If more than one value was inserted to single position, the any value (first in case of single thread) is stored.
37 * If no values was inserted to some position, then default value will be substituted.
38 *
39 * Aggregate function also accept optional parameters:
40 * - default value to substitute;
41 * - length to resize result arrays (if you want to have results of same length for all aggregation keys);
42 *
43 * If you want to pass length, default value should be also given.
44 */
45
46
47/// Generic case (inefficient).
48struct AggregateFunctionGroupArrayInsertAtDataGeneric
49{
50 Array value; /// TODO Add MemoryTracker
51};
52
53
54class AggregateFunctionGroupArrayInsertAtGeneric final
55 : public IAggregateFunctionDataHelper<AggregateFunctionGroupArrayInsertAtDataGeneric, AggregateFunctionGroupArrayInsertAtGeneric>
56{
57private:
58 DataTypePtr & type;
59 Field default_value;
60 UInt64 length_to_resize = 0; /// zero means - do not do resizing.
61
62public:
63 AggregateFunctionGroupArrayInsertAtGeneric(const DataTypes & arguments, const Array & params)
64 : IAggregateFunctionDataHelper<AggregateFunctionGroupArrayInsertAtDataGeneric, AggregateFunctionGroupArrayInsertAtGeneric>(arguments, params)
65 , type(argument_types[0])
66 {
67 if (!params.empty())
68 {
69 if (params.size() > 2)
70 throw Exception("Aggregate function " + getName() + " requires at most two parameters.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
71
72 default_value = params[0];
73
74 if (params.size() == 2)
75 {
76 length_to_resize = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), params[1]);
77 if (length_to_resize > AGGREGATE_FUNCTION_GROUP_ARRAY_INSERT_AT_MAX_SIZE)
78 throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
79 }
80 }
81
82 if (!isUnsignedInteger(arguments[1]))
83 throw Exception("Second argument of aggregate function " + getName() + " must be integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
84
85 if (default_value.isNull())
86 default_value = type->getDefault();
87 else
88 {
89 Field converted = convertFieldToType(default_value, *type);
90 if (converted.isNull())
91 throw Exception("Cannot convert parameter of aggregate function " + getName() + " (" + applyVisitor(FieldVisitorToString(), default_value) + ")"
92 " to type " + type->getName() + " to be used as default value in array", ErrorCodes::CANNOT_CONVERT_TYPE);
93
94 default_value = converted;
95 }
96 }
97
98 String getName() const override { return "groupArrayInsertAt"; }
99
100 DataTypePtr getReturnType() const override
101 {
102 return std::make_shared<DataTypeArray>(type);
103 }
104
105 void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
106 {
107 /// TODO Do positions need to be 1-based for this function?
108 size_t position = columns[1]->getUInt(row_num);
109
110 /// If position is larger than size to which array will be cutted - simply ignore value.
111 if (length_to_resize && position >= length_to_resize)
112 return;
113
114 if (position >= AGGREGATE_FUNCTION_GROUP_ARRAY_INSERT_AT_MAX_SIZE)
115 throw Exception("Too large array size: position argument (" + toString(position) + ")"
116 " is greater or equals to limit (" + toString(AGGREGATE_FUNCTION_GROUP_ARRAY_INSERT_AT_MAX_SIZE) + ")",
117 ErrorCodes::TOO_LARGE_ARRAY_SIZE);
118
119 Array & arr = data(place).value;
120
121 if (arr.size() <= position)
122 arr.resize(position + 1);
123 else if (!arr[position].isNull())
124 return; /// Element was already inserted to the specified position.
125
126 columns[0]->get(row_num, arr[position]);
127 }
128
129 void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
130 {
131 Array & arr_lhs = data(place).value;
132 const Array & arr_rhs = data(rhs).value;
133
134 if (arr_lhs.size() < arr_rhs.size())
135 arr_lhs.resize(arr_rhs.size());
136
137 for (size_t i = 0, size = arr_rhs.size(); i < size; ++i)
138 if (arr_lhs[i].isNull() && !arr_rhs[i].isNull())
139 arr_lhs[i] = arr_rhs[i];
140 }
141
142 void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
143 {
144 const Array & arr = data(place).value;
145 size_t size = arr.size();
146 writeVarUInt(size, buf);
147
148 for (const Field & elem : arr)
149 {
150 if (elem.isNull())
151 {
152 writeBinary(UInt8(1), buf);
153 }
154 else
155 {
156 writeBinary(UInt8(0), buf);
157 type->serializeBinary(elem, buf);
158 }
159 }
160 }
161
162 void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
163 {
164 size_t size = 0;
165 readVarUInt(size, buf);
166
167 if (size > AGGREGATE_FUNCTION_GROUP_ARRAY_INSERT_AT_MAX_SIZE)
168 throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
169
170 Array & arr = data(place).value;
171
172 arr.resize(size);
173 for (size_t i = 0; i < size; ++i)
174 {
175 UInt8 is_null = 0;
176 readBinary(is_null, buf);
177 if (!is_null)
178 type->deserializeBinary(arr[i], buf);
179 }
180 }
181
182 void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
183 {
184 ColumnArray & to_array = assert_cast<ColumnArray &>(to);
185 IColumn & to_data = to_array.getData();
186 ColumnArray::Offsets & to_offsets = to_array.getOffsets();
187
188 const Array & arr = data(place).value;
189
190 for (const Field & elem : arr)
191 {
192 if (!elem.isNull())
193 to_data.insert(elem);
194 else
195 to_data.insert(default_value);
196 }
197
198 size_t result_array_size = length_to_resize ? length_to_resize : arr.size();
199
200 /// Pad array if need.
201 for (size_t i = arr.size(); i < result_array_size; ++i)
202 to_data.insert(default_value);
203
204 to_offsets.push_back(to_offsets.back() + result_array_size);
205 }
206};
207
208
209#undef AGGREGATE_FUNCTION_GROUP_ARRAY_INSERT_AT_MAX_SIZE
210
211}
212