1 | #include "duckdb/execution/operator/projection/physical_unnest.hpp" |
2 | |
3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
4 | #include "duckdb/common/algorithm.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/planner/expression/bound_reference_expression.hpp" |
7 | #include "duckdb/planner/expression/bound_unnest_expression.hpp" |
8 | |
9 | namespace duckdb { |
10 | |
11 | class UnnestOperatorState : public OperatorState { |
12 | public: |
13 | UnnestOperatorState(ClientContext &context, const vector<unique_ptr<Expression>> &select_list) |
14 | : current_row(0), list_position(0), longest_list_length(DConstants::INVALID_INDEX), first_fetch(true), |
15 | executor(context) { |
16 | |
17 | // for each UNNEST in the select_list, we add the child expression to the expression executor |
18 | // and set the return type in the list_data chunk, which will contain the evaluated expression results |
19 | vector<LogicalType> list_data_types; |
20 | for (auto &exp : select_list) { |
21 | D_ASSERT(exp->type == ExpressionType::BOUND_UNNEST); |
22 | auto &bue = exp->Cast<BoundUnnestExpression>(); |
23 | list_data_types.push_back(x: bue.child->return_type); |
24 | executor.AddExpression(expr: *bue.child.get()); |
25 | } |
26 | |
27 | auto &allocator = Allocator::Get(context); |
28 | list_data.Initialize(allocator, types: list_data_types); |
29 | |
30 | list_vector_data.resize(new_size: list_data.ColumnCount()); |
31 | list_child_data.resize(new_size: list_data.ColumnCount()); |
32 | } |
33 | |
34 | idx_t current_row; |
35 | idx_t list_position; |
36 | idx_t longest_list_length; |
37 | bool first_fetch; |
38 | |
39 | ExpressionExecutor executor; |
40 | DataChunk list_data; |
41 | vector<UnifiedVectorFormat> list_vector_data; |
42 | vector<UnifiedVectorFormat> list_child_data; |
43 | |
44 | public: |
45 | //! Reset the fields of the unnest operator state |
46 | void Reset(); |
47 | //! Set the longest list's length for the current row |
48 | void SetLongestListLength(); |
49 | }; |
50 | |
51 | void UnnestOperatorState::Reset() { |
52 | current_row = 0; |
53 | list_position = 0; |
54 | longest_list_length = DConstants::INVALID_INDEX; |
55 | first_fetch = true; |
56 | } |
57 | |
58 | void UnnestOperatorState::SetLongestListLength() { |
59 | longest_list_length = 0; |
60 | for (idx_t col_idx = 0; col_idx < list_data.ColumnCount(); col_idx++) { |
61 | |
62 | auto &vector_data = list_vector_data[col_idx]; |
63 | auto current_idx = vector_data.sel->get_index(idx: current_row); |
64 | |
65 | if (vector_data.validity.RowIsValid(row_idx: current_idx)) { |
66 | |
67 | // check if this list is longer |
68 | auto list_data_entries = UnifiedVectorFormat::GetData<list_entry_t>(format: vector_data); |
69 | auto list_entry = list_data_entries[current_idx]; |
70 | if (list_entry.length > longest_list_length) { |
71 | longest_list_length = list_entry.length; |
72 | } |
73 | } |
74 | } |
75 | } |
76 | |
77 | PhysicalUnnest::PhysicalUnnest(vector<LogicalType> types, vector<unique_ptr<Expression>> select_list, |
78 | idx_t estimated_cardinality, PhysicalOperatorType type) |
79 | : PhysicalOperator(type, std::move(types), estimated_cardinality), select_list(std::move(select_list)) { |
80 | D_ASSERT(!this->select_list.empty()); |
81 | } |
82 | |
83 | static void UnnestNull(idx_t start, idx_t end, Vector &result) { |
84 | |
85 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
86 | auto &validity = FlatVector::Validity(vector&: result); |
87 | for (idx_t i = start; i < end; i++) { |
88 | validity.SetInvalid(i); |
89 | } |
90 | if (result.GetType().InternalType() == PhysicalType::STRUCT) { |
91 | auto &struct_children = StructVector::GetEntries(vector&: result); |
92 | for (auto &child : struct_children) { |
93 | UnnestNull(start, end, result&: *child); |
94 | } |
95 | } |
96 | } |
97 | |
98 | template <class T> |
99 | static void TemplatedUnnest(UnifiedVectorFormat &vector_data, idx_t start, idx_t end, Vector &result) { |
100 | |
101 | auto source_data = UnifiedVectorFormat::GetData<T>(vector_data); |
102 | auto &source_mask = vector_data.validity; |
103 | |
104 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
105 | auto result_data = FlatVector::GetData<T>(result); |
106 | auto &result_mask = FlatVector::Validity(vector&: result); |
107 | |
108 | for (idx_t i = start; i < end; i++) { |
109 | auto source_idx = vector_data.sel->get_index(idx: i); |
110 | auto target_idx = i - start; |
111 | if (source_mask.RowIsValid(row_idx: source_idx)) { |
112 | result_data[target_idx] = source_data[source_idx]; |
113 | result_mask.SetValid(target_idx); |
114 | } else { |
115 | result_mask.SetInvalid(target_idx); |
116 | } |
117 | } |
118 | } |
119 | |
120 | static void UnnestValidity(UnifiedVectorFormat &vector_data, idx_t start, idx_t end, Vector &result) { |
121 | |
122 | auto &source_mask = vector_data.validity; |
123 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
124 | auto &result_mask = FlatVector::Validity(vector&: result); |
125 | |
126 | for (idx_t i = start; i < end; i++) { |
127 | auto source_idx = vector_data.sel->get_index(idx: i); |
128 | auto target_idx = i - start; |
129 | result_mask.Set(row_idx: target_idx, valid: source_mask.RowIsValid(row_idx: source_idx)); |
130 | } |
131 | } |
132 | |
133 | static void UnnestVector(UnifiedVectorFormat &child_vector_data, Vector &child_vector, idx_t list_size, idx_t start, |
134 | idx_t end, Vector &result) { |
135 | |
136 | D_ASSERT(child_vector.GetType() == result.GetType()); |
137 | switch (result.GetType().InternalType()) { |
138 | case PhysicalType::BOOL: |
139 | case PhysicalType::INT8: |
140 | TemplatedUnnest<int8_t>(vector_data&: child_vector_data, start, end, result); |
141 | break; |
142 | case PhysicalType::INT16: |
143 | TemplatedUnnest<int16_t>(vector_data&: child_vector_data, start, end, result); |
144 | break; |
145 | case PhysicalType::INT32: |
146 | TemplatedUnnest<int32_t>(vector_data&: child_vector_data, start, end, result); |
147 | break; |
148 | case PhysicalType::INT64: |
149 | TemplatedUnnest<int64_t>(vector_data&: child_vector_data, start, end, result); |
150 | break; |
151 | case PhysicalType::INT128: |
152 | TemplatedUnnest<hugeint_t>(vector_data&: child_vector_data, start, end, result); |
153 | break; |
154 | case PhysicalType::UINT8: |
155 | TemplatedUnnest<uint8_t>(vector_data&: child_vector_data, start, end, result); |
156 | break; |
157 | case PhysicalType::UINT16: |
158 | TemplatedUnnest<uint16_t>(vector_data&: child_vector_data, start, end, result); |
159 | break; |
160 | case PhysicalType::UINT32: |
161 | TemplatedUnnest<uint32_t>(vector_data&: child_vector_data, start, end, result); |
162 | break; |
163 | case PhysicalType::UINT64: |
164 | TemplatedUnnest<uint64_t>(vector_data&: child_vector_data, start, end, result); |
165 | break; |
166 | case PhysicalType::FLOAT: |
167 | TemplatedUnnest<float>(vector_data&: child_vector_data, start, end, result); |
168 | break; |
169 | case PhysicalType::DOUBLE: |
170 | TemplatedUnnest<double>(vector_data&: child_vector_data, start, end, result); |
171 | break; |
172 | case PhysicalType::INTERVAL: |
173 | TemplatedUnnest<interval_t>(vector_data&: child_vector_data, start, end, result); |
174 | break; |
175 | case PhysicalType::VARCHAR: |
176 | TemplatedUnnest<string_t>(vector_data&: child_vector_data, start, end, result); |
177 | break; |
178 | case PhysicalType::LIST: { |
179 | // the child vector of result now references the child vector source |
180 | // FIXME: only reference relevant children (start - end) instead of all |
181 | auto &target = ListVector::GetEntry(vector&: result); |
182 | target.Reference(other&: ListVector::GetEntry(vector&: child_vector)); |
183 | ListVector::SetListSize(vec&: result, size: ListVector::GetListSize(vector: child_vector)); |
184 | // unnest |
185 | TemplatedUnnest<list_entry_t>(vector_data&: child_vector_data, start, end, result); |
186 | break; |
187 | } |
188 | case PhysicalType::STRUCT: { |
189 | auto &child_vector_entries = StructVector::GetEntries(vector&: child_vector); |
190 | auto &result_entries = StructVector::GetEntries(vector&: result); |
191 | |
192 | // set the validity mask for the 'outer' struct vector before unnesting its children |
193 | UnnestValidity(vector_data&: child_vector_data, start, end, result); |
194 | |
195 | for (idx_t i = 0; i < child_vector_entries.size(); i++) { |
196 | UnifiedVectorFormat child_vector_entries_data; |
197 | child_vector_entries[i]->ToUnifiedFormat(count: list_size, data&: child_vector_entries_data); |
198 | UnnestVector(child_vector_data&: child_vector_entries_data, child_vector&: *child_vector_entries[i], list_size, start, end, |
199 | result&: *result_entries[i]); |
200 | } |
201 | break; |
202 | } |
203 | default: |
204 | throw InternalException("Unimplemented type for UNNEST." ); |
205 | } |
206 | } |
207 | |
208 | static void PrepareInput(UnnestOperatorState &state, DataChunk &input, |
209 | const vector<unique_ptr<Expression>> &select_list) { |
210 | |
211 | state.list_data.Reset(); |
212 | // execute the expressions inside each UNNEST in the select_list to get the list data |
213 | // execution results (lists) are kept in state.list_data chunk |
214 | state.executor.Execute(input, result&: state.list_data); |
215 | |
216 | // verify incoming lists |
217 | state.list_data.Verify(); |
218 | D_ASSERT(input.size() == state.list_data.size()); |
219 | D_ASSERT(state.list_data.ColumnCount() == select_list.size()); |
220 | D_ASSERT(state.list_vector_data.size() == state.list_data.ColumnCount()); |
221 | D_ASSERT(state.list_child_data.size() == state.list_data.ColumnCount()); |
222 | |
223 | // get the UnifiedVectorFormat of each list_data vector (LIST vectors for the different UNNESTs) |
224 | // both for the vector itself and its child vector |
225 | for (idx_t col_idx = 0; col_idx < state.list_data.ColumnCount(); col_idx++) { |
226 | |
227 | auto &list_vector = state.list_data.data[col_idx]; |
228 | list_vector.ToUnifiedFormat(count: state.list_data.size(), data&: state.list_vector_data[col_idx]); |
229 | |
230 | if (list_vector.GetType() == LogicalType::SQLNULL) { |
231 | // UNNEST(NULL): SQLNULL vectors don't have child vectors, but we need to point to the child vector of |
232 | // each vector, so we just get the UnifiedVectorFormat of the vector itself |
233 | auto &child_vector = list_vector; |
234 | child_vector.ToUnifiedFormat(count: 0, data&: state.list_child_data[col_idx]); |
235 | } else { |
236 | auto list_size = ListVector::GetListSize(vector: list_vector); |
237 | auto &child_vector = ListVector::GetEntry(vector&: list_vector); |
238 | child_vector.ToUnifiedFormat(count: list_size, data&: state.list_child_data[col_idx]); |
239 | } |
240 | } |
241 | |
242 | state.first_fetch = false; |
243 | } |
244 | |
245 | unique_ptr<OperatorState> PhysicalUnnest::GetOperatorState(ExecutionContext &context) const { |
246 | return PhysicalUnnest::GetState(context, select_list); |
247 | } |
248 | |
249 | unique_ptr<OperatorState> PhysicalUnnest::GetState(ExecutionContext &context, |
250 | const vector<unique_ptr<Expression>> &select_list) { |
251 | return make_uniq<UnnestOperatorState>(args&: context.client, args: select_list); |
252 | } |
253 | |
254 | OperatorResultType PhysicalUnnest::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
255 | OperatorState &state_p, |
256 | const vector<unique_ptr<Expression>> &select_list, |
257 | bool include_input) { |
258 | |
259 | auto &state = state_p.Cast<UnnestOperatorState>(); |
260 | |
261 | do { |
262 | // reset validities, if previous loop iteration contained UNNEST(NULL) |
263 | if (include_input) { |
264 | chunk.Reset(); |
265 | } |
266 | |
267 | // prepare the input data by executing any expressions and getting the |
268 | // UnifiedVectorFormat of each LIST vector (list_vector_data) and its child vector (list_child_data) |
269 | if (state.first_fetch) { |
270 | PrepareInput(state, input, select_list); |
271 | } |
272 | |
273 | // finished with all rows of this input chunk, reset |
274 | if (state.current_row >= input.size()) { |
275 | state.Reset(); |
276 | return OperatorResultType::NEED_MORE_INPUT; |
277 | } |
278 | |
279 | // each UNNEST in the select_list contains a list (or NULL) for this row, find the longest list |
280 | // because this length determines how many times we need to repeat for the current row |
281 | if (state.longest_list_length == DConstants::INVALID_INDEX) { |
282 | state.SetLongestListLength(); |
283 | } |
284 | D_ASSERT(state.longest_list_length != DConstants::INVALID_INDEX); |
285 | |
286 | // we emit chunks of either STANDARD_VECTOR_SIZE or smaller |
287 | auto this_chunk_len = MinValue<idx_t>(STANDARD_VECTOR_SIZE, b: state.longest_list_length - state.list_position); |
288 | chunk.SetCardinality(this_chunk_len); |
289 | |
290 | // if we include other projection input columns, e.g. SELECT 1, UNNEST([1, 2]);, then |
291 | // we need to add them as a constant vector to the resulting chunk |
292 | // FIXME: emit multiple unnested rows. Currently, we never emit a chunk containing multiple unnested input rows, |
293 | // so setting a constant vector for the value at state.current_row is fine |
294 | idx_t col_offset = 0; |
295 | if (include_input) { |
296 | for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) { |
297 | ConstantVector::Reference(vector&: chunk.data[col_idx], source&: input.data[col_idx], position: state.current_row, count: input.size()); |
298 | } |
299 | col_offset = input.ColumnCount(); |
300 | } |
301 | |
302 | // unnest the lists |
303 | for (idx_t col_idx = 0; col_idx < state.list_data.ColumnCount(); col_idx++) { |
304 | |
305 | auto &result_vector = chunk.data[col_idx + col_offset]; |
306 | |
307 | if (state.list_data.data[col_idx].GetType() == LogicalType::SQLNULL) { |
308 | // UNNEST(NULL) |
309 | chunk.SetCardinality(0); |
310 | break; |
311 | |
312 | } else { |
313 | |
314 | auto &vector_data = state.list_vector_data[col_idx]; |
315 | auto current_idx = vector_data.sel->get_index(idx: state.current_row); |
316 | |
317 | if (!vector_data.validity.RowIsValid(row_idx: current_idx)) { |
318 | UnnestNull(start: 0, end: this_chunk_len, result&: result_vector); |
319 | |
320 | } else { |
321 | |
322 | auto list_data = UnifiedVectorFormat::GetData<list_entry_t>(format: vector_data); |
323 | auto list_entry = list_data[current_idx]; |
324 | |
325 | idx_t list_count = 0; |
326 | if (state.list_position < list_entry.length) { |
327 | // there are still list_count elements to unnest |
328 | list_count = MinValue<idx_t>(a: this_chunk_len, b: list_entry.length - state.list_position); |
329 | |
330 | auto &list_vector = state.list_data.data[col_idx]; |
331 | auto &child_vector = ListVector::GetEntry(vector&: list_vector); |
332 | auto list_size = ListVector::GetListSize(vector: list_vector); |
333 | auto &child_vector_data = state.list_child_data[col_idx]; |
334 | |
335 | auto base_offset = list_entry.offset + state.list_position; |
336 | UnnestVector(child_vector_data, child_vector, list_size, start: base_offset, end: base_offset + list_count, |
337 | result&: result_vector); |
338 | } |
339 | |
340 | // fill the rest with NULLs |
341 | if (list_count != this_chunk_len) { |
342 | UnnestNull(start: list_count, end: this_chunk_len, result&: result_vector); |
343 | } |
344 | } |
345 | } |
346 | } |
347 | |
348 | chunk.Verify(); |
349 | |
350 | state.list_position += this_chunk_len; |
351 | if (state.list_position == state.longest_list_length) { |
352 | state.current_row++; |
353 | state.longest_list_length = DConstants::INVALID_INDEX; |
354 | state.list_position = 0; |
355 | } |
356 | |
357 | // we only emit one unnested row (that contains data) at a time |
358 | } while (chunk.size() == 0); |
359 | return OperatorResultType::HAVE_MORE_OUTPUT; |
360 | } |
361 | |
362 | OperatorResultType PhysicalUnnest::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
363 | GlobalOperatorState &, OperatorState &state) const { |
364 | return ExecuteInternal(context, input, chunk, state_p&: state, select_list); |
365 | } |
366 | |
367 | } // namespace duckdb |
368 | |