1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * hashjoin.h |
4 | * internal structures for hash joins |
5 | * |
6 | * |
7 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
8 | * Portions Copyright (c) 1994, Regents of the University of California |
9 | * |
10 | * src/include/executor/hashjoin.h |
11 | * |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | #ifndef HASHJOIN_H |
15 | #define HASHJOIN_H |
16 | |
17 | #include "nodes/execnodes.h" |
18 | #include "port/atomics.h" |
19 | #include "storage/barrier.h" |
20 | #include "storage/buffile.h" |
21 | #include "storage/lwlock.h" |
22 | |
23 | /* ---------------------------------------------------------------- |
24 | * hash-join hash table structures |
25 | * |
26 | * Each active hashjoin has a HashJoinTable control block, which is |
27 | * palloc'd in the executor's per-query context. All other storage needed |
28 | * for the hashjoin is kept in private memory contexts, two for each hashjoin. |
29 | * This makes it easy and fast to release the storage when we don't need it |
30 | * anymore. (Exception: data associated with the temp files lives in the |
31 | * per-query context too, since we always call buffile.c in that context.) |
32 | * |
33 | * The hashtable contexts are made children of the per-query context, ensuring |
34 | * that they will be discarded at end of statement even if the join is |
35 | * aborted early by an error. (Likewise, any temporary files we make will |
36 | * be cleaned up by the virtual file manager in event of an error.) |
37 | * |
38 | * Storage that should live through the entire join is allocated from the |
39 | * "hashCxt", while storage that is only wanted for the current batch is |
40 | * allocated in the "batchCxt". By resetting the batchCxt at the end of |
41 | * each batch, we free all the per-batch storage reliably and without tedium. |
42 | * |
43 | * During first scan of inner relation, we get its tuples from executor. |
44 | * If nbatch > 1 then tuples that don't belong in first batch get saved |
45 | * into inner-batch temp files. The same statements apply for the |
46 | * first scan of the outer relation, except we write tuples to outer-batch |
47 | * temp files. After finishing the first scan, we do the following for |
48 | * each remaining batch: |
49 | * 1. Read tuples from inner batch file, load into hash buckets. |
50 | * 2. Read tuples from outer batch file, match to hash buckets and output. |
51 | * |
52 | * It is possible to increase nbatch on the fly if the in-memory hash table |
53 | * gets too big. The hash-value-to-batch computation is arranged so that this |
54 | * can only cause a tuple to go into a later batch than previously thought, |
55 | * never into an earlier batch. When we increase nbatch, we rescan the hash |
56 | * table and dump out any tuples that are now of a later batch to the correct |
57 | * inner batch file. Subsequently, while reading either inner or outer batch |
58 | * files, we might find tuples that no longer belong to the current batch; |
59 | * if so, we just dump them out to the correct batch file. |
60 | * ---------------------------------------------------------------- |
61 | */ |
62 | |
63 | /* these are in nodes/execnodes.h: */ |
64 | /* typedef struct HashJoinTupleData *HashJoinTuple; */ |
65 | /* typedef struct HashJoinTableData *HashJoinTable; */ |
66 | |
67 | typedef struct HashJoinTupleData |
68 | { |
69 | /* link to next tuple in same bucket */ |
70 | union |
71 | { |
72 | struct HashJoinTupleData *unshared; |
73 | dsa_pointer shared; |
74 | } next; |
75 | uint32 hashvalue; /* tuple's hash code */ |
76 | /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ |
77 | } HashJoinTupleData; |
78 | |
79 | #define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData)) |
80 | #define HJTUPLE_MINTUPLE(hjtup) \ |
81 | ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD)) |
82 | |
83 | /* |
84 | * If the outer relation's distribution is sufficiently nonuniform, we attempt |
85 | * to optimize the join by treating the hash values corresponding to the outer |
86 | * relation's MCVs specially. Inner relation tuples matching these hash |
87 | * values go into the "skew" hashtable instead of the main hashtable, and |
88 | * outer relation tuples with these hash values are matched against that |
89 | * table instead of the main one. Thus, tuples with these hash values are |
90 | * effectively handled as part of the first batch and will never go to disk. |
91 | * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory |
92 | * allowed for the join; while building the hashtables, we decrease the number |
93 | * of MCVs being specially treated if needed to stay under this limit. |
94 | * |
95 | * Note: you might wonder why we look at the outer relation stats for this, |
96 | * rather than the inner. One reason is that the outer relation is typically |
97 | * bigger, so we get more I/O savings by optimizing for its most common values. |
98 | * Also, for similarly-sized relations, the planner prefers to put the more |
99 | * uniformly distributed relation on the inside, so we're more likely to find |
100 | * interesting skew in the outer relation. |
101 | */ |
102 | typedef struct HashSkewBucket |
103 | { |
104 | uint32 hashvalue; /* common hash value */ |
105 | HashJoinTuple tuples; /* linked list of inner-relation tuples */ |
106 | } HashSkewBucket; |
107 | |
108 | #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket)) |
109 | #define INVALID_SKEW_BUCKET_NO (-1) |
110 | #define SKEW_WORK_MEM_PERCENT 2 |
111 | #define SKEW_MIN_OUTER_FRACTION 0.01 |
112 | |
113 | /* |
114 | * To reduce palloc overhead, the HashJoinTuples for the current batch are |
115 | * packed in 32kB buffers instead of pallocing each tuple individually. |
116 | */ |
117 | typedef struct HashMemoryChunkData |
118 | { |
119 | int ntuples; /* number of tuples stored in this chunk */ |
120 | size_t maxlen; /* size of the chunk's tuple buffer */ |
121 | size_t used; /* number of buffer bytes already used */ |
122 | |
123 | /* pointer to the next chunk (linked list) */ |
124 | union |
125 | { |
126 | struct HashMemoryChunkData *unshared; |
127 | dsa_pointer shared; |
128 | } next; |
129 | |
130 | /* |
131 | * The chunk's tuple buffer starts after the HashMemoryChunkData struct, |
132 | * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that |
133 | * that offset is not included in "maxlen" or "used". |
134 | */ |
135 | } HashMemoryChunkData; |
136 | |
137 | typedef struct HashMemoryChunkData *HashMemoryChunk; |
138 | |
139 | #define HASH_CHUNK_SIZE (32 * 1024L) |
140 | #define MAXALIGN(sizeof(HashMemoryChunkData)) |
141 | #define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE) |
142 | /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */ |
143 | #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) |
144 | |
145 | /* |
146 | * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch |
147 | * object in shared memory to coordinate access to it. Since they are |
148 | * followed by variable-sized objects, they are arranged in contiguous memory |
149 | * but not accessed directly as an array. |
150 | */ |
151 | typedef struct ParallelHashJoinBatch |
152 | { |
153 | dsa_pointer buckets; /* array of hash table buckets */ |
154 | Barrier batch_barrier; /* synchronization for joining this batch */ |
155 | |
156 | dsa_pointer chunks; /* chunks of tuples loaded */ |
157 | size_t size; /* size of buckets + chunks in memory */ |
158 | size_t estimated_size; /* size of buckets + chunks while writing */ |
159 | size_t ntuples; /* number of tuples loaded */ |
160 | size_t old_ntuples; /* number of tuples before repartitioning */ |
161 | bool space_exhausted; |
162 | |
163 | /* |
164 | * Variable-sized SharedTuplestore objects follow this struct in memory. |
165 | * See the accessor macros below. |
166 | */ |
167 | } ParallelHashJoinBatch; |
168 | |
169 | /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */ |
170 | #define ParallelHashJoinBatchInner(batch) \ |
171 | ((SharedTuplestore *) \ |
172 | ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch)))) |
173 | |
174 | /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */ |
175 | #define ParallelHashJoinBatchOuter(batch, nparticipants) \ |
176 | ((SharedTuplestore *) \ |
177 | ((char *) ParallelHashJoinBatchInner(batch) + \ |
178 | MAXALIGN(sts_estimate(nparticipants)))) |
179 | |
180 | /* Total size of a ParallelHashJoinBatch and tuplestores. */ |
181 | #define EstimateParallelHashJoinBatch(hashtable) \ |
182 | (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \ |
183 | MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2) |
184 | |
185 | /* Accessor for the nth ParallelHashJoinBatch given the base. */ |
186 | #define NthParallelHashJoinBatch(base, n) \ |
187 | ((ParallelHashJoinBatch *) \ |
188 | ((char *) (base) + \ |
189 | EstimateParallelHashJoinBatch(hashtable) * (n))) |
190 | |
191 | /* |
192 | * Each backend requires a small amount of per-batch state to interact with |
193 | * each ParallelHashJoinBatch. |
194 | */ |
195 | typedef struct ParallelHashJoinBatchAccessor |
196 | { |
197 | ParallelHashJoinBatch *shared; /* pointer to shared state */ |
198 | |
199 | /* Per-backend partial counters to reduce contention. */ |
200 | size_t preallocated; /* pre-allocated space for this backend */ |
201 | size_t ntuples; /* number of tuples */ |
202 | size_t size; /* size of partition in memory */ |
203 | size_t estimated_size; /* size of partition on disk */ |
204 | size_t old_ntuples; /* how many tuples before repartitioning? */ |
205 | bool at_least_one_chunk; /* has this backend allocated a chunk? */ |
206 | |
207 | bool done; /* flag to remember that a batch is done */ |
208 | SharedTuplestoreAccessor *inner_tuples; |
209 | SharedTuplestoreAccessor *outer_tuples; |
210 | } ParallelHashJoinBatchAccessor; |
211 | |
212 | /* |
213 | * While hashing the inner relation, any participant might determine that it's |
214 | * time to increase the number of buckets to reduce the load factor or batches |
215 | * to reduce the memory size. This is indicated by setting the growth flag to |
216 | * these values. |
217 | */ |
218 | typedef enum ParallelHashGrowth |
219 | { |
220 | /* The current dimensions are sufficient. */ |
221 | PHJ_GROWTH_OK, |
222 | /* The load factor is too high, so we need to add buckets. */ |
223 | PHJ_GROWTH_NEED_MORE_BUCKETS, |
224 | /* The memory budget would be exhausted, so we need to repartition. */ |
225 | PHJ_GROWTH_NEED_MORE_BATCHES, |
226 | /* Repartitioning didn't help last time, so don't try to do that again. */ |
227 | PHJ_GROWTH_DISABLED |
228 | } ParallelHashGrowth; |
229 | |
230 | /* |
231 | * The shared state used to coordinate a Parallel Hash Join. This is stored |
232 | * in the DSM segment. |
233 | */ |
234 | typedef struct ParallelHashJoinState |
235 | { |
236 | dsa_pointer batches; /* array of ParallelHashJoinBatch */ |
237 | dsa_pointer old_batches; /* previous generation during repartition */ |
238 | int nbatch; /* number of batches now */ |
239 | int old_nbatch; /* previous number of batches */ |
240 | int nbuckets; /* number of buckets */ |
241 | ParallelHashGrowth growth; /* control batch/bucket growth */ |
242 | dsa_pointer chunk_work_queue; /* chunk work queue */ |
243 | int nparticipants; |
244 | size_t space_allowed; |
245 | size_t total_tuples; /* total number of inner tuples */ |
246 | LWLock lock; /* lock protecting the above */ |
247 | |
248 | Barrier build_barrier; /* synchronization for the build phases */ |
249 | Barrier grow_batches_barrier; |
250 | Barrier grow_buckets_barrier; |
251 | pg_atomic_uint32 distributor; /* counter for load balancing */ |
252 | |
253 | SharedFileSet fileset; /* space for shared temporary files */ |
254 | } ParallelHashJoinState; |
255 | |
256 | /* The phases for building batches, used by build_barrier. */ |
257 | #define PHJ_BUILD_ELECTING 0 |
258 | #define PHJ_BUILD_ALLOCATING 1 |
259 | #define PHJ_BUILD_HASHING_INNER 2 |
260 | #define PHJ_BUILD_HASHING_OUTER 3 |
261 | #define PHJ_BUILD_DONE 4 |
262 | |
263 | /* The phases for probing each batch, used by for batch_barrier. */ |
264 | #define PHJ_BATCH_ELECTING 0 |
265 | #define PHJ_BATCH_ALLOCATING 1 |
266 | #define PHJ_BATCH_LOADING 2 |
267 | #define PHJ_BATCH_PROBING 3 |
268 | #define PHJ_BATCH_DONE 4 |
269 | |
270 | /* The phases of batch growth while hashing, for grow_batches_barrier. */ |
271 | #define PHJ_GROW_BATCHES_ELECTING 0 |
272 | #define PHJ_GROW_BATCHES_ALLOCATING 1 |
273 | #define PHJ_GROW_BATCHES_REPARTITIONING 2 |
274 | #define PHJ_GROW_BATCHES_DECIDING 3 |
275 | #define PHJ_GROW_BATCHES_FINISHING 4 |
276 | #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */ |
277 | |
278 | /* The phases of bucket growth while hashing, for grow_buckets_barrier. */ |
279 | #define PHJ_GROW_BUCKETS_ELECTING 0 |
280 | #define PHJ_GROW_BUCKETS_ALLOCATING 1 |
281 | #define PHJ_GROW_BUCKETS_REINSERTING 2 |
282 | #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ |
283 | |
284 | typedef struct HashJoinTableData |
285 | { |
286 | int nbuckets; /* # buckets in the in-memory hash table */ |
287 | int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ |
288 | |
289 | int nbuckets_original; /* # buckets when starting the first hash */ |
290 | int nbuckets_optimal; /* optimal # buckets (per batch) */ |
291 | int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ |
292 | |
293 | /* buckets[i] is head of list of tuples in i'th in-memory bucket */ |
294 | union |
295 | { |
296 | /* unshared array is per-batch storage, as are all the tuples */ |
297 | struct HashJoinTupleData **unshared; |
298 | /* shared array is per-query DSA area, as are all the tuples */ |
299 | dsa_pointer_atomic *shared; |
300 | } buckets; |
301 | |
302 | bool keepNulls; /* true to store unmatchable NULL tuples */ |
303 | |
304 | bool skewEnabled; /* are we using skew optimization? */ |
305 | HashSkewBucket **skewBucket; /* hashtable of skew buckets */ |
306 | int skewBucketLen; /* size of skewBucket array (a power of 2!) */ |
307 | int nSkewBuckets; /* number of active skew buckets */ |
308 | int *skewBucketNums; /* array indexes of active skew buckets */ |
309 | |
310 | int nbatch; /* number of batches */ |
311 | int curbatch; /* current batch #; 0 during 1st pass */ |
312 | |
313 | int nbatch_original; /* nbatch when we started inner scan */ |
314 | int nbatch_outstart; /* nbatch when we started outer scan */ |
315 | |
316 | bool growEnabled; /* flag to shut off nbatch increases */ |
317 | |
318 | double totalTuples; /* # tuples obtained from inner plan */ |
319 | double partialTuples; /* # tuples obtained from inner plan by me */ |
320 | double skewTuples; /* # tuples inserted into skew tuples */ |
321 | |
322 | /* |
323 | * These arrays are allocated for the life of the hash join, but only if |
324 | * nbatch > 1. A file is opened only when we first write a tuple into it |
325 | * (otherwise its pointer remains NULL). Note that the zero'th array |
326 | * elements never get used, since we will process rather than dump out any |
327 | * tuples of batch zero. |
328 | */ |
329 | BufFile **innerBatchFile; /* buffered virtual temp file per batch */ |
330 | BufFile **outerBatchFile; /* buffered virtual temp file per batch */ |
331 | |
332 | /* |
333 | * Info about the datatype-specific hash functions for the datatypes being |
334 | * hashed. These are arrays of the same length as the number of hash join |
335 | * clauses (hash keys). |
336 | */ |
337 | FmgrInfo *outer_hashfunctions; /* lookup data for hash functions */ |
338 | FmgrInfo *inner_hashfunctions; /* lookup data for hash functions */ |
339 | bool *hashStrict; /* is each hash join operator strict? */ |
340 | Oid *collations; |
341 | |
342 | Size spaceUsed; /* memory space currently used by tuples */ |
343 | Size spaceAllowed; /* upper limit for space used */ |
344 | Size spacePeak; /* peak space used */ |
345 | Size spaceUsedSkew; /* skew hash table's current space usage */ |
346 | Size spaceAllowedSkew; /* upper limit for skew hashtable */ |
347 | |
348 | MemoryContext hashCxt; /* context for whole-hash-join storage */ |
349 | MemoryContext batchCxt; /* context for this-batch-only storage */ |
350 | |
351 | /* used for dense allocation of tuples (into linked chunks) */ |
352 | HashMemoryChunk chunks; /* one list for the whole batch */ |
353 | |
354 | /* Shared and private state for Parallel Hash. */ |
355 | HashMemoryChunk current_chunk; /* this backend's current chunk */ |
356 | dsa_area *area; /* DSA area to allocate memory from */ |
357 | ParallelHashJoinState *parallel_state; |
358 | ParallelHashJoinBatchAccessor *batches; |
359 | dsa_pointer current_chunk_shared; |
360 | } HashJoinTableData; |
361 | |
362 | #endif /* HASHJOIN_H */ |
363 | |