1#include "duckdb/execution/operator/helper/physical_limit_percent.hpp"
2
3#include "duckdb/common/algorithm.hpp"
4#include "duckdb/common/types/column/column_data_collection.hpp"
5#include "duckdb/execution/expression_executor.hpp"
6#include "duckdb/execution/operator/helper/physical_limit.hpp"
7
8namespace duckdb {
9
10//===--------------------------------------------------------------------===//
11// Sink
12//===--------------------------------------------------------------------===//
13class LimitPercentGlobalState : public GlobalSinkState {
14public:
15 explicit LimitPercentGlobalState(ClientContext &context, const PhysicalLimitPercent &op)
16 : current_offset(0), data(context, op.GetTypes()) {
17 if (!op.limit_expression) {
18 this->limit_percent = op.limit_percent;
19 is_limit_percent_delimited = true;
20 } else {
21 this->limit_percent = 100.0;
22 }
23
24 if (!op.offset_expression) {
25 this->offset = op.offset_value;
26 is_offset_delimited = true;
27 } else {
28 this->offset = 0;
29 }
30 }
31
32 idx_t current_offset;
33 double limit_percent;
34 idx_t offset;
35 ColumnDataCollection data;
36
37 bool is_limit_percent_delimited = false;
38 bool is_offset_delimited = false;
39};
40
41unique_ptr<GlobalSinkState> PhysicalLimitPercent::GetGlobalSinkState(ClientContext &context) const {
42 return make_uniq<LimitPercentGlobalState>(args&: context, args: *this);
43}
44
45SinkResultType PhysicalLimitPercent::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
46 D_ASSERT(chunk.size() > 0);
47 auto &state = input.global_state.Cast<LimitPercentGlobalState>();
48 auto &limit_percent = state.limit_percent;
49 auto &offset = state.offset;
50
51 // get the next chunk from the child
52 if (!state.is_limit_percent_delimited) {
53 Value val = PhysicalLimit::GetDelimiter(context, input&: chunk, expr: limit_expression.get());
54 if (!val.IsNull()) {
55 limit_percent = val.GetValue<double>();
56 }
57 if (limit_percent < 0.0) {
58 throw BinderException("Percentage value(%f) can't be negative", limit_percent);
59 }
60 state.is_limit_percent_delimited = true;
61 }
62 if (!state.is_offset_delimited) {
63 Value val = PhysicalLimit::GetDelimiter(context, input&: chunk, expr: offset_expression.get());
64 if (!val.IsNull()) {
65 offset = val.GetValue<idx_t>();
66 }
67 if (offset > 1ULL << 62ULL) {
68 throw BinderException("Max value %lld for LIMIT/OFFSET is %lld", offset, 1ULL << 62ULL);
69 }
70 state.is_offset_delimited = true;
71 }
72
73 if (!PhysicalLimit::HandleOffset(input&: chunk, current_offset&: state.current_offset, offset, limit: DConstants::INVALID_INDEX)) {
74 return SinkResultType::NEED_MORE_INPUT;
75 }
76
77 state.data.Append(new_chunk&: chunk);
78 return SinkResultType::NEED_MORE_INPUT;
79}
80
81//===--------------------------------------------------------------------===//
82// Source
83//===--------------------------------------------------------------------===//
84class LimitPercentOperatorState : public GlobalSourceState {
85public:
86 explicit LimitPercentOperatorState(const PhysicalLimitPercent &op)
87 : limit(DConstants::INVALID_INDEX), current_offset(0) {
88 D_ASSERT(op.sink_state);
89 auto &gstate = op.sink_state->Cast<LimitPercentGlobalState>();
90 gstate.data.InitializeScan(state&: scan_state);
91 }
92
93 ColumnDataScanState scan_state;
94 idx_t limit;
95 idx_t current_offset;
96};
97
98unique_ptr<GlobalSourceState> PhysicalLimitPercent::GetGlobalSourceState(ClientContext &context) const {
99 return make_uniq<LimitPercentOperatorState>(args: *this);
100}
101
102SourceResultType PhysicalLimitPercent::GetData(ExecutionContext &context, DataChunk &chunk,
103 OperatorSourceInput &input) const {
104 auto &gstate = sink_state->Cast<LimitPercentGlobalState>();
105 auto &state = input.global_state.Cast<LimitPercentOperatorState>();
106 auto &percent_limit = gstate.limit_percent;
107 auto &offset = gstate.offset;
108 auto &limit = state.limit;
109 auto &current_offset = state.current_offset;
110
111 if (gstate.is_limit_percent_delimited && limit == DConstants::INVALID_INDEX) {
112 idx_t count = gstate.data.Count();
113 if (count > 0) {
114 count += offset;
115 }
116 if (Value::IsNan(input: percent_limit) || percent_limit < 0 || percent_limit > 100) {
117 throw OutOfRangeException("Limit percent out of range, should be between 0% and 100%");
118 }
119 double limit_dbl = percent_limit / 100 * count;
120 if (limit_dbl > count) {
121 limit = count;
122 } else {
123 limit = idx_t(limit_dbl);
124 }
125 if (limit == 0) {
126 return SourceResultType::FINISHED;
127 }
128 }
129
130 if (current_offset >= limit) {
131 return SourceResultType::FINISHED;
132 }
133 if (!gstate.data.Scan(state&: state.scan_state, result&: chunk)) {
134 return SourceResultType::FINISHED;
135 }
136
137 PhysicalLimit::HandleOffset(input&: chunk, current_offset, offset: 0, limit);
138
139 return SourceResultType::HAVE_MORE_OUTPUT;
140}
141
142} // namespace duckdb
143