1#include <DataTypes/DataTypesNumber.h>
2#include <DataTypes/DataTypesDecimal.h>
3#include <Columns/ColumnsNumber.h>
4#include <Columns/ColumnDecimal.h>
5#include <Functions/array/FunctionArrayMapped.h>
6#include <Functions/FunctionFactory.h>
7
8
9namespace DB
10{
11/// arrayCompact(['a', 'a', 'b', 'b', 'a']) = ['a', 'b', 'a'] - compact arrays
12namespace ErrorCodes
13{
14 extern const int ILLEGAL_COLUMN;
15}
16
17struct ArrayCompactImpl
18{
19 static bool useDefaultImplementationForConstants() { return true; }
20 static bool needBoolean() { return false; }
21 static bool needExpression() { return false; }
22 static bool needOneArray() { return false; }
23
24 static DataTypePtr getReturnType(const DataTypePtr & nested_type, const DataTypePtr &)
25 {
26 return std::make_shared<DataTypeArray>(nested_type);
27 }
28
29 template <typename T>
30 static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
31 {
32 using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
33
34 const ColVecType * src_values_column = checkAndGetColumn<ColVecType>(mapped.get());
35
36 if (!src_values_column)
37 return false;
38
39 const IColumn::Offsets & src_offsets = array.getOffsets();
40 const typename ColVecType::Container & src_values = src_values_column->getData();
41
42 typename ColVecType::MutablePtr res_values_column;
43 if constexpr (IsDecimalNumber<T>)
44 res_values_column = ColVecType::create(src_values.size(), src_values.getScale());
45 else
46 res_values_column = ColVecType::create(src_values.size());
47
48 typename ColVecType::Container & res_values = res_values_column->getData();
49 size_t src_offsets_size = src_offsets.size();
50 auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size);
51 IColumn::Offsets & res_offsets = res_offsets_column->getData();
52
53 size_t res_pos = 0;
54 size_t src_pos = 0;
55
56 for (size_t i = 0; i < src_offsets_size; ++i)
57 {
58 auto src_offset = src_offsets[i];
59
60 /// If array is not empty.
61 if (src_pos < src_offset)
62 {
63 /// Insert first element unconditionally.
64 res_values[res_pos] = src_values[src_pos];
65
66 /// For the rest of elements, insert if the element is different from the previous.
67 ++src_pos;
68 ++res_pos;
69 for (; src_pos < src_offset; ++src_pos)
70 {
71 if (src_values[src_pos] != src_values[src_pos - 1])
72 {
73 res_values[res_pos] = src_values[src_pos];
74 ++res_pos;
75 }
76 }
77 }
78 res_offsets[i] = res_pos;
79 }
80 res_values.resize(res_pos);
81
82 res_ptr = ColumnArray::create(std::move(res_values_column), std::move(res_offsets_column));
83 return true;
84 }
85
86 static void executeGeneric(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
87 {
88 const IColumn::Offsets & src_offsets = array.getOffsets();
89
90 auto res_values_column = mapped->cloneEmpty();
91 res_values_column->reserve(mapped->size());
92
93 size_t src_offsets_size = src_offsets.size();
94 auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size);
95 IColumn::Offsets & res_offsets = res_offsets_column->getData();
96
97 size_t res_pos = 0;
98 size_t src_pos = 0;
99
100 for (size_t i = 0; i < src_offsets_size; ++i)
101 {
102 auto src_offset = src_offsets[i];
103
104 /// If array is not empty.
105 if (src_pos < src_offset)
106 {
107 /// Insert first element unconditionally.
108 res_values_column->insertFrom(*mapped, src_pos);
109
110 /// For the rest of elements, insert if the element is different from the previous.
111 ++src_pos;
112 ++res_pos;
113 for (; src_pos < src_offset; ++src_pos)
114 {
115 if (mapped->compareAt(src_pos - 1, src_pos, *mapped, 1))
116 {
117 res_values_column->insertFrom(*mapped, src_pos);
118 ++res_pos;
119 }
120 }
121 }
122 res_offsets[i] = res_pos;
123 }
124
125 res_ptr = ColumnArray::create(std::move(res_values_column), std::move(res_offsets_column));
126 }
127
128 static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
129 {
130 ColumnPtr res;
131
132 if (!(executeType< UInt8 >(mapped, array, res) ||
133 executeType< UInt16>(mapped, array, res) ||
134 executeType< UInt32>(mapped, array, res) ||
135 executeType< UInt64>(mapped, array, res) ||
136 executeType< Int8 >(mapped, array, res) ||
137 executeType< Int16 >(mapped, array, res) ||
138 executeType< Int32 >(mapped, array, res) ||
139 executeType< Int64 >(mapped, array, res) ||
140 executeType<Float32>(mapped, array, res) ||
141 executeType<Float64>(mapped, array, res)) ||
142 executeType<Decimal32>(mapped, array, res) ||
143 executeType<Decimal64>(mapped, array, res) ||
144 executeType<Decimal128>(mapped, array, res))
145 {
146 executeGeneric(mapped, array, res);
147 }
148 return res;
149 }
150};
151
152struct NameArrayCompact { static constexpr auto name = "arrayCompact"; };
153using FunctionArrayCompact = FunctionArrayMapped<ArrayCompactImpl, NameArrayCompact>;
154
155void registerFunctionArrayCompact(FunctionFactory & factory)
156{
157 factory.registerFunction<FunctionArrayCompact>();
158}
159
160}
161
162