1 | #include "duckdb/execution/operator/aggregate/physical_window.hpp" |
2 | |
3 | #include "duckdb/common/types/chunk_collection.hpp" |
4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/execution/window_segment_tree.hpp" |
7 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
8 | #include "duckdb/planner/expression/bound_window_expression.hpp" |
9 | |
10 | #include <cmath> |
11 | |
12 | using namespace duckdb; |
13 | using namespace std; |
14 | |
15 | //! The operator state of the window |
16 | class PhysicalWindowOperatorState : public PhysicalOperatorState { |
17 | public: |
18 | PhysicalWindowOperatorState(PhysicalOperator *child) : PhysicalOperatorState(child), position(0) { |
19 | } |
20 | |
21 | idx_t position; |
22 | ChunkCollection tuples; |
23 | ChunkCollection window_results; |
24 | }; |
25 | |
26 | // this implements a sorted window functions variant |
27 | PhysicalWindow::PhysicalWindow(LogicalOperator &op, vector<unique_ptr<Expression>> select_list, |
28 | PhysicalOperatorType type) |
29 | : PhysicalOperator(type, op.types), select_list(std::move(select_list)) { |
30 | } |
31 | |
32 | static bool EqualsSubset(vector<Value> &a, vector<Value> &b, idx_t start, idx_t end) { |
33 | assert(start <= end); |
34 | for (idx_t i = start; i < end; i++) { |
35 | if (a[i] != b[i]) { |
36 | return false; |
37 | } |
38 | } |
39 | return true; |
40 | } |
41 | |
42 | static idx_t BinarySearchRightmost(ChunkCollection &input, vector<Value> row, idx_t l, idx_t r, idx_t comp_cols) { |
43 | if (comp_cols == 0) { |
44 | return r - 1; |
45 | } |
46 | while (l < r) { |
47 | idx_t m = floor((l + r) / 2); |
48 | bool less_than_equals = true; |
49 | for (idx_t i = 0; i < comp_cols; i++) { |
50 | if (input.GetRow(m)[i] > row[i]) { |
51 | less_than_equals = false; |
52 | break; |
53 | } |
54 | } |
55 | if (less_than_equals) { |
56 | l = m + 1; |
57 | } else { |
58 | r = m; |
59 | } |
60 | } |
61 | return l - 1; |
62 | } |
63 | |
64 | static void MaterializeExpressions(ClientContext &context, Expression **exprs, idx_t expr_count, ChunkCollection &input, |
65 | ChunkCollection &output, bool scalar = false) { |
66 | if (expr_count == 0) { |
67 | return; |
68 | } |
69 | |
70 | vector<TypeId> types; |
71 | ExpressionExecutor executor; |
72 | for (idx_t expr_idx = 0; expr_idx < expr_count; ++expr_idx) { |
73 | types.push_back(exprs[expr_idx]->return_type); |
74 | executor.AddExpression(*exprs[expr_idx]); |
75 | } |
76 | |
77 | for (idx_t i = 0; i < input.chunks.size(); i++) { |
78 | DataChunk chunk; |
79 | chunk.Initialize(types); |
80 | |
81 | executor.Execute(*input.chunks[i], chunk); |
82 | |
83 | chunk.Verify(); |
84 | output.Append(chunk); |
85 | |
86 | if (scalar) { |
87 | break; |
88 | } |
89 | } |
90 | } |
91 | |
92 | static void MaterializeExpression(ClientContext &context, Expression *expr, ChunkCollection &input, |
93 | ChunkCollection &output, bool scalar = false) { |
94 | MaterializeExpressions(context, &expr, 1, input, output, scalar); |
95 | } |
96 | |
97 | static void SortCollectionForWindow(ClientContext &context, BoundWindowExpression *wexpr, ChunkCollection &input, |
98 | ChunkCollection &output, ChunkCollection &sort_collection) { |
99 | vector<TypeId> sort_types; |
100 | vector<OrderType> orders; |
101 | ExpressionExecutor executor; |
102 | |
103 | // we sort by both 1) partition by expression list and 2) order by expressions |
104 | for (idx_t prt_idx = 0; prt_idx < wexpr->partitions.size(); prt_idx++) { |
105 | auto &pexpr = wexpr->partitions[prt_idx]; |
106 | sort_types.push_back(pexpr->return_type); |
107 | orders.push_back(OrderType::ASCENDING); |
108 | executor.AddExpression(*pexpr); |
109 | } |
110 | |
111 | for (idx_t ord_idx = 0; ord_idx < wexpr->orders.size(); ord_idx++) { |
112 | auto &oexpr = wexpr->orders[ord_idx].expression; |
113 | sort_types.push_back(oexpr->return_type); |
114 | orders.push_back(wexpr->orders[ord_idx].type); |
115 | executor.AddExpression(*oexpr); |
116 | } |
117 | |
118 | assert(sort_types.size() > 0); |
119 | |
120 | // create a chunkcollection for the results of the expressions in the window definitions |
121 | for (idx_t i = 0; i < input.chunks.size(); i++) { |
122 | DataChunk sort_chunk; |
123 | sort_chunk.Initialize(sort_types); |
124 | |
125 | executor.Execute(*input.chunks[i], sort_chunk); |
126 | |
127 | sort_chunk.Verify(); |
128 | sort_collection.Append(sort_chunk); |
129 | } |
130 | |
131 | assert(input.count == sort_collection.count); |
132 | |
133 | auto sorted_vector = unique_ptr<idx_t[]>(new idx_t[input.count]); |
134 | sort_collection.Sort(orders, sorted_vector.get()); |
135 | |
136 | input.Reorder(sorted_vector.get()); |
137 | output.Reorder(sorted_vector.get()); |
138 | sort_collection.Reorder(sorted_vector.get()); |
139 | } |
140 | |
141 | struct WindowBoundariesState { |
142 | idx_t partition_start = 0; |
143 | idx_t partition_end = 0; |
144 | idx_t peer_start = 0; |
145 | idx_t peer_end = 0; |
146 | int64_t window_start = -1; |
147 | int64_t window_end = -1; |
148 | bool is_same_partition = false; |
149 | bool is_peer = false; |
150 | vector<Value> row_prev; |
151 | }; |
152 | |
153 | static bool WindowNeedsRank(BoundWindowExpression *wexpr) { |
154 | return wexpr->type == ExpressionType::WINDOW_PERCENT_RANK || wexpr->type == ExpressionType::WINDOW_RANK || |
155 | wexpr->type == ExpressionType::WINDOW_RANK_DENSE || wexpr->type == ExpressionType::WINDOW_CUME_DIST; |
156 | } |
157 | |
158 | static void UpdateWindowBoundaries(BoundWindowExpression *wexpr, ChunkCollection &input, idx_t input_size, |
159 | idx_t row_idx, ChunkCollection &boundary_start_collection, |
160 | ChunkCollection &boundary_end_collection, WindowBoundariesState &bounds) { |
161 | |
162 | if (input.column_count() > 0) { |
163 | vector<Value> row_cur = input.GetRow(row_idx); |
164 | idx_t sort_col_count = wexpr->partitions.size() + wexpr->orders.size(); |
165 | |
166 | // determine partition and peer group boundaries to ultimately figure out window size |
167 | bounds.is_same_partition = EqualsSubset(bounds.row_prev, row_cur, 0, wexpr->partitions.size()); |
168 | bounds.is_peer = bounds.is_same_partition && |
169 | EqualsSubset(bounds.row_prev, row_cur, wexpr->partitions.size(), sort_col_count); |
170 | bounds.row_prev = row_cur; |
171 | |
172 | // when the partition changes, recompute the boundaries |
173 | if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init |
174 | bounds.partition_start = row_idx; |
175 | bounds.peer_start = row_idx; |
176 | |
177 | // find end of partition |
178 | bounds.partition_end = |
179 | BinarySearchRightmost(input, row_cur, bounds.partition_start, input.count, wexpr->partitions.size()) + |
180 | 1; |
181 | |
182 | } else if (!bounds.is_peer) { |
183 | bounds.peer_start = row_idx; |
184 | } |
185 | |
186 | if (wexpr->end == WindowBoundary::CURRENT_ROW_RANGE || wexpr->type == ExpressionType::WINDOW_CUME_DIST) { |
187 | bounds.peer_end = BinarySearchRightmost(input, row_cur, row_idx, bounds.partition_end, sort_col_count) + 1; |
188 | } |
189 | } else { |
190 | bounds.is_same_partition = 0; |
191 | bounds.is_peer = true; |
192 | bounds.partition_end = input_size; |
193 | bounds.peer_end = bounds.partition_end; |
194 | } |
195 | |
196 | // determine window boundaries depending on the type of expression |
197 | bounds.window_start = -1; |
198 | bounds.window_end = -1; |
199 | |
200 | switch (wexpr->start) { |
201 | case WindowBoundary::UNBOUNDED_PRECEDING: |
202 | bounds.window_start = bounds.partition_start; |
203 | break; |
204 | case WindowBoundary::CURRENT_ROW_ROWS: |
205 | bounds.window_start = row_idx; |
206 | break; |
207 | case WindowBoundary::CURRENT_ROW_RANGE: |
208 | bounds.window_start = bounds.peer_start; |
209 | break; |
210 | case WindowBoundary::UNBOUNDED_FOLLOWING: |
211 | assert(0); // disallowed |
212 | break; |
213 | case WindowBoundary::EXPR_PRECEDING: { |
214 | assert(boundary_start_collection.column_count() > 0); |
215 | bounds.window_start = |
216 | (int64_t)row_idx - |
217 | boundary_start_collection.GetValue(0, wexpr->start_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>(); |
218 | break; |
219 | } |
220 | case WindowBoundary::EXPR_FOLLOWING: { |
221 | assert(boundary_start_collection.column_count() > 0); |
222 | bounds.window_start = |
223 | row_idx + |
224 | boundary_start_collection.GetValue(0, wexpr->start_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>(); |
225 | break; |
226 | } |
227 | |
228 | default: |
229 | throw NotImplementedException("Unsupported boundary" ); |
230 | } |
231 | |
232 | switch (wexpr->end) { |
233 | case WindowBoundary::UNBOUNDED_PRECEDING: |
234 | assert(0); // disallowed |
235 | break; |
236 | case WindowBoundary::CURRENT_ROW_ROWS: |
237 | bounds.window_end = row_idx + 1; |
238 | break; |
239 | case WindowBoundary::CURRENT_ROW_RANGE: |
240 | bounds.window_end = bounds.peer_end; |
241 | break; |
242 | case WindowBoundary::UNBOUNDED_FOLLOWING: |
243 | bounds.window_end = bounds.partition_end; |
244 | break; |
245 | case WindowBoundary::EXPR_PRECEDING: |
246 | assert(boundary_end_collection.column_count() > 0); |
247 | bounds.window_end = |
248 | (int64_t)row_idx - |
249 | boundary_end_collection.GetValue(0, wexpr->end_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>() + 1; |
250 | break; |
251 | case WindowBoundary::EXPR_FOLLOWING: |
252 | assert(boundary_end_collection.column_count() > 0); |
253 | bounds.window_end = |
254 | row_idx + |
255 | boundary_end_collection.GetValue(0, wexpr->end_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>() + 1; |
256 | |
257 | break; |
258 | default: |
259 | throw NotImplementedException("Unsupported boundary" ); |
260 | } |
261 | |
262 | // clamp windows to partitions if they should exceed |
263 | if (bounds.window_start < (int64_t)bounds.partition_start) { |
264 | bounds.window_start = bounds.partition_start; |
265 | } |
266 | if ((idx_t)bounds.window_end > bounds.partition_end) { |
267 | bounds.window_end = bounds.partition_end; |
268 | } |
269 | |
270 | if (bounds.window_start < 0 || bounds.window_end < 0) { |
271 | throw Exception("Failed to compute window boundaries" ); |
272 | } |
273 | } |
274 | |
275 | static void ComputeWindowExpression(ClientContext &context, BoundWindowExpression *wexpr, ChunkCollection &input, |
276 | ChunkCollection &output, idx_t output_idx) { |
277 | |
278 | ChunkCollection sort_collection; |
279 | bool needs_sorting = wexpr->partitions.size() + wexpr->orders.size() > 0; |
280 | if (needs_sorting) { |
281 | SortCollectionForWindow(context, wexpr, input, output, sort_collection); |
282 | } |
283 | |
284 | // evaluate inner expressions of window functions, could be more complex |
285 | ChunkCollection payload_collection; |
286 | vector<Expression *> exprs; |
287 | for (auto &child : wexpr->children) { |
288 | exprs.push_back(child.get()); |
289 | } |
290 | // TODO: child may be a scalar, don't need to materialize the whole collection then |
291 | MaterializeExpressions(context, exprs.data(), exprs.size(), input, payload_collection); |
292 | |
293 | ChunkCollection leadlag_offset_collection; |
294 | ChunkCollection leadlag_default_collection; |
295 | if (wexpr->type == ExpressionType::WINDOW_LEAD || wexpr->type == ExpressionType::WINDOW_LAG) { |
296 | if (wexpr->offset_expr) { |
297 | MaterializeExpression(context, wexpr->offset_expr.get(), input, leadlag_offset_collection, |
298 | wexpr->offset_expr->IsScalar()); |
299 | } |
300 | if (wexpr->default_expr) { |
301 | MaterializeExpression(context, wexpr->default_expr.get(), input, leadlag_default_collection, |
302 | wexpr->default_expr->IsScalar()); |
303 | } |
304 | } |
305 | |
306 | // evaluate boundaries if present. |
307 | ChunkCollection boundary_start_collection; |
308 | if (wexpr->start_expr && |
309 | (wexpr->start == WindowBoundary::EXPR_PRECEDING || wexpr->start == WindowBoundary::EXPR_FOLLOWING)) { |
310 | MaterializeExpression(context, wexpr->start_expr.get(), input, boundary_start_collection, |
311 | wexpr->start_expr->IsScalar()); |
312 | } |
313 | ChunkCollection boundary_end_collection; |
314 | if (wexpr->end_expr && |
315 | (wexpr->end == WindowBoundary::EXPR_PRECEDING || wexpr->end == WindowBoundary::EXPR_FOLLOWING)) { |
316 | MaterializeExpression(context, wexpr->end_expr.get(), input, boundary_end_collection, |
317 | wexpr->end_expr->IsScalar()); |
318 | } |
319 | |
320 | // build a segment tree for frame-adhering aggregates |
321 | // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf |
322 | unique_ptr<WindowSegmentTree> segment_tree = nullptr; |
323 | |
324 | if (wexpr->aggregate) { |
325 | segment_tree = make_unique<WindowSegmentTree>(*(wexpr->aggregate), wexpr->return_type, &payload_collection); |
326 | } |
327 | |
328 | WindowBoundariesState bounds; |
329 | uint64_t dense_rank = 1, rank_equal = 0, rank = 1; |
330 | |
331 | if (needs_sorting) { |
332 | bounds.row_prev = sort_collection.GetRow(0); |
333 | } |
334 | |
335 | // this is the main loop, go through all sorted rows and compute window function result |
336 | for (idx_t row_idx = 0; row_idx < input.count; row_idx++) { |
337 | // special case, OVER (), aggregate over everything |
338 | UpdateWindowBoundaries(wexpr, sort_collection, input.count, row_idx, boundary_start_collection, |
339 | boundary_end_collection, bounds); |
340 | if (WindowNeedsRank(wexpr)) { |
341 | if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init |
342 | dense_rank = 1; |
343 | rank = 1; |
344 | rank_equal = 0; |
345 | } else if (!bounds.is_peer) { |
346 | dense_rank++; |
347 | rank += rank_equal; |
348 | rank_equal = 0; |
349 | } |
350 | rank_equal++; |
351 | } |
352 | |
353 | auto res = Value(); |
354 | |
355 | // if no values are read for window, result is NULL |
356 | if (bounds.window_start >= bounds.window_end) { |
357 | output.SetValue(output_idx, row_idx, res); |
358 | continue; |
359 | } |
360 | |
361 | switch (wexpr->type) { |
362 | case ExpressionType::WINDOW_AGGREGATE: { |
363 | res = segment_tree->Compute(bounds.window_start, bounds.window_end); |
364 | break; |
365 | } |
366 | case ExpressionType::WINDOW_ROW_NUMBER: { |
367 | res = Value::Numeric(wexpr->return_type, row_idx - bounds.partition_start + 1); |
368 | break; |
369 | } |
370 | case ExpressionType::WINDOW_RANK_DENSE: { |
371 | res = Value::Numeric(wexpr->return_type, dense_rank); |
372 | break; |
373 | } |
374 | case ExpressionType::WINDOW_RANK: { |
375 | res = Value::Numeric(wexpr->return_type, rank); |
376 | break; |
377 | } |
378 | case ExpressionType::WINDOW_PERCENT_RANK: { |
379 | int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start - 1; |
380 | double percent_rank = denom > 0 ? ((double)rank - 1) / denom : 0; |
381 | res = Value(percent_rank); |
382 | break; |
383 | } |
384 | case ExpressionType::WINDOW_CUME_DIST: { |
385 | int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start; |
386 | double cume_dist = denom > 0 ? ((double)(bounds.peer_end - bounds.partition_start)) / denom : 0; |
387 | res = Value(cume_dist); |
388 | break; |
389 | } |
390 | case ExpressionType::WINDOW_NTILE: { |
391 | if (payload_collection.column_count() != 1) { |
392 | throw Exception("NTILE needs a parameter" ); |
393 | } |
394 | auto n_param = payload_collection.GetValue(0, row_idx).GetValue<int64_t>(); |
395 | // With thanks from SQLite's ntileValueFunc() |
396 | int64_t n_total = bounds.partition_end - bounds.partition_start; |
397 | int64_t n_size = (n_total / n_param); |
398 | if (n_size > 0) { |
399 | int64_t n_large = n_total - n_param * n_size; |
400 | int64_t i_small = n_large * (n_size + 1); |
401 | |
402 | assert((n_large * (n_size + 1) + (n_param - n_large) * n_size) == n_total); |
403 | |
404 | if (row_idx < (idx_t)i_small) { |
405 | res = Value::Numeric(wexpr->return_type, 1 + row_idx / (n_size + 1)); |
406 | } else { |
407 | res = Value::Numeric(wexpr->return_type, 1 + n_large + (row_idx - i_small) / n_size); |
408 | } |
409 | } |
410 | break; |
411 | } |
412 | case ExpressionType::WINDOW_LEAD: |
413 | case ExpressionType::WINDOW_LAG: { |
414 | Value def_val = Value(wexpr->return_type); |
415 | idx_t offset = 1; |
416 | if (wexpr->offset_expr) { |
417 | offset = leadlag_offset_collection.GetValue(0, wexpr->offset_expr->IsScalar() ? 0 : row_idx) |
418 | .GetValue<int64_t>(); |
419 | } |
420 | if (wexpr->default_expr) { |
421 | def_val = leadlag_default_collection.GetValue(0, wexpr->default_expr->IsScalar() ? 0 : row_idx); |
422 | } |
423 | if (wexpr->type == ExpressionType::WINDOW_LEAD) { |
424 | auto lead_idx = row_idx + 1; |
425 | if (lead_idx < bounds.partition_end) { |
426 | res = payload_collection.GetValue(0, lead_idx); |
427 | } else { |
428 | res = def_val; |
429 | } |
430 | } else { |
431 | int64_t lag_idx = (int64_t)row_idx - offset; |
432 | if (lag_idx >= 0 && (idx_t)lag_idx >= bounds.partition_start) { |
433 | res = payload_collection.GetValue(0, lag_idx); |
434 | } else { |
435 | res = def_val; |
436 | } |
437 | } |
438 | |
439 | break; |
440 | } |
441 | case ExpressionType::WINDOW_FIRST_VALUE: { |
442 | res = payload_collection.GetValue(0, bounds.window_start); |
443 | break; |
444 | } |
445 | case ExpressionType::WINDOW_LAST_VALUE: { |
446 | res = payload_collection.GetValue(0, bounds.window_end - 1); |
447 | break; |
448 | } |
449 | default: |
450 | throw NotImplementedException("Window aggregate type %s" , ExpressionTypeToString(wexpr->type).c_str()); |
451 | } |
452 | |
453 | output.SetValue(output_idx, row_idx, res); |
454 | } |
455 | } |
456 | |
457 | void PhysicalWindow::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
458 | auto state = reinterpret_cast<PhysicalWindowOperatorState *>(state_); |
459 | ChunkCollection &big_data = state->tuples; |
460 | ChunkCollection &window_results = state->window_results; |
461 | |
462 | // this is a blocking operator, so compute complete result on first invocation |
463 | if (state->position == 0) { |
464 | do { |
465 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
466 | big_data.Append(state->child_chunk); |
467 | } while (state->child_chunk.size() != 0); |
468 | |
469 | if (big_data.count == 0) { |
470 | return; |
471 | } |
472 | |
473 | vector<TypeId> window_types; |
474 | for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) { |
475 | window_types.push_back(select_list[expr_idx]->return_type); |
476 | } |
477 | |
478 | for (idx_t i = 0; i < big_data.chunks.size(); i++) { |
479 | DataChunk window_chunk; |
480 | window_chunk.Initialize(window_types); |
481 | window_chunk.SetCardinality(big_data.chunks[i]->size()); |
482 | for (idx_t col_idx = 0; col_idx < window_chunk.column_count(); col_idx++) { |
483 | window_chunk.data[col_idx].vector_type = VectorType::CONSTANT_VECTOR; |
484 | ConstantVector::SetNull(window_chunk.data[col_idx], true); |
485 | } |
486 | |
487 | window_chunk.Verify(); |
488 | window_results.Append(window_chunk); |
489 | } |
490 | |
491 | assert(window_results.column_count() == select_list.size()); |
492 | idx_t window_output_idx = 0; |
493 | // we can have multiple window functions |
494 | for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) { |
495 | assert(select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); |
496 | // sort by partition and order clause in window def |
497 | auto wexpr = reinterpret_cast<BoundWindowExpression *>(select_list[expr_idx].get()); |
498 | ComputeWindowExpression(context, wexpr, big_data, window_results, window_output_idx++); |
499 | } |
500 | } |
501 | |
502 | if (state->position >= big_data.count) { |
503 | return; |
504 | } |
505 | |
506 | // just return what was computed before, appending the result cols of the window expressions at the end |
507 | auto &proj_ch = big_data.GetChunk(state->position); |
508 | auto &wind_ch = window_results.GetChunk(state->position); |
509 | |
510 | idx_t out_idx = 0; |
511 | assert(proj_ch.size() == wind_ch.size()); |
512 | chunk.SetCardinality(proj_ch); |
513 | for (idx_t col_idx = 0; col_idx < proj_ch.column_count(); col_idx++) { |
514 | chunk.data[out_idx++].Reference(proj_ch.data[col_idx]); |
515 | } |
516 | for (idx_t col_idx = 0; col_idx < wind_ch.column_count(); col_idx++) { |
517 | chunk.data[out_idx++].Reference(wind_ch.data[col_idx]); |
518 | } |
519 | state->position += STANDARD_VECTOR_SIZE; |
520 | } |
521 | |
522 | unique_ptr<PhysicalOperatorState> PhysicalWindow::GetOperatorState() { |
523 | return make_unique<PhysicalWindowOperatorState>(children[0].get()); |
524 | } |
525 | |