1 | #include "duckdb/execution/operator/join/physical_hash_join.hpp" |
2 | |
3 | #include "duckdb/storage/storage_manager.hpp" |
4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
5 | #include "duckdb/execution/expression_executor.hpp" |
6 | #include "duckdb/storage/buffer_manager.hpp" |
7 | |
8 | using namespace duckdb; |
9 | using namespace std; |
10 | |
11 | class PhysicalHashJoinState : public PhysicalComparisonJoinState { |
12 | public: |
13 | PhysicalHashJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions) |
14 | : PhysicalComparisonJoinState(left, right, conditions), initialized(false) { |
15 | } |
16 | |
17 | bool initialized; |
18 | DataChunk cached_chunk; |
19 | DataChunk join_keys; |
20 | unique_ptr<JoinHashTable::ScanStructure> scan_structure; |
21 | }; |
22 | |
23 | PhysicalHashJoin::PhysicalHashJoin(ClientContext &context, LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
24 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type, |
25 | vector<idx_t> left_projection_map, vector<idx_t> right_projection_map) |
26 | : PhysicalComparisonJoin(op, PhysicalOperatorType::HASH_JOIN, move(cond), join_type), |
27 | right_projection_map(right_projection_map) { |
28 | children.push_back(move(left)); |
29 | children.push_back(move(right)); |
30 | |
31 | assert(left_projection_map.size() == 0); |
32 | |
33 | hash_table = |
34 | make_unique<JoinHashTable>(BufferManager::GetBufferManager(context), conditions, |
35 | LogicalOperator::MapTypes(children[1]->GetTypes(), right_projection_map), type); |
36 | } |
37 | |
38 | PhysicalHashJoin::PhysicalHashJoin(ClientContext &context, LogicalOperator &op, unique_ptr<PhysicalOperator> left, |
39 | unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type) |
40 | : PhysicalHashJoin(context, op, move(left), move(right), move(cond), join_type, {}, {}) { |
41 | } |
42 | |
43 | void PhysicalHashJoin::BuildHashTable(ClientContext &context, PhysicalOperatorState *state_) { |
44 | auto state = reinterpret_cast<PhysicalHashJoinState *>(state_); |
45 | |
46 | // build the HT |
47 | auto right_state = children[1]->GetOperatorState(); |
48 | auto types = children[1]->GetTypes(); |
49 | |
50 | DataChunk right_chunk, build_chunk; |
51 | right_chunk.Initialize(types); |
52 | |
53 | if (right_projection_map.size() > 0) { |
54 | build_chunk.Initialize(hash_table->build_types); |
55 | } |
56 | |
57 | state->join_keys.Initialize(hash_table->condition_types); |
58 | while (true) { |
59 | // get the child chunk |
60 | children[1]->GetChunk(context, right_chunk, right_state.get()); |
61 | if (right_chunk.size() == 0) { |
62 | break; |
63 | } |
64 | // resolve the join keys for the right chunk |
65 | state->rhs_executor.Execute(right_chunk, state->join_keys); |
66 | // build the HT |
67 | if (right_projection_map.size() > 0) { |
68 | // there is a projection map: fill the build chunk with the projected columns |
69 | build_chunk.Reset(); |
70 | build_chunk.SetCardinality(right_chunk); |
71 | for (idx_t i = 0; i < right_projection_map.size(); i++) { |
72 | build_chunk.data[i].Reference(right_chunk.data[right_projection_map[i]]); |
73 | } |
74 | hash_table->Build(state->join_keys, build_chunk); |
75 | } else { |
76 | // there is not a projected map: place the entire right chunk in the HT |
77 | hash_table->Build(state->join_keys, right_chunk); |
78 | } |
79 | } |
80 | hash_table->Finalize(); |
81 | } |
82 | |
83 | void PhysicalHashJoin::ProbeHashTable(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
84 | auto state = reinterpret_cast<PhysicalHashJoinState *>(state_); |
85 | if (state->child_chunk.size() > 0 && state->scan_structure) { |
86 | // still have elements remaining from the previous probe (i.e. we got |
87 | // >1024 elements in the previous probe) |
88 | state->scan_structure->Next(state->join_keys, state->child_chunk, chunk); |
89 | if (chunk.size() > 0) { |
90 | return; |
91 | } |
92 | state->scan_structure = nullptr; |
93 | } |
94 | |
95 | // probe the HT |
96 | do { |
97 | // fetch the chunk from the left side |
98 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
99 | if (state->child_chunk.size() == 0) { |
100 | return; |
101 | } |
102 | // remove any selection vectors |
103 | if (hash_table->size() == 0) { |
104 | // empty hash table, special case |
105 | if (hash_table->join_type == JoinType::ANTI) { |
106 | // anti join with empty hash table, NOP join |
107 | // return the input |
108 | assert(chunk.column_count() == state->child_chunk.column_count()); |
109 | chunk.Reference(state->child_chunk); |
110 | return; |
111 | } else if (hash_table->join_type == JoinType::MARK) { |
112 | // MARK join with empty hash table |
113 | assert(hash_table->join_type == JoinType::MARK); |
114 | assert(chunk.column_count() == state->child_chunk.column_count() + 1); |
115 | auto &result_vector = chunk.data.back(); |
116 | assert(result_vector.type == TypeId::BOOL); |
117 | // for every data vector, we just reference the child chunk |
118 | chunk.SetCardinality(state->child_chunk); |
119 | for (idx_t i = 0; i < state->child_chunk.column_count(); i++) { |
120 | chunk.data[i].Reference(state->child_chunk.data[i]); |
121 | } |
122 | // for the MARK vector: |
123 | // if the HT has no NULL values (i.e. empty result set), return a vector that has false for every input |
124 | // entry if the HT has NULL values (i.e. result set had values, but all were NULL), return a vector that |
125 | // has NULL for every input entry |
126 | if (!hash_table->has_null) { |
127 | auto bool_result = FlatVector::GetData<bool>(result_vector); |
128 | for (idx_t i = 0; i < chunk.size(); i++) { |
129 | bool_result[i] = false; |
130 | } |
131 | } else { |
132 | FlatVector::Nullmask(result_vector).set(); |
133 | } |
134 | return; |
135 | } else if (hash_table->join_type == JoinType::LEFT || hash_table->join_type == JoinType::OUTER || |
136 | hash_table->join_type == JoinType::SINGLE) { |
137 | // LEFT/FULL OUTER/SINGLE join and build side is empty |
138 | // for the LHS we reference the data |
139 | chunk.SetCardinality(state->child_chunk.size()); |
140 | for (idx_t i = 0; i < state->child_chunk.column_count(); i++) { |
141 | chunk.data[i].Reference(state->child_chunk.data[i]); |
142 | } |
143 | // for the RHS |
144 | for (idx_t k = state->child_chunk.column_count(); k < chunk.column_count(); k++) { |
145 | chunk.data[k].vector_type = VectorType::CONSTANT_VECTOR; |
146 | ConstantVector::SetNull(chunk.data[k], true); |
147 | } |
148 | return; |
149 | } |
150 | } |
151 | // resolve the join keys for the left chunk |
152 | state->lhs_executor.Execute(state->child_chunk, state->join_keys); |
153 | |
154 | // perform the actual probe |
155 | state->scan_structure = hash_table->Probe(state->join_keys); |
156 | state->scan_structure->Next(state->join_keys, state->child_chunk, chunk); |
157 | } while (chunk.size() == 0); |
158 | } |
159 | |
160 | unique_ptr<PhysicalOperatorState> PhysicalHashJoin::GetOperatorState() { |
161 | return make_unique<PhysicalHashJoinState>(children[0].get(), children[1].get(), conditions); |
162 | } |
163 | |
164 | void PhysicalHashJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
165 | auto state = reinterpret_cast<PhysicalHashJoinState *>(state_); |
166 | if (!state->initialized) { |
167 | state->cached_chunk.Initialize(types); |
168 | BuildHashTable(context, state_); |
169 | state->initialized = true; |
170 | |
171 | if (hash_table->size() == 0 && |
172 | (hash_table->join_type == JoinType::INNER || hash_table->join_type == JoinType::SEMI)) { |
173 | // empty hash table with INNER or SEMI join means empty result set |
174 | return; |
175 | } |
176 | } |
177 | do { |
178 | ProbeHashTable(context, chunk, state); |
179 | #if STANDARD_VECTOR_SIZE >= 128 |
180 | if (chunk.size() == 0) { |
181 | if (state->cached_chunk.size() > 0) { |
182 | // finished probing but cached data remains, return cached chunk |
183 | chunk.Reference(state->cached_chunk); |
184 | state->cached_chunk.Reset(); |
185 | } |
186 | return; |
187 | } else if (chunk.size() < 64) { |
188 | // small chunk: add it to chunk cache and continue |
189 | state->cached_chunk.Append(chunk); |
190 | if (state->cached_chunk.size() >= (STANDARD_VECTOR_SIZE - 64)) { |
191 | // chunk cache full: return it |
192 | chunk.Reference(state->cached_chunk); |
193 | state->cached_chunk.Reset(); |
194 | return; |
195 | } else { |
196 | // chunk cache not full: probe again |
197 | chunk.Reset(); |
198 | } |
199 | } else { |
200 | return; |
201 | } |
202 | #else |
203 | return; |
204 | #endif |
205 | } while (true); |
206 | } |
207 | |