1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/execution/join_hashtable.hpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #pragma once |
10 | |
11 | #include "duckdb/common/common.hpp" |
12 | #include "duckdb/common/radix_partitioning.hpp" |
13 | #include "duckdb/common/types/column/column_data_consumer.hpp" |
14 | #include "duckdb/common/types/data_chunk.hpp" |
15 | #include "duckdb/common/types/null_value.hpp" |
16 | #include "duckdb/common/types/row/tuple_data_iterator.hpp" |
17 | #include "duckdb/common/types/row/tuple_data_layout.hpp" |
18 | #include "duckdb/common/types/vector.hpp" |
19 | #include "duckdb/execution/aggregate_hashtable.hpp" |
20 | #include "duckdb/planner/operator/logical_comparison_join.hpp" |
21 | #include "duckdb/storage/storage_info.hpp" |
22 | |
23 | namespace duckdb { |
24 | |
25 | class BufferManager; |
26 | class BufferHandle; |
27 | class ColumnDataCollection; |
28 | struct ColumnDataAppendState; |
29 | struct ClientConfig; |
30 | |
31 | struct JoinHTScanState { |
32 | public: |
33 | JoinHTScanState(TupleDataCollection &collection, idx_t chunk_idx_from, idx_t chunk_idx_to, |
34 | TupleDataPinProperties properties = TupleDataPinProperties::ALREADY_PINNED) |
35 | : iterator(collection, properties, chunk_idx_from, chunk_idx_to, false), offset_in_chunk(0) { |
36 | } |
37 | |
38 | TupleDataChunkIterator iterator; |
39 | idx_t offset_in_chunk; |
40 | |
41 | private: |
42 | //! Implicit copying is not allowed |
43 | JoinHTScanState(const JoinHTScanState &) = delete; |
44 | }; |
45 | |
46 | //! JoinHashTable is a linear probing HT that is used for computing joins |
47 | /*! |
48 | The JoinHashTable concatenates incoming chunks inside a linked list of |
49 | data ptrs. The storage looks like this internally. |
50 | [SERIALIZED ROW][NEXT POINTER] |
51 | [SERIALIZED ROW][NEXT POINTER] |
52 | There is a separate hash map of pointers that point into this table. |
53 | This is what is used to resolve the hashes. |
54 | [POINTER] |
55 | [POINTER] |
56 | [POINTER] |
57 | The pointers are either NULL |
58 | */ |
59 | class JoinHashTable { |
60 | public: |
61 | using ValidityBytes = TemplatedValidityMask<uint8_t>; |
62 | |
63 | //! Scan structure that can be used to resume scans, as a single probe can |
64 | //! return 1024*N values (where N is the size of the HT). This is |
65 | //! returned by the JoinHashTable::Scan function and can be used to resume a |
66 | //! probe. |
67 | struct ScanStructure { |
68 | unsafe_unique_array<UnifiedVectorFormat> key_data; |
69 | Vector pointers; |
70 | idx_t count; |
71 | SelectionVector sel_vector; |
72 | // whether or not the given tuple has found a match |
73 | unsafe_unique_array<bool> found_match; |
74 | JoinHashTable &ht; |
75 | bool finished; |
76 | |
77 | explicit ScanStructure(JoinHashTable &ht); |
78 | //! Get the next batch of data from the scan structure |
79 | void Next(DataChunk &keys, DataChunk &left, DataChunk &result); |
80 | |
81 | private: |
82 | //! Next operator for the inner join |
83 | void NextInnerJoin(DataChunk &keys, DataChunk &left, DataChunk &result); |
84 | //! Next operator for the semi join |
85 | void NextSemiJoin(DataChunk &keys, DataChunk &left, DataChunk &result); |
86 | //! Next operator for the anti join |
87 | void NextAntiJoin(DataChunk &keys, DataChunk &left, DataChunk &result); |
88 | //! Next operator for the left outer join |
89 | void NextLeftJoin(DataChunk &keys, DataChunk &left, DataChunk &result); |
90 | //! Next operator for the mark join |
91 | void NextMarkJoin(DataChunk &keys, DataChunk &left, DataChunk &result); |
92 | //! Next operator for the single join |
93 | void NextSingleJoin(DataChunk &keys, DataChunk &left, DataChunk &result); |
94 | |
95 | //! Scan the hashtable for matches of the specified keys, setting the found_match[] array to true or false |
96 | //! for every tuple |
97 | void ScanKeyMatches(DataChunk &keys); |
98 | template <bool MATCH> |
99 | void NextSemiOrAntiJoin(DataChunk &keys, DataChunk &left, DataChunk &result); |
100 | |
101 | void ConstructMarkJoinResult(DataChunk &join_keys, DataChunk &child, DataChunk &result); |
102 | |
103 | idx_t ScanInnerJoin(DataChunk &keys, SelectionVector &result_vector); |
104 | |
105 | public: |
106 | void InitializeSelectionVector(const SelectionVector *¤t_sel); |
107 | void AdvancePointers(); |
108 | void AdvancePointers(const SelectionVector &sel, idx_t sel_count); |
109 | void GatherResult(Vector &result, const SelectionVector &result_vector, const SelectionVector &sel_vector, |
110 | const idx_t count, const idx_t col_idx); |
111 | void GatherResult(Vector &result, const SelectionVector &sel_vector, const idx_t count, const idx_t col_idx); |
112 | idx_t ResolvePredicates(DataChunk &keys, SelectionVector &match_sel, SelectionVector *no_match_sel); |
113 | }; |
114 | |
115 | public: |
116 | JoinHashTable(BufferManager &buffer_manager, const vector<JoinCondition> &conditions, |
117 | vector<LogicalType> build_types, JoinType type); |
118 | ~JoinHashTable(); |
119 | |
120 | //! Add the given data to the HT |
121 | void Build(PartitionedTupleDataAppendState &append_state, DataChunk &keys, DataChunk &input); |
122 | //! Merge another HT into this one |
123 | void Merge(JoinHashTable &other); |
124 | //! Combines the partitions in sink_collection into data_collection, as if it were not partitioned |
125 | void Unpartition(); |
126 | //! Initialize the pointer table for the probe |
127 | void InitializePointerTable(); |
128 | //! Finalize the build of the HT, constructing the actual hash table and making the HT ready for probing. |
129 | //! Finalize must be called before any call to Probe, and after Finalize is called Build should no longer be |
130 | //! ever called. |
131 | void Finalize(idx_t chunk_idx_from, idx_t chunk_idx_to, bool parallel); |
132 | //! Probe the HT with the given input chunk, resulting in the given result |
133 | unique_ptr<ScanStructure> Probe(DataChunk &keys, Vector *precomputed_hashes = nullptr); |
134 | //! Scan the HT to construct the full outer join result |
135 | void ScanFullOuter(JoinHTScanState &state, Vector &addresses, DataChunk &result); |
136 | |
137 | //! Fill the pointer with all the addresses from the hashtable for full scan |
138 | idx_t FillWithHTOffsets(JoinHTScanState &state, Vector &addresses); |
139 | |
140 | idx_t Count() const { |
141 | return data_collection->Count(); |
142 | } |
143 | idx_t SizeInBytes() const { |
144 | return data_collection->SizeInBytes(); |
145 | } |
146 | |
147 | PartitionedTupleData &GetSinkCollection() { |
148 | return *sink_collection; |
149 | } |
150 | |
151 | TupleDataCollection &GetDataCollection() { |
152 | return *data_collection; |
153 | } |
154 | |
155 | //! BufferManager |
156 | BufferManager &buffer_manager; |
157 | //! The join conditions |
158 | const vector<JoinCondition> &conditions; |
159 | //! The types of the keys used in equality comparison |
160 | vector<LogicalType> equality_types; |
161 | //! The types of the keys |
162 | vector<LogicalType> condition_types; |
163 | //! The types of all conditions |
164 | vector<LogicalType> build_types; |
165 | //! The comparison predicates |
166 | vector<ExpressionType> predicates; |
167 | //! Data column layout |
168 | TupleDataLayout layout; |
169 | //! The size of an entry as stored in the HashTable |
170 | idx_t entry_size; |
171 | //! The total tuple size |
172 | idx_t tuple_size; |
173 | //! Next pointer offset in tuple |
174 | idx_t pointer_offset; |
175 | //! A constant false column for initialising right outer joins |
176 | Vector vfound; |
177 | //! The join type of the HT |
178 | JoinType join_type; |
179 | //! Whether or not the HT has been finalized |
180 | bool finalized; |
181 | //! Whether or not any of the key elements contain NULL |
182 | bool has_null; |
183 | //! Bitmask for getting relevant bits from the hashes to determine the position |
184 | uint64_t bitmask; |
185 | |
186 | struct { |
187 | mutex mj_lock; |
188 | //! The types of the duplicate eliminated columns, only used in correlated MARK JOIN for flattening |
189 | //! ANY()/ALL() expressions |
190 | vector<LogicalType> correlated_types; |
191 | //! The aggregate expression nodes used by the HT |
192 | vector<unique_ptr<Expression>> correlated_aggregates; |
193 | //! The HT that holds the group counts for every correlated column |
194 | unique_ptr<GroupedAggregateHashTable> correlated_counts; |
195 | //! Group chunk used for aggregating into correlated_counts |
196 | DataChunk group_chunk; |
197 | //! Payload chunk used for aggregating into correlated_counts |
198 | DataChunk correlated_payload; |
199 | //! Result chunk used for aggregating into correlated_counts |
200 | DataChunk result_chunk; |
201 | } correlated_mark_join_info; |
202 | |
203 | private: |
204 | unique_ptr<ScanStructure> InitializeScanStructure(DataChunk &keys, const SelectionVector *¤t_sel); |
205 | void Hash(DataChunk &keys, const SelectionVector &sel, idx_t count, Vector &hashes); |
206 | |
207 | //! Apply a bitmask to the hashes |
208 | void ApplyBitmask(Vector &hashes, idx_t count); |
209 | void ApplyBitmask(Vector &hashes, const SelectionVector &sel, idx_t count, Vector &pointers); |
210 | |
211 | private: |
212 | //! Insert the given set of locations into the HT with the given set of hashes |
213 | void InsertHashes(Vector &hashes, idx_t count, data_ptr_t key_locations[], bool parallel); |
214 | |
215 | idx_t PrepareKeys(DataChunk &keys, unsafe_unique_array<UnifiedVectorFormat> &key_data, |
216 | const SelectionVector *¤t_sel, SelectionVector &sel, bool build_side); |
217 | |
218 | //! Lock for combining data_collection when merging HTs |
219 | mutex data_lock; |
220 | //! Partitioned data collection that the data is sunk into when building |
221 | unique_ptr<PartitionedTupleData> sink_collection; |
222 | //! The DataCollection holding the main data of the hash table |
223 | unique_ptr<TupleDataCollection> data_collection; |
224 | //! The hash map of the HT, created after finalization |
225 | AllocatedData hash_map; |
226 | //! Whether or not NULL values are considered equal in each of the comparisons |
227 | vector<bool> null_values_are_equal; |
228 | |
229 | //! Copying not allowed |
230 | JoinHashTable(const JoinHashTable &) = delete; |
231 | |
232 | public: |
233 | //===--------------------------------------------------------------------===// |
234 | // External Join |
235 | //===--------------------------------------------------------------------===// |
236 | struct ProbeSpillLocalAppendState { |
237 | //! Local partition and append state (if partitioned) |
238 | PartitionedColumnData *local_partition; |
239 | PartitionedColumnDataAppendState *local_partition_append_state; |
240 | //! Local spill and append state (if not partitioned) |
241 | ColumnDataCollection *local_spill_collection; |
242 | ColumnDataAppendState *local_spill_append_state; |
243 | }; |
244 | //! ProbeSpill represents materialized probe-side data that could not be probed during PhysicalHashJoin::Execute |
245 | //! because the HashTable did not fit in memory. The ProbeSpill is not partitioned if the remaining data can be |
246 | //! dealt with in just 1 more round of probing, otherwise it is radix partitioned in the same way as the HashTable |
247 | struct ProbeSpill { |
248 | public: |
249 | ProbeSpill(JoinHashTable &ht, ClientContext &context, const vector<LogicalType> &probe_types); |
250 | |
251 | public: |
252 | //! Create a state for a new thread |
253 | ProbeSpillLocalAppendState RegisterThread(); |
254 | //! Append a chunk to this ProbeSpill |
255 | void Append(DataChunk &chunk, ProbeSpillLocalAppendState &local_state); |
256 | //! Finalize by merging the thread-local accumulated data |
257 | void Finalize(); |
258 | |
259 | public: |
260 | //! Prepare the next probe round |
261 | void PrepareNextProbe(); |
262 | //! Scans and consumes the ColumnDataCollection |
263 | unique_ptr<ColumnDataConsumer> consumer; |
264 | |
265 | private: |
266 | JoinHashTable &ht; |
267 | mutex lock; |
268 | ClientContext &context; |
269 | |
270 | //! Whether the probe data is partitioned |
271 | bool partitioned; |
272 | //! The types of the probe DataChunks |
273 | const vector<LogicalType> &probe_types; |
274 | //! The column ids |
275 | vector<column_t> column_ids; |
276 | |
277 | //! The partitioned probe data (if partitioned) and append states |
278 | unique_ptr<PartitionedColumnData> global_partitions; |
279 | vector<unique_ptr<PartitionedColumnData>> local_partitions; |
280 | vector<unique_ptr<PartitionedColumnDataAppendState>> local_partition_append_states; |
281 | |
282 | //! The probe data (if not partitioned) and append states |
283 | unique_ptr<ColumnDataCollection> global_spill_collection; |
284 | vector<unique_ptr<ColumnDataCollection>> local_spill_collections; |
285 | vector<unique_ptr<ColumnDataAppendState>> local_spill_append_states; |
286 | }; |
287 | |
288 | //! Whether we are doing an external hash join |
289 | bool external; |
290 | //! The current number of radix bits used to partition |
291 | idx_t radix_bits; |
292 | //! The max size of the HT |
293 | idx_t max_ht_size; |
294 | //! Total count |
295 | idx_t total_count; |
296 | |
297 | //! Capacity of the pointer table given the ht count |
298 | //! (minimum of 1024 to prevent collision chance for small HT's) |
299 | static idx_t PointerTableCapacity(idx_t count) { |
300 | return MaxValue<idx_t>(a: NextPowerOfTwo(v: count * 2), b: 1 << 10); |
301 | } |
302 | //! Size of the pointer table (in bytes) |
303 | static idx_t PointerTableSize(idx_t count) { |
304 | return PointerTableCapacity(count) * sizeof(data_ptr_t); |
305 | } |
306 | |
307 | //! Whether we need to do an external join |
308 | bool RequiresExternalJoin(ClientConfig &config, vector<unique_ptr<JoinHashTable>> &local_hts); |
309 | //! Computes partition sizes and number of radix bits (called before scheduling partition tasks) |
310 | bool RequiresPartitioning(ClientConfig &config, vector<unique_ptr<JoinHashTable>> &local_hts); |
311 | //! Partition this HT |
312 | void Partition(JoinHashTable &global_ht); |
313 | |
314 | //! Delete blocks that belong to the current partitioned HT |
315 | void Reset(); |
316 | //! Build HT for the next partitioned probe round |
317 | bool PrepareExternalFinalize(); |
318 | //! Probe whatever we can, sink the rest into a thread-local HT |
319 | unique_ptr<ScanStructure> ProbeAndSpill(DataChunk &keys, DataChunk &payload, ProbeSpill &probe_spill, |
320 | ProbeSpillLocalAppendState &spill_state, DataChunk &spill_chunk); |
321 | |
322 | private: |
323 | //! First and last partition of the current probe round |
324 | idx_t partition_start; |
325 | idx_t partition_end; |
326 | }; |
327 | |
328 | } // namespace duckdb |
329 | |