1 | #include "duckdb/execution/operator/order/physical_top_n.hpp" |
2 | |
3 | #include "duckdb/common/assert.hpp" |
4 | #include "duckdb/common/value_operations/value_operations.hpp" |
5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
6 | #include "duckdb/execution/expression_executor.hpp" |
7 | #include "duckdb/storage/data_table.hpp" |
8 | |
9 | using namespace duckdb; |
10 | using namespace std; |
11 | |
12 | class PhysicalTopNOperatorState : public PhysicalOperatorState { |
13 | public: |
14 | PhysicalTopNOperatorState(PhysicalOperator *child) : PhysicalOperatorState(child), position(0) { |
15 | } |
16 | |
17 | idx_t position; |
18 | idx_t current_offset; |
19 | ChunkCollection sorted_data; |
20 | unique_ptr<idx_t[]> heap; |
21 | }; |
22 | |
23 | void PhysicalTopN::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
24 | auto state = reinterpret_cast<PhysicalTopNOperatorState *>(state_); |
25 | ChunkCollection &big_data = state->sorted_data; |
26 | |
27 | if (state->position == 0) { |
28 | // first concatenate all the data of the child chunks |
29 | do { |
30 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
31 | big_data.Append(state->child_chunk); |
32 | } while (state->child_chunk.size() != 0); |
33 | |
34 | // now perform the actual ordering of the data |
35 | // compute the sorting columns from the input data |
36 | ExpressionExecutor executor; |
37 | vector<TypeId> sort_types; |
38 | vector<OrderType> order_types; |
39 | for (idx_t i = 0; i < orders.size(); i++) { |
40 | auto &expr = orders[i].expression; |
41 | sort_types.push_back(expr->return_type); |
42 | order_types.push_back(orders[i].type); |
43 | executor.AddExpression(*expr); |
44 | } |
45 | |
46 | CalculateHeapSize(big_data.count); |
47 | if (heap_size == 0) { |
48 | return; |
49 | } |
50 | |
51 | ChunkCollection heap_collection; |
52 | for (idx_t i = 0; i < big_data.chunks.size(); i++) { |
53 | DataChunk heap_chunk; |
54 | heap_chunk.Initialize(sort_types); |
55 | |
56 | executor.Execute(*big_data.chunks[i], heap_chunk); |
57 | heap_collection.Append(heap_chunk); |
58 | } |
59 | |
60 | assert(heap_collection.count == big_data.count); |
61 | |
62 | // create and use the heap |
63 | state->heap = unique_ptr<idx_t[]>(new idx_t[heap_size]); |
64 | heap_collection.Heap(order_types, state->heap.get(), heap_size); |
65 | } |
66 | |
67 | if (state->position >= heap_size) { |
68 | return; |
69 | } else if (state->position < offset) { |
70 | state->position = offset; |
71 | } |
72 | |
73 | state->position += big_data.MaterializeHeapChunk(chunk, state->heap.get(), state->position, heap_size); |
74 | } |
75 | |
76 | unique_ptr<PhysicalOperatorState> PhysicalTopN::GetOperatorState() { |
77 | return make_unique<PhysicalTopNOperatorState>(children[0].get()); |
78 | } |
79 | |
80 | void PhysicalTopN::CalculateHeapSize(idx_t rows) { |
81 | heap_size = (rows > offset) ? min(limit + offset, rows) : 0; |
82 | } |
83 | |