1#include "duckdb/execution/operator/aggregate/physical_window.hpp"
2
3#include "duckdb/common/operator/add.hpp"
4#include "duckdb/common/operator/cast_operators.hpp"
5#include "duckdb/common/operator/comparison_operators.hpp"
6#include "duckdb/common/operator/subtract.hpp"
7#include "duckdb/common/optional_ptr.hpp"
8#include "duckdb/common/radix_partitioning.hpp"
9#include "duckdb/common/row_operations/row_operations.hpp"
10#include "duckdb/common/sort/partition_state.hpp"
11#include "duckdb/common/types/chunk_collection.hpp"
12#include "duckdb/common/types/column/column_data_consumer.hpp"
13#include "duckdb/common/types/row/row_data_collection_scanner.hpp"
14#include "duckdb/common/vector_operations/vector_operations.hpp"
15#include "duckdb/common/windows_undefs.hpp"
16#include "duckdb/execution/expression_executor.hpp"
17#include "duckdb/execution/partitionable_hashtable.hpp"
18#include "duckdb/execution/window_segment_tree.hpp"
19#include "duckdb/main/client_config.hpp"
20#include "duckdb/main/config.hpp"
21#include "duckdb/parallel/base_pipeline_event.hpp"
22#include "duckdb/planner/expression/bound_reference_expression.hpp"
23#include "duckdb/planner/expression/bound_window_expression.hpp"
24
25#include <algorithm>
26#include <cmath>
27#include <numeric>
28
29namespace duckdb {
30
31// Global sink state
32class WindowGlobalSinkState : public GlobalSinkState {
33public:
34 WindowGlobalSinkState(const PhysicalWindow &op, ClientContext &context)
35 : mode(DBConfig::GetConfig(context).options.window_mode) {
36
37 D_ASSERT(op.select_list[0]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW);
38 auto &wexpr = op.select_list[0]->Cast<BoundWindowExpression>();
39
40 global_partition =
41 make_uniq<PartitionGlobalSinkState>(args&: context, args&: wexpr.partitions, args&: wexpr.orders, args&: op.children[0]->types,
42 args&: wexpr.partitions_stats, args: op.estimated_cardinality);
43 }
44
45 unique_ptr<PartitionGlobalSinkState> global_partition;
46 WindowAggregationMode mode;
47};
48
49// Per-thread sink state
50class WindowLocalSinkState : public LocalSinkState {
51public:
52 WindowLocalSinkState(ClientContext &context, const WindowGlobalSinkState &gstate)
53 : local_partition(context, *gstate.global_partition) {
54 }
55
56 void Sink(DataChunk &input_chunk) {
57 local_partition.Sink(input_chunk);
58 }
59
60 void Combine() {
61 local_partition.Combine();
62 }
63
64 PartitionLocalSinkState local_partition;
65};
66
67// this implements a sorted window functions variant
68PhysicalWindow::PhysicalWindow(vector<LogicalType> types, vector<unique_ptr<Expression>> select_list_p,
69 idx_t estimated_cardinality, PhysicalOperatorType type)
70 : PhysicalOperator(type, std::move(types), estimated_cardinality), select_list(std::move(select_list_p)) {
71 is_order_dependent = false;
72 for (auto &expr : select_list) {
73 D_ASSERT(expr->expression_class == ExpressionClass::BOUND_WINDOW);
74 auto &bound_window = expr->Cast<BoundWindowExpression>();
75 if (bound_window.partitions.empty() && bound_window.orders.empty()) {
76 is_order_dependent = true;
77 }
78 }
79}
80
81static idx_t FindNextStart(const ValidityMask &mask, idx_t l, const idx_t r, idx_t &n) {
82 if (mask.AllValid()) {
83 auto start = MinValue(a: l + n - 1, b: r);
84 n -= MinValue(a: n, b: r - l);
85 return start;
86 }
87
88 while (l < r) {
89 // If l is aligned with the start of a block, and the block is blank, then skip forward one block.
90 idx_t entry_idx;
91 idx_t shift;
92 mask.GetEntryIndex(row_idx: l, entry_idx, idx_in_entry&: shift);
93
94 const auto block = mask.GetValidityEntry(entry_idx);
95 if (mask.NoneValid(entry: block) && !shift) {
96 l += ValidityMask::BITS_PER_VALUE;
97 continue;
98 }
99
100 // Loop over the block
101 for (; shift < ValidityMask::BITS_PER_VALUE && l < r; ++shift, ++l) {
102 if (mask.RowIsValid(entry: block, idx_in_entry: shift) && --n == 0) {
103 return MinValue(a: l, b: r);
104 }
105 }
106 }
107
108 // Didn't find a start so return the end of the range
109 return r;
110}
111
112static idx_t FindPrevStart(const ValidityMask &mask, const idx_t l, idx_t r, idx_t &n) {
113 if (mask.AllValid()) {
114 auto start = (r <= l + n) ? l : r - n;
115 n -= r - start;
116 return start;
117 }
118
119 while (l < r) {
120 // If r is aligned with the start of a block, and the previous block is blank,
121 // then skip backwards one block.
122 idx_t entry_idx;
123 idx_t shift;
124 mask.GetEntryIndex(row_idx: r - 1, entry_idx, idx_in_entry&: shift);
125
126 const auto block = mask.GetValidityEntry(entry_idx);
127 if (mask.NoneValid(entry: block) && (shift + 1 == ValidityMask::BITS_PER_VALUE)) {
128 // r is nonzero (> l) and word aligned, so this will not underflow.
129 r -= ValidityMask::BITS_PER_VALUE;
130 continue;
131 }
132
133 // Loop backwards over the block
134 // shift is probing r-1 >= l >= 0
135 for (++shift; shift-- > 0; --r) {
136 if (mask.RowIsValid(entry: block, idx_in_entry: shift) && --n == 0) {
137 return MaxValue(a: l, b: r - 1);
138 }
139 }
140 }
141
142 // Didn't find a start so return the start of the range
143 return l;
144}
145
146static void PrepareInputExpressions(vector<unique_ptr<Expression>> &exprs, ExpressionExecutor &executor,
147 DataChunk &chunk) {
148 if (exprs.empty()) {
149 return;
150 }
151
152 vector<LogicalType> types;
153 for (idx_t expr_idx = 0; expr_idx < exprs.size(); ++expr_idx) {
154 types.push_back(x: exprs[expr_idx]->return_type);
155 executor.AddExpression(expr: *exprs[expr_idx]);
156 }
157
158 if (!types.empty()) {
159 auto &allocator = executor.GetAllocator();
160 chunk.Initialize(allocator, types);
161 }
162}
163
164static void PrepareInputExpression(Expression &expr, ExpressionExecutor &executor, DataChunk &chunk) {
165 vector<LogicalType> types;
166 types.push_back(x: expr.return_type);
167 executor.AddExpression(expr);
168
169 auto &allocator = executor.GetAllocator();
170 chunk.Initialize(allocator, types);
171}
172
173struct WindowInputExpression {
174 WindowInputExpression(optional_ptr<Expression> expr_p, ClientContext &context)
175 : expr(expr_p), ptype(PhysicalType::INVALID), scalar(true), executor(context) {
176 if (expr) {
177 PrepareInputExpression(expr&: *expr, executor, chunk);
178 ptype = expr->return_type.InternalType();
179 scalar = expr->IsScalar();
180 }
181 }
182
183 void Execute(DataChunk &input_chunk) {
184 if (expr) {
185 chunk.Reset();
186 executor.Execute(input&: input_chunk, result&: chunk);
187 chunk.Verify();
188 }
189 }
190
191 template <typename T>
192 inline T GetCell(idx_t i) const {
193 D_ASSERT(!chunk.data.empty());
194 const auto data = FlatVector::GetData<T>(chunk.data[0]);
195 return data[scalar ? 0 : i];
196 }
197
198 inline bool CellIsNull(idx_t i) const {
199 D_ASSERT(!chunk.data.empty());
200 if (chunk.data[0].GetVectorType() == VectorType::CONSTANT_VECTOR) {
201 return ConstantVector::IsNull(vector: chunk.data[0]);
202 }
203 return FlatVector::IsNull(vector: chunk.data[0], idx: i);
204 }
205
206 inline void CopyCell(Vector &target, idx_t target_offset) const {
207 D_ASSERT(!chunk.data.empty());
208 auto &source = chunk.data[0];
209 auto source_offset = scalar ? 0 : target_offset;
210 VectorOperations::Copy(source, target, source_count: source_offset + 1, source_offset, target_offset);
211 }
212
213 optional_ptr<Expression> expr;
214 PhysicalType ptype;
215 bool scalar;
216 ExpressionExecutor executor;
217 DataChunk chunk;
218};
219
220struct WindowInputColumn {
221 WindowInputColumn(Expression *expr_p, ClientContext &context, idx_t capacity_p)
222 : input_expr(expr_p, context), count(0), capacity(capacity_p) {
223 if (input_expr.expr) {
224 target = make_uniq<Vector>(args: input_expr.chunk.data[0].GetType(), args&: capacity);
225 }
226 }
227
228 void Append(DataChunk &input_chunk) {
229 if (input_expr.expr) {
230 const auto source_count = input_chunk.size();
231 D_ASSERT(count + source_count <= capacity);
232 if (!input_expr.scalar || !count) {
233 input_expr.Execute(input_chunk);
234 auto &source = input_expr.chunk.data[0];
235 VectorOperations::Copy(source, target&: *target, source_count, source_offset: 0, target_offset: count);
236 }
237 count += source_count;
238 }
239 }
240
241 inline bool CellIsNull(idx_t i) {
242 D_ASSERT(target);
243 D_ASSERT(i < count);
244 return FlatVector::IsNull(vector: *target, idx: input_expr.scalar ? 0 : i);
245 }
246
247 template <typename T>
248 inline T GetCell(idx_t i) const {
249 D_ASSERT(target);
250 D_ASSERT(i < count);
251 const auto data = FlatVector::GetData<T>(*target);
252 return data[input_expr.scalar ? 0 : i];
253 }
254
255 WindowInputExpression input_expr;
256
257private:
258 unique_ptr<Vector> target;
259 idx_t count;
260 idx_t capacity;
261};
262
263static inline bool BoundaryNeedsPeer(const WindowBoundary &boundary) {
264 switch (boundary) {
265 case WindowBoundary::CURRENT_ROW_RANGE:
266 case WindowBoundary::EXPR_PRECEDING_RANGE:
267 case WindowBoundary::EXPR_FOLLOWING_RANGE:
268 return true;
269 default:
270 return false;
271 }
272}
273
274struct WindowBoundariesState {
275 static inline bool IsScalar(const unique_ptr<Expression> &expr) {
276 return expr ? expr->IsScalar() : true;
277 }
278
279 WindowBoundariesState(BoundWindowExpression &wexpr, const idx_t input_size)
280 : type(wexpr.type), input_size(input_size), start_boundary(wexpr.start), end_boundary(wexpr.end),
281 partition_count(wexpr.partitions.size()), order_count(wexpr.orders.size()),
282 range_sense(wexpr.orders.empty() ? OrderType::INVALID : wexpr.orders[0].type),
283 has_preceding_range(wexpr.start == WindowBoundary::EXPR_PRECEDING_RANGE ||
284 wexpr.end == WindowBoundary::EXPR_PRECEDING_RANGE),
285 has_following_range(wexpr.start == WindowBoundary::EXPR_FOLLOWING_RANGE ||
286 wexpr.end == WindowBoundary::EXPR_FOLLOWING_RANGE),
287 needs_peer(BoundaryNeedsPeer(boundary: wexpr.end) || wexpr.type == ExpressionType::WINDOW_CUME_DIST) {
288 }
289
290 void Update(const idx_t row_idx, WindowInputColumn &range_collection, const idx_t source_offset,
291 WindowInputExpression &boundary_start, WindowInputExpression &boundary_end,
292 const ValidityMask &partition_mask, const ValidityMask &order_mask);
293
294 // Cached lookups
295 const ExpressionType type;
296 const idx_t input_size;
297 const WindowBoundary start_boundary;
298 const WindowBoundary end_boundary;
299 const size_t partition_count;
300 const size_t order_count;
301 const OrderType range_sense;
302 const bool has_preceding_range;
303 const bool has_following_range;
304 const bool needs_peer;
305
306 idx_t partition_start = 0;
307 idx_t partition_end = 0;
308 idx_t peer_start = 0;
309 idx_t peer_end = 0;
310 idx_t valid_start = 0;
311 idx_t valid_end = 0;
312 int64_t window_start = -1;
313 int64_t window_end = -1;
314 bool is_same_partition = false;
315 bool is_peer = false;
316};
317
318static bool WindowNeedsRank(const BoundWindowExpression &wexpr) {
319 return wexpr.type == ExpressionType::WINDOW_PERCENT_RANK || wexpr.type == ExpressionType::WINDOW_RANK ||
320 wexpr.type == ExpressionType::WINDOW_RANK_DENSE || wexpr.type == ExpressionType::WINDOW_CUME_DIST;
321}
322
323template <typename T>
324static T GetCell(DataChunk &chunk, idx_t column, idx_t index) {
325 D_ASSERT(chunk.ColumnCount() > column);
326 auto &source = chunk.data[column];
327 const auto data = FlatVector::GetData<T>(source);
328 return data[index];
329}
330
331static bool CellIsNull(DataChunk &chunk, idx_t column, idx_t index) {
332 D_ASSERT(chunk.ColumnCount() > column);
333 auto &source = chunk.data[column];
334 return FlatVector::IsNull(vector: source, idx: index);
335}
336
337static void CopyCell(DataChunk &chunk, idx_t column, idx_t index, Vector &target, idx_t target_offset) {
338 D_ASSERT(chunk.ColumnCount() > column);
339 auto &source = chunk.data[column];
340 VectorOperations::Copy(source, target, source_count: index + 1, source_offset: index, target_offset);
341}
342
343template <typename T>
344struct WindowColumnIterator {
345 using iterator = WindowColumnIterator<T>;
346 using iterator_category = std::forward_iterator_tag;
347 using difference_type = std::ptrdiff_t;
348 using value_type = T;
349 using reference = T;
350 using pointer = idx_t;
351
352 explicit WindowColumnIterator(WindowInputColumn &coll_p, pointer pos_p = 0) : coll(&coll_p), pos(pos_p) {
353 }
354
355 inline reference operator*() const {
356 return coll->GetCell<T>(pos);
357 }
358 inline explicit operator pointer() const {
359 return pos;
360 }
361
362 inline iterator &operator++() {
363 ++pos;
364 return *this;
365 }
366 inline iterator operator++(int) {
367 auto result = *this;
368 ++(*this);
369 return result;
370 }
371
372 friend inline bool operator==(const iterator &a, const iterator &b) {
373 return a.pos == b.pos;
374 }
375 friend inline bool operator!=(const iterator &a, const iterator &b) {
376 return a.pos != b.pos;
377 }
378
379private:
380 optional_ptr<WindowInputColumn> coll;
381 pointer pos;
382};
383
384template <typename T, typename OP>
385struct OperationCompare : public std::function<bool(T, T)> {
386 inline bool operator()(const T &lhs, const T &val) const {
387 return OP::template Operation(lhs, val);
388 }
389};
390
391template <typename T, typename OP, bool FROM>
392static idx_t FindTypedRangeBound(WindowInputColumn &over, const idx_t order_begin, const idx_t order_end,
393 WindowInputExpression &boundary, const idx_t boundary_row) {
394 D_ASSERT(!boundary.CellIsNull(boundary_row));
395 const auto val = boundary.GetCell<T>(boundary_row);
396
397 OperationCompare<T, OP> comp;
398 WindowColumnIterator<T> begin(over, order_begin);
399 WindowColumnIterator<T> end(over, order_end);
400 if (FROM) {
401 return idx_t(std::lower_bound(begin, end, val, comp));
402 } else {
403 return idx_t(std::upper_bound(begin, end, val, comp));
404 }
405}
406
407template <typename OP, bool FROM>
408static idx_t FindRangeBound(WindowInputColumn &over, const idx_t order_begin, const idx_t order_end,
409 WindowInputExpression &boundary, const idx_t expr_idx) {
410 D_ASSERT(boundary.chunk.ColumnCount() == 1);
411 D_ASSERT(boundary.chunk.data[0].GetType().InternalType() == over.input_expr.ptype);
412
413 switch (over.input_expr.ptype) {
414 case PhysicalType::INT8:
415 return FindTypedRangeBound<int8_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
416 case PhysicalType::INT16:
417 return FindTypedRangeBound<int16_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
418 case PhysicalType::INT32:
419 return FindTypedRangeBound<int32_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
420 case PhysicalType::INT64:
421 return FindTypedRangeBound<int64_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
422 case PhysicalType::UINT8:
423 return FindTypedRangeBound<uint8_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
424 case PhysicalType::UINT16:
425 return FindTypedRangeBound<uint16_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
426 case PhysicalType::UINT32:
427 return FindTypedRangeBound<uint32_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
428 case PhysicalType::UINT64:
429 return FindTypedRangeBound<uint64_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
430 case PhysicalType::INT128:
431 return FindTypedRangeBound<hugeint_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
432 case PhysicalType::FLOAT:
433 return FindTypedRangeBound<float, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
434 case PhysicalType::DOUBLE:
435 return FindTypedRangeBound<double, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
436 case PhysicalType::INTERVAL:
437 return FindTypedRangeBound<interval_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx);
438 default:
439 throw InternalException("Unsupported column type for RANGE");
440 }
441}
442
443template <bool FROM>
444static idx_t FindOrderedRangeBound(WindowInputColumn &over, const OrderType range_sense, const idx_t order_begin,
445 const idx_t order_end, WindowInputExpression &boundary, const idx_t expr_idx) {
446 switch (range_sense) {
447 case OrderType::ASCENDING:
448 return FindRangeBound<LessThan, FROM>(over, order_begin, order_end, boundary, expr_idx);
449 case OrderType::DESCENDING:
450 return FindRangeBound<GreaterThan, FROM>(over, order_begin, order_end, boundary, expr_idx);
451 default:
452 throw InternalException("Unsupported ORDER BY sense for RANGE");
453 }
454}
455
456void WindowBoundariesState::Update(const idx_t row_idx, WindowInputColumn &range_collection, const idx_t expr_idx,
457 WindowInputExpression &boundary_start, WindowInputExpression &boundary_end,
458 const ValidityMask &partition_mask, const ValidityMask &order_mask) {
459
460 auto &bounds = *this;
461 if (bounds.partition_count + bounds.order_count > 0) {
462
463 // determine partition and peer group boundaries to ultimately figure out window size
464 bounds.is_same_partition = !partition_mask.RowIsValidUnsafe(row_idx);
465 bounds.is_peer = !order_mask.RowIsValidUnsafe(row_idx);
466
467 // when the partition changes, recompute the boundaries
468 if (!bounds.is_same_partition) {
469 bounds.partition_start = row_idx;
470 bounds.peer_start = row_idx;
471
472 // find end of partition
473 bounds.partition_end = bounds.input_size;
474 if (bounds.partition_count) {
475 idx_t n = 1;
476 bounds.partition_end = FindNextStart(mask: partition_mask, l: bounds.partition_start + 1, r: bounds.input_size, n);
477 }
478
479 // Find valid ordering values for the new partition
480 // so we can exclude NULLs from RANGE expression computations
481 bounds.valid_start = bounds.partition_start;
482 bounds.valid_end = bounds.partition_end;
483
484 if ((bounds.valid_start < bounds.valid_end) && bounds.has_preceding_range) {
485 // Exclude any leading NULLs
486 if (range_collection.CellIsNull(i: bounds.valid_start)) {
487 idx_t n = 1;
488 bounds.valid_start = FindNextStart(mask: order_mask, l: bounds.valid_start + 1, r: bounds.valid_end, n);
489 }
490 }
491
492 if ((bounds.valid_start < bounds.valid_end) && bounds.has_following_range) {
493 // Exclude any trailing NULLs
494 if (range_collection.CellIsNull(i: bounds.valid_end - 1)) {
495 idx_t n = 1;
496 bounds.valid_end = FindPrevStart(mask: order_mask, l: bounds.valid_start, r: bounds.valid_end, n);
497 }
498 }
499
500 } else if (!bounds.is_peer) {
501 bounds.peer_start = row_idx;
502 }
503
504 if (bounds.needs_peer) {
505 bounds.peer_end = bounds.partition_end;
506 if (bounds.order_count) {
507 idx_t n = 1;
508 bounds.peer_end = FindNextStart(mask: order_mask, l: bounds.peer_start + 1, r: bounds.partition_end, n);
509 }
510 }
511
512 } else {
513 bounds.is_same_partition = false;
514 bounds.is_peer = true;
515 bounds.partition_end = bounds.input_size;
516 bounds.peer_end = bounds.partition_end;
517 }
518
519 // determine window boundaries depending on the type of expression
520 bounds.window_start = -1;
521 bounds.window_end = -1;
522
523 switch (bounds.start_boundary) {
524 case WindowBoundary::UNBOUNDED_PRECEDING:
525 bounds.window_start = bounds.partition_start;
526 break;
527 case WindowBoundary::CURRENT_ROW_ROWS:
528 bounds.window_start = row_idx;
529 break;
530 case WindowBoundary::CURRENT_ROW_RANGE:
531 bounds.window_start = bounds.peer_start;
532 break;
533 case WindowBoundary::EXPR_PRECEDING_ROWS: {
534 if (!TrySubtractOperator::Operation(left: int64_t(row_idx), right: boundary_start.GetCell<int64_t>(i: expr_idx),
535 result&: bounds.window_start)) {
536 throw OutOfRangeException("Overflow computing ROWS PRECEDING start");
537 }
538 break;
539 }
540 case WindowBoundary::EXPR_FOLLOWING_ROWS: {
541 if (!TryAddOperator::Operation(left: int64_t(row_idx), right: boundary_start.GetCell<int64_t>(i: expr_idx),
542 result&: bounds.window_start)) {
543 throw OutOfRangeException("Overflow computing ROWS FOLLOWING start");
544 }
545 break;
546 }
547 case WindowBoundary::EXPR_PRECEDING_RANGE: {
548 if (boundary_start.CellIsNull(i: expr_idx)) {
549 bounds.window_start = bounds.peer_start;
550 } else {
551 bounds.window_start = FindOrderedRangeBound<true>(over&: range_collection, range_sense: bounds.range_sense, order_begin: bounds.valid_start,
552 order_end: row_idx, boundary&: boundary_start, expr_idx);
553 }
554 break;
555 }
556 case WindowBoundary::EXPR_FOLLOWING_RANGE: {
557 if (boundary_start.CellIsNull(i: expr_idx)) {
558 bounds.window_start = bounds.peer_start;
559 } else {
560 bounds.window_start = FindOrderedRangeBound<true>(over&: range_collection, range_sense: bounds.range_sense, order_begin: row_idx,
561 order_end: bounds.valid_end, boundary&: boundary_start, expr_idx);
562 }
563 break;
564 }
565 default:
566 throw InternalException("Unsupported window start boundary");
567 }
568
569 switch (bounds.end_boundary) {
570 case WindowBoundary::CURRENT_ROW_ROWS:
571 bounds.window_end = row_idx + 1;
572 break;
573 case WindowBoundary::CURRENT_ROW_RANGE:
574 bounds.window_end = bounds.peer_end;
575 break;
576 case WindowBoundary::UNBOUNDED_FOLLOWING:
577 bounds.window_end = bounds.partition_end;
578 break;
579 case WindowBoundary::EXPR_PRECEDING_ROWS:
580 if (!TrySubtractOperator::Operation(left: int64_t(row_idx + 1), right: boundary_end.GetCell<int64_t>(i: expr_idx),
581 result&: bounds.window_end)) {
582 throw OutOfRangeException("Overflow computing ROWS PRECEDING end");
583 }
584 break;
585 case WindowBoundary::EXPR_FOLLOWING_ROWS:
586 if (!TryAddOperator::Operation(left: int64_t(row_idx + 1), right: boundary_end.GetCell<int64_t>(i: expr_idx),
587 result&: bounds.window_end)) {
588 throw OutOfRangeException("Overflow computing ROWS FOLLOWING end");
589 }
590 break;
591 case WindowBoundary::EXPR_PRECEDING_RANGE: {
592 if (boundary_end.CellIsNull(i: expr_idx)) {
593 bounds.window_end = bounds.peer_end;
594 } else {
595 bounds.window_end = FindOrderedRangeBound<false>(over&: range_collection, range_sense: bounds.range_sense, order_begin: bounds.valid_start,
596 order_end: row_idx, boundary&: boundary_end, expr_idx);
597 }
598 break;
599 }
600 case WindowBoundary::EXPR_FOLLOWING_RANGE: {
601 if (boundary_end.CellIsNull(i: expr_idx)) {
602 bounds.window_end = bounds.peer_end;
603 } else {
604 bounds.window_end = FindOrderedRangeBound<false>(over&: range_collection, range_sense: bounds.range_sense, order_begin: row_idx,
605 order_end: bounds.valid_end, boundary&: boundary_end, expr_idx);
606 }
607 break;
608 }
609 default:
610 throw InternalException("Unsupported window end boundary");
611 }
612
613 // clamp windows to partitions if they should exceed
614 if (bounds.window_start < (int64_t)bounds.partition_start) {
615 bounds.window_start = bounds.partition_start;
616 }
617 if (bounds.window_start > (int64_t)bounds.partition_end) {
618 bounds.window_start = bounds.partition_end;
619 }
620 if (bounds.window_end < (int64_t)bounds.partition_start) {
621 bounds.window_end = bounds.partition_start;
622 }
623 if (bounds.window_end > (int64_t)bounds.partition_end) {
624 bounds.window_end = bounds.partition_end;
625 }
626
627 if (bounds.window_start < 0 || bounds.window_end < 0) {
628 throw InternalException("Failed to compute window boundaries");
629 }
630}
631
632struct WindowExecutor {
633 static bool IsConstantAggregate(const BoundWindowExpression &wexpr);
634
635 WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const ValidityMask &partition_mask,
636 const idx_t count);
637
638 void Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count);
639 void Finalize(WindowAggregationMode mode);
640
641 void Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, const ValidityMask &partition_mask,
642 const ValidityMask &order_mask);
643
644 // The function
645 BoundWindowExpression &wexpr;
646
647 // Frame management
648 WindowBoundariesState bounds;
649 uint64_t dense_rank = 1;
650 uint64_t rank_equal = 0;
651 uint64_t rank = 1;
652
653 // Expression collections
654 DataChunk payload_collection;
655 ExpressionExecutor payload_executor;
656 DataChunk payload_chunk;
657
658 ExpressionExecutor filter_executor;
659 ValidityMask filter_mask;
660 vector<validity_t> filter_bits;
661 SelectionVector filter_sel;
662
663 // LEAD/LAG Evaluation
664 WindowInputExpression leadlag_offset;
665 WindowInputExpression leadlag_default;
666
667 // evaluate boundaries if present. Parser has checked boundary types.
668 WindowInputExpression boundary_start;
669 WindowInputExpression boundary_end;
670
671 // evaluate RANGE expressions, if needed
672 WindowInputColumn range;
673
674 // IGNORE NULLS
675 ValidityMask ignore_nulls;
676
677 // build a segment tree for frame-adhering aggregates
678 // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf
679 unique_ptr<WindowSegmentTree> segment_tree = nullptr;
680
681 // all aggregate values are the same for each partition
682 unique_ptr<WindowConstantAggregate> constant_aggregate = nullptr;
683};
684
685bool WindowExecutor::IsConstantAggregate(const BoundWindowExpression &wexpr) {
686 if (!wexpr.aggregate) {
687 return false;
688 }
689
690 // COUNT(*) is already handled efficiently by segment trees.
691 if (wexpr.children.empty()) {
692 return false;
693 }
694
695 /*
696 The default framing option is RANGE UNBOUNDED PRECEDING, which
697 is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
698 ROW; it sets the frame to be all rows from the partition start
699 up through the current row's last peer (a row that the window's
700 ORDER BY clause considers equivalent to the current row; all
701 rows are peers if there is no ORDER BY). In general, UNBOUNDED
702 PRECEDING means that the frame starts with the first row of the
703 partition, and similarly UNBOUNDED FOLLOWING means that the
704 frame ends with the last row of the partition, regardless of
705 RANGE, ROWS or GROUPS mode. In ROWS mode, CURRENT ROW means that
706 the frame starts or ends with the current row; but in RANGE or
707 GROUPS mode it means that the frame starts or ends with the
708 current row's first or last peer in the ORDER BY ordering. The
709 offset PRECEDING and offset FOLLOWING options vary in meaning
710 depending on the frame mode.
711 */
712 switch (wexpr.start) {
713 case WindowBoundary::UNBOUNDED_PRECEDING:
714 break;
715 case WindowBoundary::CURRENT_ROW_RANGE:
716 if (!wexpr.orders.empty()) {
717 return false;
718 }
719 break;
720 default:
721 return false;
722 }
723
724 switch (wexpr.end) {
725 case WindowBoundary::UNBOUNDED_FOLLOWING:
726 break;
727 case WindowBoundary::CURRENT_ROW_RANGE:
728 if (!wexpr.orders.empty()) {
729 return false;
730 }
731 break;
732 default:
733 return false;
734 }
735
736 return true;
737}
738
739WindowExecutor::WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const ValidityMask &partition_mask,
740 const idx_t count)
741 : wexpr(wexpr), bounds(wexpr, count), payload_collection(), payload_executor(context), filter_executor(context),
742 leadlag_offset(wexpr.offset_expr.get(), context), leadlag_default(wexpr.default_expr.get(), context),
743 boundary_start(wexpr.start_expr.get(), context), boundary_end(wexpr.end_expr.get(), context),
744 range((bounds.has_preceding_range || bounds.has_following_range) ? wexpr.orders[0].expression.get() : nullptr,
745 context, count)
746
747{
748 // TODO we could evaluate those expressions in parallel
749
750 // Check for constant aggregate
751 if (IsConstantAggregate(wexpr)) {
752 constant_aggregate =
753 make_uniq<WindowConstantAggregate>(args: AggregateObject(wexpr), args&: wexpr.return_type, args: partition_mask, args: count);
754 }
755
756 // evaluate the FILTER clause and stuff it into a large mask for compactness and reuse
757 if (wexpr.filter_expr) {
758 // Start with all invalid and set the ones that pass
759 filter_bits.resize(new_size: ValidityMask::ValidityMaskSize(count), x: 0);
760 filter_mask.Initialize(validity: filter_bits.data());
761 filter_executor.AddExpression(expr: *wexpr.filter_expr);
762 filter_sel.Initialize(STANDARD_VECTOR_SIZE);
763 }
764
765 // TODO: child may be a scalar, don't need to materialize the whole collection then
766
767 // evaluate inner expressions of window functions, could be more complex
768 PrepareInputExpressions(exprs&: wexpr.children, executor&: payload_executor, chunk&: payload_chunk);
769
770 auto types = payload_chunk.GetTypes();
771 if (!types.empty()) {
772 payload_collection.Initialize(allocator&: Allocator::Get(context), types);
773 }
774}
775
776void WindowExecutor::Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) {
777 // Single pass over the input to produce the global data.
778 // Vectorisation for the win...
779
780 // Set up a validity mask for IGNORE NULLS
781 bool check_nulls = false;
782 if (wexpr.ignore_nulls) {
783 switch (wexpr.type) {
784 case ExpressionType::WINDOW_LEAD:
785 case ExpressionType::WINDOW_LAG:
786 case ExpressionType::WINDOW_FIRST_VALUE:
787 case ExpressionType::WINDOW_LAST_VALUE:
788 case ExpressionType::WINDOW_NTH_VALUE:
789 check_nulls = true;
790 break;
791 default:
792 break;
793 }
794 }
795
796 const auto count = input_chunk.size();
797
798 idx_t filtered = 0;
799 SelectionVector *filtering = nullptr;
800 if (wexpr.filter_expr) {
801 filtering = &filter_sel;
802 filtered = filter_executor.SelectExpression(input&: input_chunk, sel&: filter_sel);
803 for (idx_t f = 0; f < filtered; ++f) {
804 filter_mask.SetValid(input_idx + filter_sel[f]);
805 }
806 }
807
808 if (!wexpr.children.empty()) {
809 payload_chunk.Reset();
810 payload_executor.Execute(input&: input_chunk, result&: payload_chunk);
811 payload_chunk.Verify();
812 if (constant_aggregate) {
813 constant_aggregate->Sink(payload_chunk, filter_sel: filtering, filtered);
814 } else {
815 payload_collection.Append(other: payload_chunk, resize: true);
816 }
817
818 // process payload chunks while they are still piping hot
819 if (check_nulls) {
820 UnifiedVectorFormat vdata;
821 payload_chunk.data[0].ToUnifiedFormat(count, data&: vdata);
822 if (!vdata.validity.AllValid()) {
823 // Lazily materialise the contents when we find the first NULL
824 if (ignore_nulls.AllValid()) {
825 ignore_nulls.Initialize(count: total_count);
826 }
827 // Write to the current position
828 if (input_idx % ValidityMask::BITS_PER_VALUE == 0) {
829 // If we are at the edge of an output entry, just copy the entries
830 auto dst = ignore_nulls.GetData() + ignore_nulls.EntryCount(count: input_idx);
831 auto src = vdata.validity.GetData();
832 for (auto entry_count = vdata.validity.EntryCount(count); entry_count-- > 0;) {
833 *dst++ = *src++;
834 }
835 } else {
836 // If not, we have ragged data and need to copy one bit at a time.
837 for (idx_t i = 0; i < count; ++i) {
838 ignore_nulls.Set(row_idx: input_idx + i, valid: vdata.validity.RowIsValid(row_idx: i));
839 }
840 }
841 }
842 }
843 }
844
845 range.Append(input_chunk);
846}
847
848void WindowExecutor::Finalize(WindowAggregationMode mode) {
849 // build a segment tree for frame-adhering aggregates
850 // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf
851 if (constant_aggregate) {
852 constant_aggregate->Finalize();
853 } else if (wexpr.aggregate) {
854 segment_tree = make_uniq<WindowSegmentTree>(args: AggregateObject(wexpr), args&: wexpr.return_type, args: &payload_collection,
855 args&: filter_mask, args&: mode);
856 }
857}
858
859void WindowExecutor::Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, const ValidityMask &partition_mask,
860 const ValidityMask &order_mask) {
861 // Evaluate the row-level arguments
862 boundary_start.Execute(input_chunk);
863 boundary_end.Execute(input_chunk);
864
865 leadlag_offset.Execute(input_chunk);
866 leadlag_default.Execute(input_chunk);
867
868 // this is the main loop, go through all sorted rows and compute window function result
869 for (idx_t output_offset = 0; output_offset < input_chunk.size(); ++output_offset, ++row_idx) {
870 // special case, OVER (), aggregate over everything
871 bounds.Update(row_idx, range_collection&: range, expr_idx: output_offset, boundary_start, boundary_end, partition_mask, order_mask);
872 if (WindowNeedsRank(wexpr)) {
873 if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init
874 dense_rank = 1;
875 rank = 1;
876 rank_equal = 0;
877 } else if (!bounds.is_peer) {
878 dense_rank++;
879 rank += rank_equal;
880 rank_equal = 0;
881 }
882 rank_equal++;
883 }
884
885 // if no values are read for window, result is NULL
886 if (bounds.window_start >= bounds.window_end) {
887 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
888 continue;
889 }
890
891 switch (wexpr.type) {
892 case ExpressionType::WINDOW_AGGREGATE: {
893 if (constant_aggregate) {
894 constant_aggregate->Compute(result, rid: output_offset, start: bounds.window_start, end: bounds.window_end);
895 } else {
896 segment_tree->Compute(result, rid: output_offset, start: bounds.window_start, end: bounds.window_end);
897 }
898 break;
899 }
900 case ExpressionType::WINDOW_ROW_NUMBER: {
901 auto rdata = FlatVector::GetData<int64_t>(vector&: result);
902 rdata[output_offset] = row_idx - bounds.partition_start + 1;
903 break;
904 }
905 case ExpressionType::WINDOW_RANK_DENSE: {
906 auto rdata = FlatVector::GetData<int64_t>(vector&: result);
907 rdata[output_offset] = dense_rank;
908 break;
909 }
910 case ExpressionType::WINDOW_RANK: {
911 auto rdata = FlatVector::GetData<int64_t>(vector&: result);
912 rdata[output_offset] = rank;
913 break;
914 }
915 case ExpressionType::WINDOW_PERCENT_RANK: {
916 int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start - 1;
917 double percent_rank = denom > 0 ? ((double)rank - 1) / denom : 0;
918 auto rdata = FlatVector::GetData<double>(vector&: result);
919 rdata[output_offset] = percent_rank;
920 break;
921 }
922 case ExpressionType::WINDOW_CUME_DIST: {
923 int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start;
924 double cume_dist = denom > 0 ? ((double)(bounds.peer_end - bounds.partition_start)) / denom : 0;
925 auto rdata = FlatVector::GetData<double>(vector&: result);
926 rdata[output_offset] = cume_dist;
927 break;
928 }
929 case ExpressionType::WINDOW_NTILE: {
930 D_ASSERT(payload_collection.ColumnCount() == 1);
931 if (CellIsNull(chunk&: payload_collection, column: 0, index: row_idx)) {
932 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
933 } else {
934 auto n_param = GetCell<int64_t>(chunk&: payload_collection, column: 0, index: row_idx);
935 if (n_param < 1) {
936 throw InvalidInputException("Argument for ntile must be greater than zero");
937 }
938 // With thanks from SQLite's ntileValueFunc()
939 int64_t n_total = bounds.partition_end - bounds.partition_start;
940 if (n_param > n_total) {
941 // more groups allowed than we have values
942 // map every entry to a unique group
943 n_param = n_total;
944 }
945 int64_t n_size = (n_total / n_param);
946 // find the row idx within the group
947 D_ASSERT(row_idx >= bounds.partition_start);
948 int64_t adjusted_row_idx = row_idx - bounds.partition_start;
949 // now compute the ntile
950 int64_t n_large = n_total - n_param * n_size;
951 int64_t i_small = n_large * (n_size + 1);
952 int64_t result_ntile;
953
954 D_ASSERT((n_large * (n_size + 1) + (n_param - n_large) * n_size) == n_total);
955
956 if (adjusted_row_idx < i_small) {
957 result_ntile = 1 + adjusted_row_idx / (n_size + 1);
958 } else {
959 result_ntile = 1 + n_large + (adjusted_row_idx - i_small) / n_size;
960 }
961 // result has to be between [1, NTILE]
962 D_ASSERT(result_ntile >= 1 && result_ntile <= n_param);
963 auto rdata = FlatVector::GetData<int64_t>(vector&: result);
964 rdata[output_offset] = result_ntile;
965 }
966 break;
967 }
968 case ExpressionType::WINDOW_LEAD:
969 case ExpressionType::WINDOW_LAG: {
970 int64_t offset = 1;
971 if (wexpr.offset_expr) {
972 offset = leadlag_offset.GetCell<int64_t>(i: output_offset);
973 }
974 int64_t val_idx = (int64_t)row_idx;
975 if (wexpr.type == ExpressionType::WINDOW_LEAD) {
976 val_idx += offset;
977 } else {
978 val_idx -= offset;
979 }
980
981 idx_t delta = 0;
982 if (val_idx < (int64_t)row_idx) {
983 // Count backwards
984 delta = idx_t(row_idx - val_idx);
985 val_idx = FindPrevStart(mask: ignore_nulls, l: bounds.partition_start, r: row_idx, n&: delta);
986 } else if (val_idx > (int64_t)row_idx) {
987 delta = idx_t(val_idx - row_idx);
988 val_idx = FindNextStart(mask: ignore_nulls, l: row_idx + 1, r: bounds.partition_end, n&: delta);
989 }
990 // else offset is zero, so don't move.
991
992 if (!delta) {
993 CopyCell(chunk&: payload_collection, column: 0, index: val_idx, target&: result, target_offset: output_offset);
994 } else if (wexpr.default_expr) {
995 leadlag_default.CopyCell(target&: result, target_offset: output_offset);
996 } else {
997 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
998 }
999 break;
1000 }
1001 case ExpressionType::WINDOW_FIRST_VALUE: {
1002 // Same as NTH_VALUE(..., 1)
1003 idx_t n = 1;
1004 const auto first_idx = FindNextStart(mask: ignore_nulls, l: bounds.window_start, r: bounds.window_end, n);
1005 if (!n) {
1006 CopyCell(chunk&: payload_collection, column: 0, index: first_idx, target&: result, target_offset: output_offset);
1007 } else {
1008 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
1009 }
1010 break;
1011 }
1012 case ExpressionType::WINDOW_LAST_VALUE: {
1013 idx_t n = 1;
1014 const auto last_idx = FindPrevStart(mask: ignore_nulls, l: bounds.window_start, r: bounds.window_end, n);
1015 if (!n) {
1016 CopyCell(chunk&: payload_collection, column: 0, index: last_idx, target&: result, target_offset: output_offset);
1017 } else {
1018 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
1019 }
1020 break;
1021 }
1022 case ExpressionType::WINDOW_NTH_VALUE: {
1023 D_ASSERT(payload_collection.ColumnCount() == 2);
1024 // Returns value evaluated at the row that is the n'th row of the window frame (counting from 1);
1025 // returns NULL if there is no such row.
1026 if (CellIsNull(chunk&: payload_collection, column: 1, index: row_idx)) {
1027 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
1028 } else {
1029 auto n_param = GetCell<int64_t>(chunk&: payload_collection, column: 1, index: row_idx);
1030 if (n_param < 1) {
1031 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
1032 } else {
1033 auto n = idx_t(n_param);
1034 const auto nth_index = FindNextStart(mask: ignore_nulls, l: bounds.window_start, r: bounds.window_end, n);
1035 if (!n) {
1036 CopyCell(chunk&: payload_collection, column: 0, index: nth_index, target&: result, target_offset: output_offset);
1037 } else {
1038 FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true);
1039 }
1040 }
1041 }
1042 break;
1043 }
1044 default:
1045 throw InternalException("Window aggregate type %s", ExpressionTypeToString(type: wexpr.type));
1046 }
1047 }
1048
1049 result.Verify(count: input_chunk.size());
1050}
1051
1052//===--------------------------------------------------------------------===//
1053// Sink
1054//===--------------------------------------------------------------------===//
1055SinkResultType PhysicalWindow::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
1056 auto &lstate = input.local_state.Cast<WindowLocalSinkState>();
1057
1058 lstate.Sink(input_chunk&: chunk);
1059
1060 return SinkResultType::NEED_MORE_INPUT;
1061}
1062
1063void PhysicalWindow::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
1064 auto &lstate = lstate_p.Cast<WindowLocalSinkState>();
1065 lstate.Combine();
1066}
1067
1068unique_ptr<LocalSinkState> PhysicalWindow::GetLocalSinkState(ExecutionContext &context) const {
1069 auto &gstate = sink_state->Cast<WindowGlobalSinkState>();
1070 return make_uniq<WindowLocalSinkState>(args&: context.client, args&: gstate);
1071}
1072
1073unique_ptr<GlobalSinkState> PhysicalWindow::GetGlobalSinkState(ClientContext &context) const {
1074 return make_uniq<WindowGlobalSinkState>(args: *this, args&: context);
1075}
1076
1077SinkFinalizeType PhysicalWindow::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
1078 GlobalSinkState &gstate_p) const {
1079 auto &state = gstate_p.Cast<WindowGlobalSinkState>();
1080
1081 // Did we get any data?
1082 if (!state.global_partition->count) {
1083 return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
1084 }
1085
1086 // Do we have any sorting to schedule?
1087 if (state.global_partition->rows) {
1088 D_ASSERT(!state.global_partition->grouping_data);
1089 return state.global_partition->rows->count ? SinkFinalizeType::READY : SinkFinalizeType::NO_OUTPUT_POSSIBLE;
1090 }
1091
1092 // Find the first group to sort
1093 auto &groups = state.global_partition->grouping_data->GetPartitions();
1094 if (groups.empty()) {
1095 // Empty input!
1096 return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
1097 }
1098
1099 // Schedule all the sorts for maximum thread utilisation
1100 auto new_event = make_shared<PartitionMergeEvent>(args&: *state.global_partition, args&: pipeline);
1101 event.InsertEvent(replacement_event: std::move(new_event));
1102
1103 return SinkFinalizeType::READY;
1104}
1105
1106//===--------------------------------------------------------------------===//
1107// Source
1108//===--------------------------------------------------------------------===//
1109class WindowGlobalSourceState : public GlobalSourceState {
1110public:
1111 explicit WindowGlobalSourceState(WindowGlobalSinkState &gsink) : gsink(*gsink.global_partition), next_bin(0) {
1112 }
1113
1114 PartitionGlobalSinkState &gsink;
1115 //! The output read position.
1116 atomic<idx_t> next_bin;
1117
1118public:
1119 idx_t MaxThreads() override {
1120 // If there is only one partition, we have to process it on one thread.
1121 if (!gsink.grouping_data) {
1122 return 1;
1123 }
1124
1125 // If there is not a lot of data, process serially.
1126 if (gsink.count < STANDARD_ROW_GROUPS_SIZE) {
1127 return 1;
1128 }
1129
1130 return gsink.hash_groups.size();
1131 }
1132};
1133
1134// Per-thread read state
1135class WindowLocalSourceState : public LocalSourceState {
1136public:
1137 using HashGroupPtr = unique_ptr<PartitionGlobalHashGroup>;
1138 using WindowExecutorPtr = unique_ptr<WindowExecutor>;
1139 using WindowExecutors = vector<WindowExecutorPtr>;
1140
1141 WindowLocalSourceState(const PhysicalWindow &op_p, ExecutionContext &context, WindowGlobalSourceState &gsource)
1142 : context(context.client), op(op_p), gsink(gsource.gsink) {
1143
1144 vector<LogicalType> output_types;
1145 for (idx_t expr_idx = 0; expr_idx < op.select_list.size(); ++expr_idx) {
1146 D_ASSERT(op.select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW);
1147 auto &wexpr = op.select_list[expr_idx]->Cast<BoundWindowExpression>();
1148 output_types.emplace_back(args&: wexpr.return_type);
1149 }
1150 output_chunk.Initialize(allocator&: Allocator::Get(context&: context.client), types: output_types);
1151
1152 const auto &input_types = gsink.payload_types;
1153 layout.Initialize(types: input_types);
1154 input_chunk.Initialize(allocator&: gsink.allocator, types: input_types);
1155 }
1156
1157 void MaterializeSortedData();
1158 void GeneratePartition(WindowGlobalSinkState &gstate, const idx_t hash_bin);
1159 void Scan(DataChunk &chunk);
1160
1161 HashGroupPtr hash_group;
1162 ClientContext &context;
1163 const PhysicalWindow &op;
1164
1165 PartitionGlobalSinkState &gsink;
1166
1167 //! The generated input chunks
1168 unique_ptr<RowDataCollection> rows;
1169 unique_ptr<RowDataCollection> heap;
1170 RowLayout layout;
1171 //! The partition boundary mask
1172 vector<validity_t> partition_bits;
1173 ValidityMask partition_mask;
1174 //! The order boundary mask
1175 vector<validity_t> order_bits;
1176 ValidityMask order_mask;
1177 //! The current execution functions
1178 WindowExecutors window_execs;
1179
1180 //! The read partition
1181 idx_t hash_bin;
1182 //! The read cursor
1183 unique_ptr<RowDataCollectionScanner> scanner;
1184 //! Buffer for the inputs
1185 DataChunk input_chunk;
1186 //! Buffer for window results
1187 DataChunk output_chunk;
1188};
1189
1190void WindowLocalSourceState::MaterializeSortedData() {
1191 auto &global_sort_state = *hash_group->global_sort;
1192 if (global_sort_state.sorted_blocks.empty()) {
1193 return;
1194 }
1195
1196 // scan the sorted row data
1197 D_ASSERT(global_sort_state.sorted_blocks.size() == 1);
1198 auto &sb = *global_sort_state.sorted_blocks[0];
1199
1200 // Free up some memory before allocating more
1201 sb.radix_sorting_data.clear();
1202 sb.blob_sorting_data = nullptr;
1203
1204 // Move the sorting row blocks into our RDCs
1205 auto &buffer_manager = global_sort_state.buffer_manager;
1206 auto &sd = *sb.payload_data;
1207
1208 // Data blocks are required
1209 D_ASSERT(!sd.data_blocks.empty());
1210 auto &block = sd.data_blocks[0];
1211 rows = make_uniq<RowDataCollection>(args&: buffer_manager, args&: block->capacity, args: block->entry_size);
1212 rows->blocks = std::move(sd.data_blocks);
1213 rows->count = std::accumulate(first: rows->blocks.begin(), last: rows->blocks.end(), init: idx_t(0),
1214 binary_op: [&](idx_t c, const unique_ptr<RowDataBlock> &b) { return c + b->count; });
1215
1216 // Heap blocks are optional, but we want both for iteration.
1217 if (!sd.heap_blocks.empty()) {
1218 auto &block = sd.heap_blocks[0];
1219 heap = make_uniq<RowDataCollection>(args&: buffer_manager, args&: block->capacity, args: block->entry_size);
1220 heap->blocks = std::move(sd.heap_blocks);
1221 hash_group.reset();
1222 } else {
1223 heap = make_uniq<RowDataCollection>(args&: buffer_manager, args: (idx_t)Storage::BLOCK_SIZE, args: 1, args: true);
1224 }
1225 heap->count = std::accumulate(first: heap->blocks.begin(), last: heap->blocks.end(), init: idx_t(0),
1226 binary_op: [&](idx_t c, const unique_ptr<RowDataBlock> &b) { return c + b->count; });
1227}
1228
1229void WindowLocalSourceState::GeneratePartition(WindowGlobalSinkState &gstate, const idx_t hash_bin_p) {
1230 // Get rid of any stale data
1231 hash_bin = hash_bin_p;
1232
1233 // There are three types of partitions:
1234 // 1. No partition (no sorting)
1235 // 2. One partition (sorting, but no hashing)
1236 // 3. Multiple partitions (sorting and hashing)
1237
1238 // How big is the partition?
1239 idx_t count = 0;
1240 if (hash_bin < gsink.hash_groups.size() && gsink.hash_groups[hash_bin]) {
1241 count = gsink.hash_groups[hash_bin]->count;
1242 } else if (gsink.rows && !hash_bin) {
1243 count = gsink.count;
1244 } else {
1245 return;
1246 }
1247
1248 // Initialise masks to false
1249 const auto bit_count = ValidityMask::ValidityMaskSize(count);
1250 partition_bits.clear();
1251 partition_bits.resize(new_size: bit_count, x: 0);
1252 partition_mask.Initialize(validity: partition_bits.data());
1253
1254 order_bits.clear();
1255 order_bits.resize(new_size: bit_count, x: 0);
1256 order_mask.Initialize(validity: order_bits.data());
1257
1258 // Scan the sorted data into new Collections
1259 auto external = gsink.external;
1260 if (gsink.rows && !hash_bin) {
1261 // Simple mask
1262 partition_mask.SetValidUnsafe(0);
1263 order_mask.SetValidUnsafe(0);
1264 // No partition - align the heap blocks with the row blocks
1265 rows = gsink.rows->CloneEmpty(keep_pinned: gsink.rows->keep_pinned);
1266 heap = gsink.strings->CloneEmpty(keep_pinned: gsink.strings->keep_pinned);
1267 RowDataCollectionScanner::AlignHeapBlocks(dst_block_collection&: *rows, dst_string_heap&: *heap, src_block_collection&: *gsink.rows, src_string_heap&: *gsink.strings, layout);
1268 external = true;
1269 } else if (hash_bin < gsink.hash_groups.size() && gsink.hash_groups[hash_bin]) {
1270 // Overwrite the collections with the sorted data
1271 hash_group = std::move(gsink.hash_groups[hash_bin]);
1272 hash_group->ComputeMasks(partition_mask, order_mask);
1273 external = hash_group->global_sort->external;
1274 MaterializeSortedData();
1275 } else {
1276 return;
1277 }
1278
1279 // Create the executors for each function
1280 window_execs.clear();
1281 for (idx_t expr_idx = 0; expr_idx < op.select_list.size(); ++expr_idx) {
1282 D_ASSERT(op.select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW);
1283 auto &wexpr = op.select_list[expr_idx]->Cast<BoundWindowExpression>();
1284 auto wexec = make_uniq<WindowExecutor>(args&: wexpr, args&: context, args&: partition_mask, args&: count);
1285 window_execs.emplace_back(args: std::move(wexec));
1286 }
1287
1288 // First pass over the input without flushing
1289 // TODO: Factor out the constructor data as global state
1290 scanner = make_uniq<RowDataCollectionScanner>(args&: *rows, args&: *heap, args&: layout, args&: external, args: false);
1291 idx_t input_idx = 0;
1292 while (true) {
1293 input_chunk.Reset();
1294 scanner->Scan(chunk&: input_chunk);
1295 if (input_chunk.size() == 0) {
1296 break;
1297 }
1298
1299 // TODO: Parallelization opportunity
1300 for (auto &wexec : window_execs) {
1301 wexec->Sink(input_chunk, input_idx, total_count: scanner->Count());
1302 }
1303 input_idx += input_chunk.size();
1304 }
1305
1306 // TODO: Parallelization opportunity
1307 for (auto &wexec : window_execs) {
1308 wexec->Finalize(mode: gstate.mode);
1309 }
1310
1311 // External scanning assumes all blocks are swizzled.
1312 scanner->ReSwizzle();
1313
1314 // Second pass can flush
1315 scanner->Reset(flush: true);
1316}
1317
1318void WindowLocalSourceState::Scan(DataChunk &result) {
1319 D_ASSERT(scanner);
1320 if (!scanner->Remaining()) {
1321 return;
1322 }
1323
1324 const auto position = scanner->Scanned();
1325 input_chunk.Reset();
1326 scanner->Scan(chunk&: input_chunk);
1327
1328 output_chunk.Reset();
1329 for (idx_t expr_idx = 0; expr_idx < window_execs.size(); ++expr_idx) {
1330 auto &executor = *window_execs[expr_idx];
1331 executor.Evaluate(row_idx: position, input_chunk, result&: output_chunk.data[expr_idx], partition_mask, order_mask);
1332 }
1333 output_chunk.SetCardinality(input_chunk);
1334 output_chunk.Verify();
1335
1336 idx_t out_idx = 0;
1337 result.SetCardinality(input_chunk);
1338 for (idx_t col_idx = 0; col_idx < input_chunk.ColumnCount(); col_idx++) {
1339 result.data[out_idx++].Reference(other&: input_chunk.data[col_idx]);
1340 }
1341 for (idx_t col_idx = 0; col_idx < output_chunk.ColumnCount(); col_idx++) {
1342 result.data[out_idx++].Reference(other&: output_chunk.data[col_idx]);
1343 }
1344 result.Verify();
1345}
1346
1347unique_ptr<LocalSourceState> PhysicalWindow::GetLocalSourceState(ExecutionContext &context,
1348 GlobalSourceState &gstate_p) const {
1349 auto &gstate = gstate_p.Cast<WindowGlobalSourceState>();
1350 return make_uniq<WindowLocalSourceState>(args: *this, args&: context, args&: gstate);
1351}
1352
1353unique_ptr<GlobalSourceState> PhysicalWindow::GetGlobalSourceState(ClientContext &context) const {
1354 auto &gsink = sink_state->Cast<WindowGlobalSinkState>();
1355 return make_uniq<WindowGlobalSourceState>(args&: gsink);
1356}
1357
1358SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &chunk,
1359 OperatorSourceInput &input) const {
1360 auto &lsource = input.local_state.Cast<WindowLocalSourceState>();
1361 auto &gsource = input.global_state.Cast<WindowGlobalSourceState>();
1362 auto &gsink = sink_state->Cast<WindowGlobalSinkState>();
1363
1364 auto &hash_groups = gsink.global_partition->hash_groups;
1365 const auto bin_count = hash_groups.empty() ? 1 : hash_groups.size();
1366
1367 while (chunk.size() == 0) {
1368 // Move to the next bin if we are done.
1369 while (!lsource.scanner || !lsource.scanner->Remaining()) {
1370 lsource.scanner.reset();
1371 lsource.rows.reset();
1372 lsource.heap.reset();
1373 lsource.hash_group.reset();
1374 auto hash_bin = gsource.next_bin++;
1375 if (hash_bin >= bin_count) {
1376 return chunk.size() > 0 ? SourceResultType::HAVE_MORE_OUTPUT : SourceResultType::FINISHED;
1377 }
1378
1379 for (; hash_bin < hash_groups.size(); hash_bin = gsource.next_bin++) {
1380 if (hash_groups[hash_bin]) {
1381 break;
1382 }
1383 }
1384 lsource.GeneratePartition(gstate&: gsink, hash_bin_p: hash_bin);
1385 }
1386
1387 lsource.Scan(result&: chunk);
1388 }
1389
1390 return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
1391}
1392
1393string PhysicalWindow::ParamsToString() const {
1394 string result;
1395 for (idx_t i = 0; i < select_list.size(); i++) {
1396 if (i > 0) {
1397 result += "\n";
1398 }
1399 result += select_list[i]->GetName();
1400 }
1401 return result;
1402}
1403
1404} // namespace duckdb
1405