1#include "duckdb/execution/operator/helper/physical_streaming_limit.hpp"
2#include "duckdb/execution/operator/helper/physical_limit.hpp"
3
4namespace duckdb {
5
6PhysicalStreamingLimit::PhysicalStreamingLimit(vector<LogicalType> types, idx_t limit, idx_t offset,
7 unique_ptr<Expression> limit_expression,
8 unique_ptr<Expression> offset_expression, idx_t estimated_cardinality,
9 bool parallel)
10 : PhysicalOperator(PhysicalOperatorType::STREAMING_LIMIT, std::move(types), estimated_cardinality),
11 limit_value(limit), offset_value(offset), limit_expression(std::move(limit_expression)),
12 offset_expression(std::move(offset_expression)), parallel(parallel) {
13}
14
15//===--------------------------------------------------------------------===//
16// Operator
17//===--------------------------------------------------------------------===//
18class StreamingLimitOperatorState : public OperatorState {
19public:
20 explicit StreamingLimitOperatorState(const PhysicalStreamingLimit &op) {
21 this->limit = op.limit_expression ? DConstants::INVALID_INDEX : op.limit_value;
22 this->offset = op.offset_expression ? DConstants::INVALID_INDEX : op.offset_value;
23 }
24
25 idx_t limit;
26 idx_t offset;
27};
28
29class StreamingLimitGlobalState : public GlobalOperatorState {
30public:
31 StreamingLimitGlobalState() : current_offset(0) {
32 }
33
34 std::atomic<idx_t> current_offset;
35};
36
37unique_ptr<OperatorState> PhysicalStreamingLimit::GetOperatorState(ExecutionContext &context) const {
38 return make_uniq<StreamingLimitOperatorState>(args: *this);
39}
40
41unique_ptr<GlobalOperatorState> PhysicalStreamingLimit::GetGlobalOperatorState(ClientContext &context) const {
42 return make_uniq<StreamingLimitGlobalState>();
43}
44
45OperatorResultType PhysicalStreamingLimit::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
46 GlobalOperatorState &gstate_p, OperatorState &state_p) const {
47 auto &gstate = gstate_p.Cast<StreamingLimitGlobalState>();
48 auto &state = state_p.Cast<StreamingLimitOperatorState>();
49 auto &limit = state.limit;
50 auto &offset = state.offset;
51 idx_t current_offset = gstate.current_offset.fetch_add(i: input.size());
52 idx_t max_element;
53 if (!PhysicalLimit::ComputeOffset(context, input, limit, offset, current_offset, max_element,
54 limit_expression: limit_expression.get(), offset_expression: offset_expression.get())) {
55 return OperatorResultType::FINISHED;
56 }
57 if (PhysicalLimit::HandleOffset(input, current_offset, offset, limit)) {
58 chunk.Reference(chunk&: input);
59 }
60 return OperatorResultType::NEED_MORE_INPUT;
61}
62
63OrderPreservationType PhysicalStreamingLimit::OperatorOrder() const {
64 return OrderPreservationType::FIXED_ORDER;
65}
66
67bool PhysicalStreamingLimit::ParallelOperator() const {
68 return parallel;
69}
70
71} // namespace duckdb
72