| 1 | #include "duckdb/execution/operator/helper/physical_limit.hpp" |
| 2 | |
| 3 | #include "duckdb/common/algorithm.hpp" |
| 4 | #include "duckdb/main/config.hpp" |
| 5 | |
| 6 | #include "duckdb/execution/expression_executor.hpp" |
| 7 | #include "duckdb/common/types/batched_data_collection.hpp" |
| 8 | #include "duckdb/execution/operator/helper/physical_streaming_limit.hpp" |
| 9 | |
| 10 | namespace duckdb { |
| 11 | |
| 12 | PhysicalLimit::PhysicalLimit(vector<LogicalType> types, idx_t limit, idx_t offset, |
| 13 | unique_ptr<Expression> limit_expression, unique_ptr<Expression> offset_expression, |
| 14 | idx_t estimated_cardinality) |
| 15 | : PhysicalOperator(PhysicalOperatorType::LIMIT, std::move(types), estimated_cardinality), limit_value(limit), |
| 16 | offset_value(offset), limit_expression(std::move(limit_expression)), |
| 17 | offset_expression(std::move(offset_expression)) { |
| 18 | } |
| 19 | |
| 20 | //===--------------------------------------------------------------------===// |
| 21 | // Sink |
| 22 | //===--------------------------------------------------------------------===// |
| 23 | class LimitGlobalState : public GlobalSinkState { |
| 24 | public: |
| 25 | explicit LimitGlobalState(ClientContext &context, const PhysicalLimit &op) : data(op.types) { |
| 26 | limit = 0; |
| 27 | offset = 0; |
| 28 | } |
| 29 | |
| 30 | mutex glock; |
| 31 | idx_t limit; |
| 32 | idx_t offset; |
| 33 | BatchedDataCollection data; |
| 34 | }; |
| 35 | |
| 36 | class LimitLocalState : public LocalSinkState { |
| 37 | public: |
| 38 | explicit LimitLocalState(ClientContext &context, const PhysicalLimit &op) : current_offset(0), data(op.types) { |
| 39 | this->limit = op.limit_expression ? DConstants::INVALID_INDEX : op.limit_value; |
| 40 | this->offset = op.offset_expression ? DConstants::INVALID_INDEX : op.offset_value; |
| 41 | } |
| 42 | |
| 43 | idx_t current_offset; |
| 44 | idx_t limit; |
| 45 | idx_t offset; |
| 46 | BatchedDataCollection data; |
| 47 | }; |
| 48 | |
| 49 | unique_ptr<GlobalSinkState> PhysicalLimit::GetGlobalSinkState(ClientContext &context) const { |
| 50 | return make_uniq<LimitGlobalState>(args&: context, args: *this); |
| 51 | } |
| 52 | |
| 53 | unique_ptr<LocalSinkState> PhysicalLimit::GetLocalSinkState(ExecutionContext &context) const { |
| 54 | return make_uniq<LimitLocalState>(args&: context.client, args: *this); |
| 55 | } |
| 56 | |
| 57 | bool PhysicalLimit::ComputeOffset(ExecutionContext &context, DataChunk &input, idx_t &limit, idx_t &offset, |
| 58 | idx_t current_offset, idx_t &max_element, Expression *limit_expression, |
| 59 | Expression *offset_expression) { |
| 60 | if (limit != DConstants::INVALID_INDEX && offset != DConstants::INVALID_INDEX) { |
| 61 | max_element = limit + offset; |
| 62 | if ((limit == 0 || current_offset >= max_element) && !(limit_expression || offset_expression)) { |
| 63 | return false; |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | // get the next chunk from the child |
| 68 | if (limit == DConstants::INVALID_INDEX) { |
| 69 | limit = 1ULL << 62ULL; |
| 70 | Value val = GetDelimiter(context, input, expr: limit_expression); |
| 71 | if (!val.IsNull()) { |
| 72 | limit = val.GetValue<idx_t>(); |
| 73 | } |
| 74 | if (limit > 1ULL << 62ULL) { |
| 75 | throw BinderException("Max value %lld for LIMIT/OFFSET is %lld" , limit, 1ULL << 62ULL); |
| 76 | } |
| 77 | } |
| 78 | if (offset == DConstants::INVALID_INDEX) { |
| 79 | offset = 0; |
| 80 | Value val = GetDelimiter(context, input, expr: offset_expression); |
| 81 | if (!val.IsNull()) { |
| 82 | offset = val.GetValue<idx_t>(); |
| 83 | } |
| 84 | if (offset > 1ULL << 62ULL) { |
| 85 | throw BinderException("Max value %lld for LIMIT/OFFSET is %lld" , offset, 1ULL << 62ULL); |
| 86 | } |
| 87 | } |
| 88 | max_element = limit + offset; |
| 89 | if (limit == 0 || current_offset >= max_element) { |
| 90 | return false; |
| 91 | } |
| 92 | return true; |
| 93 | } |
| 94 | |
| 95 | SinkResultType PhysicalLimit::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 96 | |
| 97 | D_ASSERT(chunk.size() > 0); |
| 98 | auto &state = input.local_state.Cast<LimitLocalState>(); |
| 99 | auto &limit = state.limit; |
| 100 | auto &offset = state.offset; |
| 101 | |
| 102 | idx_t max_element; |
| 103 | if (!ComputeOffset(context, input&: chunk, limit, offset, current_offset: state.current_offset, max_element, limit_expression: limit_expression.get(), |
| 104 | offset_expression: offset_expression.get())) { |
| 105 | return SinkResultType::FINISHED; |
| 106 | } |
| 107 | auto max_cardinality = max_element - state.current_offset; |
| 108 | if (max_cardinality < chunk.size()) { |
| 109 | chunk.SetCardinality(max_cardinality); |
| 110 | } |
| 111 | state.data.Append(input&: chunk, batch_index: state.partition_info.batch_index.GetIndex()); |
| 112 | state.current_offset += chunk.size(); |
| 113 | if (state.current_offset == max_element) { |
| 114 | return SinkResultType::FINISHED; |
| 115 | } |
| 116 | return SinkResultType::NEED_MORE_INPUT; |
| 117 | } |
| 118 | |
| 119 | void PhysicalLimit::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
| 120 | auto &gstate = gstate_p.Cast<LimitGlobalState>(); |
| 121 | auto &state = lstate_p.Cast<LimitLocalState>(); |
| 122 | |
| 123 | lock_guard<mutex> lock(gstate.glock); |
| 124 | gstate.limit = state.limit; |
| 125 | gstate.offset = state.offset; |
| 126 | gstate.data.Merge(other&: state.data); |
| 127 | } |
| 128 | |
| 129 | //===--------------------------------------------------------------------===// |
| 130 | // Source |
| 131 | //===--------------------------------------------------------------------===// |
| 132 | class LimitSourceState : public GlobalSourceState { |
| 133 | public: |
| 134 | LimitSourceState() { |
| 135 | initialized = false; |
| 136 | current_offset = 0; |
| 137 | } |
| 138 | |
| 139 | bool initialized; |
| 140 | idx_t current_offset; |
| 141 | BatchedChunkScanState scan_state; |
| 142 | }; |
| 143 | |
| 144 | unique_ptr<GlobalSourceState> PhysicalLimit::GetGlobalSourceState(ClientContext &context) const { |
| 145 | return make_uniq<LimitSourceState>(); |
| 146 | } |
| 147 | |
| 148 | SourceResultType PhysicalLimit::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { |
| 149 | auto &gstate = sink_state->Cast<LimitGlobalState>(); |
| 150 | auto &state = input.global_state.Cast<LimitSourceState>(); |
| 151 | while (state.current_offset < gstate.limit + gstate.offset) { |
| 152 | if (!state.initialized) { |
| 153 | gstate.data.InitializeScan(state&: state.scan_state); |
| 154 | state.initialized = true; |
| 155 | } |
| 156 | gstate.data.Scan(state&: state.scan_state, output&: chunk); |
| 157 | if (chunk.size() == 0) { |
| 158 | return SourceResultType::FINISHED; |
| 159 | } |
| 160 | if (HandleOffset(input&: chunk, current_offset&: state.current_offset, offset: gstate.offset, limit: gstate.limit)) { |
| 161 | break; |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | return chunk.size() > 0 ? SourceResultType::HAVE_MORE_OUTPUT : SourceResultType::FINISHED; |
| 166 | } |
| 167 | |
| 168 | bool PhysicalLimit::HandleOffset(DataChunk &input, idx_t ¤t_offset, idx_t offset, idx_t limit) { |
| 169 | idx_t max_element = limit + offset; |
| 170 | if (limit == DConstants::INVALID_INDEX) { |
| 171 | max_element = DConstants::INVALID_INDEX; |
| 172 | } |
| 173 | idx_t input_size = input.size(); |
| 174 | if (current_offset < offset) { |
| 175 | // we are not yet at the offset point |
| 176 | if (current_offset + input.size() > offset) { |
| 177 | // however we will reach it in this chunk |
| 178 | // we have to copy part of the chunk with an offset |
| 179 | idx_t start_position = offset - current_offset; |
| 180 | auto chunk_count = MinValue<idx_t>(a: limit, b: input.size() - start_position); |
| 181 | SelectionVector sel(STANDARD_VECTOR_SIZE); |
| 182 | for (idx_t i = 0; i < chunk_count; i++) { |
| 183 | sel.set_index(idx: i, loc: start_position + i); |
| 184 | } |
| 185 | // set up a slice of the input chunks |
| 186 | input.Slice(other&: input, sel, count: chunk_count); |
| 187 | } else { |
| 188 | current_offset += input_size; |
| 189 | return false; |
| 190 | } |
| 191 | } else { |
| 192 | // have to copy either the entire chunk or part of it |
| 193 | idx_t chunk_count; |
| 194 | if (current_offset + input.size() >= max_element) { |
| 195 | // have to limit the count of the chunk |
| 196 | chunk_count = max_element - current_offset; |
| 197 | } else { |
| 198 | // we copy the entire chunk |
| 199 | chunk_count = input.size(); |
| 200 | } |
| 201 | // instead of copying we just change the pointer in the current chunk |
| 202 | input.Reference(chunk&: input); |
| 203 | input.SetCardinality(chunk_count); |
| 204 | } |
| 205 | |
| 206 | current_offset += input_size; |
| 207 | return true; |
| 208 | } |
| 209 | |
| 210 | Value PhysicalLimit::GetDelimiter(ExecutionContext &context, DataChunk &input, Expression *expr) { |
| 211 | DataChunk limit_chunk; |
| 212 | vector<LogicalType> types {expr->return_type}; |
| 213 | auto &allocator = Allocator::Get(context&: context.client); |
| 214 | limit_chunk.Initialize(allocator, types); |
| 215 | ExpressionExecutor limit_executor(context.client, expr); |
| 216 | auto input_size = input.size(); |
| 217 | input.SetCardinality(1); |
| 218 | limit_executor.Execute(input, result&: limit_chunk); |
| 219 | input.SetCardinality(input_size); |
| 220 | auto limit_value = limit_chunk.GetValue(col_idx: 0, index: 0); |
| 221 | return limit_value; |
| 222 | } |
| 223 | |
| 224 | } // namespace duckdb |
| 225 | |