| 1 | #include "duckdb/execution/operator/order/physical_top_n.hpp" |
| 2 | |
| 3 | #include "duckdb/common/assert.hpp" |
| 4 | #include "duckdb/common/value_operations/value_operations.hpp" |
| 5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 6 | #include "duckdb/execution/expression_executor.hpp" |
| 7 | #include "duckdb/storage/data_table.hpp" |
| 8 | |
| 9 | using namespace duckdb; |
| 10 | using namespace std; |
| 11 | |
| 12 | class PhysicalTopNOperatorState : public PhysicalOperatorState { |
| 13 | public: |
| 14 | PhysicalTopNOperatorState(PhysicalOperator *child) : PhysicalOperatorState(child), position(0) { |
| 15 | } |
| 16 | |
| 17 | idx_t position; |
| 18 | idx_t current_offset; |
| 19 | ChunkCollection sorted_data; |
| 20 | unique_ptr<idx_t[]> heap; |
| 21 | }; |
| 22 | |
| 23 | void PhysicalTopN::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
| 24 | auto state = reinterpret_cast<PhysicalTopNOperatorState *>(state_); |
| 25 | ChunkCollection &big_data = state->sorted_data; |
| 26 | |
| 27 | if (state->position == 0) { |
| 28 | // first concatenate all the data of the child chunks |
| 29 | do { |
| 30 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
| 31 | big_data.Append(state->child_chunk); |
| 32 | } while (state->child_chunk.size() != 0); |
| 33 | |
| 34 | // now perform the actual ordering of the data |
| 35 | // compute the sorting columns from the input data |
| 36 | ExpressionExecutor executor; |
| 37 | vector<TypeId> sort_types; |
| 38 | vector<OrderType> order_types; |
| 39 | for (idx_t i = 0; i < orders.size(); i++) { |
| 40 | auto &expr = orders[i].expression; |
| 41 | sort_types.push_back(expr->return_type); |
| 42 | order_types.push_back(orders[i].type); |
| 43 | executor.AddExpression(*expr); |
| 44 | } |
| 45 | |
| 46 | CalculateHeapSize(big_data.count); |
| 47 | if (heap_size == 0) { |
| 48 | return; |
| 49 | } |
| 50 | |
| 51 | ChunkCollection heap_collection; |
| 52 | for (idx_t i = 0; i < big_data.chunks.size(); i++) { |
| 53 | DataChunk heap_chunk; |
| 54 | heap_chunk.Initialize(sort_types); |
| 55 | |
| 56 | executor.Execute(*big_data.chunks[i], heap_chunk); |
| 57 | heap_collection.Append(heap_chunk); |
| 58 | } |
| 59 | |
| 60 | assert(heap_collection.count == big_data.count); |
| 61 | |
| 62 | // create and use the heap |
| 63 | state->heap = unique_ptr<idx_t[]>(new idx_t[heap_size]); |
| 64 | heap_collection.Heap(order_types, state->heap.get(), heap_size); |
| 65 | } |
| 66 | |
| 67 | if (state->position >= heap_size) { |
| 68 | return; |
| 69 | } else if (state->position < offset) { |
| 70 | state->position = offset; |
| 71 | } |
| 72 | |
| 73 | state->position += big_data.MaterializeHeapChunk(chunk, state->heap.get(), state->position, heap_size); |
| 74 | } |
| 75 | |
| 76 | unique_ptr<PhysicalOperatorState> PhysicalTopN::GetOperatorState() { |
| 77 | return make_unique<PhysicalTopNOperatorState>(children[0].get()); |
| 78 | } |
| 79 | |
| 80 | void PhysicalTopN::CalculateHeapSize(idx_t rows) { |
| 81 | heap_size = (rows > offset) ? min(limit + offset, rows) : 0; |
| 82 | } |
| 83 | |