1#include "duckdb/execution/operator/order/physical_top_n.hpp"
2
3#include "duckdb/common/assert.hpp"
4#include "duckdb/common/value_operations/value_operations.hpp"
5#include "duckdb/common/vector_operations/vector_operations.hpp"
6#include "duckdb/execution/expression_executor.hpp"
7#include "duckdb/storage/data_table.hpp"
8
9using namespace duckdb;
10using namespace std;
11
12class PhysicalTopNOperatorState : public PhysicalOperatorState {
13public:
14 PhysicalTopNOperatorState(PhysicalOperator *child) : PhysicalOperatorState(child), position(0) {
15 }
16
17 idx_t position;
18 idx_t current_offset;
19 ChunkCollection sorted_data;
20 unique_ptr<idx_t[]> heap;
21};
22
23void PhysicalTopN::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
24 auto state = reinterpret_cast<PhysicalTopNOperatorState *>(state_);
25 ChunkCollection &big_data = state->sorted_data;
26
27 if (state->position == 0) {
28 // first concatenate all the data of the child chunks
29 do {
30 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
31 big_data.Append(state->child_chunk);
32 } while (state->child_chunk.size() != 0);
33
34 // now perform the actual ordering of the data
35 // compute the sorting columns from the input data
36 ExpressionExecutor executor;
37 vector<TypeId> sort_types;
38 vector<OrderType> order_types;
39 for (idx_t i = 0; i < orders.size(); i++) {
40 auto &expr = orders[i].expression;
41 sort_types.push_back(expr->return_type);
42 order_types.push_back(orders[i].type);
43 executor.AddExpression(*expr);
44 }
45
46 CalculateHeapSize(big_data.count);
47 if (heap_size == 0) {
48 return;
49 }
50
51 ChunkCollection heap_collection;
52 for (idx_t i = 0; i < big_data.chunks.size(); i++) {
53 DataChunk heap_chunk;
54 heap_chunk.Initialize(sort_types);
55
56 executor.Execute(*big_data.chunks[i], heap_chunk);
57 heap_collection.Append(heap_chunk);
58 }
59
60 assert(heap_collection.count == big_data.count);
61
62 // create and use the heap
63 state->heap = unique_ptr<idx_t[]>(new idx_t[heap_size]);
64 heap_collection.Heap(order_types, state->heap.get(), heap_size);
65 }
66
67 if (state->position >= heap_size) {
68 return;
69 } else if (state->position < offset) {
70 state->position = offset;
71 }
72
73 state->position += big_data.MaterializeHeapChunk(chunk, state->heap.get(), state->position, heap_size);
74}
75
76unique_ptr<PhysicalOperatorState> PhysicalTopN::GetOperatorState() {
77 return make_unique<PhysicalTopNOperatorState>(children[0].get());
78}
79
80void PhysicalTopN::CalculateHeapSize(idx_t rows) {
81 heap_size = (rows > offset) ? min(limit + offset, rows) : 0;
82}
83