1#include "duckdb/execution/window_segment_tree.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/common/algorithm.hpp"
5#include "duckdb/common/helper.hpp"
6
7namespace duckdb {
8
9//===--------------------------------------------------------------------===//
10// WindowAggregateState
11//===--------------------------------------------------------------------===//
12
13WindowAggregateState::WindowAggregateState(AggregateObject aggr, const LogicalType &result_type_p)
14 : aggr(std::move(aggr)), result_type(result_type_p), state(aggr.function.state_size()),
15 statev(Value::POINTER(value: CastPointerToValue(src: state.data()))),
16 statep(Value::POINTER(value: CastPointerToValue(src: state.data()))) {
17 statev.SetVectorType(VectorType::FLAT_VECTOR); // Prevent conversion of results to constants
18}
19
20WindowAggregateState::~WindowAggregateState() {
21}
22
23void WindowAggregateState::AggregateInit() {
24 aggr.function.initialize(state.data());
25}
26
27void WindowAggregateState::AggegateFinal(Vector &result, idx_t rid) {
28 AggregateInputData aggr_input_data(aggr.GetFunctionData(), Allocator::DefaultAllocator());
29 aggr.function.finalize(statev, aggr_input_data, result, 1, rid);
30
31 if (aggr.function.destructor) {
32 aggr.function.destructor(statev, aggr_input_data, 1);
33 }
34}
35
36void WindowAggregateState::Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered) {
37}
38
39void WindowAggregateState::Finalize() {
40}
41
42void WindowAggregateState::Compute(Vector &result, idx_t rid, idx_t start, idx_t end) {
43}
44
45//===--------------------------------------------------------------------===//
46// WindowConstantAggregate
47//===--------------------------------------------------------------------===//
48
49WindowConstantAggregate::WindowConstantAggregate(AggregateObject aggr, const LogicalType &result_type,
50 const ValidityMask &partition_mask, const idx_t count)
51 : WindowAggregateState(std::move(aggr), result_type), partition(0), row(0) {
52
53 // Locate the partition boundaries
54 idx_t start = 0;
55 if (partition_mask.AllValid()) {
56 partition_offsets.emplace_back(args: 0);
57 } else {
58 idx_t entry_idx;
59 idx_t shift;
60 while (start < count) {
61 partition_mask.GetEntryIndex(row_idx: start, entry_idx, idx_in_entry&: shift);
62
63 // If start is aligned with the start of a block,
64 // and the block is blank, then skip forward one block.
65 const auto block = partition_mask.GetValidityEntry(entry_idx);
66 if (partition_mask.NoneValid(entry: block) && !shift) {
67 start += ValidityMask::BITS_PER_VALUE;
68 continue;
69 }
70
71 // Loop over the block
72 for (; shift < ValidityMask::BITS_PER_VALUE && start < count; ++shift, ++start) {
73 if (partition_mask.RowIsValid(entry: block, idx_in_entry: shift)) {
74 partition_offsets.emplace_back(args&: start);
75 }
76 }
77 }
78 }
79
80 // Initialise the vector for caching the results
81 results = make_uniq<Vector>(args: result_type, args: partition_offsets.size());
82 partition_offsets.emplace_back(args: count);
83
84 // Start the first aggregate
85 AggregateInit();
86}
87
88void WindowConstantAggregate::Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered) {
89 const auto chunk_begin = row;
90 const auto chunk_end = chunk_begin + payload_chunk.size();
91
92 if (!inputs.ColumnCount() && payload_chunk.ColumnCount()) {
93 inputs.Initialize(allocator&: Allocator::DefaultAllocator(), types: payload_chunk.GetTypes());
94 }
95
96 AggregateInputData aggr_input_data(aggr.GetFunctionData(), Allocator::DefaultAllocator());
97 idx_t begin = 0;
98 idx_t filter_idx = 0;
99 auto partition_end = partition_offsets[partition + 1];
100 while (row < chunk_end) {
101 if (row == partition_end) {
102 AggegateFinal(result&: *results, rid: partition++);
103 AggregateInit();
104 partition_end = partition_offsets[partition + 1];
105 }
106 partition_end = MinValue(a: partition_end, b: chunk_end);
107 auto end = partition_end - chunk_begin;
108
109 inputs.Reset();
110 if (filter_sel) {
111 // Slice to any filtered rows in [begin, end)
112 SelectionVector sel;
113
114 // Find the first value in [begin, end)
115 for (; filter_idx < filtered; ++filter_idx) {
116 auto idx = filter_sel->get_index(idx: filter_idx);
117 if (idx >= begin) {
118 break;
119 }
120 }
121
122 // Find the first value in [end, filtered)
123 sel.Initialize(sel: filter_sel->data() + filter_idx);
124 idx_t nsel = 0;
125 for (; filter_idx < filtered; ++filter_idx, ++nsel) {
126 auto idx = filter_sel->get_index(idx: filter_idx);
127 if (idx >= end) {
128 break;
129 }
130 }
131
132 if (nsel != inputs.size()) {
133 inputs.Slice(other&: payload_chunk, sel, count: nsel);
134 }
135 } else {
136 // Slice to [begin, end)
137 if (begin) {
138 for (idx_t c = 0; c < payload_chunk.ColumnCount(); ++c) {
139 inputs.data[c].Slice(other&: payload_chunk.data[c], offset: begin, end);
140 }
141 } else {
142 inputs.Reference(chunk&: payload_chunk);
143 }
144 inputs.SetCardinality(end - begin);
145 }
146
147 // Aggregate the filtered rows into a single state
148 const auto count = inputs.size();
149 if (aggr.function.simple_update) {
150 aggr.function.simple_update(inputs.data.data(), aggr_input_data, inputs.ColumnCount(), state.data(), count);
151 } else {
152 aggr.function.update(inputs.data.data(), aggr_input_data, inputs.ColumnCount(), statep, count);
153 }
154
155 // Skip filtered rows too!
156 row += end - begin;
157 begin = end;
158 }
159}
160
161void WindowConstantAggregate::Finalize() {
162 AggegateFinal(result&: *results, rid: partition++);
163
164 partition = 0;
165 row = 0;
166}
167
168void WindowConstantAggregate::Compute(Vector &target, idx_t rid, idx_t start, idx_t end) {
169 // Find the partition containing [start, end)
170 while (start < partition_offsets[partition] || partition_offsets[partition + 1] <= start) {
171 ++partition;
172 }
173 D_ASSERT(partition_offsets[partition] <= start);
174 D_ASSERT(partition + 1 < partition_offsets.size());
175 D_ASSERT(end <= partition_offsets[partition + 1]);
176
177 // Copy the value
178 VectorOperations::Copy(source: *results, target, source_count: partition + 1, source_offset: partition, target_offset: rid);
179}
180
181//===--------------------------------------------------------------------===//
182// WindowSegmentTree
183//===--------------------------------------------------------------------===//
184WindowSegmentTree::WindowSegmentTree(AggregateObject aggr_p, const LogicalType &result_type_p, DataChunk *input,
185 const ValidityMask &filter_mask_p, WindowAggregationMode mode_p)
186 : aggr(std::move(aggr_p)), result_type(result_type_p), state(aggr.function.state_size()),
187 statep(Value::POINTER(value: CastPointerToValue(src: state.data()))), frame(0, 0),
188 statev(Value::POINTER(value: CastPointerToValue(src: state.data()))), internal_nodes(0), input_ref(input),
189 filter_mask(filter_mask_p), mode(mode_p) {
190 statep.Flatten(count: input->size());
191 statev.SetVectorType(VectorType::FLAT_VECTOR); // Prevent conversion of results to constants
192
193 if (input_ref && input_ref->ColumnCount() > 0) {
194 filter_sel.Initialize(count: input->size());
195 inputs.Initialize(allocator&: Allocator::DefaultAllocator(), types: input_ref->GetTypes());
196 // if we have a frame-by-frame method, share the single state
197 if (aggr.function.window && UseWindowAPI()) {
198 AggregateInit();
199 inputs.Reference(chunk&: *input_ref);
200 } else {
201 inputs.SetCapacity(*input_ref);
202 if (aggr.function.combine && UseCombineAPI()) {
203 ConstructTree();
204 }
205 }
206 }
207}
208
209WindowSegmentTree::~WindowSegmentTree() {
210 if (!aggr.function.destructor) {
211 // nothing to destroy
212 return;
213 }
214 AggregateInputData aggr_input_data(aggr.GetFunctionData(), Allocator::DefaultAllocator());
215 // call the destructor for all the intermediate states
216 data_ptr_t address_data[STANDARD_VECTOR_SIZE];
217 Vector addresses(LogicalType::POINTER, data_ptr_cast(src: address_data));
218 idx_t count = 0;
219 for (idx_t i = 0; i < internal_nodes; i++) {
220 address_data[count++] = data_ptr_t(levels_flat_native.get() + i * state.size());
221 if (count == STANDARD_VECTOR_SIZE) {
222 aggr.function.destructor(addresses, aggr_input_data, count);
223 count = 0;
224 }
225 }
226 if (count > 0) {
227 aggr.function.destructor(addresses, aggr_input_data, count);
228 }
229
230 if (aggr.function.window && UseWindowAPI()) {
231 aggr.function.destructor(statev, aggr_input_data, 1);
232 }
233}
234
235void WindowSegmentTree::AggregateInit() {
236 aggr.function.initialize(state.data());
237}
238
239void WindowSegmentTree::AggegateFinal(Vector &result, idx_t rid) {
240 AggregateInputData aggr_input_data(aggr.GetFunctionData(), Allocator::DefaultAllocator());
241 aggr.function.finalize(statev, aggr_input_data, result, 1, rid);
242
243 if (aggr.function.destructor) {
244 aggr.function.destructor(statev, aggr_input_data, 1);
245 }
246}
247
248void WindowSegmentTree::ExtractFrame(idx_t begin, idx_t end) {
249 const auto size = end - begin;
250
251 auto &chunk = *input_ref;
252 const auto input_count = input_ref->ColumnCount();
253 inputs.SetCardinality(size);
254 for (idx_t i = 0; i < input_count; ++i) {
255 auto &v = inputs.data[i];
256 auto &vec = chunk.data[i];
257 v.Slice(other&: vec, offset: begin, end);
258 v.Verify(count: size);
259 }
260
261 // Slice to any filtered rows
262 if (!filter_mask.AllValid()) {
263 idx_t filtered = 0;
264 for (idx_t i = begin; i < end; ++i) {
265 if (filter_mask.RowIsValid(row_idx: i)) {
266 filter_sel.set_index(idx: filtered++, loc: i - begin);
267 }
268 }
269 if (filtered != inputs.size()) {
270 inputs.Slice(sel_vector: filter_sel, count: filtered);
271 }
272 }
273}
274
275void WindowSegmentTree::WindowSegmentValue(idx_t l_idx, idx_t begin, idx_t end) {
276 D_ASSERT(begin <= end);
277 if (begin == end || inputs.ColumnCount() == 0) {
278 return;
279 }
280
281 const auto count = end - begin;
282 Vector s(statep, 0, count);
283 if (l_idx == 0) {
284 ExtractFrame(begin, end);
285 AggregateInputData aggr_input_data(aggr.GetFunctionData(), Allocator::DefaultAllocator());
286 D_ASSERT(!inputs.data.empty());
287 aggr.function.update(&inputs.data[0], aggr_input_data, input_ref->ColumnCount(), s, inputs.size());
288 } else {
289 // find out where the states begin
290 data_ptr_t begin_ptr = levels_flat_native.get() + state.size() * (begin + levels_flat_start[l_idx - 1]);
291 // set up a vector of pointers that point towards the set of states
292 Vector v(LogicalType::POINTER, count);
293 auto pdata = FlatVector::GetData<data_ptr_t>(vector&: v);
294 for (idx_t i = 0; i < count; i++) {
295 pdata[i] = begin_ptr + i * state.size();
296 }
297 v.Verify(count);
298 AggregateInputData aggr_input_data(aggr.GetFunctionData(), Allocator::DefaultAllocator());
299 aggr.function.combine(v, s, aggr_input_data, count);
300 }
301}
302
303void WindowSegmentTree::ConstructTree() {
304 D_ASSERT(input_ref);
305 D_ASSERT(inputs.ColumnCount() > 0);
306
307 // compute space required to store internal nodes of segment tree
308 internal_nodes = 0;
309 idx_t level_nodes = input_ref->size();
310 do {
311 level_nodes = (level_nodes + (TREE_FANOUT - 1)) / TREE_FANOUT;
312 internal_nodes += level_nodes;
313 } while (level_nodes > 1);
314 levels_flat_native = make_unsafe_uniq_array<data_t>(n: internal_nodes * state.size());
315 levels_flat_start.push_back(x: 0);
316
317 idx_t levels_flat_offset = 0;
318 idx_t level_current = 0;
319 // level 0 is data itself
320 idx_t level_size;
321 // iterate over the levels of the segment tree
322 while ((level_size = (level_current == 0 ? input_ref->size()
323 : levels_flat_offset - levels_flat_start[level_current - 1])) > 1) {
324 for (idx_t pos = 0; pos < level_size; pos += TREE_FANOUT) {
325 // compute the aggregate for this entry in the segment tree
326 AggregateInit();
327 WindowSegmentValue(l_idx: level_current, begin: pos, end: MinValue(a: level_size, b: pos + TREE_FANOUT));
328
329 memcpy(dest: levels_flat_native.get() + (levels_flat_offset * state.size()), src: state.data(), n: state.size());
330
331 levels_flat_offset++;
332 }
333
334 levels_flat_start.push_back(x: levels_flat_offset);
335 level_current++;
336 }
337
338 // Corner case: single element in the window
339 if (levels_flat_offset == 0) {
340 aggr.function.initialize(levels_flat_native.get());
341 }
342}
343
344void WindowSegmentTree::Compute(Vector &result, idx_t rid, idx_t begin, idx_t end) {
345 D_ASSERT(input_ref);
346
347 // If we have a window function, use that
348 if (aggr.function.window && UseWindowAPI()) {
349 // Frame boundaries
350 auto prev = frame;
351 frame = FrameBounds(begin, end);
352
353 // Extract the range
354 AggregateInputData aggr_input_data(aggr.GetFunctionData(), Allocator::DefaultAllocator());
355 aggr.function.window(input_ref->data.data(), filter_mask, aggr_input_data, inputs.ColumnCount(), state.data(),
356 frame, prev, result, rid, 0);
357 return;
358 }
359
360 AggregateInit();
361
362 // Aggregate everything at once if we can't combine states
363 if (!aggr.function.combine || !UseCombineAPI()) {
364 WindowSegmentValue(l_idx: 0, begin, end);
365 AggegateFinal(result, rid);
366 return;
367 }
368
369 for (idx_t l_idx = 0; l_idx < levels_flat_start.size() + 1; l_idx++) {
370 idx_t parent_begin = begin / TREE_FANOUT;
371 idx_t parent_end = end / TREE_FANOUT;
372 if (parent_begin == parent_end) {
373 WindowSegmentValue(l_idx, begin, end);
374 break;
375 }
376 idx_t group_begin = parent_begin * TREE_FANOUT;
377 if (begin != group_begin) {
378 WindowSegmentValue(l_idx, begin, end: group_begin + TREE_FANOUT);
379 parent_begin++;
380 }
381 idx_t group_end = parent_end * TREE_FANOUT;
382 if (end != group_end) {
383 WindowSegmentValue(l_idx, begin: group_end, end);
384 }
385 begin = parent_begin;
386 end = parent_end;
387 }
388
389 AggegateFinal(result, rid);
390}
391
392} // namespace duckdb
393