1 | #include "duckdb/execution/operator/helper/physical_limit.hpp" |
2 | |
3 | #include "duckdb/common/algorithm.hpp" |
4 | #include "duckdb/main/config.hpp" |
5 | |
6 | #include "duckdb/execution/expression_executor.hpp" |
7 | #include "duckdb/common/types/batched_data_collection.hpp" |
8 | #include "duckdb/execution/operator/helper/physical_streaming_limit.hpp" |
9 | |
10 | namespace duckdb { |
11 | |
12 | PhysicalLimit::PhysicalLimit(vector<LogicalType> types, idx_t limit, idx_t offset, |
13 | unique_ptr<Expression> limit_expression, unique_ptr<Expression> offset_expression, |
14 | idx_t estimated_cardinality) |
15 | : PhysicalOperator(PhysicalOperatorType::LIMIT, std::move(types), estimated_cardinality), limit_value(limit), |
16 | offset_value(offset), limit_expression(std::move(limit_expression)), |
17 | offset_expression(std::move(offset_expression)) { |
18 | } |
19 | |
20 | //===--------------------------------------------------------------------===// |
21 | // Sink |
22 | //===--------------------------------------------------------------------===// |
23 | class LimitGlobalState : public GlobalSinkState { |
24 | public: |
25 | explicit LimitGlobalState(ClientContext &context, const PhysicalLimit &op) : data(op.types) { |
26 | limit = 0; |
27 | offset = 0; |
28 | } |
29 | |
30 | mutex glock; |
31 | idx_t limit; |
32 | idx_t offset; |
33 | BatchedDataCollection data; |
34 | }; |
35 | |
36 | class LimitLocalState : public LocalSinkState { |
37 | public: |
38 | explicit LimitLocalState(ClientContext &context, const PhysicalLimit &op) : current_offset(0), data(op.types) { |
39 | this->limit = op.limit_expression ? DConstants::INVALID_INDEX : op.limit_value; |
40 | this->offset = op.offset_expression ? DConstants::INVALID_INDEX : op.offset_value; |
41 | } |
42 | |
43 | idx_t current_offset; |
44 | idx_t limit; |
45 | idx_t offset; |
46 | BatchedDataCollection data; |
47 | }; |
48 | |
49 | unique_ptr<GlobalSinkState> PhysicalLimit::GetGlobalSinkState(ClientContext &context) const { |
50 | return make_uniq<LimitGlobalState>(args&: context, args: *this); |
51 | } |
52 | |
53 | unique_ptr<LocalSinkState> PhysicalLimit::GetLocalSinkState(ExecutionContext &context) const { |
54 | return make_uniq<LimitLocalState>(args&: context.client, args: *this); |
55 | } |
56 | |
57 | bool PhysicalLimit::ComputeOffset(ExecutionContext &context, DataChunk &input, idx_t &limit, idx_t &offset, |
58 | idx_t current_offset, idx_t &max_element, Expression *limit_expression, |
59 | Expression *offset_expression) { |
60 | if (limit != DConstants::INVALID_INDEX && offset != DConstants::INVALID_INDEX) { |
61 | max_element = limit + offset; |
62 | if ((limit == 0 || current_offset >= max_element) && !(limit_expression || offset_expression)) { |
63 | return false; |
64 | } |
65 | } |
66 | |
67 | // get the next chunk from the child |
68 | if (limit == DConstants::INVALID_INDEX) { |
69 | limit = 1ULL << 62ULL; |
70 | Value val = GetDelimiter(context, input, expr: limit_expression); |
71 | if (!val.IsNull()) { |
72 | limit = val.GetValue<idx_t>(); |
73 | } |
74 | if (limit > 1ULL << 62ULL) { |
75 | throw BinderException("Max value %lld for LIMIT/OFFSET is %lld" , limit, 1ULL << 62ULL); |
76 | } |
77 | } |
78 | if (offset == DConstants::INVALID_INDEX) { |
79 | offset = 0; |
80 | Value val = GetDelimiter(context, input, expr: offset_expression); |
81 | if (!val.IsNull()) { |
82 | offset = val.GetValue<idx_t>(); |
83 | } |
84 | if (offset > 1ULL << 62ULL) { |
85 | throw BinderException("Max value %lld for LIMIT/OFFSET is %lld" , offset, 1ULL << 62ULL); |
86 | } |
87 | } |
88 | max_element = limit + offset; |
89 | if (limit == 0 || current_offset >= max_element) { |
90 | return false; |
91 | } |
92 | return true; |
93 | } |
94 | |
95 | SinkResultType PhysicalLimit::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
96 | |
97 | D_ASSERT(chunk.size() > 0); |
98 | auto &state = input.local_state.Cast<LimitLocalState>(); |
99 | auto &limit = state.limit; |
100 | auto &offset = state.offset; |
101 | |
102 | idx_t max_element; |
103 | if (!ComputeOffset(context, input&: chunk, limit, offset, current_offset: state.current_offset, max_element, limit_expression: limit_expression.get(), |
104 | offset_expression: offset_expression.get())) { |
105 | return SinkResultType::FINISHED; |
106 | } |
107 | auto max_cardinality = max_element - state.current_offset; |
108 | if (max_cardinality < chunk.size()) { |
109 | chunk.SetCardinality(max_cardinality); |
110 | } |
111 | state.data.Append(input&: chunk, batch_index: state.partition_info.batch_index.GetIndex()); |
112 | state.current_offset += chunk.size(); |
113 | if (state.current_offset == max_element) { |
114 | return SinkResultType::FINISHED; |
115 | } |
116 | return SinkResultType::NEED_MORE_INPUT; |
117 | } |
118 | |
119 | void PhysicalLimit::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
120 | auto &gstate = gstate_p.Cast<LimitGlobalState>(); |
121 | auto &state = lstate_p.Cast<LimitLocalState>(); |
122 | |
123 | lock_guard<mutex> lock(gstate.glock); |
124 | gstate.limit = state.limit; |
125 | gstate.offset = state.offset; |
126 | gstate.data.Merge(other&: state.data); |
127 | } |
128 | |
129 | //===--------------------------------------------------------------------===// |
130 | // Source |
131 | //===--------------------------------------------------------------------===// |
132 | class LimitSourceState : public GlobalSourceState { |
133 | public: |
134 | LimitSourceState() { |
135 | initialized = false; |
136 | current_offset = 0; |
137 | } |
138 | |
139 | bool initialized; |
140 | idx_t current_offset; |
141 | BatchedChunkScanState scan_state; |
142 | }; |
143 | |
144 | unique_ptr<GlobalSourceState> PhysicalLimit::GetGlobalSourceState(ClientContext &context) const { |
145 | return make_uniq<LimitSourceState>(); |
146 | } |
147 | |
148 | SourceResultType PhysicalLimit::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { |
149 | auto &gstate = sink_state->Cast<LimitGlobalState>(); |
150 | auto &state = input.global_state.Cast<LimitSourceState>(); |
151 | while (state.current_offset < gstate.limit + gstate.offset) { |
152 | if (!state.initialized) { |
153 | gstate.data.InitializeScan(state&: state.scan_state); |
154 | state.initialized = true; |
155 | } |
156 | gstate.data.Scan(state&: state.scan_state, output&: chunk); |
157 | if (chunk.size() == 0) { |
158 | return SourceResultType::FINISHED; |
159 | } |
160 | if (HandleOffset(input&: chunk, current_offset&: state.current_offset, offset: gstate.offset, limit: gstate.limit)) { |
161 | break; |
162 | } |
163 | } |
164 | |
165 | return chunk.size() > 0 ? SourceResultType::HAVE_MORE_OUTPUT : SourceResultType::FINISHED; |
166 | } |
167 | |
168 | bool PhysicalLimit::HandleOffset(DataChunk &input, idx_t ¤t_offset, idx_t offset, idx_t limit) { |
169 | idx_t max_element = limit + offset; |
170 | if (limit == DConstants::INVALID_INDEX) { |
171 | max_element = DConstants::INVALID_INDEX; |
172 | } |
173 | idx_t input_size = input.size(); |
174 | if (current_offset < offset) { |
175 | // we are not yet at the offset point |
176 | if (current_offset + input.size() > offset) { |
177 | // however we will reach it in this chunk |
178 | // we have to copy part of the chunk with an offset |
179 | idx_t start_position = offset - current_offset; |
180 | auto chunk_count = MinValue<idx_t>(a: limit, b: input.size() - start_position); |
181 | SelectionVector sel(STANDARD_VECTOR_SIZE); |
182 | for (idx_t i = 0; i < chunk_count; i++) { |
183 | sel.set_index(idx: i, loc: start_position + i); |
184 | } |
185 | // set up a slice of the input chunks |
186 | input.Slice(other&: input, sel, count: chunk_count); |
187 | } else { |
188 | current_offset += input_size; |
189 | return false; |
190 | } |
191 | } else { |
192 | // have to copy either the entire chunk or part of it |
193 | idx_t chunk_count; |
194 | if (current_offset + input.size() >= max_element) { |
195 | // have to limit the count of the chunk |
196 | chunk_count = max_element - current_offset; |
197 | } else { |
198 | // we copy the entire chunk |
199 | chunk_count = input.size(); |
200 | } |
201 | // instead of copying we just change the pointer in the current chunk |
202 | input.Reference(chunk&: input); |
203 | input.SetCardinality(chunk_count); |
204 | } |
205 | |
206 | current_offset += input_size; |
207 | return true; |
208 | } |
209 | |
210 | Value PhysicalLimit::GetDelimiter(ExecutionContext &context, DataChunk &input, Expression *expr) { |
211 | DataChunk limit_chunk; |
212 | vector<LogicalType> types {expr->return_type}; |
213 | auto &allocator = Allocator::Get(context&: context.client); |
214 | limit_chunk.Initialize(allocator, types); |
215 | ExpressionExecutor limit_executor(context.client, expr); |
216 | auto input_size = input.size(); |
217 | input.SetCardinality(1); |
218 | limit_executor.Execute(input, result&: limit_chunk); |
219 | input.SetCardinality(input_size); |
220 | auto limit_value = limit_chunk.GetValue(col_idx: 0, index: 0); |
221 | return limit_value; |
222 | } |
223 | |
224 | } // namespace duckdb |
225 | |