1 | #include "duckdb/execution/operator/helper/physical_limit_percent.hpp" |
2 | |
3 | #include "duckdb/common/algorithm.hpp" |
4 | #include "duckdb/common/types/column/column_data_collection.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/execution/operator/helper/physical_limit.hpp" |
7 | |
8 | namespace duckdb { |
9 | |
10 | //===--------------------------------------------------------------------===// |
11 | // Sink |
12 | //===--------------------------------------------------------------------===// |
13 | class LimitPercentGlobalState : public GlobalSinkState { |
14 | public: |
15 | explicit LimitPercentGlobalState(ClientContext &context, const PhysicalLimitPercent &op) |
16 | : current_offset(0), data(context, op.GetTypes()) { |
17 | if (!op.limit_expression) { |
18 | this->limit_percent = op.limit_percent; |
19 | is_limit_percent_delimited = true; |
20 | } else { |
21 | this->limit_percent = 100.0; |
22 | } |
23 | |
24 | if (!op.offset_expression) { |
25 | this->offset = op.offset_value; |
26 | is_offset_delimited = true; |
27 | } else { |
28 | this->offset = 0; |
29 | } |
30 | } |
31 | |
32 | idx_t current_offset; |
33 | double limit_percent; |
34 | idx_t offset; |
35 | ColumnDataCollection data; |
36 | |
37 | bool is_limit_percent_delimited = false; |
38 | bool is_offset_delimited = false; |
39 | }; |
40 | |
41 | unique_ptr<GlobalSinkState> PhysicalLimitPercent::GetGlobalSinkState(ClientContext &context) const { |
42 | return make_uniq<LimitPercentGlobalState>(args&: context, args: *this); |
43 | } |
44 | |
45 | SinkResultType PhysicalLimitPercent::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
46 | D_ASSERT(chunk.size() > 0); |
47 | auto &state = input.global_state.Cast<LimitPercentGlobalState>(); |
48 | auto &limit_percent = state.limit_percent; |
49 | auto &offset = state.offset; |
50 | |
51 | // get the next chunk from the child |
52 | if (!state.is_limit_percent_delimited) { |
53 | Value val = PhysicalLimit::GetDelimiter(context, input&: chunk, expr: limit_expression.get()); |
54 | if (!val.IsNull()) { |
55 | limit_percent = val.GetValue<double>(); |
56 | } |
57 | if (limit_percent < 0.0) { |
58 | throw BinderException("Percentage value(%f) can't be negative" , limit_percent); |
59 | } |
60 | state.is_limit_percent_delimited = true; |
61 | } |
62 | if (!state.is_offset_delimited) { |
63 | Value val = PhysicalLimit::GetDelimiter(context, input&: chunk, expr: offset_expression.get()); |
64 | if (!val.IsNull()) { |
65 | offset = val.GetValue<idx_t>(); |
66 | } |
67 | if (offset > 1ULL << 62ULL) { |
68 | throw BinderException("Max value %lld for LIMIT/OFFSET is %lld" , offset, 1ULL << 62ULL); |
69 | } |
70 | state.is_offset_delimited = true; |
71 | } |
72 | |
73 | if (!PhysicalLimit::HandleOffset(input&: chunk, current_offset&: state.current_offset, offset, limit: DConstants::INVALID_INDEX)) { |
74 | return SinkResultType::NEED_MORE_INPUT; |
75 | } |
76 | |
77 | state.data.Append(new_chunk&: chunk); |
78 | return SinkResultType::NEED_MORE_INPUT; |
79 | } |
80 | |
81 | //===--------------------------------------------------------------------===// |
82 | // Source |
83 | //===--------------------------------------------------------------------===// |
84 | class LimitPercentOperatorState : public GlobalSourceState { |
85 | public: |
86 | explicit LimitPercentOperatorState(const PhysicalLimitPercent &op) |
87 | : limit(DConstants::INVALID_INDEX), current_offset(0) { |
88 | D_ASSERT(op.sink_state); |
89 | auto &gstate = op.sink_state->Cast<LimitPercentGlobalState>(); |
90 | gstate.data.InitializeScan(state&: scan_state); |
91 | } |
92 | |
93 | ColumnDataScanState scan_state; |
94 | idx_t limit; |
95 | idx_t current_offset; |
96 | }; |
97 | |
98 | unique_ptr<GlobalSourceState> PhysicalLimitPercent::GetGlobalSourceState(ClientContext &context) const { |
99 | return make_uniq<LimitPercentOperatorState>(args: *this); |
100 | } |
101 | |
102 | SourceResultType PhysicalLimitPercent::GetData(ExecutionContext &context, DataChunk &chunk, |
103 | OperatorSourceInput &input) const { |
104 | auto &gstate = sink_state->Cast<LimitPercentGlobalState>(); |
105 | auto &state = input.global_state.Cast<LimitPercentOperatorState>(); |
106 | auto &percent_limit = gstate.limit_percent; |
107 | auto &offset = gstate.offset; |
108 | auto &limit = state.limit; |
109 | auto ¤t_offset = state.current_offset; |
110 | |
111 | if (gstate.is_limit_percent_delimited && limit == DConstants::INVALID_INDEX) { |
112 | idx_t count = gstate.data.Count(); |
113 | if (count > 0) { |
114 | count += offset; |
115 | } |
116 | if (Value::IsNan(input: percent_limit) || percent_limit < 0 || percent_limit > 100) { |
117 | throw OutOfRangeException("Limit percent out of range, should be between 0% and 100%" ); |
118 | } |
119 | double limit_dbl = percent_limit / 100 * count; |
120 | if (limit_dbl > count) { |
121 | limit = count; |
122 | } else { |
123 | limit = idx_t(limit_dbl); |
124 | } |
125 | if (limit == 0) { |
126 | return SourceResultType::FINISHED; |
127 | } |
128 | } |
129 | |
130 | if (current_offset >= limit) { |
131 | return SourceResultType::FINISHED; |
132 | } |
133 | if (!gstate.data.Scan(state&: state.scan_state, result&: chunk)) { |
134 | return SourceResultType::FINISHED; |
135 | } |
136 | |
137 | PhysicalLimit::HandleOffset(input&: chunk, current_offset, offset: 0, limit); |
138 | |
139 | return SourceResultType::HAVE_MORE_OUTPUT; |
140 | } |
141 | |
142 | } // namespace duckdb |
143 | |