1#include "duckdb/execution/operator/helper/physical_limit.hpp"
2
3#include "duckdb/common/algorithm.hpp"
4#include "duckdb/main/config.hpp"
5
6#include "duckdb/execution/expression_executor.hpp"
7#include "duckdb/common/types/batched_data_collection.hpp"
8#include "duckdb/execution/operator/helper/physical_streaming_limit.hpp"
9
10namespace duckdb {
11
12PhysicalLimit::PhysicalLimit(vector<LogicalType> types, idx_t limit, idx_t offset,
13 unique_ptr<Expression> limit_expression, unique_ptr<Expression> offset_expression,
14 idx_t estimated_cardinality)
15 : PhysicalOperator(PhysicalOperatorType::LIMIT, std::move(types), estimated_cardinality), limit_value(limit),
16 offset_value(offset), limit_expression(std::move(limit_expression)),
17 offset_expression(std::move(offset_expression)) {
18}
19
20//===--------------------------------------------------------------------===//
21// Sink
22//===--------------------------------------------------------------------===//
23class LimitGlobalState : public GlobalSinkState {
24public:
25 explicit LimitGlobalState(ClientContext &context, const PhysicalLimit &op) : data(op.types) {
26 limit = 0;
27 offset = 0;
28 }
29
30 mutex glock;
31 idx_t limit;
32 idx_t offset;
33 BatchedDataCollection data;
34};
35
36class LimitLocalState : public LocalSinkState {
37public:
38 explicit LimitLocalState(ClientContext &context, const PhysicalLimit &op) : current_offset(0), data(op.types) {
39 this->limit = op.limit_expression ? DConstants::INVALID_INDEX : op.limit_value;
40 this->offset = op.offset_expression ? DConstants::INVALID_INDEX : op.offset_value;
41 }
42
43 idx_t current_offset;
44 idx_t limit;
45 idx_t offset;
46 BatchedDataCollection data;
47};
48
49unique_ptr<GlobalSinkState> PhysicalLimit::GetGlobalSinkState(ClientContext &context) const {
50 return make_uniq<LimitGlobalState>(args&: context, args: *this);
51}
52
53unique_ptr<LocalSinkState> PhysicalLimit::GetLocalSinkState(ExecutionContext &context) const {
54 return make_uniq<LimitLocalState>(args&: context.client, args: *this);
55}
56
57bool PhysicalLimit::ComputeOffset(ExecutionContext &context, DataChunk &input, idx_t &limit, idx_t &offset,
58 idx_t current_offset, idx_t &max_element, Expression *limit_expression,
59 Expression *offset_expression) {
60 if (limit != DConstants::INVALID_INDEX && offset != DConstants::INVALID_INDEX) {
61 max_element = limit + offset;
62 if ((limit == 0 || current_offset >= max_element) && !(limit_expression || offset_expression)) {
63 return false;
64 }
65 }
66
67 // get the next chunk from the child
68 if (limit == DConstants::INVALID_INDEX) {
69 limit = 1ULL << 62ULL;
70 Value val = GetDelimiter(context, input, expr: limit_expression);
71 if (!val.IsNull()) {
72 limit = val.GetValue<idx_t>();
73 }
74 if (limit > 1ULL << 62ULL) {
75 throw BinderException("Max value %lld for LIMIT/OFFSET is %lld", limit, 1ULL << 62ULL);
76 }
77 }
78 if (offset == DConstants::INVALID_INDEX) {
79 offset = 0;
80 Value val = GetDelimiter(context, input, expr: offset_expression);
81 if (!val.IsNull()) {
82 offset = val.GetValue<idx_t>();
83 }
84 if (offset > 1ULL << 62ULL) {
85 throw BinderException("Max value %lld for LIMIT/OFFSET is %lld", offset, 1ULL << 62ULL);
86 }
87 }
88 max_element = limit + offset;
89 if (limit == 0 || current_offset >= max_element) {
90 return false;
91 }
92 return true;
93}
94
95SinkResultType PhysicalLimit::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
96
97 D_ASSERT(chunk.size() > 0);
98 auto &state = input.local_state.Cast<LimitLocalState>();
99 auto &limit = state.limit;
100 auto &offset = state.offset;
101
102 idx_t max_element;
103 if (!ComputeOffset(context, input&: chunk, limit, offset, current_offset: state.current_offset, max_element, limit_expression: limit_expression.get(),
104 offset_expression: offset_expression.get())) {
105 return SinkResultType::FINISHED;
106 }
107 auto max_cardinality = max_element - state.current_offset;
108 if (max_cardinality < chunk.size()) {
109 chunk.SetCardinality(max_cardinality);
110 }
111 state.data.Append(input&: chunk, batch_index: state.partition_info.batch_index.GetIndex());
112 state.current_offset += chunk.size();
113 if (state.current_offset == max_element) {
114 return SinkResultType::FINISHED;
115 }
116 return SinkResultType::NEED_MORE_INPUT;
117}
118
119void PhysicalLimit::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
120 auto &gstate = gstate_p.Cast<LimitGlobalState>();
121 auto &state = lstate_p.Cast<LimitLocalState>();
122
123 lock_guard<mutex> lock(gstate.glock);
124 gstate.limit = state.limit;
125 gstate.offset = state.offset;
126 gstate.data.Merge(other&: state.data);
127}
128
129//===--------------------------------------------------------------------===//
130// Source
131//===--------------------------------------------------------------------===//
132class LimitSourceState : public GlobalSourceState {
133public:
134 LimitSourceState() {
135 initialized = false;
136 current_offset = 0;
137 }
138
139 bool initialized;
140 idx_t current_offset;
141 BatchedChunkScanState scan_state;
142};
143
144unique_ptr<GlobalSourceState> PhysicalLimit::GetGlobalSourceState(ClientContext &context) const {
145 return make_uniq<LimitSourceState>();
146}
147
148SourceResultType PhysicalLimit::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const {
149 auto &gstate = sink_state->Cast<LimitGlobalState>();
150 auto &state = input.global_state.Cast<LimitSourceState>();
151 while (state.current_offset < gstate.limit + gstate.offset) {
152 if (!state.initialized) {
153 gstate.data.InitializeScan(state&: state.scan_state);
154 state.initialized = true;
155 }
156 gstate.data.Scan(state&: state.scan_state, output&: chunk);
157 if (chunk.size() == 0) {
158 return SourceResultType::FINISHED;
159 }
160 if (HandleOffset(input&: chunk, current_offset&: state.current_offset, offset: gstate.offset, limit: gstate.limit)) {
161 break;
162 }
163 }
164
165 return chunk.size() > 0 ? SourceResultType::HAVE_MORE_OUTPUT : SourceResultType::FINISHED;
166}
167
168bool PhysicalLimit::HandleOffset(DataChunk &input, idx_t &current_offset, idx_t offset, idx_t limit) {
169 idx_t max_element = limit + offset;
170 if (limit == DConstants::INVALID_INDEX) {
171 max_element = DConstants::INVALID_INDEX;
172 }
173 idx_t input_size = input.size();
174 if (current_offset < offset) {
175 // we are not yet at the offset point
176 if (current_offset + input.size() > offset) {
177 // however we will reach it in this chunk
178 // we have to copy part of the chunk with an offset
179 idx_t start_position = offset - current_offset;
180 auto chunk_count = MinValue<idx_t>(a: limit, b: input.size() - start_position);
181 SelectionVector sel(STANDARD_VECTOR_SIZE);
182 for (idx_t i = 0; i < chunk_count; i++) {
183 sel.set_index(idx: i, loc: start_position + i);
184 }
185 // set up a slice of the input chunks
186 input.Slice(other&: input, sel, count: chunk_count);
187 } else {
188 current_offset += input_size;
189 return false;
190 }
191 } else {
192 // have to copy either the entire chunk or part of it
193 idx_t chunk_count;
194 if (current_offset + input.size() >= max_element) {
195 // have to limit the count of the chunk
196 chunk_count = max_element - current_offset;
197 } else {
198 // we copy the entire chunk
199 chunk_count = input.size();
200 }
201 // instead of copying we just change the pointer in the current chunk
202 input.Reference(chunk&: input);
203 input.SetCardinality(chunk_count);
204 }
205
206 current_offset += input_size;
207 return true;
208}
209
210Value PhysicalLimit::GetDelimiter(ExecutionContext &context, DataChunk &input, Expression *expr) {
211 DataChunk limit_chunk;
212 vector<LogicalType> types {expr->return_type};
213 auto &allocator = Allocator::Get(context&: context.client);
214 limit_chunk.Initialize(allocator, types);
215 ExpressionExecutor limit_executor(context.client, expr);
216 auto input_size = input.size();
217 input.SetCardinality(1);
218 limit_executor.Execute(input, result&: limit_chunk);
219 input.SetCardinality(input_size);
220 auto limit_value = limit_chunk.GetValue(col_idx: 0, index: 0);
221 return limit_value;
222}
223
224} // namespace duckdb
225