1#include "duckdb/execution/operator/aggregate/physical_window.hpp"
2
3#include "duckdb/common/types/chunk_collection.hpp"
4#include "duckdb/common/vector_operations/vector_operations.hpp"
5#include "duckdb/execution/expression_executor.hpp"
6#include "duckdb/execution/window_segment_tree.hpp"
7#include "duckdb/planner/expression/bound_reference_expression.hpp"
8#include "duckdb/planner/expression/bound_window_expression.hpp"
9
10#include <cmath>
11
12using namespace duckdb;
13using namespace std;
14
15//! The operator state of the window
16class PhysicalWindowOperatorState : public PhysicalOperatorState {
17public:
18 PhysicalWindowOperatorState(PhysicalOperator *child) : PhysicalOperatorState(child), position(0) {
19 }
20
21 idx_t position;
22 ChunkCollection tuples;
23 ChunkCollection window_results;
24};
25
26// this implements a sorted window functions variant
27PhysicalWindow::PhysicalWindow(LogicalOperator &op, vector<unique_ptr<Expression>> select_list,
28 PhysicalOperatorType type)
29 : PhysicalOperator(type, op.types), select_list(std::move(select_list)) {
30}
31
32static bool EqualsSubset(vector<Value> &a, vector<Value> &b, idx_t start, idx_t end) {
33 assert(start <= end);
34 for (idx_t i = start; i < end; i++) {
35 if (a[i] != b[i]) {
36 return false;
37 }
38 }
39 return true;
40}
41
42static idx_t BinarySearchRightmost(ChunkCollection &input, vector<Value> row, idx_t l, idx_t r, idx_t comp_cols) {
43 if (comp_cols == 0) {
44 return r - 1;
45 }
46 while (l < r) {
47 idx_t m = floor((l + r) / 2);
48 bool less_than_equals = true;
49 for (idx_t i = 0; i < comp_cols; i++) {
50 if (input.GetRow(m)[i] > row[i]) {
51 less_than_equals = false;
52 break;
53 }
54 }
55 if (less_than_equals) {
56 l = m + 1;
57 } else {
58 r = m;
59 }
60 }
61 return l - 1;
62}
63
64static void MaterializeExpressions(ClientContext &context, Expression **exprs, idx_t expr_count, ChunkCollection &input,
65 ChunkCollection &output, bool scalar = false) {
66 if (expr_count == 0) {
67 return;
68 }
69
70 vector<TypeId> types;
71 ExpressionExecutor executor;
72 for (idx_t expr_idx = 0; expr_idx < expr_count; ++expr_idx) {
73 types.push_back(exprs[expr_idx]->return_type);
74 executor.AddExpression(*exprs[expr_idx]);
75 }
76
77 for (idx_t i = 0; i < input.chunks.size(); i++) {
78 DataChunk chunk;
79 chunk.Initialize(types);
80
81 executor.Execute(*input.chunks[i], chunk);
82
83 chunk.Verify();
84 output.Append(chunk);
85
86 if (scalar) {
87 break;
88 }
89 }
90}
91
92static void MaterializeExpression(ClientContext &context, Expression *expr, ChunkCollection &input,
93 ChunkCollection &output, bool scalar = false) {
94 MaterializeExpressions(context, &expr, 1, input, output, scalar);
95}
96
97static void SortCollectionForWindow(ClientContext &context, BoundWindowExpression *wexpr, ChunkCollection &input,
98 ChunkCollection &output, ChunkCollection &sort_collection) {
99 vector<TypeId> sort_types;
100 vector<OrderType> orders;
101 ExpressionExecutor executor;
102
103 // we sort by both 1) partition by expression list and 2) order by expressions
104 for (idx_t prt_idx = 0; prt_idx < wexpr->partitions.size(); prt_idx++) {
105 auto &pexpr = wexpr->partitions[prt_idx];
106 sort_types.push_back(pexpr->return_type);
107 orders.push_back(OrderType::ASCENDING);
108 executor.AddExpression(*pexpr);
109 }
110
111 for (idx_t ord_idx = 0; ord_idx < wexpr->orders.size(); ord_idx++) {
112 auto &oexpr = wexpr->orders[ord_idx].expression;
113 sort_types.push_back(oexpr->return_type);
114 orders.push_back(wexpr->orders[ord_idx].type);
115 executor.AddExpression(*oexpr);
116 }
117
118 assert(sort_types.size() > 0);
119
120 // create a chunkcollection for the results of the expressions in the window definitions
121 for (idx_t i = 0; i < input.chunks.size(); i++) {
122 DataChunk sort_chunk;
123 sort_chunk.Initialize(sort_types);
124
125 executor.Execute(*input.chunks[i], sort_chunk);
126
127 sort_chunk.Verify();
128 sort_collection.Append(sort_chunk);
129 }
130
131 assert(input.count == sort_collection.count);
132
133 auto sorted_vector = unique_ptr<idx_t[]>(new idx_t[input.count]);
134 sort_collection.Sort(orders, sorted_vector.get());
135
136 input.Reorder(sorted_vector.get());
137 output.Reorder(sorted_vector.get());
138 sort_collection.Reorder(sorted_vector.get());
139}
140
141struct WindowBoundariesState {
142 idx_t partition_start = 0;
143 idx_t partition_end = 0;
144 idx_t peer_start = 0;
145 idx_t peer_end = 0;
146 int64_t window_start = -1;
147 int64_t window_end = -1;
148 bool is_same_partition = false;
149 bool is_peer = false;
150 vector<Value> row_prev;
151};
152
153static bool WindowNeedsRank(BoundWindowExpression *wexpr) {
154 return wexpr->type == ExpressionType::WINDOW_PERCENT_RANK || wexpr->type == ExpressionType::WINDOW_RANK ||
155 wexpr->type == ExpressionType::WINDOW_RANK_DENSE || wexpr->type == ExpressionType::WINDOW_CUME_DIST;
156}
157
158static void UpdateWindowBoundaries(BoundWindowExpression *wexpr, ChunkCollection &input, idx_t input_size,
159 idx_t row_idx, ChunkCollection &boundary_start_collection,
160 ChunkCollection &boundary_end_collection, WindowBoundariesState &bounds) {
161
162 if (input.column_count() > 0) {
163 vector<Value> row_cur = input.GetRow(row_idx);
164 idx_t sort_col_count = wexpr->partitions.size() + wexpr->orders.size();
165
166 // determine partition and peer group boundaries to ultimately figure out window size
167 bounds.is_same_partition = EqualsSubset(bounds.row_prev, row_cur, 0, wexpr->partitions.size());
168 bounds.is_peer = bounds.is_same_partition &&
169 EqualsSubset(bounds.row_prev, row_cur, wexpr->partitions.size(), sort_col_count);
170 bounds.row_prev = row_cur;
171
172 // when the partition changes, recompute the boundaries
173 if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init
174 bounds.partition_start = row_idx;
175 bounds.peer_start = row_idx;
176
177 // find end of partition
178 bounds.partition_end =
179 BinarySearchRightmost(input, row_cur, bounds.partition_start, input.count, wexpr->partitions.size()) +
180 1;
181
182 } else if (!bounds.is_peer) {
183 bounds.peer_start = row_idx;
184 }
185
186 if (wexpr->end == WindowBoundary::CURRENT_ROW_RANGE || wexpr->type == ExpressionType::WINDOW_CUME_DIST) {
187 bounds.peer_end = BinarySearchRightmost(input, row_cur, row_idx, bounds.partition_end, sort_col_count) + 1;
188 }
189 } else {
190 bounds.is_same_partition = 0;
191 bounds.is_peer = true;
192 bounds.partition_end = input_size;
193 bounds.peer_end = bounds.partition_end;
194 }
195
196 // determine window boundaries depending on the type of expression
197 bounds.window_start = -1;
198 bounds.window_end = -1;
199
200 switch (wexpr->start) {
201 case WindowBoundary::UNBOUNDED_PRECEDING:
202 bounds.window_start = bounds.partition_start;
203 break;
204 case WindowBoundary::CURRENT_ROW_ROWS:
205 bounds.window_start = row_idx;
206 break;
207 case WindowBoundary::CURRENT_ROW_RANGE:
208 bounds.window_start = bounds.peer_start;
209 break;
210 case WindowBoundary::UNBOUNDED_FOLLOWING:
211 assert(0); // disallowed
212 break;
213 case WindowBoundary::EXPR_PRECEDING: {
214 assert(boundary_start_collection.column_count() > 0);
215 bounds.window_start =
216 (int64_t)row_idx -
217 boundary_start_collection.GetValue(0, wexpr->start_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>();
218 break;
219 }
220 case WindowBoundary::EXPR_FOLLOWING: {
221 assert(boundary_start_collection.column_count() > 0);
222 bounds.window_start =
223 row_idx +
224 boundary_start_collection.GetValue(0, wexpr->start_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>();
225 break;
226 }
227
228 default:
229 throw NotImplementedException("Unsupported boundary");
230 }
231
232 switch (wexpr->end) {
233 case WindowBoundary::UNBOUNDED_PRECEDING:
234 assert(0); // disallowed
235 break;
236 case WindowBoundary::CURRENT_ROW_ROWS:
237 bounds.window_end = row_idx + 1;
238 break;
239 case WindowBoundary::CURRENT_ROW_RANGE:
240 bounds.window_end = bounds.peer_end;
241 break;
242 case WindowBoundary::UNBOUNDED_FOLLOWING:
243 bounds.window_end = bounds.partition_end;
244 break;
245 case WindowBoundary::EXPR_PRECEDING:
246 assert(boundary_end_collection.column_count() > 0);
247 bounds.window_end =
248 (int64_t)row_idx -
249 boundary_end_collection.GetValue(0, wexpr->end_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>() + 1;
250 break;
251 case WindowBoundary::EXPR_FOLLOWING:
252 assert(boundary_end_collection.column_count() > 0);
253 bounds.window_end =
254 row_idx +
255 boundary_end_collection.GetValue(0, wexpr->end_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>() + 1;
256
257 break;
258 default:
259 throw NotImplementedException("Unsupported boundary");
260 }
261
262 // clamp windows to partitions if they should exceed
263 if (bounds.window_start < (int64_t)bounds.partition_start) {
264 bounds.window_start = bounds.partition_start;
265 }
266 if ((idx_t)bounds.window_end > bounds.partition_end) {
267 bounds.window_end = bounds.partition_end;
268 }
269
270 if (bounds.window_start < 0 || bounds.window_end < 0) {
271 throw Exception("Failed to compute window boundaries");
272 }
273}
274
275static void ComputeWindowExpression(ClientContext &context, BoundWindowExpression *wexpr, ChunkCollection &input,
276 ChunkCollection &output, idx_t output_idx) {
277
278 ChunkCollection sort_collection;
279 bool needs_sorting = wexpr->partitions.size() + wexpr->orders.size() > 0;
280 if (needs_sorting) {
281 SortCollectionForWindow(context, wexpr, input, output, sort_collection);
282 }
283
284 // evaluate inner expressions of window functions, could be more complex
285 ChunkCollection payload_collection;
286 vector<Expression *> exprs;
287 for (auto &child : wexpr->children) {
288 exprs.push_back(child.get());
289 }
290 // TODO: child may be a scalar, don't need to materialize the whole collection then
291 MaterializeExpressions(context, exprs.data(), exprs.size(), input, payload_collection);
292
293 ChunkCollection leadlag_offset_collection;
294 ChunkCollection leadlag_default_collection;
295 if (wexpr->type == ExpressionType::WINDOW_LEAD || wexpr->type == ExpressionType::WINDOW_LAG) {
296 if (wexpr->offset_expr) {
297 MaterializeExpression(context, wexpr->offset_expr.get(), input, leadlag_offset_collection,
298 wexpr->offset_expr->IsScalar());
299 }
300 if (wexpr->default_expr) {
301 MaterializeExpression(context, wexpr->default_expr.get(), input, leadlag_default_collection,
302 wexpr->default_expr->IsScalar());
303 }
304 }
305
306 // evaluate boundaries if present.
307 ChunkCollection boundary_start_collection;
308 if (wexpr->start_expr &&
309 (wexpr->start == WindowBoundary::EXPR_PRECEDING || wexpr->start == WindowBoundary::EXPR_FOLLOWING)) {
310 MaterializeExpression(context, wexpr->start_expr.get(), input, boundary_start_collection,
311 wexpr->start_expr->IsScalar());
312 }
313 ChunkCollection boundary_end_collection;
314 if (wexpr->end_expr &&
315 (wexpr->end == WindowBoundary::EXPR_PRECEDING || wexpr->end == WindowBoundary::EXPR_FOLLOWING)) {
316 MaterializeExpression(context, wexpr->end_expr.get(), input, boundary_end_collection,
317 wexpr->end_expr->IsScalar());
318 }
319
320 // build a segment tree for frame-adhering aggregates
321 // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf
322 unique_ptr<WindowSegmentTree> segment_tree = nullptr;
323
324 if (wexpr->aggregate) {
325 segment_tree = make_unique<WindowSegmentTree>(*(wexpr->aggregate), wexpr->return_type, &payload_collection);
326 }
327
328 WindowBoundariesState bounds;
329 uint64_t dense_rank = 1, rank_equal = 0, rank = 1;
330
331 if (needs_sorting) {
332 bounds.row_prev = sort_collection.GetRow(0);
333 }
334
335 // this is the main loop, go through all sorted rows and compute window function result
336 for (idx_t row_idx = 0; row_idx < input.count; row_idx++) {
337 // special case, OVER (), aggregate over everything
338 UpdateWindowBoundaries(wexpr, sort_collection, input.count, row_idx, boundary_start_collection,
339 boundary_end_collection, bounds);
340 if (WindowNeedsRank(wexpr)) {
341 if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init
342 dense_rank = 1;
343 rank = 1;
344 rank_equal = 0;
345 } else if (!bounds.is_peer) {
346 dense_rank++;
347 rank += rank_equal;
348 rank_equal = 0;
349 }
350 rank_equal++;
351 }
352
353 auto res = Value();
354
355 // if no values are read for window, result is NULL
356 if (bounds.window_start >= bounds.window_end) {
357 output.SetValue(output_idx, row_idx, res);
358 continue;
359 }
360
361 switch (wexpr->type) {
362 case ExpressionType::WINDOW_AGGREGATE: {
363 res = segment_tree->Compute(bounds.window_start, bounds.window_end);
364 break;
365 }
366 case ExpressionType::WINDOW_ROW_NUMBER: {
367 res = Value::Numeric(wexpr->return_type, row_idx - bounds.partition_start + 1);
368 break;
369 }
370 case ExpressionType::WINDOW_RANK_DENSE: {
371 res = Value::Numeric(wexpr->return_type, dense_rank);
372 break;
373 }
374 case ExpressionType::WINDOW_RANK: {
375 res = Value::Numeric(wexpr->return_type, rank);
376 break;
377 }
378 case ExpressionType::WINDOW_PERCENT_RANK: {
379 int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start - 1;
380 double percent_rank = denom > 0 ? ((double)rank - 1) / denom : 0;
381 res = Value(percent_rank);
382 break;
383 }
384 case ExpressionType::WINDOW_CUME_DIST: {
385 int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start;
386 double cume_dist = denom > 0 ? ((double)(bounds.peer_end - bounds.partition_start)) / denom : 0;
387 res = Value(cume_dist);
388 break;
389 }
390 case ExpressionType::WINDOW_NTILE: {
391 if (payload_collection.column_count() != 1) {
392 throw Exception("NTILE needs a parameter");
393 }
394 auto n_param = payload_collection.GetValue(0, row_idx).GetValue<int64_t>();
395 // With thanks from SQLite's ntileValueFunc()
396 int64_t n_total = bounds.partition_end - bounds.partition_start;
397 int64_t n_size = (n_total / n_param);
398 if (n_size > 0) {
399 int64_t n_large = n_total - n_param * n_size;
400 int64_t i_small = n_large * (n_size + 1);
401
402 assert((n_large * (n_size + 1) + (n_param - n_large) * n_size) == n_total);
403
404 if (row_idx < (idx_t)i_small) {
405 res = Value::Numeric(wexpr->return_type, 1 + row_idx / (n_size + 1));
406 } else {
407 res = Value::Numeric(wexpr->return_type, 1 + n_large + (row_idx - i_small) / n_size);
408 }
409 }
410 break;
411 }
412 case ExpressionType::WINDOW_LEAD:
413 case ExpressionType::WINDOW_LAG: {
414 Value def_val = Value(wexpr->return_type);
415 idx_t offset = 1;
416 if (wexpr->offset_expr) {
417 offset = leadlag_offset_collection.GetValue(0, wexpr->offset_expr->IsScalar() ? 0 : row_idx)
418 .GetValue<int64_t>();
419 }
420 if (wexpr->default_expr) {
421 def_val = leadlag_default_collection.GetValue(0, wexpr->default_expr->IsScalar() ? 0 : row_idx);
422 }
423 if (wexpr->type == ExpressionType::WINDOW_LEAD) {
424 auto lead_idx = row_idx + 1;
425 if (lead_idx < bounds.partition_end) {
426 res = payload_collection.GetValue(0, lead_idx);
427 } else {
428 res = def_val;
429 }
430 } else {
431 int64_t lag_idx = (int64_t)row_idx - offset;
432 if (lag_idx >= 0 && (idx_t)lag_idx >= bounds.partition_start) {
433 res = payload_collection.GetValue(0, lag_idx);
434 } else {
435 res = def_val;
436 }
437 }
438
439 break;
440 }
441 case ExpressionType::WINDOW_FIRST_VALUE: {
442 res = payload_collection.GetValue(0, bounds.window_start);
443 break;
444 }
445 case ExpressionType::WINDOW_LAST_VALUE: {
446 res = payload_collection.GetValue(0, bounds.window_end - 1);
447 break;
448 }
449 default:
450 throw NotImplementedException("Window aggregate type %s", ExpressionTypeToString(wexpr->type).c_str());
451 }
452
453 output.SetValue(output_idx, row_idx, res);
454 }
455}
456
457void PhysicalWindow::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
458 auto state = reinterpret_cast<PhysicalWindowOperatorState *>(state_);
459 ChunkCollection &big_data = state->tuples;
460 ChunkCollection &window_results = state->window_results;
461
462 // this is a blocking operator, so compute complete result on first invocation
463 if (state->position == 0) {
464 do {
465 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
466 big_data.Append(state->child_chunk);
467 } while (state->child_chunk.size() != 0);
468
469 if (big_data.count == 0) {
470 return;
471 }
472
473 vector<TypeId> window_types;
474 for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) {
475 window_types.push_back(select_list[expr_idx]->return_type);
476 }
477
478 for (idx_t i = 0; i < big_data.chunks.size(); i++) {
479 DataChunk window_chunk;
480 window_chunk.Initialize(window_types);
481 window_chunk.SetCardinality(big_data.chunks[i]->size());
482 for (idx_t col_idx = 0; col_idx < window_chunk.column_count(); col_idx++) {
483 window_chunk.data[col_idx].vector_type = VectorType::CONSTANT_VECTOR;
484 ConstantVector::SetNull(window_chunk.data[col_idx], true);
485 }
486
487 window_chunk.Verify();
488 window_results.Append(window_chunk);
489 }
490
491 assert(window_results.column_count() == select_list.size());
492 idx_t window_output_idx = 0;
493 // we can have multiple window functions
494 for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) {
495 assert(select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW);
496 // sort by partition and order clause in window def
497 auto wexpr = reinterpret_cast<BoundWindowExpression *>(select_list[expr_idx].get());
498 ComputeWindowExpression(context, wexpr, big_data, window_results, window_output_idx++);
499 }
500 }
501
502 if (state->position >= big_data.count) {
503 return;
504 }
505
506 // just return what was computed before, appending the result cols of the window expressions at the end
507 auto &proj_ch = big_data.GetChunk(state->position);
508 auto &wind_ch = window_results.GetChunk(state->position);
509
510 idx_t out_idx = 0;
511 assert(proj_ch.size() == wind_ch.size());
512 chunk.SetCardinality(proj_ch);
513 for (idx_t col_idx = 0; col_idx < proj_ch.column_count(); col_idx++) {
514 chunk.data[out_idx++].Reference(proj_ch.data[col_idx]);
515 }
516 for (idx_t col_idx = 0; col_idx < wind_ch.column_count(); col_idx++) {
517 chunk.data[out_idx++].Reference(wind_ch.data[col_idx]);
518 }
519 state->position += STANDARD_VECTOR_SIZE;
520}
521
522unique_ptr<PhysicalOperatorState> PhysicalWindow::GetOperatorState() {
523 return make_unique<PhysicalWindowOperatorState>(children[0].get());
524}
525