1#include "duckdb/execution/operator/join/physical_hash_join.hpp"
2
3#include "duckdb/storage/storage_manager.hpp"
4#include "duckdb/common/vector_operations/vector_operations.hpp"
5#include "duckdb/execution/expression_executor.hpp"
6#include "duckdb/storage/buffer_manager.hpp"
7
8using namespace duckdb;
9using namespace std;
10
11class PhysicalHashJoinState : public PhysicalComparisonJoinState {
12public:
13 PhysicalHashJoinState(PhysicalOperator *left, PhysicalOperator *right, vector<JoinCondition> &conditions)
14 : PhysicalComparisonJoinState(left, right, conditions), initialized(false) {
15 }
16
17 bool initialized;
18 DataChunk cached_chunk;
19 DataChunk join_keys;
20 unique_ptr<JoinHashTable::ScanStructure> scan_structure;
21};
22
23PhysicalHashJoin::PhysicalHashJoin(ClientContext &context, LogicalOperator &op, unique_ptr<PhysicalOperator> left,
24 unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type,
25 vector<idx_t> left_projection_map, vector<idx_t> right_projection_map)
26 : PhysicalComparisonJoin(op, PhysicalOperatorType::HASH_JOIN, move(cond), join_type),
27 right_projection_map(right_projection_map) {
28 children.push_back(move(left));
29 children.push_back(move(right));
30
31 assert(left_projection_map.size() == 0);
32
33 hash_table =
34 make_unique<JoinHashTable>(BufferManager::GetBufferManager(context), conditions,
35 LogicalOperator::MapTypes(children[1]->GetTypes(), right_projection_map), type);
36}
37
38PhysicalHashJoin::PhysicalHashJoin(ClientContext &context, LogicalOperator &op, unique_ptr<PhysicalOperator> left,
39 unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type)
40 : PhysicalHashJoin(context, op, move(left), move(right), move(cond), join_type, {}, {}) {
41}
42
43void PhysicalHashJoin::BuildHashTable(ClientContext &context, PhysicalOperatorState *state_) {
44 auto state = reinterpret_cast<PhysicalHashJoinState *>(state_);
45
46 // build the HT
47 auto right_state = children[1]->GetOperatorState();
48 auto types = children[1]->GetTypes();
49
50 DataChunk right_chunk, build_chunk;
51 right_chunk.Initialize(types);
52
53 if (right_projection_map.size() > 0) {
54 build_chunk.Initialize(hash_table->build_types);
55 }
56
57 state->join_keys.Initialize(hash_table->condition_types);
58 while (true) {
59 // get the child chunk
60 children[1]->GetChunk(context, right_chunk, right_state.get());
61 if (right_chunk.size() == 0) {
62 break;
63 }
64 // resolve the join keys for the right chunk
65 state->rhs_executor.Execute(right_chunk, state->join_keys);
66 // build the HT
67 if (right_projection_map.size() > 0) {
68 // there is a projection map: fill the build chunk with the projected columns
69 build_chunk.Reset();
70 build_chunk.SetCardinality(right_chunk);
71 for (idx_t i = 0; i < right_projection_map.size(); i++) {
72 build_chunk.data[i].Reference(right_chunk.data[right_projection_map[i]]);
73 }
74 hash_table->Build(state->join_keys, build_chunk);
75 } else {
76 // there is not a projected map: place the entire right chunk in the HT
77 hash_table->Build(state->join_keys, right_chunk);
78 }
79 }
80 hash_table->Finalize();
81}
82
83void PhysicalHashJoin::ProbeHashTable(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
84 auto state = reinterpret_cast<PhysicalHashJoinState *>(state_);
85 if (state->child_chunk.size() > 0 && state->scan_structure) {
86 // still have elements remaining from the previous probe (i.e. we got
87 // >1024 elements in the previous probe)
88 state->scan_structure->Next(state->join_keys, state->child_chunk, chunk);
89 if (chunk.size() > 0) {
90 return;
91 }
92 state->scan_structure = nullptr;
93 }
94
95 // probe the HT
96 do {
97 // fetch the chunk from the left side
98 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
99 if (state->child_chunk.size() == 0) {
100 return;
101 }
102 // remove any selection vectors
103 if (hash_table->size() == 0) {
104 // empty hash table, special case
105 if (hash_table->join_type == JoinType::ANTI) {
106 // anti join with empty hash table, NOP join
107 // return the input
108 assert(chunk.column_count() == state->child_chunk.column_count());
109 chunk.Reference(state->child_chunk);
110 return;
111 } else if (hash_table->join_type == JoinType::MARK) {
112 // MARK join with empty hash table
113 assert(hash_table->join_type == JoinType::MARK);
114 assert(chunk.column_count() == state->child_chunk.column_count() + 1);
115 auto &result_vector = chunk.data.back();
116 assert(result_vector.type == TypeId::BOOL);
117 // for every data vector, we just reference the child chunk
118 chunk.SetCardinality(state->child_chunk);
119 for (idx_t i = 0; i < state->child_chunk.column_count(); i++) {
120 chunk.data[i].Reference(state->child_chunk.data[i]);
121 }
122 // for the MARK vector:
123 // if the HT has no NULL values (i.e. empty result set), return a vector that has false for every input
124 // entry if the HT has NULL values (i.e. result set had values, but all were NULL), return a vector that
125 // has NULL for every input entry
126 if (!hash_table->has_null) {
127 auto bool_result = FlatVector::GetData<bool>(result_vector);
128 for (idx_t i = 0; i < chunk.size(); i++) {
129 bool_result[i] = false;
130 }
131 } else {
132 FlatVector::Nullmask(result_vector).set();
133 }
134 return;
135 } else if (hash_table->join_type == JoinType::LEFT || hash_table->join_type == JoinType::OUTER ||
136 hash_table->join_type == JoinType::SINGLE) {
137 // LEFT/FULL OUTER/SINGLE join and build side is empty
138 // for the LHS we reference the data
139 chunk.SetCardinality(state->child_chunk.size());
140 for (idx_t i = 0; i < state->child_chunk.column_count(); i++) {
141 chunk.data[i].Reference(state->child_chunk.data[i]);
142 }
143 // for the RHS
144 for (idx_t k = state->child_chunk.column_count(); k < chunk.column_count(); k++) {
145 chunk.data[k].vector_type = VectorType::CONSTANT_VECTOR;
146 ConstantVector::SetNull(chunk.data[k], true);
147 }
148 return;
149 }
150 }
151 // resolve the join keys for the left chunk
152 state->lhs_executor.Execute(state->child_chunk, state->join_keys);
153
154 // perform the actual probe
155 state->scan_structure = hash_table->Probe(state->join_keys);
156 state->scan_structure->Next(state->join_keys, state->child_chunk, chunk);
157 } while (chunk.size() == 0);
158}
159
160unique_ptr<PhysicalOperatorState> PhysicalHashJoin::GetOperatorState() {
161 return make_unique<PhysicalHashJoinState>(children[0].get(), children[1].get(), conditions);
162}
163
164void PhysicalHashJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
165 auto state = reinterpret_cast<PhysicalHashJoinState *>(state_);
166 if (!state->initialized) {
167 state->cached_chunk.Initialize(types);
168 BuildHashTable(context, state_);
169 state->initialized = true;
170
171 if (hash_table->size() == 0 &&
172 (hash_table->join_type == JoinType::INNER || hash_table->join_type == JoinType::SEMI)) {
173 // empty hash table with INNER or SEMI join means empty result set
174 return;
175 }
176 }
177 do {
178 ProbeHashTable(context, chunk, state);
179#if STANDARD_VECTOR_SIZE >= 128
180 if (chunk.size() == 0) {
181 if (state->cached_chunk.size() > 0) {
182 // finished probing but cached data remains, return cached chunk
183 chunk.Reference(state->cached_chunk);
184 state->cached_chunk.Reset();
185 }
186 return;
187 } else if (chunk.size() < 64) {
188 // small chunk: add it to chunk cache and continue
189 state->cached_chunk.Append(chunk);
190 if (state->cached_chunk.size() >= (STANDARD_VECTOR_SIZE - 64)) {
191 // chunk cache full: return it
192 chunk.Reference(state->cached_chunk);
193 state->cached_chunk.Reset();
194 return;
195 } else {
196 // chunk cache not full: probe again
197 chunk.Reset();
198 }
199 } else {
200 return;
201 }
202#else
203 return;
204#endif
205 } while (true);
206}
207