| 1 | #include "duckdb/execution/operator/order/physical_top_n.hpp" | 
|---|
| 2 |  | 
|---|
| 3 | #include "duckdb/common/assert.hpp" | 
|---|
| 4 | #include "duckdb/common/value_operations/value_operations.hpp" | 
|---|
| 5 | #include "duckdb/common/vector_operations/vector_operations.hpp" | 
|---|
| 6 | #include "duckdb/execution/expression_executor.hpp" | 
|---|
| 7 | #include "duckdb/storage/data_table.hpp" | 
|---|
| 8 |  | 
|---|
| 9 | using namespace duckdb; | 
|---|
| 10 | using namespace std; | 
|---|
| 11 |  | 
|---|
| 12 | class PhysicalTopNOperatorState : public PhysicalOperatorState { | 
|---|
| 13 | public: | 
|---|
| 14 | PhysicalTopNOperatorState(PhysicalOperator *child) : PhysicalOperatorState(child), position(0) { | 
|---|
| 15 | } | 
|---|
| 16 |  | 
|---|
| 17 | idx_t position; | 
|---|
| 18 | idx_t current_offset; | 
|---|
| 19 | ChunkCollection sorted_data; | 
|---|
| 20 | unique_ptr<idx_t[]> heap; | 
|---|
| 21 | }; | 
|---|
| 22 |  | 
|---|
| 23 | void PhysicalTopN::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { | 
|---|
| 24 | auto state = reinterpret_cast<PhysicalTopNOperatorState *>(state_); | 
|---|
| 25 | ChunkCollection &big_data = state->sorted_data; | 
|---|
| 26 |  | 
|---|
| 27 | if (state->position == 0) { | 
|---|
| 28 | // first concatenate all the data of the child chunks | 
|---|
| 29 | do { | 
|---|
| 30 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); | 
|---|
| 31 | big_data.Append(state->child_chunk); | 
|---|
| 32 | } while (state->child_chunk.size() != 0); | 
|---|
| 33 |  | 
|---|
| 34 | // now perform the actual ordering of the data | 
|---|
| 35 | // compute the sorting columns from the input data | 
|---|
| 36 | ExpressionExecutor executor; | 
|---|
| 37 | vector<TypeId> sort_types; | 
|---|
| 38 | vector<OrderType> order_types; | 
|---|
| 39 | for (idx_t i = 0; i < orders.size(); i++) { | 
|---|
| 40 | auto &expr = orders[i].expression; | 
|---|
| 41 | sort_types.push_back(expr->return_type); | 
|---|
| 42 | order_types.push_back(orders[i].type); | 
|---|
| 43 | executor.AddExpression(*expr); | 
|---|
| 44 | } | 
|---|
| 45 |  | 
|---|
| 46 | CalculateHeapSize(big_data.count); | 
|---|
| 47 | if (heap_size == 0) { | 
|---|
| 48 | return; | 
|---|
| 49 | } | 
|---|
| 50 |  | 
|---|
| 51 | ChunkCollection heap_collection; | 
|---|
| 52 | for (idx_t i = 0; i < big_data.chunks.size(); i++) { | 
|---|
| 53 | DataChunk heap_chunk; | 
|---|
| 54 | heap_chunk.Initialize(sort_types); | 
|---|
| 55 |  | 
|---|
| 56 | executor.Execute(*big_data.chunks[i], heap_chunk); | 
|---|
| 57 | heap_collection.Append(heap_chunk); | 
|---|
| 58 | } | 
|---|
| 59 |  | 
|---|
| 60 | assert(heap_collection.count == big_data.count); | 
|---|
| 61 |  | 
|---|
| 62 | // create and use the heap | 
|---|
| 63 | state->heap = unique_ptr<idx_t[]>(new idx_t[heap_size]); | 
|---|
| 64 | heap_collection.Heap(order_types, state->heap.get(), heap_size); | 
|---|
| 65 | } | 
|---|
| 66 |  | 
|---|
| 67 | if (state->position >= heap_size) { | 
|---|
| 68 | return; | 
|---|
| 69 | } else if (state->position < offset) { | 
|---|
| 70 | state->position = offset; | 
|---|
| 71 | } | 
|---|
| 72 |  | 
|---|
| 73 | state->position += big_data.MaterializeHeapChunk(chunk, state->heap.get(), state->position, heap_size); | 
|---|
| 74 | } | 
|---|
| 75 |  | 
|---|
| 76 | unique_ptr<PhysicalOperatorState> PhysicalTopN::GetOperatorState() { | 
|---|
| 77 | return make_unique<PhysicalTopNOperatorState>(children[0].get()); | 
|---|
| 78 | } | 
|---|
| 79 |  | 
|---|
| 80 | void PhysicalTopN::CalculateHeapSize(idx_t rows) { | 
|---|
| 81 | heap_size = (rows > offset) ? min(limit + offset, rows) : 0; | 
|---|
| 82 | } | 
|---|
| 83 |  | 
|---|