1 | #include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp" |
2 | |
3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
4 | #include "duckdb/execution/expression_executor.hpp" |
5 | #include "duckdb/execution/merge_join.hpp" |
6 | #include "duckdb/common/operator/comparison_operators.hpp" |
7 | |
8 | using namespace duckdb; |
9 | using namespace std; |
10 | |
11 | static void OrderVector(Vector &vector, idx_t count, MergeOrder &order); |
12 | |
13 | class PhysicalPiecewiseMergeJoinState : public PhysicalComparisonJoinState { |
14 | public: |
15 | PhysicalPiecewiseMergeJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions) |
16 | : PhysicalComparisonJoinState(left, right, conditions), initialized(false), left_position(0), right_position(0), |
17 | right_chunk_index(0), has_null(false) { |
18 | } |
19 | |
20 | bool initialized; |
21 | idx_t left_position; |
22 | idx_t right_position; |
23 | idx_t right_chunk_index; |
24 | DataChunk left_chunk; |
25 | DataChunk join_keys; |
26 | MergeOrder left_orders; |
27 | ChunkCollection right_chunks; |
28 | ChunkCollection right_conditions; |
29 | vector<MergeOrder> right_orders; |
30 | bool has_null; |
31 | }; |
32 | |
33 | PhysicalPiecewiseMergeJoin::PhysicalPiecewiseMergeJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
34 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, |
35 | JoinType join_type) |
36 | : PhysicalComparisonJoin(op, PhysicalOperatorType::PIECEWISE_MERGE_JOIN, move(cond), join_type) { |
37 | // for now we only support one condition! |
38 | assert(conditions.size() == 1); |
39 | for (auto &cond : conditions) { |
40 | // COMPARE NOT EQUAL not supported yet with merge join |
41 | assert(cond.comparison != ExpressionType::COMPARE_NOTEQUAL); |
42 | assert(cond.left->return_type == cond.right->return_type); |
43 | join_key_types.push_back(cond.left->return_type); |
44 | } |
45 | children.push_back(move(left)); |
46 | children.push_back(move(right)); |
47 | } |
48 | |
49 | void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, |
50 | PhysicalOperatorState *state_) { |
51 | auto state = reinterpret_cast<PhysicalPiecewiseMergeJoinState *>(state_); |
52 | assert(conditions.size() == 1); |
53 | if (!state->initialized) { |
54 | // create the sorted pieces |
55 | auto right_state = children[1]->GetOperatorState(); |
56 | auto types = children[1]->GetTypes(); |
57 | |
58 | DataChunk right_chunk; |
59 | right_chunk.Initialize(types); |
60 | state->join_keys.Initialize(join_key_types); |
61 | // first fetch the entire right side |
62 | while (true) { |
63 | children[1]->GetChunk(context, right_chunk, right_state.get()); |
64 | if (right_chunk.size() == 0) { |
65 | break; |
66 | } |
67 | // resolve the join keys for this chunk |
68 | state->rhs_executor.SetChunk(right_chunk); |
69 | |
70 | state->join_keys.Reset(); |
71 | state->join_keys.SetCardinality(right_chunk); |
72 | for (idx_t k = 0; k < conditions.size(); k++) { |
73 | // resolve the join key |
74 | state->rhs_executor.ExecuteExpression(k, state->join_keys.data[k]); |
75 | } |
76 | // append the join keys and the chunk to the chunk collection |
77 | state->right_chunks.Append(right_chunk); |
78 | state->right_conditions.Append(state->join_keys); |
79 | } |
80 | if (state->right_chunks.count == 0 && (type == JoinType::INNER || type == JoinType::SEMI)) { |
81 | // empty RHS with INNER or SEMI join means empty result set |
82 | return; |
83 | } |
84 | // now order all the chunks |
85 | state->right_orders.resize(state->right_conditions.chunks.size()); |
86 | for (idx_t i = 0; i < state->right_conditions.chunks.size(); i++) { |
87 | auto &chunk_to_order = *state->right_conditions.chunks[i]; |
88 | assert(chunk_to_order.column_count() == 1); |
89 | for (idx_t col_idx = 0; col_idx < chunk_to_order.column_count(); col_idx++) { |
90 | OrderVector(chunk_to_order.data[col_idx], chunk_to_order.size(), state->right_orders[i]); |
91 | if (state->right_orders[i].count < chunk_to_order.size()) { |
92 | // the amount of entries in the order vector is smaller than the amount of entries in the vector |
93 | // this only happens if there are NULL values in the right-hand side |
94 | // hence we set the has_null to true (this is required for the MARK join) |
95 | state->has_null = true; |
96 | } |
97 | } |
98 | } |
99 | state->right_chunk_index = state->right_orders.size(); |
100 | state->initialized = true; |
101 | } |
102 | |
103 | do { |
104 | // check if we have to fetch a child from the left side |
105 | if (state->right_chunk_index == state->right_orders.size()) { |
106 | // fetch the chunk from the left side |
107 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
108 | if (state->child_chunk.size() == 0) { |
109 | return; |
110 | } |
111 | |
112 | // resolve the join keys for the left chunk |
113 | state->join_keys.Reset(); |
114 | state->lhs_executor.SetChunk(state->child_chunk); |
115 | state->join_keys.SetCardinality(state->child_chunk); |
116 | for (idx_t k = 0; k < conditions.size(); k++) { |
117 | state->lhs_executor.ExecuteExpression(k, state->join_keys.data[k]); |
118 | // sort by join key |
119 | OrderVector(state->join_keys.data[k], state->join_keys.size(), state->left_orders); |
120 | } |
121 | state->right_chunk_index = 0; |
122 | state->left_position = 0; |
123 | state->right_position = 0; |
124 | } |
125 | |
126 | ScalarMergeInfo left_info(state->left_orders, state->join_keys.data[0].type, state->left_position); |
127 | |
128 | // first check if the join type is MARK, SEMI or ANTI |
129 | // in this case we loop over the entire right collection immediately |
130 | // because we can never return more than STANDARD_VECTOR_SIZE rows from a join |
131 | switch (type) { |
132 | case JoinType::MARK: { |
133 | // MARK join |
134 | if (state->right_chunks.count > 0) { |
135 | ChunkMergeInfo right_info(state->right_conditions, state->right_orders); |
136 | // first perform the MARK join |
137 | // this method uses the LHS to loop over the entire RHS looking for matches |
138 | MergeJoinMark::Perform(left_info, right_info, conditions[0].comparison); |
139 | // now construct the mark join result from the found matches |
140 | PhysicalJoin::ConstructMarkJoinResult(state->join_keys, state->child_chunk, chunk, |
141 | right_info.found_match, state->has_null); |
142 | } else { |
143 | // RHS empty: result is false for everything |
144 | chunk.Reference(state->child_chunk); |
145 | auto &mark_vector = chunk.data.back(); |
146 | mark_vector.vector_type = VectorType::CONSTANT_VECTOR; |
147 | mark_vector.SetValue(0, Value::BOOLEAN(false)); |
148 | } |
149 | state->right_chunk_index = state->right_orders.size(); |
150 | return; |
151 | } |
152 | default: |
153 | // INNER, LEFT OUTER, etc... join that can return >STANDARD_VECTOR_SIZE entries |
154 | break; |
155 | } |
156 | |
157 | // perform the actual merge join |
158 | auto &right_chunk = *state->right_chunks.chunks[state->right_chunk_index]; |
159 | auto &right_condition_chunk = *state->right_conditions.chunks[state->right_chunk_index]; |
160 | auto &right_orders = state->right_orders[state->right_chunk_index]; |
161 | |
162 | ScalarMergeInfo right(right_orders, right_condition_chunk.data[0].type, state->right_position); |
163 | // perform the merge join |
164 | switch (type) { |
165 | case JoinType::INNER: { |
166 | idx_t result_count = MergeJoinInner::Perform(left_info, right, conditions[0].comparison); |
167 | if (result_count == 0) { |
168 | // exhausted this chunk on the right side |
169 | // move to the next |
170 | state->right_chunk_index++; |
171 | state->left_position = 0; |
172 | state->right_position = 0; |
173 | } else { |
174 | chunk.Slice(state->child_chunk, left_info.result, result_count); |
175 | chunk.Slice(right_chunk, right.result, result_count, state->child_chunk.column_count()); |
176 | } |
177 | break; |
178 | } |
179 | default: |
180 | throw NotImplementedException("Unimplemented join type for merge join" ); |
181 | } |
182 | } while (chunk.size() == 0); |
183 | } |
184 | |
185 | unique_ptr<PhysicalOperatorState> PhysicalPiecewiseMergeJoin::GetOperatorState() { |
186 | return make_unique<PhysicalPiecewiseMergeJoinState>(children[0].get(), children[1].get(), conditions); |
187 | } |
188 | |
189 | template <class T, class OP> |
190 | static sel_t templated_quicksort_initial(T *data, const SelectionVector &sel, const SelectionVector ¬_null_sel, |
191 | idx_t count, SelectionVector &result) { |
192 | // select pivot |
193 | auto pivot_idx = not_null_sel.get_index(0); |
194 | auto dpivot_idx = sel.get_index(pivot_idx); |
195 | sel_t low = 0, high = count - 1; |
196 | // now insert elements |
197 | for (idx_t i = 1; i < count; i++) { |
198 | auto idx = not_null_sel.get_index(i); |
199 | auto didx = sel.get_index(idx); |
200 | if (OP::Operation(data[didx], data[dpivot_idx])) { |
201 | result.set_index(low++, idx); |
202 | } else { |
203 | result.set_index(high--, idx); |
204 | } |
205 | } |
206 | assert(low == high); |
207 | result.set_index(low, pivot_idx); |
208 | return low; |
209 | } |
210 | |
211 | template <class T, class OP> |
212 | static void templated_quicksort_inplace(T *data, const SelectionVector &sel, idx_t count, SelectionVector &result, |
213 | sel_t left, sel_t right) { |
214 | if (left >= right) { |
215 | return; |
216 | } |
217 | |
218 | sel_t middle = left + (right - left) / 2; |
219 | sel_t dpivot_idx = sel.get_index(result.get_index(middle)); |
220 | |
221 | // move the mid point value to the front. |
222 | sel_t i = left + 1; |
223 | sel_t j = right; |
224 | |
225 | result.swap(middle, left); |
226 | while (i <= j) { |
227 | while (i <= j && (OP::Operation(data[sel.get_index(result.get_index(i))], data[dpivot_idx]))) { |
228 | i++; |
229 | } |
230 | |
231 | while (i <= j && !OP::Operation(data[sel.get_index(result.get_index(j))], data[dpivot_idx])) { |
232 | j--; |
233 | } |
234 | |
235 | if (i < j) { |
236 | result.swap(i, j); |
237 | } |
238 | } |
239 | result.swap(i - 1, left); |
240 | sel_t part = i - 1; |
241 | |
242 | if (part > 0) { |
243 | templated_quicksort_inplace<T, OP>(data, sel, count, result, left, part - 1); |
244 | } |
245 | templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, right); |
246 | } |
247 | |
248 | template <class T, class OP> |
249 | void templated_quicksort(T *__restrict data, const SelectionVector &sel, const SelectionVector ¬_null_sel, |
250 | idx_t count, SelectionVector &result) { |
251 | auto part = templated_quicksort_initial<T, OP>(data, sel, not_null_sel, count, result); |
252 | if (part > count) { |
253 | return; |
254 | } |
255 | templated_quicksort_inplace<T, OP>(data, sel, count, result, 0, part); |
256 | templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, count - 1); |
257 | } |
258 | |
259 | template <class T> |
260 | static void templated_quicksort(VectorData &vdata, const SelectionVector ¬_null_sel, idx_t not_null_count, |
261 | SelectionVector &result) { |
262 | if (not_null_count == 0) { |
263 | return; |
264 | } |
265 | templated_quicksort<T, duckdb::LessThanEquals>((T *)vdata.data, *vdata.sel, not_null_sel, not_null_count, result); |
266 | } |
267 | |
268 | void OrderVector(Vector &vector, idx_t count, MergeOrder &order) { |
269 | if (count == 0) { |
270 | order.count = 0; |
271 | return; |
272 | } |
273 | vector.Orrify(count, order.vdata); |
274 | auto &vdata = order.vdata; |
275 | |
276 | // first filter out all the non-null values |
277 | SelectionVector not_null(STANDARD_VECTOR_SIZE); |
278 | idx_t not_null_count = 0; |
279 | for (idx_t i = 0; i < count; i++) { |
280 | auto idx = vdata.sel->get_index(i); |
281 | if (!(*vdata.nullmask)[idx]) { |
282 | not_null.set_index(not_null_count++, i); |
283 | } |
284 | } |
285 | |
286 | order.count = not_null_count; |
287 | order.order.Initialize(STANDARD_VECTOR_SIZE); |
288 | switch (vector.type) { |
289 | case TypeId::BOOL: |
290 | case TypeId::INT8: |
291 | templated_quicksort<int8_t>(vdata, not_null, not_null_count, order.order); |
292 | break; |
293 | case TypeId::INT16: |
294 | templated_quicksort<int16_t>(vdata, not_null, not_null_count, order.order); |
295 | break; |
296 | case TypeId::INT32: |
297 | templated_quicksort<int32_t>(vdata, not_null, not_null_count, order.order); |
298 | break; |
299 | case TypeId::INT64: |
300 | templated_quicksort<int64_t>(vdata, not_null, not_null_count, order.order); |
301 | break; |
302 | case TypeId::FLOAT: |
303 | templated_quicksort<float>(vdata, not_null, not_null_count, order.order); |
304 | break; |
305 | case TypeId::DOUBLE: |
306 | templated_quicksort<double>(vdata, not_null, not_null_count, order.order); |
307 | break; |
308 | case TypeId::VARCHAR: |
309 | templated_quicksort<string_t>(vdata, not_null, not_null_count, order.order); |
310 | break; |
311 | default: |
312 | throw NotImplementedException("Unimplemented type for sort" ); |
313 | } |
314 | } |
315 | |