1 | #pragma once |
2 | |
3 | #include <AggregateFunctions/IAggregateFunction.h> |
4 | |
5 | #include <Columns/IColumn.h> |
6 | #include <Common/PODArray.h> |
7 | |
8 | #include <Core/Field.h> |
9 | |
10 | #include <IO/ReadBufferFromString.h> |
11 | #include <IO/WriteBuffer.h> |
12 | #include <IO/WriteHelpers.h> |
13 | |
14 | #include <Functions/FunctionHelpers.h> |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | class Arena; |
20 | using ArenaPtr = std::shared_ptr<Arena>; |
21 | using ConstArenaPtr = std::shared_ptr<const Arena>; |
22 | using ConstArenas = std::vector<ConstArenaPtr>; |
23 | |
24 | |
25 | /** Column of states of aggregate functions. |
26 | * Presented as an array of pointers to the states of aggregate functions (data). |
27 | * The states themselves are stored in one of the pools (arenas). |
28 | * |
29 | * It can be in two variants: |
30 | * |
31 | * 1. Own its values - that is, be responsible for destroying them. |
32 | * The column consists of the values "assigned to it" after the aggregation is performed (see Aggregator, convertToBlocks function), |
33 | * or from values created by itself (see `insert` method). |
34 | * In this case, `src` will be `nullptr`, and the column itself will be destroyed (call `IAggregateFunction::destroy`) |
35 | * states of aggregate functions in the destructor. |
36 | * |
37 | * 2. Do not own its values, but use values taken from another ColumnAggregateFunction column. |
38 | * For example, this is a column obtained by permutation/filtering or other transformations from another column. |
39 | * In this case, `src` will be `shared ptr` to the source column. Destruction of values will be handled by this source column. |
40 | * |
41 | * This solution is somewhat limited: |
42 | * - the variant in which the column contains a part of "it's own" and a part of "another's" values is not supported; |
43 | * - the option of having multiple source columns is not supported, which may be necessary for a more optimal merge of the two columns. |
44 | * |
45 | * These restrictions can be removed if you add an array of flags or even refcount, |
46 | * specifying which individual values should be destroyed and which ones should not. |
47 | * Clearly, this method would have a substantially non-zero price. |
48 | */ |
49 | class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateFunction> |
50 | { |
51 | public: |
52 | using Container = PaddedPODArray<AggregateDataPtr>; |
53 | |
54 | private: |
55 | friend class COWHelper<IColumn, ColumnAggregateFunction>; |
56 | |
57 | /// Arenas used by function states that are created elsewhere. We own these |
58 | /// arenas in the sense of extending their lifetime, but do not modify them. |
59 | /// Even reading these arenas is unsafe, because they may be shared with |
60 | /// other data blocks and modified by other threads concurrently. |
61 | ConstArenas foreign_arenas; |
62 | |
63 | /// Arena for allocating the internals of function states created by current |
64 | /// column (e.g., when inserting new states). |
65 | ArenaPtr my_arena; |
66 | |
67 | /// Used for destroying states and for finalization of values. |
68 | AggregateFunctionPtr func; |
69 | |
70 | /// Source column. Used (holds source from destruction), |
71 | /// if this column has been constructed from another and uses all or part of its values. |
72 | ColumnPtr src; |
73 | |
74 | /// Array of pointers to aggregation states, that are placed in arenas. |
75 | Container data; |
76 | |
77 | ColumnAggregateFunction() {} |
78 | |
79 | /// Create a new column that has another column as a source. |
80 | MutablePtr createView() const; |
81 | |
82 | /// If we have another column as a source (owner of data), copy all data to ourself and reset source. |
83 | /// This is needed before inserting new elements, because we must own these elements (to destroy them in destructor), |
84 | /// but ownership of different elements cannot be mixed by different columns. |
85 | void ensureOwnership(); |
86 | |
87 | ColumnAggregateFunction(const AggregateFunctionPtr & func_) |
88 | : func(func_) |
89 | { |
90 | } |
91 | |
92 | ColumnAggregateFunction(const AggregateFunctionPtr & func_, |
93 | const ConstArenas & arenas_) |
94 | : foreign_arenas(arenas_), func(func_) |
95 | { |
96 | } |
97 | |
98 | |
99 | ColumnAggregateFunction(const ColumnAggregateFunction & src_); |
100 | |
101 | String getTypeString() const; |
102 | |
103 | public: |
104 | ~ColumnAggregateFunction() override; |
105 | |
106 | void set(const AggregateFunctionPtr & func_) |
107 | { |
108 | func = func_; |
109 | } |
110 | |
111 | AggregateFunctionPtr getAggregateFunction() { return func; } |
112 | AggregateFunctionPtr getAggregateFunction() const { return func; } |
113 | |
114 | /// Take shared ownership of Arena, that holds memory for states of aggregate functions. |
115 | void addArena(ConstArenaPtr arena_); |
116 | |
117 | /** Transform column with states of aggregate functions to column with final result values. |
118 | */ |
119 | MutableColumnPtr convertToValues() const; |
120 | |
121 | std::string getName() const override { return "AggregateFunction(" + func->getName() + ")" ; } |
122 | const char * getFamilyName() const override { return "AggregateFunction" ; } |
123 | |
124 | bool tryFinalizeAggregateFunction(MutableColumnPtr* res_) const; |
125 | MutableColumnPtr predictValues(Block & block, const ColumnNumbers & arguments, const Context & context) const; |
126 | |
127 | size_t size() const override |
128 | { |
129 | return getData().size(); |
130 | } |
131 | |
132 | MutableColumnPtr cloneEmpty() const override; |
133 | |
134 | Field operator[](size_t n) const override; |
135 | |
136 | void get(size_t n, Field & res) const override; |
137 | |
138 | StringRef getDataAt(size_t n) const override; |
139 | |
140 | void insertData(const char * pos, size_t length) override; |
141 | |
142 | void insertFrom(const IColumn & from, size_t n) override; |
143 | |
144 | void insertFrom(ConstAggregateDataPtr place); |
145 | |
146 | /// Merge state at last row with specified state in another column. |
147 | void insertMergeFrom(ConstAggregateDataPtr place); |
148 | |
149 | void insertMergeFrom(const IColumn & from, size_t n); |
150 | |
151 | Arena & createOrGetArena(); |
152 | |
153 | void insert(const Field & x) override; |
154 | |
155 | void insertDefault() override; |
156 | |
157 | StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; |
158 | |
159 | const char * deserializeAndInsertFromArena(const char * pos) override; |
160 | |
161 | void updateHashWithValue(size_t n, SipHash & hash) const override; |
162 | |
163 | size_t byteSize() const override; |
164 | |
165 | size_t allocatedBytes() const override; |
166 | |
167 | void protect() override; |
168 | |
169 | void insertRangeFrom(const IColumn & from, size_t start, size_t length) override; |
170 | |
171 | void popBack(size_t n) override; |
172 | |
173 | ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override; |
174 | |
175 | ColumnPtr permute(const Permutation & perm, size_t limit) const override; |
176 | |
177 | ColumnPtr index(const IColumn & indexes, size_t limit) const override; |
178 | |
179 | template <typename Type> |
180 | ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const; |
181 | |
182 | ColumnPtr replicate(const Offsets & offsets) const override; |
183 | |
184 | MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override; |
185 | |
186 | void gather(ColumnGathererStream & gatherer_stream) override; |
187 | |
188 | int compareAt(size_t, size_t, const IColumn &, int) const override |
189 | { |
190 | return 0; |
191 | } |
192 | |
193 | void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; |
194 | |
195 | /** More efficient manipulation methods */ |
196 | Container & getData() |
197 | { |
198 | return data; |
199 | } |
200 | |
201 | const Container & getData() const |
202 | { |
203 | return data; |
204 | } |
205 | |
206 | void getExtremes(Field & min, Field & max) const override; |
207 | }; |
208 | |
209 | |
210 | } |
211 | |