1/*-------------------------------------------------------------------------
2 *
3 * hashjoin.h
4 * internal structures for hash joins
5 *
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * src/include/executor/hashjoin.h
11 *
12 *-------------------------------------------------------------------------
13 */
14#ifndef HASHJOIN_H
15#define HASHJOIN_H
16
17#include "nodes/execnodes.h"
18#include "port/atomics.h"
19#include "storage/barrier.h"
20#include "storage/buffile.h"
21#include "storage/lwlock.h"
22
23/* ----------------------------------------------------------------
24 * hash-join hash table structures
25 *
26 * Each active hashjoin has a HashJoinTable control block, which is
27 * palloc'd in the executor's per-query context. All other storage needed
28 * for the hashjoin is kept in private memory contexts, two for each hashjoin.
29 * This makes it easy and fast to release the storage when we don't need it
30 * anymore. (Exception: data associated with the temp files lives in the
31 * per-query context too, since we always call buffile.c in that context.)
32 *
33 * The hashtable contexts are made children of the per-query context, ensuring
34 * that they will be discarded at end of statement even if the join is
35 * aborted early by an error. (Likewise, any temporary files we make will
36 * be cleaned up by the virtual file manager in event of an error.)
37 *
38 * Storage that should live through the entire join is allocated from the
39 * "hashCxt", while storage that is only wanted for the current batch is
40 * allocated in the "batchCxt". By resetting the batchCxt at the end of
41 * each batch, we free all the per-batch storage reliably and without tedium.
42 *
43 * During first scan of inner relation, we get its tuples from executor.
44 * If nbatch > 1 then tuples that don't belong in first batch get saved
45 * into inner-batch temp files. The same statements apply for the
46 * first scan of the outer relation, except we write tuples to outer-batch
47 * temp files. After finishing the first scan, we do the following for
48 * each remaining batch:
49 * 1. Read tuples from inner batch file, load into hash buckets.
50 * 2. Read tuples from outer batch file, match to hash buckets and output.
51 *
52 * It is possible to increase nbatch on the fly if the in-memory hash table
53 * gets too big. The hash-value-to-batch computation is arranged so that this
54 * can only cause a tuple to go into a later batch than previously thought,
55 * never into an earlier batch. When we increase nbatch, we rescan the hash
56 * table and dump out any tuples that are now of a later batch to the correct
57 * inner batch file. Subsequently, while reading either inner or outer batch
58 * files, we might find tuples that no longer belong to the current batch;
59 * if so, we just dump them out to the correct batch file.
60 * ----------------------------------------------------------------
61 */
62
63/* these are in nodes/execnodes.h: */
64/* typedef struct HashJoinTupleData *HashJoinTuple; */
65/* typedef struct HashJoinTableData *HashJoinTable; */
66
67typedef struct HashJoinTupleData
68{
69 /* link to next tuple in same bucket */
70 union
71 {
72 struct HashJoinTupleData *unshared;
73 dsa_pointer shared;
74 } next;
75 uint32 hashvalue; /* tuple's hash code */
76 /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
77} HashJoinTupleData;
78
79#define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
80#define HJTUPLE_MINTUPLE(hjtup) \
81 ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
82
83/*
84 * If the outer relation's distribution is sufficiently nonuniform, we attempt
85 * to optimize the join by treating the hash values corresponding to the outer
86 * relation's MCVs specially. Inner relation tuples matching these hash
87 * values go into the "skew" hashtable instead of the main hashtable, and
88 * outer relation tuples with these hash values are matched against that
89 * table instead of the main one. Thus, tuples with these hash values are
90 * effectively handled as part of the first batch and will never go to disk.
91 * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory
92 * allowed for the join; while building the hashtables, we decrease the number
93 * of MCVs being specially treated if needed to stay under this limit.
94 *
95 * Note: you might wonder why we look at the outer relation stats for this,
96 * rather than the inner. One reason is that the outer relation is typically
97 * bigger, so we get more I/O savings by optimizing for its most common values.
98 * Also, for similarly-sized relations, the planner prefers to put the more
99 * uniformly distributed relation on the inside, so we're more likely to find
100 * interesting skew in the outer relation.
101 */
102typedef struct HashSkewBucket
103{
104 uint32 hashvalue; /* common hash value */
105 HashJoinTuple tuples; /* linked list of inner-relation tuples */
106} HashSkewBucket;
107
108#define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket))
109#define INVALID_SKEW_BUCKET_NO (-1)
110#define SKEW_WORK_MEM_PERCENT 2
111#define SKEW_MIN_OUTER_FRACTION 0.01
112
113/*
114 * To reduce palloc overhead, the HashJoinTuples for the current batch are
115 * packed in 32kB buffers instead of pallocing each tuple individually.
116 */
117typedef struct HashMemoryChunkData
118{
119 int ntuples; /* number of tuples stored in this chunk */
120 size_t maxlen; /* size of the chunk's tuple buffer */
121 size_t used; /* number of buffer bytes already used */
122
123 /* pointer to the next chunk (linked list) */
124 union
125 {
126 struct HashMemoryChunkData *unshared;
127 dsa_pointer shared;
128 } next;
129
130 /*
131 * The chunk's tuple buffer starts after the HashMemoryChunkData struct,
132 * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that
133 * that offset is not included in "maxlen" or "used".
134 */
135} HashMemoryChunkData;
136
137typedef struct HashMemoryChunkData *HashMemoryChunk;
138
139#define HASH_CHUNK_SIZE (32 * 1024L)
140#define HASH_CHUNK_HEADER_SIZE MAXALIGN(sizeof(HashMemoryChunkData))
141#define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE)
142/* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */
143#define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
144
145/*
146 * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
147 * object in shared memory to coordinate access to it. Since they are
148 * followed by variable-sized objects, they are arranged in contiguous memory
149 * but not accessed directly as an array.
150 */
151typedef struct ParallelHashJoinBatch
152{
153 dsa_pointer buckets; /* array of hash table buckets */
154 Barrier batch_barrier; /* synchronization for joining this batch */
155
156 dsa_pointer chunks; /* chunks of tuples loaded */
157 size_t size; /* size of buckets + chunks in memory */
158 size_t estimated_size; /* size of buckets + chunks while writing */
159 size_t ntuples; /* number of tuples loaded */
160 size_t old_ntuples; /* number of tuples before repartitioning */
161 bool space_exhausted;
162
163 /*
164 * Variable-sized SharedTuplestore objects follow this struct in memory.
165 * See the accessor macros below.
166 */
167} ParallelHashJoinBatch;
168
169/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
170#define ParallelHashJoinBatchInner(batch) \
171 ((SharedTuplestore *) \
172 ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
173
174/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
175#define ParallelHashJoinBatchOuter(batch, nparticipants) \
176 ((SharedTuplestore *) \
177 ((char *) ParallelHashJoinBatchInner(batch) + \
178 MAXALIGN(sts_estimate(nparticipants))))
179
180/* Total size of a ParallelHashJoinBatch and tuplestores. */
181#define EstimateParallelHashJoinBatch(hashtable) \
182 (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
183 MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
184
185/* Accessor for the nth ParallelHashJoinBatch given the base. */
186#define NthParallelHashJoinBatch(base, n) \
187 ((ParallelHashJoinBatch *) \
188 ((char *) (base) + \
189 EstimateParallelHashJoinBatch(hashtable) * (n)))
190
191/*
192 * Each backend requires a small amount of per-batch state to interact with
193 * each ParallelHashJoinBatch.
194 */
195typedef struct ParallelHashJoinBatchAccessor
196{
197 ParallelHashJoinBatch *shared; /* pointer to shared state */
198
199 /* Per-backend partial counters to reduce contention. */
200 size_t preallocated; /* pre-allocated space for this backend */
201 size_t ntuples; /* number of tuples */
202 size_t size; /* size of partition in memory */
203 size_t estimated_size; /* size of partition on disk */
204 size_t old_ntuples; /* how many tuples before repartitioning? */
205 bool at_least_one_chunk; /* has this backend allocated a chunk? */
206
207 bool done; /* flag to remember that a batch is done */
208 SharedTuplestoreAccessor *inner_tuples;
209 SharedTuplestoreAccessor *outer_tuples;
210} ParallelHashJoinBatchAccessor;
211
212/*
213 * While hashing the inner relation, any participant might determine that it's
214 * time to increase the number of buckets to reduce the load factor or batches
215 * to reduce the memory size. This is indicated by setting the growth flag to
216 * these values.
217 */
218typedef enum ParallelHashGrowth
219{
220 /* The current dimensions are sufficient. */
221 PHJ_GROWTH_OK,
222 /* The load factor is too high, so we need to add buckets. */
223 PHJ_GROWTH_NEED_MORE_BUCKETS,
224 /* The memory budget would be exhausted, so we need to repartition. */
225 PHJ_GROWTH_NEED_MORE_BATCHES,
226 /* Repartitioning didn't help last time, so don't try to do that again. */
227 PHJ_GROWTH_DISABLED
228} ParallelHashGrowth;
229
230/*
231 * The shared state used to coordinate a Parallel Hash Join. This is stored
232 * in the DSM segment.
233 */
234typedef struct ParallelHashJoinState
235{
236 dsa_pointer batches; /* array of ParallelHashJoinBatch */
237 dsa_pointer old_batches; /* previous generation during repartition */
238 int nbatch; /* number of batches now */
239 int old_nbatch; /* previous number of batches */
240 int nbuckets; /* number of buckets */
241 ParallelHashGrowth growth; /* control batch/bucket growth */
242 dsa_pointer chunk_work_queue; /* chunk work queue */
243 int nparticipants;
244 size_t space_allowed;
245 size_t total_tuples; /* total number of inner tuples */
246 LWLock lock; /* lock protecting the above */
247
248 Barrier build_barrier; /* synchronization for the build phases */
249 Barrier grow_batches_barrier;
250 Barrier grow_buckets_barrier;
251 pg_atomic_uint32 distributor; /* counter for load balancing */
252
253 SharedFileSet fileset; /* space for shared temporary files */
254} ParallelHashJoinState;
255
256/* The phases for building batches, used by build_barrier. */
257#define PHJ_BUILD_ELECTING 0
258#define PHJ_BUILD_ALLOCATING 1
259#define PHJ_BUILD_HASHING_INNER 2
260#define PHJ_BUILD_HASHING_OUTER 3
261#define PHJ_BUILD_DONE 4
262
263/* The phases for probing each batch, used by for batch_barrier. */
264#define PHJ_BATCH_ELECTING 0
265#define PHJ_BATCH_ALLOCATING 1
266#define PHJ_BATCH_LOADING 2
267#define PHJ_BATCH_PROBING 3
268#define PHJ_BATCH_DONE 4
269
270/* The phases of batch growth while hashing, for grow_batches_barrier. */
271#define PHJ_GROW_BATCHES_ELECTING 0
272#define PHJ_GROW_BATCHES_ALLOCATING 1
273#define PHJ_GROW_BATCHES_REPARTITIONING 2
274#define PHJ_GROW_BATCHES_DECIDING 3
275#define PHJ_GROW_BATCHES_FINISHING 4
276#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
277
278/* The phases of bucket growth while hashing, for grow_buckets_barrier. */
279#define PHJ_GROW_BUCKETS_ELECTING 0
280#define PHJ_GROW_BUCKETS_ALLOCATING 1
281#define PHJ_GROW_BUCKETS_REINSERTING 2
282#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
283
284typedef struct HashJoinTableData
285{
286 int nbuckets; /* # buckets in the in-memory hash table */
287 int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */
288
289 int nbuckets_original; /* # buckets when starting the first hash */
290 int nbuckets_optimal; /* optimal # buckets (per batch) */
291 int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
292
293 /* buckets[i] is head of list of tuples in i'th in-memory bucket */
294 union
295 {
296 /* unshared array is per-batch storage, as are all the tuples */
297 struct HashJoinTupleData **unshared;
298 /* shared array is per-query DSA area, as are all the tuples */
299 dsa_pointer_atomic *shared;
300 } buckets;
301
302 bool keepNulls; /* true to store unmatchable NULL tuples */
303
304 bool skewEnabled; /* are we using skew optimization? */
305 HashSkewBucket **skewBucket; /* hashtable of skew buckets */
306 int skewBucketLen; /* size of skewBucket array (a power of 2!) */
307 int nSkewBuckets; /* number of active skew buckets */
308 int *skewBucketNums; /* array indexes of active skew buckets */
309
310 int nbatch; /* number of batches */
311 int curbatch; /* current batch #; 0 during 1st pass */
312
313 int nbatch_original; /* nbatch when we started inner scan */
314 int nbatch_outstart; /* nbatch when we started outer scan */
315
316 bool growEnabled; /* flag to shut off nbatch increases */
317
318 double totalTuples; /* # tuples obtained from inner plan */
319 double partialTuples; /* # tuples obtained from inner plan by me */
320 double skewTuples; /* # tuples inserted into skew tuples */
321
322 /*
323 * These arrays are allocated for the life of the hash join, but only if
324 * nbatch > 1. A file is opened only when we first write a tuple into it
325 * (otherwise its pointer remains NULL). Note that the zero'th array
326 * elements never get used, since we will process rather than dump out any
327 * tuples of batch zero.
328 */
329 BufFile **innerBatchFile; /* buffered virtual temp file per batch */
330 BufFile **outerBatchFile; /* buffered virtual temp file per batch */
331
332 /*
333 * Info about the datatype-specific hash functions for the datatypes being
334 * hashed. These are arrays of the same length as the number of hash join
335 * clauses (hash keys).
336 */
337 FmgrInfo *outer_hashfunctions; /* lookup data for hash functions */
338 FmgrInfo *inner_hashfunctions; /* lookup data for hash functions */
339 bool *hashStrict; /* is each hash join operator strict? */
340 Oid *collations;
341
342 Size spaceUsed; /* memory space currently used by tuples */
343 Size spaceAllowed; /* upper limit for space used */
344 Size spacePeak; /* peak space used */
345 Size spaceUsedSkew; /* skew hash table's current space usage */
346 Size spaceAllowedSkew; /* upper limit for skew hashtable */
347
348 MemoryContext hashCxt; /* context for whole-hash-join storage */
349 MemoryContext batchCxt; /* context for this-batch-only storage */
350
351 /* used for dense allocation of tuples (into linked chunks) */
352 HashMemoryChunk chunks; /* one list for the whole batch */
353
354 /* Shared and private state for Parallel Hash. */
355 HashMemoryChunk current_chunk; /* this backend's current chunk */
356 dsa_area *area; /* DSA area to allocate memory from */
357 ParallelHashJoinState *parallel_state;
358 ParallelHashJoinBatchAccessor *batches;
359 dsa_pointer current_chunk_shared;
360} HashJoinTableData;
361
362#endif /* HASHJOIN_H */
363