1 | #include "duckdb/execution/operator/helper/physical_streaming_limit.hpp" |
2 | #include "duckdb/execution/operator/helper/physical_limit.hpp" |
3 | |
4 | namespace duckdb { |
5 | |
6 | PhysicalStreamingLimit::PhysicalStreamingLimit(vector<LogicalType> types, idx_t limit, idx_t offset, |
7 | unique_ptr<Expression> limit_expression, |
8 | unique_ptr<Expression> offset_expression, idx_t estimated_cardinality, |
9 | bool parallel) |
10 | : PhysicalOperator(PhysicalOperatorType::STREAMING_LIMIT, std::move(types), estimated_cardinality), |
11 | limit_value(limit), offset_value(offset), limit_expression(std::move(limit_expression)), |
12 | offset_expression(std::move(offset_expression)), parallel(parallel) { |
13 | } |
14 | |
15 | //===--------------------------------------------------------------------===// |
16 | // Operator |
17 | //===--------------------------------------------------------------------===// |
18 | class StreamingLimitOperatorState : public OperatorState { |
19 | public: |
20 | explicit StreamingLimitOperatorState(const PhysicalStreamingLimit &op) { |
21 | this->limit = op.limit_expression ? DConstants::INVALID_INDEX : op.limit_value; |
22 | this->offset = op.offset_expression ? DConstants::INVALID_INDEX : op.offset_value; |
23 | } |
24 | |
25 | idx_t limit; |
26 | idx_t offset; |
27 | }; |
28 | |
29 | class StreamingLimitGlobalState : public GlobalOperatorState { |
30 | public: |
31 | StreamingLimitGlobalState() : current_offset(0) { |
32 | } |
33 | |
34 | std::atomic<idx_t> current_offset; |
35 | }; |
36 | |
37 | unique_ptr<OperatorState> PhysicalStreamingLimit::GetOperatorState(ExecutionContext &context) const { |
38 | return make_uniq<StreamingLimitOperatorState>(args: *this); |
39 | } |
40 | |
41 | unique_ptr<GlobalOperatorState> PhysicalStreamingLimit::GetGlobalOperatorState(ClientContext &context) const { |
42 | return make_uniq<StreamingLimitGlobalState>(); |
43 | } |
44 | |
45 | OperatorResultType PhysicalStreamingLimit::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
46 | GlobalOperatorState &gstate_p, OperatorState &state_p) const { |
47 | auto &gstate = gstate_p.Cast<StreamingLimitGlobalState>(); |
48 | auto &state = state_p.Cast<StreamingLimitOperatorState>(); |
49 | auto &limit = state.limit; |
50 | auto &offset = state.offset; |
51 | idx_t current_offset = gstate.current_offset.fetch_add(i: input.size()); |
52 | idx_t max_element; |
53 | if (!PhysicalLimit::ComputeOffset(context, input, limit, offset, current_offset, max_element, |
54 | limit_expression: limit_expression.get(), offset_expression: offset_expression.get())) { |
55 | return OperatorResultType::FINISHED; |
56 | } |
57 | if (PhysicalLimit::HandleOffset(input, current_offset, offset, limit)) { |
58 | chunk.Reference(chunk&: input); |
59 | } |
60 | return OperatorResultType::NEED_MORE_INPUT; |
61 | } |
62 | |
63 | OrderPreservationType PhysicalStreamingLimit::OperatorOrder() const { |
64 | return OrderPreservationType::FIXED_ORDER; |
65 | } |
66 | |
67 | bool PhysicalStreamingLimit::ParallelOperator() const { |
68 | return parallel; |
69 | } |
70 | |
71 | } // namespace duckdb |
72 | |