1#include "duckdb/execution/window_segment_tree.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4
5#include <cmath>
6
7using namespace duckdb;
8using namespace std;
9
10WindowSegmentTree::WindowSegmentTree(AggregateFunction &aggregate, TypeId result_type, ChunkCollection *input)
11 : aggregate(aggregate), state(aggregate.state_size()), statep(TypeId::POINTER), result_type(result_type),
12 input_ref(input) {
13 statep.SetCount(STANDARD_VECTOR_SIZE);
14 Value ptr_val = Value::POINTER((idx_t)state.data());
15 statep.Reference(ptr_val);
16 statep.Normalify(STANDARD_VECTOR_SIZE);
17
18 if (input_ref && input_ref->column_count() > 0) {
19 inputs.Initialize(input_ref->types);
20 if (aggregate.combine) {
21 ConstructTree();
22 }
23 }
24}
25
26void WindowSegmentTree::AggregateInit() {
27 aggregate.initialize(state.data());
28}
29
30Value WindowSegmentTree::AggegateFinal() {
31 Vector statev(Value::POINTER((idx_t)state.data()));
32 Vector result(result_type);
33 result.vector_type = VectorType::CONSTANT_VECTOR;
34 ConstantVector::SetNull(result, false);
35 aggregate.finalize(statev, result, 1);
36
37 return result.GetValue(0);
38}
39
40void WindowSegmentTree::WindowSegmentValue(idx_t l_idx, idx_t begin, idx_t end) {
41 assert(begin <= end);
42 if (begin == end) {
43 return;
44 }
45 inputs.SetCardinality(end - begin);
46
47 idx_t start_in_vector = begin % STANDARD_VECTOR_SIZE;
48 Vector s;
49 s.Slice(statep, start_in_vector);
50 if (l_idx == 0) {
51 const auto input_count = input_ref->column_count();
52 auto &chunk = input_ref->GetChunk(begin);
53 for (idx_t i = 0; i < input_count; ++i) {
54 auto &v = inputs.data[i];
55 auto &vec = chunk.data[i];
56 v.Slice(vec, start_in_vector);
57 v.Verify(inputs.size());
58 }
59 aggregate.update(&inputs.data[0], input_count, s, inputs.size());
60 } else {
61 assert(end - begin <= STANDARD_VECTOR_SIZE);
62 data_ptr_t ptr = levels_flat_native.get() + state.size() * (begin + levels_flat_start[l_idx - 1]);
63 Vector v(result_type, ptr);
64 v.Verify(inputs.size());
65 aggregate.combine(v, s, inputs.size());
66 }
67}
68
69void WindowSegmentTree::ConstructTree() {
70 assert(input_ref);
71 assert(inputs.column_count() > 0);
72
73 // compute space required to store internal nodes of segment tree
74 idx_t internal_nodes = 0;
75 idx_t level_nodes = input_ref->count;
76 do {
77 level_nodes = (idx_t)ceil((double)level_nodes / TREE_FANOUT);
78 internal_nodes += level_nodes;
79 } while (level_nodes > 1);
80 levels_flat_native = unique_ptr<data_t[]>(new data_t[internal_nodes * state.size()]);
81 levels_flat_start.push_back(0);
82
83 idx_t levels_flat_offset = 0;
84 idx_t level_current = 0;
85 // level 0 is data itself
86 idx_t level_size;
87 while ((level_size = (level_current == 0 ? input_ref->count
88 : levels_flat_offset - levels_flat_start[level_current - 1])) > 1) {
89 for (idx_t pos = 0; pos < level_size; pos += TREE_FANOUT) {
90 AggregateInit();
91 WindowSegmentValue(level_current, pos, min(level_size, pos + TREE_FANOUT));
92
93 memcpy(levels_flat_native.get() + (levels_flat_offset * state.size()), state.data(), state.size());
94
95 levels_flat_offset++;
96 }
97
98 levels_flat_start.push_back(levels_flat_offset);
99 level_current++;
100 }
101}
102
103Value WindowSegmentTree::Compute(idx_t begin, idx_t end) {
104 assert(input_ref);
105
106 // No arguments, so just count
107 if (inputs.column_count() == 0) {
108 return Value::Numeric(result_type, end - begin);
109 }
110
111 AggregateInit();
112
113 // Aggregate everything at once if we can't combine states
114 if (!aggregate.combine) {
115 WindowSegmentValue(0, begin, end);
116 return AggegateFinal();
117 }
118
119 for (idx_t l_idx = 0; l_idx < levels_flat_start.size() + 1; l_idx++) {
120 idx_t parent_begin = begin / TREE_FANOUT;
121 idx_t parent_end = end / TREE_FANOUT;
122 if (parent_begin == parent_end) {
123 WindowSegmentValue(l_idx, begin, end);
124 return AggegateFinal();
125 }
126 idx_t group_begin = parent_begin * TREE_FANOUT;
127 if (begin != group_begin) {
128 WindowSegmentValue(l_idx, begin, group_begin + TREE_FANOUT);
129 parent_begin++;
130 }
131 idx_t group_end = parent_end * TREE_FANOUT;
132 if (end != group_end) {
133 WindowSegmentValue(l_idx, group_end, end);
134 }
135 begin = parent_begin;
136 end = parent_end;
137 }
138
139 return AggegateFinal();
140}
141