1#include "duckdb/execution/operator/projection/physical_unnest.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/execution/expression_executor.hpp"
5#include "duckdb/planner/expression/bound_reference_expression.hpp"
6#include "duckdb/planner/expression/bound_unnest_expression.hpp"
7
8using namespace duckdb;
9using namespace std;
10
11//! The operator state of the window
12class PhysicalUnnestOperatorState : public PhysicalOperatorState {
13public:
14 PhysicalUnnestOperatorState(PhysicalOperator *child)
15 : PhysicalOperatorState(child), parent_position(0), list_position(0), list_length(-1) {
16 }
17
18 idx_t parent_position;
19 idx_t list_position;
20 int64_t list_length = -1;
21
22 DataChunk list_data;
23};
24
25// this implements a sorted window functions variant
26PhysicalUnnest::PhysicalUnnest(LogicalOperator &op, vector<unique_ptr<Expression>> select_list,
27 PhysicalOperatorType type)
28 : PhysicalOperator(type, op.types), select_list(std::move(select_list)) {
29
30 assert(this->select_list.size() > 0);
31}
32
33void PhysicalUnnest::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
34 auto state = reinterpret_cast<PhysicalUnnestOperatorState *>(state_);
35 while (true) { // repeat until we actually have produced some rows
36 if (state->child_chunk.size() == 0 || state->parent_position >= state->child_chunk.size()) {
37 // get the child data
38 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
39 if (state->child_chunk.size() == 0) {
40 return;
41 }
42 state->parent_position = 0;
43 state->list_position = 0;
44 state->list_length = -1;
45
46 // get the list data to unnest
47 ExpressionExecutor executor;
48 vector<TypeId> list_data_types;
49 for (auto &exp : select_list) {
50 assert(exp->type == ExpressionType::BOUND_UNNEST);
51 auto bue = (BoundUnnestExpression *)exp.get();
52 list_data_types.push_back(bue->child->return_type);
53 executor.AddExpression(*bue->child.get());
54 }
55 state->list_data.Destroy();
56 state->list_data.Initialize(list_data_types);
57 executor.Execute(state->child_chunk, state->list_data);
58
59 // paranoia aplenty
60 state->child_chunk.Verify();
61 state->list_data.Verify();
62 assert(state->child_chunk.size() == state->list_data.size());
63 assert(state->list_data.column_count() == select_list.size());
64 }
65
66 // need to figure out how many times we need to repeat for current row
67 if (state->list_length < 0) {
68 for (idx_t col_idx = 0; col_idx < state->list_data.column_count(); col_idx++) {
69 auto &v = state->list_data.data[col_idx];
70
71 assert(v.type == TypeId::LIST);
72 // TODO deal with NULL values here!
73 auto list_data = FlatVector::GetData<list_entry_t>(v);
74 auto list_entry = list_data[state->parent_position];
75 if ((int64_t)list_entry.length > state->list_length) {
76 state->list_length = list_entry.length;
77 }
78 }
79 }
80
81 assert(state->list_length >= 0);
82
83 auto this_chunk_len = min((idx_t)STANDARD_VECTOR_SIZE, state->list_length - state->list_position);
84
85 // first cols are from child, last n cols from unnest
86 chunk.SetCardinality(this_chunk_len);
87
88 for (idx_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) {
89 auto val = state->child_chunk.data[col_idx].GetValue(state->parent_position);
90 chunk.data[col_idx].Reference(val);
91 }
92
93 // FIXME do not use GetValue/SetValue here
94 // TODO now that list entries are chunk collections, simply scan them!
95 for (idx_t col_idx = 0; col_idx < state->list_data.column_count(); col_idx++) {
96 auto target_col = col_idx + state->child_chunk.column_count();
97 auto &v = state->list_data.data[col_idx];
98 auto list_data = FlatVector::GetData<list_entry_t>(v);
99 auto list_entry = list_data[state->parent_position];
100 auto &child_cc = ListVector::GetEntry(v);
101
102 idx_t i = 0;
103 if (list_entry.length > state->list_position) {
104 for (i = 0; i < min((idx_t)this_chunk_len, list_entry.length - state->list_position); i++) {
105 chunk.data[target_col].SetValue(i,
106 child_cc.GetValue(0, list_entry.offset + i + state->list_position));
107 }
108 }
109 for (; i < (idx_t)this_chunk_len; i++) {
110 chunk.data[target_col].SetValue(i, Value());
111 }
112 }
113
114 state->list_position += this_chunk_len;
115 if ((int64_t)state->list_position == state->list_length) {
116 state->parent_position++;
117 state->list_length = -1;
118 state->list_position = 0;
119 }
120
121 chunk.Verify();
122 if (chunk.size() > 0) {
123 return;
124 }
125 }
126}
127
128unique_ptr<PhysicalOperatorState> PhysicalUnnest::GetOperatorState() {
129 return make_unique<PhysicalUnnestOperatorState>(children[0].get());
130}
131