| 1 | #include "duckdb/execution/operator/aggregate/physical_window.hpp" |
| 2 | |
| 3 | #include "duckdb/common/types/chunk_collection.hpp" |
| 4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 5 | #include "duckdb/execution/expression_executor.hpp" |
| 6 | #include "duckdb/execution/window_segment_tree.hpp" |
| 7 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
| 8 | #include "duckdb/planner/expression/bound_window_expression.hpp" |
| 9 | |
| 10 | #include <cmath> |
| 11 | |
| 12 | using namespace duckdb; |
| 13 | using namespace std; |
| 14 | |
| 15 | //! The operator state of the window |
| 16 | class PhysicalWindowOperatorState : public PhysicalOperatorState { |
| 17 | public: |
| 18 | PhysicalWindowOperatorState(PhysicalOperator *child) : PhysicalOperatorState(child), position(0) { |
| 19 | } |
| 20 | |
| 21 | idx_t position; |
| 22 | ChunkCollection tuples; |
| 23 | ChunkCollection window_results; |
| 24 | }; |
| 25 | |
| 26 | // this implements a sorted window functions variant |
| 27 | PhysicalWindow::PhysicalWindow(LogicalOperator &op, vector<unique_ptr<Expression>> select_list, |
| 28 | PhysicalOperatorType type) |
| 29 | : PhysicalOperator(type, op.types), select_list(std::move(select_list)) { |
| 30 | } |
| 31 | |
| 32 | static bool EqualsSubset(vector<Value> &a, vector<Value> &b, idx_t start, idx_t end) { |
| 33 | assert(start <= end); |
| 34 | for (idx_t i = start; i < end; i++) { |
| 35 | if (a[i] != b[i]) { |
| 36 | return false; |
| 37 | } |
| 38 | } |
| 39 | return true; |
| 40 | } |
| 41 | |
| 42 | static idx_t BinarySearchRightmost(ChunkCollection &input, vector<Value> row, idx_t l, idx_t r, idx_t comp_cols) { |
| 43 | if (comp_cols == 0) { |
| 44 | return r - 1; |
| 45 | } |
| 46 | while (l < r) { |
| 47 | idx_t m = floor((l + r) / 2); |
| 48 | bool less_than_equals = true; |
| 49 | for (idx_t i = 0; i < comp_cols; i++) { |
| 50 | if (input.GetRow(m)[i] > row[i]) { |
| 51 | less_than_equals = false; |
| 52 | break; |
| 53 | } |
| 54 | } |
| 55 | if (less_than_equals) { |
| 56 | l = m + 1; |
| 57 | } else { |
| 58 | r = m; |
| 59 | } |
| 60 | } |
| 61 | return l - 1; |
| 62 | } |
| 63 | |
| 64 | static void MaterializeExpressions(ClientContext &context, Expression **exprs, idx_t expr_count, ChunkCollection &input, |
| 65 | ChunkCollection &output, bool scalar = false) { |
| 66 | if (expr_count == 0) { |
| 67 | return; |
| 68 | } |
| 69 | |
| 70 | vector<TypeId> types; |
| 71 | ExpressionExecutor executor; |
| 72 | for (idx_t expr_idx = 0; expr_idx < expr_count; ++expr_idx) { |
| 73 | types.push_back(exprs[expr_idx]->return_type); |
| 74 | executor.AddExpression(*exprs[expr_idx]); |
| 75 | } |
| 76 | |
| 77 | for (idx_t i = 0; i < input.chunks.size(); i++) { |
| 78 | DataChunk chunk; |
| 79 | chunk.Initialize(types); |
| 80 | |
| 81 | executor.Execute(*input.chunks[i], chunk); |
| 82 | |
| 83 | chunk.Verify(); |
| 84 | output.Append(chunk); |
| 85 | |
| 86 | if (scalar) { |
| 87 | break; |
| 88 | } |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | static void MaterializeExpression(ClientContext &context, Expression *expr, ChunkCollection &input, |
| 93 | ChunkCollection &output, bool scalar = false) { |
| 94 | MaterializeExpressions(context, &expr, 1, input, output, scalar); |
| 95 | } |
| 96 | |
| 97 | static void SortCollectionForWindow(ClientContext &context, BoundWindowExpression *wexpr, ChunkCollection &input, |
| 98 | ChunkCollection &output, ChunkCollection &sort_collection) { |
| 99 | vector<TypeId> sort_types; |
| 100 | vector<OrderType> orders; |
| 101 | ExpressionExecutor executor; |
| 102 | |
| 103 | // we sort by both 1) partition by expression list and 2) order by expressions |
| 104 | for (idx_t prt_idx = 0; prt_idx < wexpr->partitions.size(); prt_idx++) { |
| 105 | auto &pexpr = wexpr->partitions[prt_idx]; |
| 106 | sort_types.push_back(pexpr->return_type); |
| 107 | orders.push_back(OrderType::ASCENDING); |
| 108 | executor.AddExpression(*pexpr); |
| 109 | } |
| 110 | |
| 111 | for (idx_t ord_idx = 0; ord_idx < wexpr->orders.size(); ord_idx++) { |
| 112 | auto &oexpr = wexpr->orders[ord_idx].expression; |
| 113 | sort_types.push_back(oexpr->return_type); |
| 114 | orders.push_back(wexpr->orders[ord_idx].type); |
| 115 | executor.AddExpression(*oexpr); |
| 116 | } |
| 117 | |
| 118 | assert(sort_types.size() > 0); |
| 119 | |
| 120 | // create a chunkcollection for the results of the expressions in the window definitions |
| 121 | for (idx_t i = 0; i < input.chunks.size(); i++) { |
| 122 | DataChunk sort_chunk; |
| 123 | sort_chunk.Initialize(sort_types); |
| 124 | |
| 125 | executor.Execute(*input.chunks[i], sort_chunk); |
| 126 | |
| 127 | sort_chunk.Verify(); |
| 128 | sort_collection.Append(sort_chunk); |
| 129 | } |
| 130 | |
| 131 | assert(input.count == sort_collection.count); |
| 132 | |
| 133 | auto sorted_vector = unique_ptr<idx_t[]>(new idx_t[input.count]); |
| 134 | sort_collection.Sort(orders, sorted_vector.get()); |
| 135 | |
| 136 | input.Reorder(sorted_vector.get()); |
| 137 | output.Reorder(sorted_vector.get()); |
| 138 | sort_collection.Reorder(sorted_vector.get()); |
| 139 | } |
| 140 | |
| 141 | struct WindowBoundariesState { |
| 142 | idx_t partition_start = 0; |
| 143 | idx_t partition_end = 0; |
| 144 | idx_t peer_start = 0; |
| 145 | idx_t peer_end = 0; |
| 146 | int64_t window_start = -1; |
| 147 | int64_t window_end = -1; |
| 148 | bool is_same_partition = false; |
| 149 | bool is_peer = false; |
| 150 | vector<Value> row_prev; |
| 151 | }; |
| 152 | |
| 153 | static bool WindowNeedsRank(BoundWindowExpression *wexpr) { |
| 154 | return wexpr->type == ExpressionType::WINDOW_PERCENT_RANK || wexpr->type == ExpressionType::WINDOW_RANK || |
| 155 | wexpr->type == ExpressionType::WINDOW_RANK_DENSE || wexpr->type == ExpressionType::WINDOW_CUME_DIST; |
| 156 | } |
| 157 | |
| 158 | static void UpdateWindowBoundaries(BoundWindowExpression *wexpr, ChunkCollection &input, idx_t input_size, |
| 159 | idx_t row_idx, ChunkCollection &boundary_start_collection, |
| 160 | ChunkCollection &boundary_end_collection, WindowBoundariesState &bounds) { |
| 161 | |
| 162 | if (input.column_count() > 0) { |
| 163 | vector<Value> row_cur = input.GetRow(row_idx); |
| 164 | idx_t sort_col_count = wexpr->partitions.size() + wexpr->orders.size(); |
| 165 | |
| 166 | // determine partition and peer group boundaries to ultimately figure out window size |
| 167 | bounds.is_same_partition = EqualsSubset(bounds.row_prev, row_cur, 0, wexpr->partitions.size()); |
| 168 | bounds.is_peer = bounds.is_same_partition && |
| 169 | EqualsSubset(bounds.row_prev, row_cur, wexpr->partitions.size(), sort_col_count); |
| 170 | bounds.row_prev = row_cur; |
| 171 | |
| 172 | // when the partition changes, recompute the boundaries |
| 173 | if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init |
| 174 | bounds.partition_start = row_idx; |
| 175 | bounds.peer_start = row_idx; |
| 176 | |
| 177 | // find end of partition |
| 178 | bounds.partition_end = |
| 179 | BinarySearchRightmost(input, row_cur, bounds.partition_start, input.count, wexpr->partitions.size()) + |
| 180 | 1; |
| 181 | |
| 182 | } else if (!bounds.is_peer) { |
| 183 | bounds.peer_start = row_idx; |
| 184 | } |
| 185 | |
| 186 | if (wexpr->end == WindowBoundary::CURRENT_ROW_RANGE || wexpr->type == ExpressionType::WINDOW_CUME_DIST) { |
| 187 | bounds.peer_end = BinarySearchRightmost(input, row_cur, row_idx, bounds.partition_end, sort_col_count) + 1; |
| 188 | } |
| 189 | } else { |
| 190 | bounds.is_same_partition = 0; |
| 191 | bounds.is_peer = true; |
| 192 | bounds.partition_end = input_size; |
| 193 | bounds.peer_end = bounds.partition_end; |
| 194 | } |
| 195 | |
| 196 | // determine window boundaries depending on the type of expression |
| 197 | bounds.window_start = -1; |
| 198 | bounds.window_end = -1; |
| 199 | |
| 200 | switch (wexpr->start) { |
| 201 | case WindowBoundary::UNBOUNDED_PRECEDING: |
| 202 | bounds.window_start = bounds.partition_start; |
| 203 | break; |
| 204 | case WindowBoundary::CURRENT_ROW_ROWS: |
| 205 | bounds.window_start = row_idx; |
| 206 | break; |
| 207 | case WindowBoundary::CURRENT_ROW_RANGE: |
| 208 | bounds.window_start = bounds.peer_start; |
| 209 | break; |
| 210 | case WindowBoundary::UNBOUNDED_FOLLOWING: |
| 211 | assert(0); // disallowed |
| 212 | break; |
| 213 | case WindowBoundary::EXPR_PRECEDING: { |
| 214 | assert(boundary_start_collection.column_count() > 0); |
| 215 | bounds.window_start = |
| 216 | (int64_t)row_idx - |
| 217 | boundary_start_collection.GetValue(0, wexpr->start_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>(); |
| 218 | break; |
| 219 | } |
| 220 | case WindowBoundary::EXPR_FOLLOWING: { |
| 221 | assert(boundary_start_collection.column_count() > 0); |
| 222 | bounds.window_start = |
| 223 | row_idx + |
| 224 | boundary_start_collection.GetValue(0, wexpr->start_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>(); |
| 225 | break; |
| 226 | } |
| 227 | |
| 228 | default: |
| 229 | throw NotImplementedException("Unsupported boundary" ); |
| 230 | } |
| 231 | |
| 232 | switch (wexpr->end) { |
| 233 | case WindowBoundary::UNBOUNDED_PRECEDING: |
| 234 | assert(0); // disallowed |
| 235 | break; |
| 236 | case WindowBoundary::CURRENT_ROW_ROWS: |
| 237 | bounds.window_end = row_idx + 1; |
| 238 | break; |
| 239 | case WindowBoundary::CURRENT_ROW_RANGE: |
| 240 | bounds.window_end = bounds.peer_end; |
| 241 | break; |
| 242 | case WindowBoundary::UNBOUNDED_FOLLOWING: |
| 243 | bounds.window_end = bounds.partition_end; |
| 244 | break; |
| 245 | case WindowBoundary::EXPR_PRECEDING: |
| 246 | assert(boundary_end_collection.column_count() > 0); |
| 247 | bounds.window_end = |
| 248 | (int64_t)row_idx - |
| 249 | boundary_end_collection.GetValue(0, wexpr->end_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>() + 1; |
| 250 | break; |
| 251 | case WindowBoundary::EXPR_FOLLOWING: |
| 252 | assert(boundary_end_collection.column_count() > 0); |
| 253 | bounds.window_end = |
| 254 | row_idx + |
| 255 | boundary_end_collection.GetValue(0, wexpr->end_expr->IsScalar() ? 0 : row_idx).GetValue<int64_t>() + 1; |
| 256 | |
| 257 | break; |
| 258 | default: |
| 259 | throw NotImplementedException("Unsupported boundary" ); |
| 260 | } |
| 261 | |
| 262 | // clamp windows to partitions if they should exceed |
| 263 | if (bounds.window_start < (int64_t)bounds.partition_start) { |
| 264 | bounds.window_start = bounds.partition_start; |
| 265 | } |
| 266 | if ((idx_t)bounds.window_end > bounds.partition_end) { |
| 267 | bounds.window_end = bounds.partition_end; |
| 268 | } |
| 269 | |
| 270 | if (bounds.window_start < 0 || bounds.window_end < 0) { |
| 271 | throw Exception("Failed to compute window boundaries" ); |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | static void ComputeWindowExpression(ClientContext &context, BoundWindowExpression *wexpr, ChunkCollection &input, |
| 276 | ChunkCollection &output, idx_t output_idx) { |
| 277 | |
| 278 | ChunkCollection sort_collection; |
| 279 | bool needs_sorting = wexpr->partitions.size() + wexpr->orders.size() > 0; |
| 280 | if (needs_sorting) { |
| 281 | SortCollectionForWindow(context, wexpr, input, output, sort_collection); |
| 282 | } |
| 283 | |
| 284 | // evaluate inner expressions of window functions, could be more complex |
| 285 | ChunkCollection payload_collection; |
| 286 | vector<Expression *> exprs; |
| 287 | for (auto &child : wexpr->children) { |
| 288 | exprs.push_back(child.get()); |
| 289 | } |
| 290 | // TODO: child may be a scalar, don't need to materialize the whole collection then |
| 291 | MaterializeExpressions(context, exprs.data(), exprs.size(), input, payload_collection); |
| 292 | |
| 293 | ChunkCollection leadlag_offset_collection; |
| 294 | ChunkCollection leadlag_default_collection; |
| 295 | if (wexpr->type == ExpressionType::WINDOW_LEAD || wexpr->type == ExpressionType::WINDOW_LAG) { |
| 296 | if (wexpr->offset_expr) { |
| 297 | MaterializeExpression(context, wexpr->offset_expr.get(), input, leadlag_offset_collection, |
| 298 | wexpr->offset_expr->IsScalar()); |
| 299 | } |
| 300 | if (wexpr->default_expr) { |
| 301 | MaterializeExpression(context, wexpr->default_expr.get(), input, leadlag_default_collection, |
| 302 | wexpr->default_expr->IsScalar()); |
| 303 | } |
| 304 | } |
| 305 | |
| 306 | // evaluate boundaries if present. |
| 307 | ChunkCollection boundary_start_collection; |
| 308 | if (wexpr->start_expr && |
| 309 | (wexpr->start == WindowBoundary::EXPR_PRECEDING || wexpr->start == WindowBoundary::EXPR_FOLLOWING)) { |
| 310 | MaterializeExpression(context, wexpr->start_expr.get(), input, boundary_start_collection, |
| 311 | wexpr->start_expr->IsScalar()); |
| 312 | } |
| 313 | ChunkCollection boundary_end_collection; |
| 314 | if (wexpr->end_expr && |
| 315 | (wexpr->end == WindowBoundary::EXPR_PRECEDING || wexpr->end == WindowBoundary::EXPR_FOLLOWING)) { |
| 316 | MaterializeExpression(context, wexpr->end_expr.get(), input, boundary_end_collection, |
| 317 | wexpr->end_expr->IsScalar()); |
| 318 | } |
| 319 | |
| 320 | // build a segment tree for frame-adhering aggregates |
| 321 | // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf |
| 322 | unique_ptr<WindowSegmentTree> segment_tree = nullptr; |
| 323 | |
| 324 | if (wexpr->aggregate) { |
| 325 | segment_tree = make_unique<WindowSegmentTree>(*(wexpr->aggregate), wexpr->return_type, &payload_collection); |
| 326 | } |
| 327 | |
| 328 | WindowBoundariesState bounds; |
| 329 | uint64_t dense_rank = 1, rank_equal = 0, rank = 1; |
| 330 | |
| 331 | if (needs_sorting) { |
| 332 | bounds.row_prev = sort_collection.GetRow(0); |
| 333 | } |
| 334 | |
| 335 | // this is the main loop, go through all sorted rows and compute window function result |
| 336 | for (idx_t row_idx = 0; row_idx < input.count; row_idx++) { |
| 337 | // special case, OVER (), aggregate over everything |
| 338 | UpdateWindowBoundaries(wexpr, sort_collection, input.count, row_idx, boundary_start_collection, |
| 339 | boundary_end_collection, bounds); |
| 340 | if (WindowNeedsRank(wexpr)) { |
| 341 | if (!bounds.is_same_partition || row_idx == 0) { // special case for first row, need to init |
| 342 | dense_rank = 1; |
| 343 | rank = 1; |
| 344 | rank_equal = 0; |
| 345 | } else if (!bounds.is_peer) { |
| 346 | dense_rank++; |
| 347 | rank += rank_equal; |
| 348 | rank_equal = 0; |
| 349 | } |
| 350 | rank_equal++; |
| 351 | } |
| 352 | |
| 353 | auto res = Value(); |
| 354 | |
| 355 | // if no values are read for window, result is NULL |
| 356 | if (bounds.window_start >= bounds.window_end) { |
| 357 | output.SetValue(output_idx, row_idx, res); |
| 358 | continue; |
| 359 | } |
| 360 | |
| 361 | switch (wexpr->type) { |
| 362 | case ExpressionType::WINDOW_AGGREGATE: { |
| 363 | res = segment_tree->Compute(bounds.window_start, bounds.window_end); |
| 364 | break; |
| 365 | } |
| 366 | case ExpressionType::WINDOW_ROW_NUMBER: { |
| 367 | res = Value::Numeric(wexpr->return_type, row_idx - bounds.partition_start + 1); |
| 368 | break; |
| 369 | } |
| 370 | case ExpressionType::WINDOW_RANK_DENSE: { |
| 371 | res = Value::Numeric(wexpr->return_type, dense_rank); |
| 372 | break; |
| 373 | } |
| 374 | case ExpressionType::WINDOW_RANK: { |
| 375 | res = Value::Numeric(wexpr->return_type, rank); |
| 376 | break; |
| 377 | } |
| 378 | case ExpressionType::WINDOW_PERCENT_RANK: { |
| 379 | int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start - 1; |
| 380 | double percent_rank = denom > 0 ? ((double)rank - 1) / denom : 0; |
| 381 | res = Value(percent_rank); |
| 382 | break; |
| 383 | } |
| 384 | case ExpressionType::WINDOW_CUME_DIST: { |
| 385 | int64_t denom = (int64_t)bounds.partition_end - bounds.partition_start; |
| 386 | double cume_dist = denom > 0 ? ((double)(bounds.peer_end - bounds.partition_start)) / denom : 0; |
| 387 | res = Value(cume_dist); |
| 388 | break; |
| 389 | } |
| 390 | case ExpressionType::WINDOW_NTILE: { |
| 391 | if (payload_collection.column_count() != 1) { |
| 392 | throw Exception("NTILE needs a parameter" ); |
| 393 | } |
| 394 | auto n_param = payload_collection.GetValue(0, row_idx).GetValue<int64_t>(); |
| 395 | // With thanks from SQLite's ntileValueFunc() |
| 396 | int64_t n_total = bounds.partition_end - bounds.partition_start; |
| 397 | int64_t n_size = (n_total / n_param); |
| 398 | if (n_size > 0) { |
| 399 | int64_t n_large = n_total - n_param * n_size; |
| 400 | int64_t i_small = n_large * (n_size + 1); |
| 401 | |
| 402 | assert((n_large * (n_size + 1) + (n_param - n_large) * n_size) == n_total); |
| 403 | |
| 404 | if (row_idx < (idx_t)i_small) { |
| 405 | res = Value::Numeric(wexpr->return_type, 1 + row_idx / (n_size + 1)); |
| 406 | } else { |
| 407 | res = Value::Numeric(wexpr->return_type, 1 + n_large + (row_idx - i_small) / n_size); |
| 408 | } |
| 409 | } |
| 410 | break; |
| 411 | } |
| 412 | case ExpressionType::WINDOW_LEAD: |
| 413 | case ExpressionType::WINDOW_LAG: { |
| 414 | Value def_val = Value(wexpr->return_type); |
| 415 | idx_t offset = 1; |
| 416 | if (wexpr->offset_expr) { |
| 417 | offset = leadlag_offset_collection.GetValue(0, wexpr->offset_expr->IsScalar() ? 0 : row_idx) |
| 418 | .GetValue<int64_t>(); |
| 419 | } |
| 420 | if (wexpr->default_expr) { |
| 421 | def_val = leadlag_default_collection.GetValue(0, wexpr->default_expr->IsScalar() ? 0 : row_idx); |
| 422 | } |
| 423 | if (wexpr->type == ExpressionType::WINDOW_LEAD) { |
| 424 | auto lead_idx = row_idx + 1; |
| 425 | if (lead_idx < bounds.partition_end) { |
| 426 | res = payload_collection.GetValue(0, lead_idx); |
| 427 | } else { |
| 428 | res = def_val; |
| 429 | } |
| 430 | } else { |
| 431 | int64_t lag_idx = (int64_t)row_idx - offset; |
| 432 | if (lag_idx >= 0 && (idx_t)lag_idx >= bounds.partition_start) { |
| 433 | res = payload_collection.GetValue(0, lag_idx); |
| 434 | } else { |
| 435 | res = def_val; |
| 436 | } |
| 437 | } |
| 438 | |
| 439 | break; |
| 440 | } |
| 441 | case ExpressionType::WINDOW_FIRST_VALUE: { |
| 442 | res = payload_collection.GetValue(0, bounds.window_start); |
| 443 | break; |
| 444 | } |
| 445 | case ExpressionType::WINDOW_LAST_VALUE: { |
| 446 | res = payload_collection.GetValue(0, bounds.window_end - 1); |
| 447 | break; |
| 448 | } |
| 449 | default: |
| 450 | throw NotImplementedException("Window aggregate type %s" , ExpressionTypeToString(wexpr->type).c_str()); |
| 451 | } |
| 452 | |
| 453 | output.SetValue(output_idx, row_idx, res); |
| 454 | } |
| 455 | } |
| 456 | |
| 457 | void PhysicalWindow::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
| 458 | auto state = reinterpret_cast<PhysicalWindowOperatorState *>(state_); |
| 459 | ChunkCollection &big_data = state->tuples; |
| 460 | ChunkCollection &window_results = state->window_results; |
| 461 | |
| 462 | // this is a blocking operator, so compute complete result on first invocation |
| 463 | if (state->position == 0) { |
| 464 | do { |
| 465 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
| 466 | big_data.Append(state->child_chunk); |
| 467 | } while (state->child_chunk.size() != 0); |
| 468 | |
| 469 | if (big_data.count == 0) { |
| 470 | return; |
| 471 | } |
| 472 | |
| 473 | vector<TypeId> window_types; |
| 474 | for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) { |
| 475 | window_types.push_back(select_list[expr_idx]->return_type); |
| 476 | } |
| 477 | |
| 478 | for (idx_t i = 0; i < big_data.chunks.size(); i++) { |
| 479 | DataChunk window_chunk; |
| 480 | window_chunk.Initialize(window_types); |
| 481 | window_chunk.SetCardinality(big_data.chunks[i]->size()); |
| 482 | for (idx_t col_idx = 0; col_idx < window_chunk.column_count(); col_idx++) { |
| 483 | window_chunk.data[col_idx].vector_type = VectorType::CONSTANT_VECTOR; |
| 484 | ConstantVector::SetNull(window_chunk.data[col_idx], true); |
| 485 | } |
| 486 | |
| 487 | window_chunk.Verify(); |
| 488 | window_results.Append(window_chunk); |
| 489 | } |
| 490 | |
| 491 | assert(window_results.column_count() == select_list.size()); |
| 492 | idx_t window_output_idx = 0; |
| 493 | // we can have multiple window functions |
| 494 | for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) { |
| 495 | assert(select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); |
| 496 | // sort by partition and order clause in window def |
| 497 | auto wexpr = reinterpret_cast<BoundWindowExpression *>(select_list[expr_idx].get()); |
| 498 | ComputeWindowExpression(context, wexpr, big_data, window_results, window_output_idx++); |
| 499 | } |
| 500 | } |
| 501 | |
| 502 | if (state->position >= big_data.count) { |
| 503 | return; |
| 504 | } |
| 505 | |
| 506 | // just return what was computed before, appending the result cols of the window expressions at the end |
| 507 | auto &proj_ch = big_data.GetChunk(state->position); |
| 508 | auto &wind_ch = window_results.GetChunk(state->position); |
| 509 | |
| 510 | idx_t out_idx = 0; |
| 511 | assert(proj_ch.size() == wind_ch.size()); |
| 512 | chunk.SetCardinality(proj_ch); |
| 513 | for (idx_t col_idx = 0; col_idx < proj_ch.column_count(); col_idx++) { |
| 514 | chunk.data[out_idx++].Reference(proj_ch.data[col_idx]); |
| 515 | } |
| 516 | for (idx_t col_idx = 0; col_idx < wind_ch.column_count(); col_idx++) { |
| 517 | chunk.data[out_idx++].Reference(wind_ch.data[col_idx]); |
| 518 | } |
| 519 | state->position += STANDARD_VECTOR_SIZE; |
| 520 | } |
| 521 | |
| 522 | unique_ptr<PhysicalOperatorState> PhysicalWindow::GetOperatorState() { |
| 523 | return make_unique<PhysicalWindowOperatorState>(children[0].get()); |
| 524 | } |
| 525 | |