1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/execution/join_hashtable.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/common/common.hpp"
12#include "duckdb/common/radix_partitioning.hpp"
13#include "duckdb/common/types/column/column_data_consumer.hpp"
14#include "duckdb/common/types/data_chunk.hpp"
15#include "duckdb/common/types/null_value.hpp"
16#include "duckdb/common/types/row/tuple_data_iterator.hpp"
17#include "duckdb/common/types/row/tuple_data_layout.hpp"
18#include "duckdb/common/types/vector.hpp"
19#include "duckdb/execution/aggregate_hashtable.hpp"
20#include "duckdb/planner/operator/logical_comparison_join.hpp"
21#include "duckdb/storage/storage_info.hpp"
22
23namespace duckdb {
24
25class BufferManager;
26class BufferHandle;
27class ColumnDataCollection;
28struct ColumnDataAppendState;
29struct ClientConfig;
30
31struct JoinHTScanState {
32public:
33 JoinHTScanState(TupleDataCollection &collection, idx_t chunk_idx_from, idx_t chunk_idx_to,
34 TupleDataPinProperties properties = TupleDataPinProperties::ALREADY_PINNED)
35 : iterator(collection, properties, chunk_idx_from, chunk_idx_to, false), offset_in_chunk(0) {
36 }
37
38 TupleDataChunkIterator iterator;
39 idx_t offset_in_chunk;
40
41private:
42 //! Implicit copying is not allowed
43 JoinHTScanState(const JoinHTScanState &) = delete;
44};
45
46//! JoinHashTable is a linear probing HT that is used for computing joins
47/*!
48 The JoinHashTable concatenates incoming chunks inside a linked list of
49 data ptrs. The storage looks like this internally.
50 [SERIALIZED ROW][NEXT POINTER]
51 [SERIALIZED ROW][NEXT POINTER]
52 There is a separate hash map of pointers that point into this table.
53 This is what is used to resolve the hashes.
54 [POINTER]
55 [POINTER]
56 [POINTER]
57 The pointers are either NULL
58*/
59class JoinHashTable {
60public:
61 using ValidityBytes = TemplatedValidityMask<uint8_t>;
62
63 //! Scan structure that can be used to resume scans, as a single probe can
64 //! return 1024*N values (where N is the size of the HT). This is
65 //! returned by the JoinHashTable::Scan function and can be used to resume a
66 //! probe.
67 struct ScanStructure {
68 unsafe_unique_array<UnifiedVectorFormat> key_data;
69 Vector pointers;
70 idx_t count;
71 SelectionVector sel_vector;
72 // whether or not the given tuple has found a match
73 unsafe_unique_array<bool> found_match;
74 JoinHashTable &ht;
75 bool finished;
76
77 explicit ScanStructure(JoinHashTable &ht);
78 //! Get the next batch of data from the scan structure
79 void Next(DataChunk &keys, DataChunk &left, DataChunk &result);
80
81 private:
82 //! Next operator for the inner join
83 void NextInnerJoin(DataChunk &keys, DataChunk &left, DataChunk &result);
84 //! Next operator for the semi join
85 void NextSemiJoin(DataChunk &keys, DataChunk &left, DataChunk &result);
86 //! Next operator for the anti join
87 void NextAntiJoin(DataChunk &keys, DataChunk &left, DataChunk &result);
88 //! Next operator for the left outer join
89 void NextLeftJoin(DataChunk &keys, DataChunk &left, DataChunk &result);
90 //! Next operator for the mark join
91 void NextMarkJoin(DataChunk &keys, DataChunk &left, DataChunk &result);
92 //! Next operator for the single join
93 void NextSingleJoin(DataChunk &keys, DataChunk &left, DataChunk &result);
94
95 //! Scan the hashtable for matches of the specified keys, setting the found_match[] array to true or false
96 //! for every tuple
97 void ScanKeyMatches(DataChunk &keys);
98 template <bool MATCH>
99 void NextSemiOrAntiJoin(DataChunk &keys, DataChunk &left, DataChunk &result);
100
101 void ConstructMarkJoinResult(DataChunk &join_keys, DataChunk &child, DataChunk &result);
102
103 idx_t ScanInnerJoin(DataChunk &keys, SelectionVector &result_vector);
104
105 public:
106 void InitializeSelectionVector(const SelectionVector *&current_sel);
107 void AdvancePointers();
108 void AdvancePointers(const SelectionVector &sel, idx_t sel_count);
109 void GatherResult(Vector &result, const SelectionVector &result_vector, const SelectionVector &sel_vector,
110 const idx_t count, const idx_t col_idx);
111 void GatherResult(Vector &result, const SelectionVector &sel_vector, const idx_t count, const idx_t col_idx);
112 idx_t ResolvePredicates(DataChunk &keys, SelectionVector &match_sel, SelectionVector *no_match_sel);
113 };
114
115public:
116 JoinHashTable(BufferManager &buffer_manager, const vector<JoinCondition> &conditions,
117 vector<LogicalType> build_types, JoinType type);
118 ~JoinHashTable();
119
120 //! Add the given data to the HT
121 void Build(PartitionedTupleDataAppendState &append_state, DataChunk &keys, DataChunk &input);
122 //! Merge another HT into this one
123 void Merge(JoinHashTable &other);
124 //! Combines the partitions in sink_collection into data_collection, as if it were not partitioned
125 void Unpartition();
126 //! Initialize the pointer table for the probe
127 void InitializePointerTable();
128 //! Finalize the build of the HT, constructing the actual hash table and making the HT ready for probing.
129 //! Finalize must be called before any call to Probe, and after Finalize is called Build should no longer be
130 //! ever called.
131 void Finalize(idx_t chunk_idx_from, idx_t chunk_idx_to, bool parallel);
132 //! Probe the HT with the given input chunk, resulting in the given result
133 unique_ptr<ScanStructure> Probe(DataChunk &keys, Vector *precomputed_hashes = nullptr);
134 //! Scan the HT to construct the full outer join result
135 void ScanFullOuter(JoinHTScanState &state, Vector &addresses, DataChunk &result);
136
137 //! Fill the pointer with all the addresses from the hashtable for full scan
138 idx_t FillWithHTOffsets(JoinHTScanState &state, Vector &addresses);
139
140 idx_t Count() const {
141 return data_collection->Count();
142 }
143 idx_t SizeInBytes() const {
144 return data_collection->SizeInBytes();
145 }
146
147 PartitionedTupleData &GetSinkCollection() {
148 return *sink_collection;
149 }
150
151 TupleDataCollection &GetDataCollection() {
152 return *data_collection;
153 }
154
155 //! BufferManager
156 BufferManager &buffer_manager;
157 //! The join conditions
158 const vector<JoinCondition> &conditions;
159 //! The types of the keys used in equality comparison
160 vector<LogicalType> equality_types;
161 //! The types of the keys
162 vector<LogicalType> condition_types;
163 //! The types of all conditions
164 vector<LogicalType> build_types;
165 //! The comparison predicates
166 vector<ExpressionType> predicates;
167 //! Data column layout
168 TupleDataLayout layout;
169 //! The size of an entry as stored in the HashTable
170 idx_t entry_size;
171 //! The total tuple size
172 idx_t tuple_size;
173 //! Next pointer offset in tuple
174 idx_t pointer_offset;
175 //! A constant false column for initialising right outer joins
176 Vector vfound;
177 //! The join type of the HT
178 JoinType join_type;
179 //! Whether or not the HT has been finalized
180 bool finalized;
181 //! Whether or not any of the key elements contain NULL
182 bool has_null;
183 //! Bitmask for getting relevant bits from the hashes to determine the position
184 uint64_t bitmask;
185
186 struct {
187 mutex mj_lock;
188 //! The types of the duplicate eliminated columns, only used in correlated MARK JOIN for flattening
189 //! ANY()/ALL() expressions
190 vector<LogicalType> correlated_types;
191 //! The aggregate expression nodes used by the HT
192 vector<unique_ptr<Expression>> correlated_aggregates;
193 //! The HT that holds the group counts for every correlated column
194 unique_ptr<GroupedAggregateHashTable> correlated_counts;
195 //! Group chunk used for aggregating into correlated_counts
196 DataChunk group_chunk;
197 //! Payload chunk used for aggregating into correlated_counts
198 DataChunk correlated_payload;
199 //! Result chunk used for aggregating into correlated_counts
200 DataChunk result_chunk;
201 } correlated_mark_join_info;
202
203private:
204 unique_ptr<ScanStructure> InitializeScanStructure(DataChunk &keys, const SelectionVector *&current_sel);
205 void Hash(DataChunk &keys, const SelectionVector &sel, idx_t count, Vector &hashes);
206
207 //! Apply a bitmask to the hashes
208 void ApplyBitmask(Vector &hashes, idx_t count);
209 void ApplyBitmask(Vector &hashes, const SelectionVector &sel, idx_t count, Vector &pointers);
210
211private:
212 //! Insert the given set of locations into the HT with the given set of hashes
213 void InsertHashes(Vector &hashes, idx_t count, data_ptr_t key_locations[], bool parallel);
214
215 idx_t PrepareKeys(DataChunk &keys, unsafe_unique_array<UnifiedVectorFormat> &key_data,
216 const SelectionVector *&current_sel, SelectionVector &sel, bool build_side);
217
218 //! Lock for combining data_collection when merging HTs
219 mutex data_lock;
220 //! Partitioned data collection that the data is sunk into when building
221 unique_ptr<PartitionedTupleData> sink_collection;
222 //! The DataCollection holding the main data of the hash table
223 unique_ptr<TupleDataCollection> data_collection;
224 //! The hash map of the HT, created after finalization
225 AllocatedData hash_map;
226 //! Whether or not NULL values are considered equal in each of the comparisons
227 vector<bool> null_values_are_equal;
228
229 //! Copying not allowed
230 JoinHashTable(const JoinHashTable &) = delete;
231
232public:
233 //===--------------------------------------------------------------------===//
234 // External Join
235 //===--------------------------------------------------------------------===//
236 struct ProbeSpillLocalAppendState {
237 //! Local partition and append state (if partitioned)
238 PartitionedColumnData *local_partition;
239 PartitionedColumnDataAppendState *local_partition_append_state;
240 //! Local spill and append state (if not partitioned)
241 ColumnDataCollection *local_spill_collection;
242 ColumnDataAppendState *local_spill_append_state;
243 };
244 //! ProbeSpill represents materialized probe-side data that could not be probed during PhysicalHashJoin::Execute
245 //! because the HashTable did not fit in memory. The ProbeSpill is not partitioned if the remaining data can be
246 //! dealt with in just 1 more round of probing, otherwise it is radix partitioned in the same way as the HashTable
247 struct ProbeSpill {
248 public:
249 ProbeSpill(JoinHashTable &ht, ClientContext &context, const vector<LogicalType> &probe_types);
250
251 public:
252 //! Create a state for a new thread
253 ProbeSpillLocalAppendState RegisterThread();
254 //! Append a chunk to this ProbeSpill
255 void Append(DataChunk &chunk, ProbeSpillLocalAppendState &local_state);
256 //! Finalize by merging the thread-local accumulated data
257 void Finalize();
258
259 public:
260 //! Prepare the next probe round
261 void PrepareNextProbe();
262 //! Scans and consumes the ColumnDataCollection
263 unique_ptr<ColumnDataConsumer> consumer;
264
265 private:
266 JoinHashTable &ht;
267 mutex lock;
268 ClientContext &context;
269
270 //! Whether the probe data is partitioned
271 bool partitioned;
272 //! The types of the probe DataChunks
273 const vector<LogicalType> &probe_types;
274 //! The column ids
275 vector<column_t> column_ids;
276
277 //! The partitioned probe data (if partitioned) and append states
278 unique_ptr<PartitionedColumnData> global_partitions;
279 vector<unique_ptr<PartitionedColumnData>> local_partitions;
280 vector<unique_ptr<PartitionedColumnDataAppendState>> local_partition_append_states;
281
282 //! The probe data (if not partitioned) and append states
283 unique_ptr<ColumnDataCollection> global_spill_collection;
284 vector<unique_ptr<ColumnDataCollection>> local_spill_collections;
285 vector<unique_ptr<ColumnDataAppendState>> local_spill_append_states;
286 };
287
288 //! Whether we are doing an external hash join
289 bool external;
290 //! The current number of radix bits used to partition
291 idx_t radix_bits;
292 //! The max size of the HT
293 idx_t max_ht_size;
294 //! Total count
295 idx_t total_count;
296
297 //! Capacity of the pointer table given the ht count
298 //! (minimum of 1024 to prevent collision chance for small HT's)
299 static idx_t PointerTableCapacity(idx_t count) {
300 return MaxValue<idx_t>(a: NextPowerOfTwo(v: count * 2), b: 1 << 10);
301 }
302 //! Size of the pointer table (in bytes)
303 static idx_t PointerTableSize(idx_t count) {
304 return PointerTableCapacity(count) * sizeof(data_ptr_t);
305 }
306
307 //! Whether we need to do an external join
308 bool RequiresExternalJoin(ClientConfig &config, vector<unique_ptr<JoinHashTable>> &local_hts);
309 //! Computes partition sizes and number of radix bits (called before scheduling partition tasks)
310 bool RequiresPartitioning(ClientConfig &config, vector<unique_ptr<JoinHashTable>> &local_hts);
311 //! Partition this HT
312 void Partition(JoinHashTable &global_ht);
313
314 //! Delete blocks that belong to the current partitioned HT
315 void Reset();
316 //! Build HT for the next partitioned probe round
317 bool PrepareExternalFinalize();
318 //! Probe whatever we can, sink the rest into a thread-local HT
319 unique_ptr<ScanStructure> ProbeAndSpill(DataChunk &keys, DataChunk &payload, ProbeSpill &probe_spill,
320 ProbeSpillLocalAppendState &spill_state, DataChunk &spill_chunk);
321
322private:
323 //! First and last partition of the current probe round
324 idx_t partition_start;
325 idx_t partition_end;
326};
327
328} // namespace duckdb
329