1#include "duckdb/execution/operator/join/physical_piecewise_merge_join.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/execution/expression_executor.hpp"
5#include "duckdb/execution/merge_join.hpp"
6#include "duckdb/common/operator/comparison_operators.hpp"
7
8using namespace duckdb;
9using namespace std;
10
11static void OrderVector(Vector &vector, idx_t count, MergeOrder &order);
12
13class PhysicalPiecewiseMergeJoinState : public PhysicalComparisonJoinState {
14public:
15 PhysicalPiecewiseMergeJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions)
16 : PhysicalComparisonJoinState(left, right, conditions), initialized(false), left_position(0), right_position(0),
17 right_chunk_index(0), has_null(false) {
18 }
19
20 bool initialized;
21 idx_t left_position;
22 idx_t right_position;
23 idx_t right_chunk_index;
24 DataChunk left_chunk;
25 DataChunk join_keys;
26 MergeOrder left_orders;
27 ChunkCollection right_chunks;
28 ChunkCollection right_conditions;
29 vector<MergeOrder> right_orders;
30 bool has_null;
31};
32
33PhysicalPiecewiseMergeJoin::PhysicalPiecewiseMergeJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left,
34 unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond,
35 JoinType join_type)
36 : PhysicalComparisonJoin(op, PhysicalOperatorType::PIECEWISE_MERGE_JOIN, move(cond), join_type) {
37 // for now we only support one condition!
38 assert(conditions.size() == 1);
39 for (auto &cond : conditions) {
40 // COMPARE NOT EQUAL not supported yet with merge join
41 assert(cond.comparison != ExpressionType::COMPARE_NOTEQUAL);
42 assert(cond.left->return_type == cond.right->return_type);
43 join_key_types.push_back(cond.left->return_type);
44 }
45 children.push_back(move(left));
46 children.push_back(move(right));
47}
48
49void PhysicalPiecewiseMergeJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk,
50 PhysicalOperatorState *state_) {
51 auto state = reinterpret_cast<PhysicalPiecewiseMergeJoinState *>(state_);
52 assert(conditions.size() == 1);
53 if (!state->initialized) {
54 // create the sorted pieces
55 auto right_state = children[1]->GetOperatorState();
56 auto types = children[1]->GetTypes();
57
58 DataChunk right_chunk;
59 right_chunk.Initialize(types);
60 state->join_keys.Initialize(join_key_types);
61 // first fetch the entire right side
62 while (true) {
63 children[1]->GetChunk(context, right_chunk, right_state.get());
64 if (right_chunk.size() == 0) {
65 break;
66 }
67 // resolve the join keys for this chunk
68 state->rhs_executor.SetChunk(right_chunk);
69
70 state->join_keys.Reset();
71 state->join_keys.SetCardinality(right_chunk);
72 for (idx_t k = 0; k < conditions.size(); k++) {
73 // resolve the join key
74 state->rhs_executor.ExecuteExpression(k, state->join_keys.data[k]);
75 }
76 // append the join keys and the chunk to the chunk collection
77 state->right_chunks.Append(right_chunk);
78 state->right_conditions.Append(state->join_keys);
79 }
80 if (state->right_chunks.count == 0 && (type == JoinType::INNER || type == JoinType::SEMI)) {
81 // empty RHS with INNER or SEMI join means empty result set
82 return;
83 }
84 // now order all the chunks
85 state->right_orders.resize(state->right_conditions.chunks.size());
86 for (idx_t i = 0; i < state->right_conditions.chunks.size(); i++) {
87 auto &chunk_to_order = *state->right_conditions.chunks[i];
88 assert(chunk_to_order.column_count() == 1);
89 for (idx_t col_idx = 0; col_idx < chunk_to_order.column_count(); col_idx++) {
90 OrderVector(chunk_to_order.data[col_idx], chunk_to_order.size(), state->right_orders[i]);
91 if (state->right_orders[i].count < chunk_to_order.size()) {
92 // the amount of entries in the order vector is smaller than the amount of entries in the vector
93 // this only happens if there are NULL values in the right-hand side
94 // hence we set the has_null to true (this is required for the MARK join)
95 state->has_null = true;
96 }
97 }
98 }
99 state->right_chunk_index = state->right_orders.size();
100 state->initialized = true;
101 }
102
103 do {
104 // check if we have to fetch a child from the left side
105 if (state->right_chunk_index == state->right_orders.size()) {
106 // fetch the chunk from the left side
107 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
108 if (state->child_chunk.size() == 0) {
109 return;
110 }
111
112 // resolve the join keys for the left chunk
113 state->join_keys.Reset();
114 state->lhs_executor.SetChunk(state->child_chunk);
115 state->join_keys.SetCardinality(state->child_chunk);
116 for (idx_t k = 0; k < conditions.size(); k++) {
117 state->lhs_executor.ExecuteExpression(k, state->join_keys.data[k]);
118 // sort by join key
119 OrderVector(state->join_keys.data[k], state->join_keys.size(), state->left_orders);
120 }
121 state->right_chunk_index = 0;
122 state->left_position = 0;
123 state->right_position = 0;
124 }
125
126 ScalarMergeInfo left_info(state->left_orders, state->join_keys.data[0].type, state->left_position);
127
128 // first check if the join type is MARK, SEMI or ANTI
129 // in this case we loop over the entire right collection immediately
130 // because we can never return more than STANDARD_VECTOR_SIZE rows from a join
131 switch (type) {
132 case JoinType::MARK: {
133 // MARK join
134 if (state->right_chunks.count > 0) {
135 ChunkMergeInfo right_info(state->right_conditions, state->right_orders);
136 // first perform the MARK join
137 // this method uses the LHS to loop over the entire RHS looking for matches
138 MergeJoinMark::Perform(left_info, right_info, conditions[0].comparison);
139 // now construct the mark join result from the found matches
140 PhysicalJoin::ConstructMarkJoinResult(state->join_keys, state->child_chunk, chunk,
141 right_info.found_match, state->has_null);
142 } else {
143 // RHS empty: result is false for everything
144 chunk.Reference(state->child_chunk);
145 auto &mark_vector = chunk.data.back();
146 mark_vector.vector_type = VectorType::CONSTANT_VECTOR;
147 mark_vector.SetValue(0, Value::BOOLEAN(false));
148 }
149 state->right_chunk_index = state->right_orders.size();
150 return;
151 }
152 default:
153 // INNER, LEFT OUTER, etc... join that can return >STANDARD_VECTOR_SIZE entries
154 break;
155 }
156
157 // perform the actual merge join
158 auto &right_chunk = *state->right_chunks.chunks[state->right_chunk_index];
159 auto &right_condition_chunk = *state->right_conditions.chunks[state->right_chunk_index];
160 auto &right_orders = state->right_orders[state->right_chunk_index];
161
162 ScalarMergeInfo right(right_orders, right_condition_chunk.data[0].type, state->right_position);
163 // perform the merge join
164 switch (type) {
165 case JoinType::INNER: {
166 idx_t result_count = MergeJoinInner::Perform(left_info, right, conditions[0].comparison);
167 if (result_count == 0) {
168 // exhausted this chunk on the right side
169 // move to the next
170 state->right_chunk_index++;
171 state->left_position = 0;
172 state->right_position = 0;
173 } else {
174 chunk.Slice(state->child_chunk, left_info.result, result_count);
175 chunk.Slice(right_chunk, right.result, result_count, state->child_chunk.column_count());
176 }
177 break;
178 }
179 default:
180 throw NotImplementedException("Unimplemented join type for merge join");
181 }
182 } while (chunk.size() == 0);
183}
184
185unique_ptr<PhysicalOperatorState> PhysicalPiecewiseMergeJoin::GetOperatorState() {
186 return make_unique<PhysicalPiecewiseMergeJoinState>(children[0].get(), children[1].get(), conditions);
187}
188
189template <class T, class OP>
190static sel_t templated_quicksort_initial(T *data, const SelectionVector &sel, const SelectionVector &not_null_sel,
191 idx_t count, SelectionVector &result) {
192 // select pivot
193 auto pivot_idx = not_null_sel.get_index(0);
194 auto dpivot_idx = sel.get_index(pivot_idx);
195 sel_t low = 0, high = count - 1;
196 // now insert elements
197 for (idx_t i = 1; i < count; i++) {
198 auto idx = not_null_sel.get_index(i);
199 auto didx = sel.get_index(idx);
200 if (OP::Operation(data[didx], data[dpivot_idx])) {
201 result.set_index(low++, idx);
202 } else {
203 result.set_index(high--, idx);
204 }
205 }
206 assert(low == high);
207 result.set_index(low, pivot_idx);
208 return low;
209}
210
211template <class T, class OP>
212static void templated_quicksort_inplace(T *data, const SelectionVector &sel, idx_t count, SelectionVector &result,
213 sel_t left, sel_t right) {
214 if (left >= right) {
215 return;
216 }
217
218 sel_t middle = left + (right - left) / 2;
219 sel_t dpivot_idx = sel.get_index(result.get_index(middle));
220
221 // move the mid point value to the front.
222 sel_t i = left + 1;
223 sel_t j = right;
224
225 result.swap(middle, left);
226 while (i <= j) {
227 while (i <= j && (OP::Operation(data[sel.get_index(result.get_index(i))], data[dpivot_idx]))) {
228 i++;
229 }
230
231 while (i <= j && !OP::Operation(data[sel.get_index(result.get_index(j))], data[dpivot_idx])) {
232 j--;
233 }
234
235 if (i < j) {
236 result.swap(i, j);
237 }
238 }
239 result.swap(i - 1, left);
240 sel_t part = i - 1;
241
242 if (part > 0) {
243 templated_quicksort_inplace<T, OP>(data, sel, count, result, left, part - 1);
244 }
245 templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, right);
246}
247
248template <class T, class OP>
249void templated_quicksort(T *__restrict data, const SelectionVector &sel, const SelectionVector &not_null_sel,
250 idx_t count, SelectionVector &result) {
251 auto part = templated_quicksort_initial<T, OP>(data, sel, not_null_sel, count, result);
252 if (part > count) {
253 return;
254 }
255 templated_quicksort_inplace<T, OP>(data, sel, count, result, 0, part);
256 templated_quicksort_inplace<T, OP>(data, sel, count, result, part + 1, count - 1);
257}
258
259template <class T>
260static void templated_quicksort(VectorData &vdata, const SelectionVector &not_null_sel, idx_t not_null_count,
261 SelectionVector &result) {
262 if (not_null_count == 0) {
263 return;
264 }
265 templated_quicksort<T, duckdb::LessThanEquals>((T *)vdata.data, *vdata.sel, not_null_sel, not_null_count, result);
266}
267
268void OrderVector(Vector &vector, idx_t count, MergeOrder &order) {
269 if (count == 0) {
270 order.count = 0;
271 return;
272 }
273 vector.Orrify(count, order.vdata);
274 auto &vdata = order.vdata;
275
276 // first filter out all the non-null values
277 SelectionVector not_null(STANDARD_VECTOR_SIZE);
278 idx_t not_null_count = 0;
279 for (idx_t i = 0; i < count; i++) {
280 auto idx = vdata.sel->get_index(i);
281 if (!(*vdata.nullmask)[idx]) {
282 not_null.set_index(not_null_count++, i);
283 }
284 }
285
286 order.count = not_null_count;
287 order.order.Initialize(STANDARD_VECTOR_SIZE);
288 switch (vector.type) {
289 case TypeId::BOOL:
290 case TypeId::INT8:
291 templated_quicksort<int8_t>(vdata, not_null, not_null_count, order.order);
292 break;
293 case TypeId::INT16:
294 templated_quicksort<int16_t>(vdata, not_null, not_null_count, order.order);
295 break;
296 case TypeId::INT32:
297 templated_quicksort<int32_t>(vdata, not_null, not_null_count, order.order);
298 break;
299 case TypeId::INT64:
300 templated_quicksort<int64_t>(vdata, not_null, not_null_count, order.order);
301 break;
302 case TypeId::FLOAT:
303 templated_quicksort<float>(vdata, not_null, not_null_count, order.order);
304 break;
305 case TypeId::DOUBLE:
306 templated_quicksort<double>(vdata, not_null, not_null_count, order.order);
307 break;
308 case TypeId::VARCHAR:
309 templated_quicksort<string_t>(vdata, not_null, not_null_count, order.order);
310 break;
311 default:
312 throw NotImplementedException("Unimplemented type for sort");
313 }
314}
315