1 | #include "duckdb/execution/operator/aggregate/physical_window.hpp" |
2 | |
3 | #include "duckdb/common/operator/add.hpp" |
4 | #include "duckdb/common/operator/cast_operators.hpp" |
5 | #include "duckdb/common/operator/comparison_operators.hpp" |
6 | #include "duckdb/common/operator/subtract.hpp" |
7 | #include "duckdb/common/optional_ptr.hpp" |
8 | #include "duckdb/common/radix_partitioning.hpp" |
9 | #include "duckdb/common/row_operations/row_operations.hpp" |
10 | #include "duckdb/common/sort/partition_state.hpp" |
11 | #include "duckdb/common/types/chunk_collection.hpp" |
12 | #include "duckdb/common/types/column/column_data_consumer.hpp" |
13 | #include "duckdb/common/types/row/row_data_collection_scanner.hpp" |
14 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
15 | #include "duckdb/common/windows_undefs.hpp" |
16 | #include "duckdb/execution/expression_executor.hpp" |
17 | #include "duckdb/execution/partitionable_hashtable.hpp" |
18 | #include "duckdb/execution/window_segment_tree.hpp" |
19 | #include "duckdb/main/client_config.hpp" |
20 | #include "duckdb/main/config.hpp" |
21 | #include "duckdb/parallel/base_pipeline_event.hpp" |
22 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
23 | #include "duckdb/planner/expression/bound_window_expression.hpp" |
24 | |
25 | #include <algorithm> |
26 | #include <cmath> |
27 | #include <numeric> |
28 | |
29 | namespace duckdb { |
30 | |
31 | // Global sink state |
32 | class WindowGlobalSinkState : public GlobalSinkState { |
33 | public: |
34 | WindowGlobalSinkState(const PhysicalWindow &op, ClientContext &context) |
35 | : mode(DBConfig::GetConfig(context).options.window_mode) { |
36 | |
37 | D_ASSERT(op.select_list[0]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); |
38 | auto &wexpr = op.select_list[0]->Cast<BoundWindowExpression>(); |
39 | |
40 | global_partition = |
41 | make_uniq<PartitionGlobalSinkState>(args&: context, args&: wexpr.partitions, args&: wexpr.orders, args&: op.children[0]->types, |
42 | args&: wexpr.partitions_stats, args: op.estimated_cardinality); |
43 | } |
44 | |
45 | unique_ptr<PartitionGlobalSinkState> global_partition; |
46 | WindowAggregationMode mode; |
47 | }; |
48 | |
49 | // Per-thread sink state |
50 | class WindowLocalSinkState : public LocalSinkState { |
51 | public: |
52 | WindowLocalSinkState(ClientContext &context, const WindowGlobalSinkState &gstate) |
53 | : local_partition(context, *gstate.global_partition) { |
54 | } |
55 | |
56 | void Sink(DataChunk &input_chunk) { |
57 | local_partition.Sink(input_chunk); |
58 | } |
59 | |
60 | void Combine() { |
61 | local_partition.Combine(); |
62 | } |
63 | |
64 | PartitionLocalSinkState local_partition; |
65 | }; |
66 | |
67 | // this implements a sorted window functions variant |
68 | PhysicalWindow::PhysicalWindow(vector<LogicalType> types, vector<unique_ptr<Expression>> select_list_p, |
69 | idx_t estimated_cardinality, PhysicalOperatorType type) |
70 | : PhysicalOperator(type, std::move(types), estimated_cardinality), select_list(std::move(select_list_p)) { |
71 | is_order_dependent = false; |
72 | for (auto &expr : select_list) { |
73 | D_ASSERT(expr->expression_class == ExpressionClass::BOUND_WINDOW); |
74 | auto &bound_window = expr->Cast<BoundWindowExpression>(); |
75 | if (bound_window.partitions.empty() && bound_window.orders.empty()) { |
76 | is_order_dependent = true; |
77 | } |
78 | } |
79 | } |
80 | |
81 | static idx_t FindNextStart(const ValidityMask &mask, idx_t l, const idx_t r, idx_t &n) { |
82 | if (mask.AllValid()) { |
83 | auto start = MinValue(a: l + n - 1, b: r); |
84 | n -= MinValue(a: n, b: r - l); |
85 | return start; |
86 | } |
87 | |
88 | while (l < r) { |
89 | // If l is aligned with the start of a block, and the block is blank, then skip forward one block. |
90 | idx_t entry_idx; |
91 | idx_t shift; |
92 | mask.GetEntryIndex(row_idx: l, entry_idx, idx_in_entry&: shift); |
93 | |
94 | const auto block = mask.GetValidityEntry(entry_idx); |
95 | if (mask.NoneValid(entry: block) && !shift) { |
96 | l += ValidityMask::BITS_PER_VALUE; |
97 | continue; |
98 | } |
99 | |
100 | // Loop over the block |
101 | for (; shift < ValidityMask::BITS_PER_VALUE && l < r; ++shift, ++l) { |
102 | if (mask.RowIsValid(entry: block, idx_in_entry: shift) && --n == 0) { |
103 | return MinValue(a: l, b: r); |
104 | } |
105 | } |
106 | } |
107 | |
108 | // Didn't find a start so return the end of the range |
109 | return r; |
110 | } |
111 | |
112 | static idx_t FindPrevStart(const ValidityMask &mask, const idx_t l, idx_t r, idx_t &n) { |
113 | if (mask.AllValid()) { |
114 | auto start = (r <= l + n) ? l : r - n; |
115 | n -= r - start; |
116 | return start; |
117 | } |
118 | |
119 | while (l < r) { |
120 | // If r is aligned with the start of a block, and the previous block is blank, |
121 | // then skip backwards one block. |
122 | idx_t entry_idx; |
123 | idx_t shift; |
124 | mask.GetEntryIndex(row_idx: r - 1, entry_idx, idx_in_entry&: shift); |
125 | |
126 | const auto block = mask.GetValidityEntry(entry_idx); |
127 | if (mask.NoneValid(entry: block) && (shift + 1 == ValidityMask::BITS_PER_VALUE)) { |
128 | // r is nonzero (> l) and word aligned, so this will not underflow. |
129 | r -= ValidityMask::BITS_PER_VALUE; |
130 | continue; |
131 | } |
132 | |
133 | // Loop backwards over the block |
134 | // shift is probing r-1 >= l >= 0 |
135 | for (++shift; shift-- > 0; --r) { |
136 | if (mask.RowIsValid(entry: block, idx_in_entry: shift) && --n == 0) { |
137 | return MaxValue(a: l, b: r - 1); |
138 | } |
139 | } |
140 | } |
141 | |
142 | // Didn't find a start so return the start of the range |
143 | return l; |
144 | } |
145 | |
146 | static void PrepareInputExpressions(vector<unique_ptr<Expression>> &exprs, ExpressionExecutor &executor, |
147 | DataChunk &chunk) { |
148 | if (exprs.empty()) { |
149 | return; |
150 | } |
151 | |
152 | vector<LogicalType> types; |
153 | for (idx_t expr_idx = 0; expr_idx < exprs.size(); ++expr_idx) { |
154 | types.push_back(x: exprs[expr_idx]->return_type); |
155 | executor.AddExpression(expr: *exprs[expr_idx]); |
156 | } |
157 | |
158 | if (!types.empty()) { |
159 | auto &allocator = executor.GetAllocator(); |
160 | chunk.Initialize(allocator, types); |
161 | } |
162 | } |
163 | |
164 | static void PrepareInputExpression(Expression &expr, ExpressionExecutor &executor, DataChunk &chunk) { |
165 | vector<LogicalType> types; |
166 | types.push_back(x: expr.return_type); |
167 | executor.AddExpression(expr); |
168 | |
169 | auto &allocator = executor.GetAllocator(); |
170 | chunk.Initialize(allocator, types); |
171 | } |
172 | |
173 | struct WindowInputExpression { |
174 | WindowInputExpression(optional_ptr<Expression> expr_p, ClientContext &context) |
175 | : expr(expr_p), ptype(PhysicalType::INVALID), scalar(true), executor(context) { |
176 | if (expr) { |
177 | PrepareInputExpression(expr&: *expr, executor, chunk); |
178 | ptype = expr->return_type.InternalType(); |
179 | scalar = expr->IsScalar(); |
180 | } |
181 | } |
182 | |
183 | void Execute(DataChunk &input_chunk) { |
184 | if (expr) { |
185 | chunk.Reset(); |
186 | executor.Execute(input&: input_chunk, result&: chunk); |
187 | chunk.Verify(); |
188 | } |
189 | } |
190 | |
191 | template <typename T> |
192 | inline T GetCell(idx_t i) const { |
193 | D_ASSERT(!chunk.data.empty()); |
194 | const auto data = FlatVector::GetData<T>(chunk.data[0]); |
195 | return data[scalar ? 0 : i]; |
196 | } |
197 | |
198 | inline bool CellIsNull(idx_t i) const { |
199 | D_ASSERT(!chunk.data.empty()); |
200 | if (chunk.data[0].GetVectorType() == VectorType::CONSTANT_VECTOR) { |
201 | return ConstantVector::IsNull(vector: chunk.data[0]); |
202 | } |
203 | return FlatVector::IsNull(vector: chunk.data[0], idx: i); |
204 | } |
205 | |
206 | inline void CopyCell(Vector &target, idx_t target_offset) const { |
207 | D_ASSERT(!chunk.data.empty()); |
208 | auto &source = chunk.data[0]; |
209 | auto source_offset = scalar ? 0 : target_offset; |
210 | VectorOperations::Copy(source, target, source_count: source_offset + 1, source_offset, target_offset); |
211 | } |
212 | |
213 | optional_ptr<Expression> expr; |
214 | PhysicalType ptype; |
215 | bool scalar; |
216 | ExpressionExecutor executor; |
217 | DataChunk chunk; |
218 | }; |
219 | |
220 | struct WindowInputColumn { |
221 | WindowInputColumn(Expression *expr_p, ClientContext &context, idx_t capacity_p) |
222 | : input_expr(expr_p, context), count(0), capacity(capacity_p) { |
223 | if (input_expr.expr) { |
224 | target = make_uniq<Vector>(args: input_expr.chunk.data[0].GetType(), args&: capacity); |
225 | } |
226 | } |
227 | |
228 | void Append(DataChunk &input_chunk) { |
229 | if (input_expr.expr) { |
230 | const auto source_count = input_chunk.size(); |
231 | D_ASSERT(count + source_count <= capacity); |
232 | if (!input_expr.scalar || !count) { |
233 | input_expr.Execute(input_chunk); |
234 | auto &source = input_expr.chunk.data[0]; |
235 | VectorOperations::Copy(source, target&: *target, source_count, source_offset: 0, target_offset: count); |
236 | } |
237 | count += source_count; |
238 | } |
239 | } |
240 | |
241 | inline bool CellIsNull(idx_t i) { |
242 | D_ASSERT(target); |
243 | D_ASSERT(i < count); |
244 | return FlatVector::IsNull(vector: *target, idx: input_expr.scalar ? 0 : i); |
245 | } |
246 | |
247 | template <typename T> |
248 | inline T GetCell(idx_t i) const { |
249 | D_ASSERT(target); |
250 | D_ASSERT(i < count); |
251 | const auto data = FlatVector::GetData<T>(*target); |
252 | return data[input_expr.scalar ? 0 : i]; |
253 | } |
254 | |
255 | WindowInputExpression input_expr; |
256 | |
257 | private: |
258 | unique_ptr<Vector> target; |
259 | idx_t count; |
260 | idx_t capacity; |
261 | }; |
262 | |
263 | static inline bool BoundaryNeedsPeer(const WindowBoundary &boundary) { |
264 | switch (boundary) { |
265 | case WindowBoundary::CURRENT_ROW_RANGE: |
266 | case WindowBoundary::EXPR_PRECEDING_RANGE: |
267 | case WindowBoundary::EXPR_FOLLOWING_RANGE: |
268 | return true; |
269 | default: |
270 | return false; |
271 | } |
272 | } |
273 | |
274 | struct WindowBoundariesState { |
275 | static inline bool IsScalar(const unique_ptr<Expression> &expr) { |
276 | return expr ? expr->IsScalar() : true; |
277 | } |
278 | |
279 | WindowBoundariesState(BoundWindowExpression &wexpr, const idx_t input_size) |
280 | : type(wexpr.type), input_size(input_size), start_boundary(wexpr.start), end_boundary(wexpr.end), |
281 | partition_count(wexpr.partitions.size()), order_count(wexpr.orders.size()), |
282 | range_sense(wexpr.orders.empty() ? OrderType::INVALID : wexpr.orders[0].type), |
283 | has_preceding_range(wexpr.start == WindowBoundary::EXPR_PRECEDING_RANGE || |
284 | wexpr.end == WindowBoundary::EXPR_PRECEDING_RANGE), |
285 | has_following_range(wexpr.start == WindowBoundary::EXPR_FOLLOWING_RANGE || |
286 | wexpr.end == WindowBoundary::EXPR_FOLLOWING_RANGE), |
287 | needs_peer(BoundaryNeedsPeer(boundary: wexpr.end) || wexpr.type == ExpressionType::WINDOW_CUME_DIST) { |
288 | } |
289 | |
290 | void Update(const idx_t row_idx, WindowInputColumn &range_collection, const idx_t source_offset, |
291 | WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, |
292 | const ValidityMask &partition_mask, const ValidityMask &order_mask); |
293 | |
294 | // Cached lookups |
295 | const ExpressionType type; |
296 | const idx_t input_size; |
297 | const WindowBoundary start_boundary; |
298 | const WindowBoundary end_boundary; |
299 | const size_t partition_count; |
300 | const size_t order_count; |
301 | const OrderType range_sense; |
302 | const bool has_preceding_range; |
303 | const bool has_following_range; |
304 | const bool needs_peer; |
305 | |
306 | idx_t partition_start = 0; |
307 | idx_t partition_end = 0; |
308 | idx_t peer_start = 0; |
309 | idx_t peer_end = 0; |
310 | idx_t valid_start = 0; |
311 | idx_t valid_end = 0; |
312 | int64_t window_start = -1; |
313 | int64_t window_end = -1; |
314 | bool is_same_partition = false; |
315 | bool is_peer = false; |
316 | }; |
317 | |
318 | static bool WindowNeedsRank(const BoundWindowExpression &wexpr) { |
319 | return wexpr.type == ExpressionType::WINDOW_PERCENT_RANK || wexpr.type == ExpressionType::WINDOW_RANK || |
320 | wexpr.type == ExpressionType::WINDOW_RANK_DENSE || wexpr.type == ExpressionType::WINDOW_CUME_DIST; |
321 | } |
322 | |
323 | template <typename T> |
324 | static T GetCell(DataChunk &chunk, idx_t column, idx_t index) { |
325 | D_ASSERT(chunk.ColumnCount() > column); |
326 | auto &source = chunk.data[column]; |
327 | const auto data = FlatVector::GetData<T>(source); |
328 | return data[index]; |
329 | } |
330 | |
331 | static bool CellIsNull(DataChunk &chunk, idx_t column, idx_t index) { |
332 | D_ASSERT(chunk.ColumnCount() > column); |
333 | auto &source = chunk.data[column]; |
334 | return FlatVector::IsNull(vector: source, idx: index); |
335 | } |
336 | |
337 | static void CopyCell(DataChunk &chunk, idx_t column, idx_t index, Vector &target, idx_t target_offset) { |
338 | D_ASSERT(chunk.ColumnCount() > column); |
339 | auto &source = chunk.data[column]; |
340 | VectorOperations::Copy(source, target, source_count: index + 1, source_offset: index, target_offset); |
341 | } |
342 | |
343 | template <typename T> |
344 | struct WindowColumnIterator { |
345 | using iterator = WindowColumnIterator<T>; |
346 | using iterator_category = std::forward_iterator_tag; |
347 | using difference_type = std::ptrdiff_t; |
348 | using value_type = T; |
349 | using reference = T; |
350 | using pointer = idx_t; |
351 | |
352 | explicit WindowColumnIterator(WindowInputColumn &coll_p, pointer pos_p = 0) : coll(&coll_p), pos(pos_p) { |
353 | } |
354 | |
355 | inline reference operator*() const { |
356 | return coll->GetCell<T>(pos); |
357 | } |
358 | inline explicit operator pointer() const { |
359 | return pos; |
360 | } |
361 | |
362 | inline iterator &operator++() { |
363 | ++pos; |
364 | return *this; |
365 | } |
366 | inline iterator operator++(int) { |
367 | auto result = *this; |
368 | ++(*this); |
369 | return result; |
370 | } |
371 | |
372 | friend inline bool operator==(const iterator &a, const iterator &b) { |
373 | return a.pos == b.pos; |
374 | } |
375 | friend inline bool operator!=(const iterator &a, const iterator &b) { |
376 | return a.pos != b.pos; |
377 | } |
378 | |
379 | private: |
380 | optional_ptr<WindowInputColumn> coll; |
381 | pointer pos; |
382 | }; |
383 | |
384 | template <typename T, typename OP> |
385 | struct OperationCompare : public std::function<bool(T, T)> { |
386 | inline bool operator()(const T &lhs, const T &val) const { |
387 | return OP::template Operation(lhs, val); |
388 | } |
389 | }; |
390 | |
391 | template <typename T, typename OP, bool FROM> |
392 | static idx_t FindTypedRangeBound(WindowInputColumn &over, const idx_t order_begin, const idx_t order_end, |
393 | WindowInputExpression &boundary, const idx_t boundary_row) { |
394 | D_ASSERT(!boundary.CellIsNull(boundary_row)); |
395 | const auto val = boundary.GetCell<T>(boundary_row); |
396 | |
397 | OperationCompare<T, OP> comp; |
398 | WindowColumnIterator<T> begin(over, order_begin); |
399 | WindowColumnIterator<T> end(over, order_end); |
400 | if (FROM) { |
401 | return idx_t(std::lower_bound(begin, end, val, comp)); |
402 | } else { |
403 | return idx_t(std::upper_bound(begin, end, val, comp)); |
404 | } |
405 | } |
406 | |
407 | template <typename OP, bool FROM> |
408 | static idx_t FindRangeBound(WindowInputColumn &over, const idx_t order_begin, const idx_t order_end, |
409 | WindowInputExpression &boundary, const idx_t expr_idx) { |
410 | D_ASSERT(boundary.chunk.ColumnCount() == 1); |
411 | D_ASSERT(boundary.chunk.data[0].GetType().InternalType() == over.input_expr.ptype); |
412 | |
413 | switch (over.input_expr.ptype) { |
414 | case PhysicalType::INT8: |
415 | return FindTypedRangeBound<int8_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
416 | case PhysicalType::INT16: |
417 | return FindTypedRangeBound<int16_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
418 | case PhysicalType::INT32: |
419 | return FindTypedRangeBound<int32_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
420 | case PhysicalType::INT64: |
421 | return FindTypedRangeBound<int64_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
422 | case PhysicalType::UINT8: |
423 | return FindTypedRangeBound<uint8_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
424 | case PhysicalType::UINT16: |
425 | return FindTypedRangeBound<uint16_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
426 | case PhysicalType::UINT32: |
427 | return FindTypedRangeBound<uint32_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
428 | case PhysicalType::UINT64: |
429 | return FindTypedRangeBound<uint64_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
430 | case PhysicalType::INT128: |
431 | return FindTypedRangeBound<hugeint_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
432 | case PhysicalType::FLOAT: |
433 | return FindTypedRangeBound<float, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
434 | case PhysicalType::DOUBLE: |
435 | return FindTypedRangeBound<double, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
436 | case PhysicalType::INTERVAL: |
437 | return FindTypedRangeBound<interval_t, OP, FROM>(over, order_begin, order_end, boundary, expr_idx); |
438 | default: |
439 | throw InternalException("Unsupported column type for RANGE" ); |
440 | } |
441 | } |
442 | |
443 | template <bool FROM> |
444 | static idx_t FindOrderedRangeBound(WindowInputColumn &over, const OrderType range_sense, const idx_t order_begin, |
445 | const idx_t order_end, WindowInputExpression &boundary, const idx_t expr_idx) { |
446 | switch (range_sense) { |
447 | case OrderType::ASCENDING: |
448 | return FindRangeBound<LessThan, FROM>(over, order_begin, order_end, boundary, expr_idx); |
449 | case OrderType::DESCENDING: |
450 | return FindRangeBound<GreaterThan, FROM>(over, order_begin, order_end, boundary, expr_idx); |
451 | default: |
452 | throw InternalException("Unsupported ORDER BY sense for RANGE" ); |
453 | } |
454 | } |
455 | |
456 | void WindowBoundariesState::Update(const idx_t row_idx, WindowInputColumn &range_collection, const idx_t expr_idx, |
457 | WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, |
458 | const ValidityMask &partition_mask, const ValidityMask &order_mask) { |
459 | |
460 | auto &bounds = *this; |
461 | if (bounds.partition_count + bounds.order_count > 0) { |
462 | |
463 | // determine partition and peer group boundaries to ultimately figure out window size |
464 | bounds.is_same_partition = !partition_mask.RowIsValidUnsafe(row_idx); |
465 | bounds.is_peer = !order_mask.RowIsValidUnsafe(row_idx); |
466 | |
467 | // when the partition changes, recompute the boundaries |
468 | if (!bounds.is_same_partition) { |
469 | bounds.partition_start = row_idx; |
470 | bounds.peer_start = row_idx; |
471 | |
472 | // find end of partition |
473 | bounds.partition_end = bounds.input_size; |
474 | if (bounds.partition_count) { |
475 | idx_t n = 1; |
476 | bounds.partition_end = FindNextStart(mask: partition_mask, l: bounds.partition_start + 1, r: bounds.input_size, n); |
477 | } |
478 | |
479 | // Find valid ordering values for the new partition |
480 | // so we can exclude NULLs from RANGE expression computations |
481 | bounds.valid_start = bounds.partition_start; |
482 | bounds.valid_end = bounds.partition_end; |
483 | |
484 | if ((bounds.valid_start < bounds.valid_end) && bounds.has_preceding_range) { |
485 | // Exclude any leading NULLs |
486 | if (range_collection.CellIsNull(i: bounds.valid_start)) { |
487 | idx_t n = 1; |
488 | bounds.valid_start = FindNextStart(mask: order_mask, l: bounds.valid_start + 1, r: bounds.valid_end, n); |
489 | } |
490 | } |
491 | |
492 | if ((bounds.valid_start < bounds.valid_end) && bounds.has_following_range) { |
493 | // Exclude any trailing NULLs |
494 | if (range_collection.CellIsNull(i: bounds.valid_end - 1)) { |
495 | idx_t n = 1; |
496 | bounds.valid_end = FindPrevStart(mask: order_mask, l: bounds.valid_start, r: bounds.valid_end, n); |
497 | } |
498 | } |
499 | |
500 | } else if (!bounds.is_peer) { |
501 | bounds.peer_start = row_idx; |
502 | } |
503 | |
504 | if (bounds.needs_peer) { |
505 | bounds.peer_end = bounds.partition_end; |
506 | if (bounds.order_count) { |
507 | idx_t n = 1; |
508 | bounds.peer_end = FindNextStart(mask: order_mask, l: bounds.peer_start + 1, r: bounds.partition_end, n); |
509 | } |
510 | } |
511 | |
512 | } else { |
513 | bounds.is_same_partition = false; |
514 | bounds.is_peer = true; |
515 | bounds.partition_end = bounds.input_size; |
516 | bounds.peer_end = bounds.partition_end; |
517 | } |
518 | |
519 | // determine window boundaries depending on the type of expression |
520 | bounds.window_start = -1; |
521 | bounds.window_end = -1; |
522 | |
523 | switch (bounds.start_boundary) { |
524 | case WindowBoundary::UNBOUNDED_PRECEDING: |
525 | bounds.window_start = bounds.partition_start; |
526 | break; |
527 | case WindowBoundary::CURRENT_ROW_ROWS: |
528 | bounds.window_start = row_idx; |
529 | break; |
530 | case WindowBoundary::CURRENT_ROW_RANGE: |
531 | bounds.window_start = bounds.peer_start; |
532 | break; |
533 | case WindowBoundary::EXPR_PRECEDING_ROWS: { |
534 | if (!TrySubtractOperator::Operation(left: int64_t(row_idx), right: boundary_start.GetCell<int64_t>(i: expr_idx), |
535 | result&: bounds.window_start)) { |
536 | throw OutOfRangeException("Overflow computing ROWS PRECEDING start" ); |
537 | } |
538 | break; |
539 | } |
540 | case WindowBoundary::EXPR_FOLLOWING_ROWS: { |
541 | if (!TryAddOperator::Operation(left: int64_t(row_idx), right: boundary_start.GetCell<int64_t>(i: expr_idx), |
542 | result&: bounds.window_start)) { |
543 | throw OutOfRangeException("Overflow computing ROWS FOLLOWING start" ); |
544 | } |
545 | break; |
546 | } |
547 | case WindowBoundary::EXPR_PRECEDING_RANGE: { |
548 | if (boundary_start.CellIsNull(i: expr_idx)) { |
549 | bounds.window_start = bounds.peer_start; |
550 | } else { |
551 | bounds.window_start = FindOrderedRangeBound<true>(over&: range_collection, range_sense: bounds.range_sense, order_begin: bounds.valid_start, |
552 | order_end: row_idx, boundary&: boundary_start, expr_idx); |
553 | } |
554 | break; |
555 | } |
556 | case WindowBoundary::EXPR_FOLLOWING_RANGE: { |
557 | if (boundary_start.CellIsNull(i: expr_idx)) { |
558 | bounds.window_start = bounds.peer_start; |
559 | } else { |
560 | bounds.window_start = FindOrderedRangeBound<true>(over&: range_collection, range_sense: bounds.range_sense, order_begin: row_idx, |
561 | order_end: bounds.valid_end, boundary&: boundary_start, expr_idx); |
562 | } |
563 | break; |
564 | } |
565 | default: |
566 | throw InternalException("Unsupported window start boundary" ); |
567 | } |
568 | |
569 | switch (bounds.end_boundary) { |
570 | case WindowBoundary::CURRENT_ROW_ROWS: |
571 | bounds.window_end = row_idx + 1; |
572 | break; |
573 | case WindowBoundary::CURRENT_ROW_RANGE: |
574 | bounds.window_end = bounds.peer_end; |
575 | break; |
576 | case WindowBoundary::UNBOUNDED_FOLLOWING: |
577 | bounds.window_end = bounds.partition_end; |
578 | break; |
579 | case WindowBoundary::EXPR_PRECEDING_ROWS: |
580 | if (!TrySubtractOperator::Operation(left: int64_t(row_idx + 1), right: boundary_end.GetCell<int64_t>(i: expr_idx), |
581 | result&: bounds.window_end)) { |
582 | throw OutOfRangeException("Overflow computing ROWS PRECEDING end" ); |
583 | } |
584 | break; |
585 | case WindowBoundary::EXPR_FOLLOWING_ROWS: |
586 | if (!TryAddOperator::Operation(left: int64_t(row_idx + 1), right: boundary_end.GetCell<int64_t>(i: expr_idx), |
587 | result&: bounds.window_end)) { |
588 | throw OutOfRangeException("Overflow computing ROWS FOLLOWING end" ); |
589 | } |
590 | break; |
591 | case WindowBoundary::EXPR_PRECEDING_RANGE: { |
592 | if (boundary_end.CellIsNull(i: expr_idx)) { |
593 | bounds.window_end = bounds.peer_end; |
594 | } else { |
595 | bounds.window_end = FindOrderedRangeBound<false>(over&: range_collection, range_sense: bounds.range_sense, order_begin: bounds.valid_start, |
596 | order_end: row_idx, boundary&: boundary_end, expr_idx); |
597 | } |
598 | break; |
599 | } |
600 | case WindowBoundary::EXPR_FOLLOWING_RANGE: { |
601 | if (boundary_end.CellIsNull(i: expr_idx)) { |
602 | bounds.window_end = bounds.peer_end; |
603 | } else { |
604 | bounds.window_end = FindOrderedRangeBound<false>(over&: range_collection, range_sense: bounds.range_sense, order_begin: row_idx, |
605 | order_end: bounds.valid_end, boundary&: boundary_end, expr_idx); |
606 | } |
607 | break; |
608 | } |
609 | default: |
610 | throw InternalException("Unsupported window end boundary" ); |
611 | } |
612 | |
613 | // clamp windows to partitions if they should exceed |
614 | if (bounds.window_start < (int64_t)bounds.partition_start) { |
615 | bounds.window_start = bounds.partition_start; |
616 | } |
617 | if (bounds.window_start > (int64_t)bounds.partition_end) { |
618 | bounds.window_start = bounds.partition_end; |
619 | } |
620 | if (bounds.window_end < (int64_t)bounds.partition_start) { |
621 | bounds.window_end = bounds.partition_start; |
622 | } |
623 | if (bounds.window_end > (int64_t)bounds.partition_end) { |
624 | bounds.window_end = bounds.partition_end; |
625 | } |
626 | |
627 | if (bounds.window_start < 0 || bounds.window_end < 0) { |
628 | throw InternalException("Failed to compute window boundaries" ); |
629 | } |
630 | } |
631 | |
632 | struct WindowExecutor { |
633 | static bool IsConstantAggregate(const BoundWindowExpression &wexpr); |
634 | |
635 | WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const ValidityMask &partition_mask, |
636 | const idx_t count); |
637 | |
638 | void Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count); |
639 | void Finalize(WindowAggregationMode mode); |
640 | |
641 | void Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, const ValidityMask &partition_mask, |
642 | const ValidityMask &order_mask); |
643 | |
644 | // The function |
645 | BoundWindowExpression &wexpr; |
646 | |
647 | // Frame management |
648 | WindowBoundariesState bounds; |
649 | uint64_t dense_rank = 1; |
650 | uint64_t rank_equal = 0; |
651 | uint64_t rank = 1; |
652 | |
653 | // Expression collections |
654 | DataChunk payload_collection; |
655 | ExpressionExecutor payload_executor; |
656 | DataChunk payload_chunk; |
657 | |
658 | ExpressionExecutor filter_executor; |
659 | ValidityMask filter_mask; |
660 | vector<validity_t> filter_bits; |
661 | SelectionVector filter_sel; |
662 | |
663 | // LEAD/LAG Evaluation |
664 | WindowInputExpression leadlag_offset; |
665 | WindowInputExpression leadlag_default; |
666 | |
667 | // evaluate boundaries if present. Parser has checked boundary types. |
668 | WindowInputExpression boundary_start; |
669 | WindowInputExpression boundary_end; |
670 | |
671 | // evaluate RANGE expressions, if needed |
672 | WindowInputColumn range; |
673 | |
674 | // IGNORE NULLS |
675 | ValidityMask ignore_nulls; |
676 | |
677 | // build a segment tree for frame-adhering aggregates |
678 | // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf |
679 | unique_ptr<WindowSegmentTree> segment_tree = nullptr; |
680 | |
681 | // all aggregate values are the same for each partition |
682 | unique_ptr<WindowConstantAggregate> constant_aggregate = nullptr; |
683 | }; |
684 | |
685 | bool WindowExecutor::IsConstantAggregate(const BoundWindowExpression &wexpr) { |
686 | if (!wexpr.aggregate) { |
687 | return false; |
688 | } |
689 | |
690 | // COUNT(*) is already handled efficiently by segment trees. |
691 | if (wexpr.children.empty()) { |
692 | return false; |
693 | } |
694 | |
695 | /* |
696 | The default framing option is RANGE UNBOUNDED PRECEDING, which |
697 | is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT |
698 | ROW; it sets the frame to be all rows from the partition start |
699 | up through the current row's last peer (a row that the window's |
700 | ORDER BY clause considers equivalent to the current row; all |
701 | rows are peers if there is no ORDER BY). In general, UNBOUNDED |
702 | PRECEDING means that the frame starts with the first row of the |
703 | partition, and similarly UNBOUNDED FOLLOWING means that the |
704 | frame ends with the last row of the partition, regardless of |
705 | RANGE, ROWS or GROUPS mode. In ROWS mode, CURRENT ROW means that |
706 | the frame starts or ends with the current row; but in RANGE or |
707 | GROUPS mode it means that the frame starts or ends with the |
708 | current row's first or last peer in the ORDER BY ordering. The |
709 | offset PRECEDING and offset FOLLOWING options vary in meaning |
710 | depending on the frame mode. |
711 | */ |
712 | switch (wexpr.start) { |
713 | case WindowBoundary::UNBOUNDED_PRECEDING: |
714 | break; |
715 | case WindowBoundary::CURRENT_ROW_RANGE: |
716 | if (!wexpr.orders.empty()) { |
717 | return false; |
718 | } |
719 | break; |
720 | default: |
721 | return false; |
722 | } |
723 | |
724 | switch (wexpr.end) { |
725 | case WindowBoundary::UNBOUNDED_FOLLOWING: |
726 | break; |
727 | case WindowBoundary::CURRENT_ROW_RANGE: |
728 | if (!wexpr.orders.empty()) { |
729 | return false; |
730 | } |
731 | break; |
732 | default: |
733 | return false; |
734 | } |
735 | |
736 | return true; |
737 | } |
738 | |
739 | WindowExecutor::WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const ValidityMask &partition_mask, |
740 | const idx_t count) |
741 | : wexpr(wexpr), bounds(wexpr, count), payload_collection(), payload_executor(context), filter_executor(context), |
742 | leadlag_offset(wexpr.offset_expr.get(), context), leadlag_default(wexpr.default_expr.get(), context), |
743 | boundary_start(wexpr.start_expr.get(), context), boundary_end(wexpr.end_expr.get(), context), |
744 | range((bounds.has_preceding_range || bounds.has_following_range) ? wexpr.orders[0].expression.get() : nullptr, |
745 | context, count) |
746 | |
747 | { |
748 | // TODO we could evaluate those expressions in parallel |
749 | |
750 | // Check for constant aggregate |
751 | if (IsConstantAggregate(wexpr)) { |
752 | constant_aggregate = |
753 | make_uniq<WindowConstantAggregate>(args: AggregateObject(wexpr), args&: wexpr.return_type, args: partition_mask, args: count); |
754 | } |
755 | |
756 | // evaluate the FILTER clause and stuff it into a large mask for compactness and reuse |
757 | if (wexpr.filter_expr) { |
758 | // Start with all invalid and set the ones that pass |
759 | filter_bits.resize(new_size: ValidityMask::ValidityMaskSize(count), x: 0); |
760 | filter_mask.Initialize(validity: filter_bits.data()); |
761 | filter_executor.AddExpression(expr: *wexpr.filter_expr); |
762 | filter_sel.Initialize(STANDARD_VECTOR_SIZE); |
763 | } |
764 | |
765 | // TODO: child may be a scalar, don't need to materialize the whole collection then |
766 | |
767 | // evaluate inner expressions of window functions, could be more complex |
768 | PrepareInputExpressions(exprs&: wexpr.children, executor&: payload_executor, chunk&: payload_chunk); |
769 | |
770 | auto types = payload_chunk.GetTypes(); |
771 | if (!types.empty()) { |
772 | payload_collection.Initialize(allocator&: Allocator::Get(context), types); |
773 | } |
774 | } |
775 | |
776 | void WindowExecutor::Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) { |
777 | // Single pass over the input to produce the global data. |
778 | // Vectorisation for the win... |
779 | |
780 | // Set up a validity mask for IGNORE NULLS |
781 | bool check_nulls = false; |
782 | if (wexpr.ignore_nulls) { |
783 | switch (wexpr.type) { |
784 | case ExpressionType::WINDOW_LEAD: |
785 | case ExpressionType::WINDOW_LAG: |
786 | case ExpressionType::WINDOW_FIRST_VALUE: |
787 | case ExpressionType::WINDOW_LAST_VALUE: |
788 | case ExpressionType::WINDOW_NTH_VALUE: |
789 | check_nulls = true; |
790 | break; |
791 | default: |
792 | break; |
793 | } |
794 | } |
795 | |
796 | const auto count = input_chunk.size(); |
797 | |
798 | idx_t filtered = 0; |
799 | SelectionVector *filtering = nullptr; |
800 | if (wexpr.filter_expr) { |
801 | filtering = &filter_sel; |
802 | filtered = filter_executor.SelectExpression(input&: input_chunk, sel&: filter_sel); |
803 | for (idx_t f = 0; f < filtered; ++f) { |
804 | filter_mask.SetValid(input_idx + filter_sel[f]); |
805 | } |
806 | } |
807 | |
808 | if (!wexpr.children.empty()) { |
809 | payload_chunk.Reset(); |
810 | payload_executor.Execute(input&: input_chunk, result&: payload_chunk); |
811 | payload_chunk.Verify(); |
812 | if (constant_aggregate) { |
813 | constant_aggregate->Sink(payload_chunk, filter_sel: filtering, filtered); |
814 | } else { |
815 | payload_collection.Append(other: payload_chunk, resize: true); |
816 | } |
817 | |
818 | // process payload chunks while they are still piping hot |
819 | if (check_nulls) { |
820 | UnifiedVectorFormat vdata; |
821 | payload_chunk.data[0].ToUnifiedFormat(count, data&: vdata); |
822 | if (!vdata.validity.AllValid()) { |
823 | // Lazily materialise the contents when we find the first NULL |
824 | if (ignore_nulls.AllValid()) { |
825 | ignore_nulls.Initialize(count: total_count); |
826 | } |
827 | // Write to the current position |
828 | if (input_idx % ValidityMask::BITS_PER_VALUE == 0) { |
829 | // If we are at the edge of an output entry, just copy the entries |
830 | auto dst = ignore_nulls.GetData() + ignore_nulls.EntryCount(count: input_idx); |
831 | auto src = vdata.validity.GetData(); |
832 | for (auto entry_count = vdata.validity.EntryCount(count); entry_count-- > 0;) { |
833 | *dst++ = *src++; |
834 | } |
835 | } else { |
836 | // If not, we have ragged data and need to copy one bit at a time. |
837 | for (idx_t i = 0; i < count; ++i) { |
838 | ignore_nulls.Set(row_idx: input_idx + i, valid: vdata.validity.RowIsValid(row_idx: i)); |
839 | } |
840 | } |
841 | } |
842 | } |
843 | } |
844 | |
845 | range.Append(input_chunk); |
846 | } |
847 | |
848 | void WindowExecutor::Finalize(WindowAggregationMode mode) { |
849 | // build a segment tree for frame-adhering aggregates |
850 | // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf |
851 | if (constant_aggregate) { |
852 | constant_aggregate->Finalize(); |
853 | } else if (wexpr.aggregate) { |
854 | segment_tree = make_uniq<WindowSegmentTree>(args: AggregateObject(wexpr), args&: wexpr.return_type, args: &payload_collection, |
855 | args&: filter_mask, args&: mode); |
856 | } |
857 | } |
858 | |
859 | void WindowExecutor::Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, const ValidityMask &partition_mask, |
860 | const ValidityMask &order_mask) { |
861 | // Evaluate the row-level arguments |
862 | boundary_start.Execute(input_chunk); |
863 | boundary_end.Execute(input_chunk); |
864 | |
865 | leadlag_offset.Execute(input_chunk); |
866 | leadlag_default.Execute(input_chunk); |
867 | |
868 | // this is the main loop, go through all sorted rows and compute window function result |
869 | for (idx_t output_offset = 0; output_offset < input_chunk.size(); ++output_offset, ++row_idx) { |
870 | // special case, OVER (), aggregate over everything |
871 | bounds.Update(row_idx, range_collection&: range, expr_idx: output_offset, boundary_start, boundary_end, partition_mask, order_mask); |
872 | if (WindowNeedsRank(wexpr)) { |
873 | if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init |
874 | dense_rank = 1; |
875 | rank = 1; |
876 | rank_equal = 0; |
877 | } else if (!bounds.is_peer) { |
878 | dense_rank++; |
879 | rank += rank_equal; |
880 | rank_equal = 0; |
881 | } |
882 | rank_equal++; |
883 | } |
884 | |
885 | // if no values are read for window, result is NULL |
886 | if (bounds.window_start >= bounds.window_end) { |
887 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
888 | continue; |
889 | } |
890 | |
891 | switch (wexpr.type) { |
892 | case ExpressionType::WINDOW_AGGREGATE: { |
893 | if (constant_aggregate) { |
894 | constant_aggregate->Compute(result, rid: output_offset, start: bounds.window_start, end: bounds.window_end); |
895 | } else { |
896 | segment_tree->Compute(result, rid: output_offset, start: bounds.window_start, end: bounds.window_end); |
897 | } |
898 | break; |
899 | } |
900 | case ExpressionType::WINDOW_ROW_NUMBER: { |
901 | auto rdata = FlatVector::GetData<int64_t>(vector&: result); |
902 | rdata[output_offset] = row_idx - bounds.partition_start + 1; |
903 | break; |
904 | } |
905 | case ExpressionType::WINDOW_RANK_DENSE: { |
906 | auto rdata = FlatVector::GetData<int64_t>(vector&: result); |
907 | rdata[output_offset] = dense_rank; |
908 | break; |
909 | } |
910 | case ExpressionType::WINDOW_RANK: { |
911 | auto rdata = FlatVector::GetData<int64_t>(vector&: result); |
912 | rdata[output_offset] = rank; |
913 | break; |
914 | } |
915 | case ExpressionType::WINDOW_PERCENT_RANK: { |
916 | int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start - 1; |
917 | double percent_rank = denom > 0 ? ((double)rank - 1) / denom : 0; |
918 | auto rdata = FlatVector::GetData<double>(vector&: result); |
919 | rdata[output_offset] = percent_rank; |
920 | break; |
921 | } |
922 | case ExpressionType::WINDOW_CUME_DIST: { |
923 | int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start; |
924 | double cume_dist = denom > 0 ? ((double)(bounds.peer_end - bounds.partition_start)) / denom : 0; |
925 | auto rdata = FlatVector::GetData<double>(vector&: result); |
926 | rdata[output_offset] = cume_dist; |
927 | break; |
928 | } |
929 | case ExpressionType::WINDOW_NTILE: { |
930 | D_ASSERT(payload_collection.ColumnCount() == 1); |
931 | if (CellIsNull(chunk&: payload_collection, column: 0, index: row_idx)) { |
932 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
933 | } else { |
934 | auto n_param = GetCell<int64_t>(chunk&: payload_collection, column: 0, index: row_idx); |
935 | if (n_param < 1) { |
936 | throw InvalidInputException("Argument for ntile must be greater than zero" ); |
937 | } |
938 | // With thanks from SQLite's ntileValueFunc() |
939 | int64_t n_total = bounds.partition_end - bounds.partition_start; |
940 | if (n_param > n_total) { |
941 | // more groups allowed than we have values |
942 | // map every entry to a unique group |
943 | n_param = n_total; |
944 | } |
945 | int64_t n_size = (n_total / n_param); |
946 | // find the row idx within the group |
947 | D_ASSERT(row_idx >= bounds.partition_start); |
948 | int64_t adjusted_row_idx = row_idx - bounds.partition_start; |
949 | // now compute the ntile |
950 | int64_t n_large = n_total - n_param * n_size; |
951 | int64_t i_small = n_large * (n_size + 1); |
952 | int64_t result_ntile; |
953 | |
954 | D_ASSERT((n_large * (n_size + 1) + (n_param - n_large) * n_size) == n_total); |
955 | |
956 | if (adjusted_row_idx < i_small) { |
957 | result_ntile = 1 + adjusted_row_idx / (n_size + 1); |
958 | } else { |
959 | result_ntile = 1 + n_large + (adjusted_row_idx - i_small) / n_size; |
960 | } |
961 | // result has to be between [1, NTILE] |
962 | D_ASSERT(result_ntile >= 1 && result_ntile <= n_param); |
963 | auto rdata = FlatVector::GetData<int64_t>(vector&: result); |
964 | rdata[output_offset] = result_ntile; |
965 | } |
966 | break; |
967 | } |
968 | case ExpressionType::WINDOW_LEAD: |
969 | case ExpressionType::WINDOW_LAG: { |
970 | int64_t offset = 1; |
971 | if (wexpr.offset_expr) { |
972 | offset = leadlag_offset.GetCell<int64_t>(i: output_offset); |
973 | } |
974 | int64_t val_idx = (int64_t)row_idx; |
975 | if (wexpr.type == ExpressionType::WINDOW_LEAD) { |
976 | val_idx += offset; |
977 | } else { |
978 | val_idx -= offset; |
979 | } |
980 | |
981 | idx_t delta = 0; |
982 | if (val_idx < (int64_t)row_idx) { |
983 | // Count backwards |
984 | delta = idx_t(row_idx - val_idx); |
985 | val_idx = FindPrevStart(mask: ignore_nulls, l: bounds.partition_start, r: row_idx, n&: delta); |
986 | } else if (val_idx > (int64_t)row_idx) { |
987 | delta = idx_t(val_idx - row_idx); |
988 | val_idx = FindNextStart(mask: ignore_nulls, l: row_idx + 1, r: bounds.partition_end, n&: delta); |
989 | } |
990 | // else offset is zero, so don't move. |
991 | |
992 | if (!delta) { |
993 | CopyCell(chunk&: payload_collection, column: 0, index: val_idx, target&: result, target_offset: output_offset); |
994 | } else if (wexpr.default_expr) { |
995 | leadlag_default.CopyCell(target&: result, target_offset: output_offset); |
996 | } else { |
997 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
998 | } |
999 | break; |
1000 | } |
1001 | case ExpressionType::WINDOW_FIRST_VALUE: { |
1002 | // Same as NTH_VALUE(..., 1) |
1003 | idx_t n = 1; |
1004 | const auto first_idx = FindNextStart(mask: ignore_nulls, l: bounds.window_start, r: bounds.window_end, n); |
1005 | if (!n) { |
1006 | CopyCell(chunk&: payload_collection, column: 0, index: first_idx, target&: result, target_offset: output_offset); |
1007 | } else { |
1008 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
1009 | } |
1010 | break; |
1011 | } |
1012 | case ExpressionType::WINDOW_LAST_VALUE: { |
1013 | idx_t n = 1; |
1014 | const auto last_idx = FindPrevStart(mask: ignore_nulls, l: bounds.window_start, r: bounds.window_end, n); |
1015 | if (!n) { |
1016 | CopyCell(chunk&: payload_collection, column: 0, index: last_idx, target&: result, target_offset: output_offset); |
1017 | } else { |
1018 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
1019 | } |
1020 | break; |
1021 | } |
1022 | case ExpressionType::WINDOW_NTH_VALUE: { |
1023 | D_ASSERT(payload_collection.ColumnCount() == 2); |
1024 | // Returns value evaluated at the row that is the n'th row of the window frame (counting from 1); |
1025 | // returns NULL if there is no such row. |
1026 | if (CellIsNull(chunk&: payload_collection, column: 1, index: row_idx)) { |
1027 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
1028 | } else { |
1029 | auto n_param = GetCell<int64_t>(chunk&: payload_collection, column: 1, index: row_idx); |
1030 | if (n_param < 1) { |
1031 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
1032 | } else { |
1033 | auto n = idx_t(n_param); |
1034 | const auto nth_index = FindNextStart(mask: ignore_nulls, l: bounds.window_start, r: bounds.window_end, n); |
1035 | if (!n) { |
1036 | CopyCell(chunk&: payload_collection, column: 0, index: nth_index, target&: result, target_offset: output_offset); |
1037 | } else { |
1038 | FlatVector::SetNull(vector&: result, idx: output_offset, is_null: true); |
1039 | } |
1040 | } |
1041 | } |
1042 | break; |
1043 | } |
1044 | default: |
1045 | throw InternalException("Window aggregate type %s" , ExpressionTypeToString(type: wexpr.type)); |
1046 | } |
1047 | } |
1048 | |
1049 | result.Verify(count: input_chunk.size()); |
1050 | } |
1051 | |
1052 | //===--------------------------------------------------------------------===// |
1053 | // Sink |
1054 | //===--------------------------------------------------------------------===// |
1055 | SinkResultType PhysicalWindow::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
1056 | auto &lstate = input.local_state.Cast<WindowLocalSinkState>(); |
1057 | |
1058 | lstate.Sink(input_chunk&: chunk); |
1059 | |
1060 | return SinkResultType::NEED_MORE_INPUT; |
1061 | } |
1062 | |
1063 | void PhysicalWindow::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
1064 | auto &lstate = lstate_p.Cast<WindowLocalSinkState>(); |
1065 | lstate.Combine(); |
1066 | } |
1067 | |
1068 | unique_ptr<LocalSinkState> PhysicalWindow::GetLocalSinkState(ExecutionContext &context) const { |
1069 | auto &gstate = sink_state->Cast<WindowGlobalSinkState>(); |
1070 | return make_uniq<WindowLocalSinkState>(args&: context.client, args&: gstate); |
1071 | } |
1072 | |
1073 | unique_ptr<GlobalSinkState> PhysicalWindow::GetGlobalSinkState(ClientContext &context) const { |
1074 | return make_uniq<WindowGlobalSinkState>(args: *this, args&: context); |
1075 | } |
1076 | |
1077 | SinkFinalizeType PhysicalWindow::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
1078 | GlobalSinkState &gstate_p) const { |
1079 | auto &state = gstate_p.Cast<WindowGlobalSinkState>(); |
1080 | |
1081 | // Did we get any data? |
1082 | if (!state.global_partition->count) { |
1083 | return SinkFinalizeType::NO_OUTPUT_POSSIBLE; |
1084 | } |
1085 | |
1086 | // Do we have any sorting to schedule? |
1087 | if (state.global_partition->rows) { |
1088 | D_ASSERT(!state.global_partition->grouping_data); |
1089 | return state.global_partition->rows->count ? SinkFinalizeType::READY : SinkFinalizeType::NO_OUTPUT_POSSIBLE; |
1090 | } |
1091 | |
1092 | // Find the first group to sort |
1093 | auto &groups = state.global_partition->grouping_data->GetPartitions(); |
1094 | if (groups.empty()) { |
1095 | // Empty input! |
1096 | return SinkFinalizeType::NO_OUTPUT_POSSIBLE; |
1097 | } |
1098 | |
1099 | // Schedule all the sorts for maximum thread utilisation |
1100 | auto new_event = make_shared<PartitionMergeEvent>(args&: *state.global_partition, args&: pipeline); |
1101 | event.InsertEvent(replacement_event: std::move(new_event)); |
1102 | |
1103 | return SinkFinalizeType::READY; |
1104 | } |
1105 | |
1106 | //===--------------------------------------------------------------------===// |
1107 | // Source |
1108 | //===--------------------------------------------------------------------===// |
1109 | class WindowGlobalSourceState : public GlobalSourceState { |
1110 | public: |
1111 | explicit WindowGlobalSourceState(WindowGlobalSinkState &gsink) : gsink(*gsink.global_partition), next_bin(0) { |
1112 | } |
1113 | |
1114 | PartitionGlobalSinkState &gsink; |
1115 | //! The output read position. |
1116 | atomic<idx_t> next_bin; |
1117 | |
1118 | public: |
1119 | idx_t MaxThreads() override { |
1120 | // If there is only one partition, we have to process it on one thread. |
1121 | if (!gsink.grouping_data) { |
1122 | return 1; |
1123 | } |
1124 | |
1125 | // If there is not a lot of data, process serially. |
1126 | if (gsink.count < STANDARD_ROW_GROUPS_SIZE) { |
1127 | return 1; |
1128 | } |
1129 | |
1130 | return gsink.hash_groups.size(); |
1131 | } |
1132 | }; |
1133 | |
1134 | // Per-thread read state |
1135 | class WindowLocalSourceState : public LocalSourceState { |
1136 | public: |
1137 | using HashGroupPtr = unique_ptr<PartitionGlobalHashGroup>; |
1138 | using WindowExecutorPtr = unique_ptr<WindowExecutor>; |
1139 | using WindowExecutors = vector<WindowExecutorPtr>; |
1140 | |
1141 | WindowLocalSourceState(const PhysicalWindow &op_p, ExecutionContext &context, WindowGlobalSourceState &gsource) |
1142 | : context(context.client), op(op_p), gsink(gsource.gsink) { |
1143 | |
1144 | vector<LogicalType> output_types; |
1145 | for (idx_t expr_idx = 0; expr_idx < op.select_list.size(); ++expr_idx) { |
1146 | D_ASSERT(op.select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); |
1147 | auto &wexpr = op.select_list[expr_idx]->Cast<BoundWindowExpression>(); |
1148 | output_types.emplace_back(args&: wexpr.return_type); |
1149 | } |
1150 | output_chunk.Initialize(allocator&: Allocator::Get(context&: context.client), types: output_types); |
1151 | |
1152 | const auto &input_types = gsink.payload_types; |
1153 | layout.Initialize(types: input_types); |
1154 | input_chunk.Initialize(allocator&: gsink.allocator, types: input_types); |
1155 | } |
1156 | |
1157 | void MaterializeSortedData(); |
1158 | void GeneratePartition(WindowGlobalSinkState &gstate, const idx_t hash_bin); |
1159 | void Scan(DataChunk &chunk); |
1160 | |
1161 | HashGroupPtr hash_group; |
1162 | ClientContext &context; |
1163 | const PhysicalWindow &op; |
1164 | |
1165 | PartitionGlobalSinkState &gsink; |
1166 | |
1167 | //! The generated input chunks |
1168 | unique_ptr<RowDataCollection> rows; |
1169 | unique_ptr<RowDataCollection> heap; |
1170 | RowLayout layout; |
1171 | //! The partition boundary mask |
1172 | vector<validity_t> partition_bits; |
1173 | ValidityMask partition_mask; |
1174 | //! The order boundary mask |
1175 | vector<validity_t> order_bits; |
1176 | ValidityMask order_mask; |
1177 | //! The current execution functions |
1178 | WindowExecutors window_execs; |
1179 | |
1180 | //! The read partition |
1181 | idx_t hash_bin; |
1182 | //! The read cursor |
1183 | unique_ptr<RowDataCollectionScanner> scanner; |
1184 | //! Buffer for the inputs |
1185 | DataChunk input_chunk; |
1186 | //! Buffer for window results |
1187 | DataChunk output_chunk; |
1188 | }; |
1189 | |
1190 | void WindowLocalSourceState::MaterializeSortedData() { |
1191 | auto &global_sort_state = *hash_group->global_sort; |
1192 | if (global_sort_state.sorted_blocks.empty()) { |
1193 | return; |
1194 | } |
1195 | |
1196 | // scan the sorted row data |
1197 | D_ASSERT(global_sort_state.sorted_blocks.size() == 1); |
1198 | auto &sb = *global_sort_state.sorted_blocks[0]; |
1199 | |
1200 | // Free up some memory before allocating more |
1201 | sb.radix_sorting_data.clear(); |
1202 | sb.blob_sorting_data = nullptr; |
1203 | |
1204 | // Move the sorting row blocks into our RDCs |
1205 | auto &buffer_manager = global_sort_state.buffer_manager; |
1206 | auto &sd = *sb.payload_data; |
1207 | |
1208 | // Data blocks are required |
1209 | D_ASSERT(!sd.data_blocks.empty()); |
1210 | auto &block = sd.data_blocks[0]; |
1211 | rows = make_uniq<RowDataCollection>(args&: buffer_manager, args&: block->capacity, args: block->entry_size); |
1212 | rows->blocks = std::move(sd.data_blocks); |
1213 | rows->count = std::accumulate(first: rows->blocks.begin(), last: rows->blocks.end(), init: idx_t(0), |
1214 | binary_op: [&](idx_t c, const unique_ptr<RowDataBlock> &b) { return c + b->count; }); |
1215 | |
1216 | // Heap blocks are optional, but we want both for iteration. |
1217 | if (!sd.heap_blocks.empty()) { |
1218 | auto &block = sd.heap_blocks[0]; |
1219 | heap = make_uniq<RowDataCollection>(args&: buffer_manager, args&: block->capacity, args: block->entry_size); |
1220 | heap->blocks = std::move(sd.heap_blocks); |
1221 | hash_group.reset(); |
1222 | } else { |
1223 | heap = make_uniq<RowDataCollection>(args&: buffer_manager, args: (idx_t)Storage::BLOCK_SIZE, args: 1, args: true); |
1224 | } |
1225 | heap->count = std::accumulate(first: heap->blocks.begin(), last: heap->blocks.end(), init: idx_t(0), |
1226 | binary_op: [&](idx_t c, const unique_ptr<RowDataBlock> &b) { return c + b->count; }); |
1227 | } |
1228 | |
1229 | void WindowLocalSourceState::GeneratePartition(WindowGlobalSinkState &gstate, const idx_t hash_bin_p) { |
1230 | // Get rid of any stale data |
1231 | hash_bin = hash_bin_p; |
1232 | |
1233 | // There are three types of partitions: |
1234 | // 1. No partition (no sorting) |
1235 | // 2. One partition (sorting, but no hashing) |
1236 | // 3. Multiple partitions (sorting and hashing) |
1237 | |
1238 | // How big is the partition? |
1239 | idx_t count = 0; |
1240 | if (hash_bin < gsink.hash_groups.size() && gsink.hash_groups[hash_bin]) { |
1241 | count = gsink.hash_groups[hash_bin]->count; |
1242 | } else if (gsink.rows && !hash_bin) { |
1243 | count = gsink.count; |
1244 | } else { |
1245 | return; |
1246 | } |
1247 | |
1248 | // Initialise masks to false |
1249 | const auto bit_count = ValidityMask::ValidityMaskSize(count); |
1250 | partition_bits.clear(); |
1251 | partition_bits.resize(new_size: bit_count, x: 0); |
1252 | partition_mask.Initialize(validity: partition_bits.data()); |
1253 | |
1254 | order_bits.clear(); |
1255 | order_bits.resize(new_size: bit_count, x: 0); |
1256 | order_mask.Initialize(validity: order_bits.data()); |
1257 | |
1258 | // Scan the sorted data into new Collections |
1259 | auto external = gsink.external; |
1260 | if (gsink.rows && !hash_bin) { |
1261 | // Simple mask |
1262 | partition_mask.SetValidUnsafe(0); |
1263 | order_mask.SetValidUnsafe(0); |
1264 | // No partition - align the heap blocks with the row blocks |
1265 | rows = gsink.rows->CloneEmpty(keep_pinned: gsink.rows->keep_pinned); |
1266 | heap = gsink.strings->CloneEmpty(keep_pinned: gsink.strings->keep_pinned); |
1267 | RowDataCollectionScanner::AlignHeapBlocks(dst_block_collection&: *rows, dst_string_heap&: *heap, src_block_collection&: *gsink.rows, src_string_heap&: *gsink.strings, layout); |
1268 | external = true; |
1269 | } else if (hash_bin < gsink.hash_groups.size() && gsink.hash_groups[hash_bin]) { |
1270 | // Overwrite the collections with the sorted data |
1271 | hash_group = std::move(gsink.hash_groups[hash_bin]); |
1272 | hash_group->ComputeMasks(partition_mask, order_mask); |
1273 | external = hash_group->global_sort->external; |
1274 | MaterializeSortedData(); |
1275 | } else { |
1276 | return; |
1277 | } |
1278 | |
1279 | // Create the executors for each function |
1280 | window_execs.clear(); |
1281 | for (idx_t expr_idx = 0; expr_idx < op.select_list.size(); ++expr_idx) { |
1282 | D_ASSERT(op.select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); |
1283 | auto &wexpr = op.select_list[expr_idx]->Cast<BoundWindowExpression>(); |
1284 | auto wexec = make_uniq<WindowExecutor>(args&: wexpr, args&: context, args&: partition_mask, args&: count); |
1285 | window_execs.emplace_back(args: std::move(wexec)); |
1286 | } |
1287 | |
1288 | // First pass over the input without flushing |
1289 | // TODO: Factor out the constructor data as global state |
1290 | scanner = make_uniq<RowDataCollectionScanner>(args&: *rows, args&: *heap, args&: layout, args&: external, args: false); |
1291 | idx_t input_idx = 0; |
1292 | while (true) { |
1293 | input_chunk.Reset(); |
1294 | scanner->Scan(chunk&: input_chunk); |
1295 | if (input_chunk.size() == 0) { |
1296 | break; |
1297 | } |
1298 | |
1299 | // TODO: Parallelization opportunity |
1300 | for (auto &wexec : window_execs) { |
1301 | wexec->Sink(input_chunk, input_idx, total_count: scanner->Count()); |
1302 | } |
1303 | input_idx += input_chunk.size(); |
1304 | } |
1305 | |
1306 | // TODO: Parallelization opportunity |
1307 | for (auto &wexec : window_execs) { |
1308 | wexec->Finalize(mode: gstate.mode); |
1309 | } |
1310 | |
1311 | // External scanning assumes all blocks are swizzled. |
1312 | scanner->ReSwizzle(); |
1313 | |
1314 | // Second pass can flush |
1315 | scanner->Reset(flush: true); |
1316 | } |
1317 | |
1318 | void WindowLocalSourceState::Scan(DataChunk &result) { |
1319 | D_ASSERT(scanner); |
1320 | if (!scanner->Remaining()) { |
1321 | return; |
1322 | } |
1323 | |
1324 | const auto position = scanner->Scanned(); |
1325 | input_chunk.Reset(); |
1326 | scanner->Scan(chunk&: input_chunk); |
1327 | |
1328 | output_chunk.Reset(); |
1329 | for (idx_t expr_idx = 0; expr_idx < window_execs.size(); ++expr_idx) { |
1330 | auto &executor = *window_execs[expr_idx]; |
1331 | executor.Evaluate(row_idx: position, input_chunk, result&: output_chunk.data[expr_idx], partition_mask, order_mask); |
1332 | } |
1333 | output_chunk.SetCardinality(input_chunk); |
1334 | output_chunk.Verify(); |
1335 | |
1336 | idx_t out_idx = 0; |
1337 | result.SetCardinality(input_chunk); |
1338 | for (idx_t col_idx = 0; col_idx < input_chunk.ColumnCount(); col_idx++) { |
1339 | result.data[out_idx++].Reference(other&: input_chunk.data[col_idx]); |
1340 | } |
1341 | for (idx_t col_idx = 0; col_idx < output_chunk.ColumnCount(); col_idx++) { |
1342 | result.data[out_idx++].Reference(other&: output_chunk.data[col_idx]); |
1343 | } |
1344 | result.Verify(); |
1345 | } |
1346 | |
1347 | unique_ptr<LocalSourceState> PhysicalWindow::GetLocalSourceState(ExecutionContext &context, |
1348 | GlobalSourceState &gstate_p) const { |
1349 | auto &gstate = gstate_p.Cast<WindowGlobalSourceState>(); |
1350 | return make_uniq<WindowLocalSourceState>(args: *this, args&: context, args&: gstate); |
1351 | } |
1352 | |
1353 | unique_ptr<GlobalSourceState> PhysicalWindow::GetGlobalSourceState(ClientContext &context) const { |
1354 | auto &gsink = sink_state->Cast<WindowGlobalSinkState>(); |
1355 | return make_uniq<WindowGlobalSourceState>(args&: gsink); |
1356 | } |
1357 | |
1358 | SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &chunk, |
1359 | OperatorSourceInput &input) const { |
1360 | auto &lsource = input.local_state.Cast<WindowLocalSourceState>(); |
1361 | auto &gsource = input.global_state.Cast<WindowGlobalSourceState>(); |
1362 | auto &gsink = sink_state->Cast<WindowGlobalSinkState>(); |
1363 | |
1364 | auto &hash_groups = gsink.global_partition->hash_groups; |
1365 | const auto bin_count = hash_groups.empty() ? 1 : hash_groups.size(); |
1366 | |
1367 | while (chunk.size() == 0) { |
1368 | // Move to the next bin if we are done. |
1369 | while (!lsource.scanner || !lsource.scanner->Remaining()) { |
1370 | lsource.scanner.reset(); |
1371 | lsource.rows.reset(); |
1372 | lsource.heap.reset(); |
1373 | lsource.hash_group.reset(); |
1374 | auto hash_bin = gsource.next_bin++; |
1375 | if (hash_bin >= bin_count) { |
1376 | return chunk.size() > 0 ? SourceResultType::HAVE_MORE_OUTPUT : SourceResultType::FINISHED; |
1377 | } |
1378 | |
1379 | for (; hash_bin < hash_groups.size(); hash_bin = gsource.next_bin++) { |
1380 | if (hash_groups[hash_bin]) { |
1381 | break; |
1382 | } |
1383 | } |
1384 | lsource.GeneratePartition(gstate&: gsink, hash_bin_p: hash_bin); |
1385 | } |
1386 | |
1387 | lsource.Scan(result&: chunk); |
1388 | } |
1389 | |
1390 | return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
1391 | } |
1392 | |
1393 | string PhysicalWindow::ParamsToString() const { |
1394 | string result; |
1395 | for (idx_t i = 0; i < select_list.size(); i++) { |
1396 | if (i > 0) { |
1397 | result += "\n" ; |
1398 | } |
1399 | result += select_list[i]->GetName(); |
1400 | } |
1401 | return result; |
1402 | } |
1403 | |
1404 | } // namespace duckdb |
1405 | |