| 1 | #include "duckdb/execution/operator/helper/physical_streaming_limit.hpp" |
| 2 | #include "duckdb/execution/operator/helper/physical_limit.hpp" |
| 3 | |
| 4 | namespace duckdb { |
| 5 | |
| 6 | PhysicalStreamingLimit::PhysicalStreamingLimit(vector<LogicalType> types, idx_t limit, idx_t offset, |
| 7 | unique_ptr<Expression> limit_expression, |
| 8 | unique_ptr<Expression> offset_expression, idx_t estimated_cardinality, |
| 9 | bool parallel) |
| 10 | : PhysicalOperator(PhysicalOperatorType::STREAMING_LIMIT, std::move(types), estimated_cardinality), |
| 11 | limit_value(limit), offset_value(offset), limit_expression(std::move(limit_expression)), |
| 12 | offset_expression(std::move(offset_expression)), parallel(parallel) { |
| 13 | } |
| 14 | |
| 15 | //===--------------------------------------------------------------------===// |
| 16 | // Operator |
| 17 | //===--------------------------------------------------------------------===// |
| 18 | class StreamingLimitOperatorState : public OperatorState { |
| 19 | public: |
| 20 | explicit StreamingLimitOperatorState(const PhysicalStreamingLimit &op) { |
| 21 | this->limit = op.limit_expression ? DConstants::INVALID_INDEX : op.limit_value; |
| 22 | this->offset = op.offset_expression ? DConstants::INVALID_INDEX : op.offset_value; |
| 23 | } |
| 24 | |
| 25 | idx_t limit; |
| 26 | idx_t offset; |
| 27 | }; |
| 28 | |
| 29 | class StreamingLimitGlobalState : public GlobalOperatorState { |
| 30 | public: |
| 31 | StreamingLimitGlobalState() : current_offset(0) { |
| 32 | } |
| 33 | |
| 34 | std::atomic<idx_t> current_offset; |
| 35 | }; |
| 36 | |
| 37 | unique_ptr<OperatorState> PhysicalStreamingLimit::GetOperatorState(ExecutionContext &context) const { |
| 38 | return make_uniq<StreamingLimitOperatorState>(args: *this); |
| 39 | } |
| 40 | |
| 41 | unique_ptr<GlobalOperatorState> PhysicalStreamingLimit::GetGlobalOperatorState(ClientContext &context) const { |
| 42 | return make_uniq<StreamingLimitGlobalState>(); |
| 43 | } |
| 44 | |
| 45 | OperatorResultType PhysicalStreamingLimit::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
| 46 | GlobalOperatorState &gstate_p, OperatorState &state_p) const { |
| 47 | auto &gstate = gstate_p.Cast<StreamingLimitGlobalState>(); |
| 48 | auto &state = state_p.Cast<StreamingLimitOperatorState>(); |
| 49 | auto &limit = state.limit; |
| 50 | auto &offset = state.offset; |
| 51 | idx_t current_offset = gstate.current_offset.fetch_add(i: input.size()); |
| 52 | idx_t max_element; |
| 53 | if (!PhysicalLimit::ComputeOffset(context, input, limit, offset, current_offset, max_element, |
| 54 | limit_expression: limit_expression.get(), offset_expression: offset_expression.get())) { |
| 55 | return OperatorResultType::FINISHED; |
| 56 | } |
| 57 | if (PhysicalLimit::HandleOffset(input, current_offset, offset, limit)) { |
| 58 | chunk.Reference(chunk&: input); |
| 59 | } |
| 60 | return OperatorResultType::NEED_MORE_INPUT; |
| 61 | } |
| 62 | |
| 63 | OrderPreservationType PhysicalStreamingLimit::OperatorOrder() const { |
| 64 | return OrderPreservationType::FIXED_ORDER; |
| 65 | } |
| 66 | |
| 67 | bool PhysicalStreamingLimit::ParallelOperator() const { |
| 68 | return parallel; |
| 69 | } |
| 70 | |
| 71 | } // namespace duckdb |
| 72 | |