1#include <DataTypes/DataTypesNumber.h>
2#include <Columns/ColumnsNumber.h>
3#include "FunctionArrayMapped.h"
4#include <Functions/FunctionFactory.h>
5
6
7namespace DB
8{
9
10template <bool reverse>
11struct ArraySplitImpl
12{
13 static bool needBoolean() { return true; }
14 static bool needExpression() { return true; }
15 static bool needOneArray() { return false; }
16
17 static DataTypePtr getReturnType(const DataTypePtr & /*expression_return*/, const DataTypePtr & array_element)
18 {
19 return std::make_shared<DataTypeArray>(
20 std::make_shared<DataTypeArray>(array_element)
21 );
22 }
23
24 static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
25 {
26 const ColumnUInt8 * column_cut = typeid_cast<const ColumnUInt8 *>(&*mapped);
27
28 const IColumn::Offsets & in_offsets = array.getOffsets();
29 auto column_offsets_2 = ColumnArray::ColumnOffsets::create();
30 auto column_offsets_1 = ColumnArray::ColumnOffsets::create();
31 IColumn::Offsets & out_offsets_2 = column_offsets_2->getData();
32 IColumn::Offsets & out_offsets_1 = column_offsets_1->getData();
33
34 if (column_cut)
35 {
36 const IColumn::Filter & cut = column_cut->getData();
37
38 size_t pos = 0;
39
40 out_offsets_2.reserve(in_offsets.size()); // assume the actual size to be equal or larger
41 out_offsets_1.reserve(in_offsets.size());
42
43 for (size_t i = 0; i < in_offsets.size(); ++i)
44 {
45 if (pos < in_offsets[i])
46 {
47 pos += !reverse;
48 for (; pos < in_offsets[i] - reverse; ++pos)
49 {
50 if (cut[pos])
51 out_offsets_2.push_back(pos + reverse);
52 }
53 pos += reverse;
54
55 out_offsets_2.push_back(pos);
56 }
57
58 out_offsets_1.push_back(out_offsets_2.size());
59 }
60 }
61 else
62 {
63 auto column_cut_const = checkAndGetColumnConst<ColumnUInt8>(&*mapped);
64
65 if (!column_cut_const)
66 throw Exception("Unexpected type of cut column", ErrorCodes::ILLEGAL_COLUMN);
67
68 if (column_cut_const->getValue<UInt8>())
69 {
70 out_offsets_2.reserve(in_offsets.back());
71 out_offsets_1.reserve(in_offsets.size());
72
73 for (size_t i = 0; i < in_offsets.back(); ++i)
74 out_offsets_2.push_back(i + 1);
75 for (size_t i = 0; i < in_offsets.size(); ++i)
76 out_offsets_1.push_back(in_offsets[i]);
77 }
78 else
79 {
80 size_t pos = 0;
81
82 out_offsets_2.reserve(in_offsets.size());
83 out_offsets_1.reserve(in_offsets.size());
84
85 for (size_t i = 0; i < in_offsets.size(); ++i)
86 {
87 if (pos < in_offsets[i])
88 {
89 pos = in_offsets[i];
90
91 out_offsets_2.push_back(pos);
92 }
93
94 out_offsets_1.push_back(out_offsets_2.size());
95 }
96 }
97 }
98
99 return ColumnArray::create(
100 ColumnArray::create(
101 array.getDataPtr(),
102 std::move(column_offsets_2)
103 ),
104 std::move(column_offsets_1)
105 );
106 }
107};
108
109struct NameArraySplit { static constexpr auto name = "arraySplit"; };
110struct NameArrayReverseSplit { static constexpr auto name = "arrayReverseSplit"; };
111using FunctionArraySplit = FunctionArrayMapped<ArraySplitImpl<false>, NameArraySplit>;
112using FunctionArrayReverseSplit = FunctionArrayMapped<ArraySplitImpl<true>, NameArrayReverseSplit>;
113
114void registerFunctionsArraySplit(FunctionFactory & factory)
115{
116 factory.registerFunction<FunctionArraySplit>();
117 factory.registerFunction<FunctionArrayReverseSplit>();
118}
119
120}
121