| 1 | #include "duckdb/execution/operator/helper/physical_limit_percent.hpp" |
| 2 | |
| 3 | #include "duckdb/common/algorithm.hpp" |
| 4 | #include "duckdb/common/types/column/column_data_collection.hpp" |
| 5 | #include "duckdb/execution/expression_executor.hpp" |
| 6 | #include "duckdb/execution/operator/helper/physical_limit.hpp" |
| 7 | |
| 8 | namespace duckdb { |
| 9 | |
| 10 | //===--------------------------------------------------------------------===// |
| 11 | // Sink |
| 12 | //===--------------------------------------------------------------------===// |
| 13 | class LimitPercentGlobalState : public GlobalSinkState { |
| 14 | public: |
| 15 | explicit LimitPercentGlobalState(ClientContext &context, const PhysicalLimitPercent &op) |
| 16 | : current_offset(0), data(context, op.GetTypes()) { |
| 17 | if (!op.limit_expression) { |
| 18 | this->limit_percent = op.limit_percent; |
| 19 | is_limit_percent_delimited = true; |
| 20 | } else { |
| 21 | this->limit_percent = 100.0; |
| 22 | } |
| 23 | |
| 24 | if (!op.offset_expression) { |
| 25 | this->offset = op.offset_value; |
| 26 | is_offset_delimited = true; |
| 27 | } else { |
| 28 | this->offset = 0; |
| 29 | } |
| 30 | } |
| 31 | |
| 32 | idx_t current_offset; |
| 33 | double limit_percent; |
| 34 | idx_t offset; |
| 35 | ColumnDataCollection data; |
| 36 | |
| 37 | bool is_limit_percent_delimited = false; |
| 38 | bool is_offset_delimited = false; |
| 39 | }; |
| 40 | |
| 41 | unique_ptr<GlobalSinkState> PhysicalLimitPercent::GetGlobalSinkState(ClientContext &context) const { |
| 42 | return make_uniq<LimitPercentGlobalState>(args&: context, args: *this); |
| 43 | } |
| 44 | |
| 45 | SinkResultType PhysicalLimitPercent::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 46 | D_ASSERT(chunk.size() > 0); |
| 47 | auto &state = input.global_state.Cast<LimitPercentGlobalState>(); |
| 48 | auto &limit_percent = state.limit_percent; |
| 49 | auto &offset = state.offset; |
| 50 | |
| 51 | // get the next chunk from the child |
| 52 | if (!state.is_limit_percent_delimited) { |
| 53 | Value val = PhysicalLimit::GetDelimiter(context, input&: chunk, expr: limit_expression.get()); |
| 54 | if (!val.IsNull()) { |
| 55 | limit_percent = val.GetValue<double>(); |
| 56 | } |
| 57 | if (limit_percent < 0.0) { |
| 58 | throw BinderException("Percentage value(%f) can't be negative" , limit_percent); |
| 59 | } |
| 60 | state.is_limit_percent_delimited = true; |
| 61 | } |
| 62 | if (!state.is_offset_delimited) { |
| 63 | Value val = PhysicalLimit::GetDelimiter(context, input&: chunk, expr: offset_expression.get()); |
| 64 | if (!val.IsNull()) { |
| 65 | offset = val.GetValue<idx_t>(); |
| 66 | } |
| 67 | if (offset > 1ULL << 62ULL) { |
| 68 | throw BinderException("Max value %lld for LIMIT/OFFSET is %lld" , offset, 1ULL << 62ULL); |
| 69 | } |
| 70 | state.is_offset_delimited = true; |
| 71 | } |
| 72 | |
| 73 | if (!PhysicalLimit::HandleOffset(input&: chunk, current_offset&: state.current_offset, offset, limit: DConstants::INVALID_INDEX)) { |
| 74 | return SinkResultType::NEED_MORE_INPUT; |
| 75 | } |
| 76 | |
| 77 | state.data.Append(new_chunk&: chunk); |
| 78 | return SinkResultType::NEED_MORE_INPUT; |
| 79 | } |
| 80 | |
| 81 | //===--------------------------------------------------------------------===// |
| 82 | // Source |
| 83 | //===--------------------------------------------------------------------===// |
| 84 | class LimitPercentOperatorState : public GlobalSourceState { |
| 85 | public: |
| 86 | explicit LimitPercentOperatorState(const PhysicalLimitPercent &op) |
| 87 | : limit(DConstants::INVALID_INDEX), current_offset(0) { |
| 88 | D_ASSERT(op.sink_state); |
| 89 | auto &gstate = op.sink_state->Cast<LimitPercentGlobalState>(); |
| 90 | gstate.data.InitializeScan(state&: scan_state); |
| 91 | } |
| 92 | |
| 93 | ColumnDataScanState scan_state; |
| 94 | idx_t limit; |
| 95 | idx_t current_offset; |
| 96 | }; |
| 97 | |
| 98 | unique_ptr<GlobalSourceState> PhysicalLimitPercent::GetGlobalSourceState(ClientContext &context) const { |
| 99 | return make_uniq<LimitPercentOperatorState>(args: *this); |
| 100 | } |
| 101 | |
| 102 | SourceResultType PhysicalLimitPercent::GetData(ExecutionContext &context, DataChunk &chunk, |
| 103 | OperatorSourceInput &input) const { |
| 104 | auto &gstate = sink_state->Cast<LimitPercentGlobalState>(); |
| 105 | auto &state = input.global_state.Cast<LimitPercentOperatorState>(); |
| 106 | auto &percent_limit = gstate.limit_percent; |
| 107 | auto &offset = gstate.offset; |
| 108 | auto &limit = state.limit; |
| 109 | auto ¤t_offset = state.current_offset; |
| 110 | |
| 111 | if (gstate.is_limit_percent_delimited && limit == DConstants::INVALID_INDEX) { |
| 112 | idx_t count = gstate.data.Count(); |
| 113 | if (count > 0) { |
| 114 | count += offset; |
| 115 | } |
| 116 | if (Value::IsNan(input: percent_limit) || percent_limit < 0 || percent_limit > 100) { |
| 117 | throw OutOfRangeException("Limit percent out of range, should be between 0% and 100%" ); |
| 118 | } |
| 119 | double limit_dbl = percent_limit / 100 * count; |
| 120 | if (limit_dbl > count) { |
| 121 | limit = count; |
| 122 | } else { |
| 123 | limit = idx_t(limit_dbl); |
| 124 | } |
| 125 | if (limit == 0) { |
| 126 | return SourceResultType::FINISHED; |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | if (current_offset >= limit) { |
| 131 | return SourceResultType::FINISHED; |
| 132 | } |
| 133 | if (!gstate.data.Scan(state&: state.scan_state, result&: chunk)) { |
| 134 | return SourceResultType::FINISHED; |
| 135 | } |
| 136 | |
| 137 | PhysicalLimit::HandleOffset(input&: chunk, current_offset, offset: 0, limit); |
| 138 | |
| 139 | return SourceResultType::HAVE_MORE_OUTPUT; |
| 140 | } |
| 141 | |
| 142 | } // namespace duckdb |
| 143 | |