| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * hashjoin.h |
| 4 | * internal structures for hash joins |
| 5 | * |
| 6 | * |
| 7 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 8 | * Portions Copyright (c) 1994, Regents of the University of California |
| 9 | * |
| 10 | * src/include/executor/hashjoin.h |
| 11 | * |
| 12 | *------------------------------------------------------------------------- |
| 13 | */ |
| 14 | #ifndef HASHJOIN_H |
| 15 | #define HASHJOIN_H |
| 16 | |
| 17 | #include "nodes/execnodes.h" |
| 18 | #include "port/atomics.h" |
| 19 | #include "storage/barrier.h" |
| 20 | #include "storage/buffile.h" |
| 21 | #include "storage/lwlock.h" |
| 22 | |
| 23 | /* ---------------------------------------------------------------- |
| 24 | * hash-join hash table structures |
| 25 | * |
| 26 | * Each active hashjoin has a HashJoinTable control block, which is |
| 27 | * palloc'd in the executor's per-query context. All other storage needed |
| 28 | * for the hashjoin is kept in private memory contexts, two for each hashjoin. |
| 29 | * This makes it easy and fast to release the storage when we don't need it |
| 30 | * anymore. (Exception: data associated with the temp files lives in the |
| 31 | * per-query context too, since we always call buffile.c in that context.) |
| 32 | * |
| 33 | * The hashtable contexts are made children of the per-query context, ensuring |
| 34 | * that they will be discarded at end of statement even if the join is |
| 35 | * aborted early by an error. (Likewise, any temporary files we make will |
| 36 | * be cleaned up by the virtual file manager in event of an error.) |
| 37 | * |
| 38 | * Storage that should live through the entire join is allocated from the |
| 39 | * "hashCxt", while storage that is only wanted for the current batch is |
| 40 | * allocated in the "batchCxt". By resetting the batchCxt at the end of |
| 41 | * each batch, we free all the per-batch storage reliably and without tedium. |
| 42 | * |
| 43 | * During first scan of inner relation, we get its tuples from executor. |
| 44 | * If nbatch > 1 then tuples that don't belong in first batch get saved |
| 45 | * into inner-batch temp files. The same statements apply for the |
| 46 | * first scan of the outer relation, except we write tuples to outer-batch |
| 47 | * temp files. After finishing the first scan, we do the following for |
| 48 | * each remaining batch: |
| 49 | * 1. Read tuples from inner batch file, load into hash buckets. |
| 50 | * 2. Read tuples from outer batch file, match to hash buckets and output. |
| 51 | * |
| 52 | * It is possible to increase nbatch on the fly if the in-memory hash table |
| 53 | * gets too big. The hash-value-to-batch computation is arranged so that this |
| 54 | * can only cause a tuple to go into a later batch than previously thought, |
| 55 | * never into an earlier batch. When we increase nbatch, we rescan the hash |
| 56 | * table and dump out any tuples that are now of a later batch to the correct |
| 57 | * inner batch file. Subsequently, while reading either inner or outer batch |
| 58 | * files, we might find tuples that no longer belong to the current batch; |
| 59 | * if so, we just dump them out to the correct batch file. |
| 60 | * ---------------------------------------------------------------- |
| 61 | */ |
| 62 | |
| 63 | /* these are in nodes/execnodes.h: */ |
| 64 | /* typedef struct HashJoinTupleData *HashJoinTuple; */ |
| 65 | /* typedef struct HashJoinTableData *HashJoinTable; */ |
| 66 | |
| 67 | typedef struct HashJoinTupleData |
| 68 | { |
| 69 | /* link to next tuple in same bucket */ |
| 70 | union |
| 71 | { |
| 72 | struct HashJoinTupleData *unshared; |
| 73 | dsa_pointer shared; |
| 74 | } next; |
| 75 | uint32 hashvalue; /* tuple's hash code */ |
| 76 | /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ |
| 77 | } HashJoinTupleData; |
| 78 | |
| 79 | #define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData)) |
| 80 | #define HJTUPLE_MINTUPLE(hjtup) \ |
| 81 | ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD)) |
| 82 | |
| 83 | /* |
| 84 | * If the outer relation's distribution is sufficiently nonuniform, we attempt |
| 85 | * to optimize the join by treating the hash values corresponding to the outer |
| 86 | * relation's MCVs specially. Inner relation tuples matching these hash |
| 87 | * values go into the "skew" hashtable instead of the main hashtable, and |
| 88 | * outer relation tuples with these hash values are matched against that |
| 89 | * table instead of the main one. Thus, tuples with these hash values are |
| 90 | * effectively handled as part of the first batch and will never go to disk. |
| 91 | * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory |
| 92 | * allowed for the join; while building the hashtables, we decrease the number |
| 93 | * of MCVs being specially treated if needed to stay under this limit. |
| 94 | * |
| 95 | * Note: you might wonder why we look at the outer relation stats for this, |
| 96 | * rather than the inner. One reason is that the outer relation is typically |
| 97 | * bigger, so we get more I/O savings by optimizing for its most common values. |
| 98 | * Also, for similarly-sized relations, the planner prefers to put the more |
| 99 | * uniformly distributed relation on the inside, so we're more likely to find |
| 100 | * interesting skew in the outer relation. |
| 101 | */ |
| 102 | typedef struct HashSkewBucket |
| 103 | { |
| 104 | uint32 hashvalue; /* common hash value */ |
| 105 | HashJoinTuple tuples; /* linked list of inner-relation tuples */ |
| 106 | } HashSkewBucket; |
| 107 | |
| 108 | #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket)) |
| 109 | #define INVALID_SKEW_BUCKET_NO (-1) |
| 110 | #define SKEW_WORK_MEM_PERCENT 2 |
| 111 | #define SKEW_MIN_OUTER_FRACTION 0.01 |
| 112 | |
| 113 | /* |
| 114 | * To reduce palloc overhead, the HashJoinTuples for the current batch are |
| 115 | * packed in 32kB buffers instead of pallocing each tuple individually. |
| 116 | */ |
| 117 | typedef struct HashMemoryChunkData |
| 118 | { |
| 119 | int ntuples; /* number of tuples stored in this chunk */ |
| 120 | size_t maxlen; /* size of the chunk's tuple buffer */ |
| 121 | size_t used; /* number of buffer bytes already used */ |
| 122 | |
| 123 | /* pointer to the next chunk (linked list) */ |
| 124 | union |
| 125 | { |
| 126 | struct HashMemoryChunkData *unshared; |
| 127 | dsa_pointer shared; |
| 128 | } next; |
| 129 | |
| 130 | /* |
| 131 | * The chunk's tuple buffer starts after the HashMemoryChunkData struct, |
| 132 | * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that |
| 133 | * that offset is not included in "maxlen" or "used". |
| 134 | */ |
| 135 | } HashMemoryChunkData; |
| 136 | |
| 137 | typedef struct HashMemoryChunkData *HashMemoryChunk; |
| 138 | |
| 139 | #define HASH_CHUNK_SIZE (32 * 1024L) |
| 140 | #define MAXALIGN(sizeof(HashMemoryChunkData)) |
| 141 | #define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE) |
| 142 | /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */ |
| 143 | #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) |
| 144 | |
| 145 | /* |
| 146 | * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch |
| 147 | * object in shared memory to coordinate access to it. Since they are |
| 148 | * followed by variable-sized objects, they are arranged in contiguous memory |
| 149 | * but not accessed directly as an array. |
| 150 | */ |
| 151 | typedef struct ParallelHashJoinBatch |
| 152 | { |
| 153 | dsa_pointer buckets; /* array of hash table buckets */ |
| 154 | Barrier batch_barrier; /* synchronization for joining this batch */ |
| 155 | |
| 156 | dsa_pointer chunks; /* chunks of tuples loaded */ |
| 157 | size_t size; /* size of buckets + chunks in memory */ |
| 158 | size_t estimated_size; /* size of buckets + chunks while writing */ |
| 159 | size_t ntuples; /* number of tuples loaded */ |
| 160 | size_t old_ntuples; /* number of tuples before repartitioning */ |
| 161 | bool space_exhausted; |
| 162 | |
| 163 | /* |
| 164 | * Variable-sized SharedTuplestore objects follow this struct in memory. |
| 165 | * See the accessor macros below. |
| 166 | */ |
| 167 | } ParallelHashJoinBatch; |
| 168 | |
| 169 | /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */ |
| 170 | #define ParallelHashJoinBatchInner(batch) \ |
| 171 | ((SharedTuplestore *) \ |
| 172 | ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch)))) |
| 173 | |
| 174 | /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */ |
| 175 | #define ParallelHashJoinBatchOuter(batch, nparticipants) \ |
| 176 | ((SharedTuplestore *) \ |
| 177 | ((char *) ParallelHashJoinBatchInner(batch) + \ |
| 178 | MAXALIGN(sts_estimate(nparticipants)))) |
| 179 | |
| 180 | /* Total size of a ParallelHashJoinBatch and tuplestores. */ |
| 181 | #define EstimateParallelHashJoinBatch(hashtable) \ |
| 182 | (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \ |
| 183 | MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2) |
| 184 | |
| 185 | /* Accessor for the nth ParallelHashJoinBatch given the base. */ |
| 186 | #define NthParallelHashJoinBatch(base, n) \ |
| 187 | ((ParallelHashJoinBatch *) \ |
| 188 | ((char *) (base) + \ |
| 189 | EstimateParallelHashJoinBatch(hashtable) * (n))) |
| 190 | |
| 191 | /* |
| 192 | * Each backend requires a small amount of per-batch state to interact with |
| 193 | * each ParallelHashJoinBatch. |
| 194 | */ |
| 195 | typedef struct ParallelHashJoinBatchAccessor |
| 196 | { |
| 197 | ParallelHashJoinBatch *shared; /* pointer to shared state */ |
| 198 | |
| 199 | /* Per-backend partial counters to reduce contention. */ |
| 200 | size_t preallocated; /* pre-allocated space for this backend */ |
| 201 | size_t ntuples; /* number of tuples */ |
| 202 | size_t size; /* size of partition in memory */ |
| 203 | size_t estimated_size; /* size of partition on disk */ |
| 204 | size_t old_ntuples; /* how many tuples before repartitioning? */ |
| 205 | bool at_least_one_chunk; /* has this backend allocated a chunk? */ |
| 206 | |
| 207 | bool done; /* flag to remember that a batch is done */ |
| 208 | SharedTuplestoreAccessor *inner_tuples; |
| 209 | SharedTuplestoreAccessor *outer_tuples; |
| 210 | } ParallelHashJoinBatchAccessor; |
| 211 | |
| 212 | /* |
| 213 | * While hashing the inner relation, any participant might determine that it's |
| 214 | * time to increase the number of buckets to reduce the load factor or batches |
| 215 | * to reduce the memory size. This is indicated by setting the growth flag to |
| 216 | * these values. |
| 217 | */ |
| 218 | typedef enum ParallelHashGrowth |
| 219 | { |
| 220 | /* The current dimensions are sufficient. */ |
| 221 | PHJ_GROWTH_OK, |
| 222 | /* The load factor is too high, so we need to add buckets. */ |
| 223 | PHJ_GROWTH_NEED_MORE_BUCKETS, |
| 224 | /* The memory budget would be exhausted, so we need to repartition. */ |
| 225 | PHJ_GROWTH_NEED_MORE_BATCHES, |
| 226 | /* Repartitioning didn't help last time, so don't try to do that again. */ |
| 227 | PHJ_GROWTH_DISABLED |
| 228 | } ParallelHashGrowth; |
| 229 | |
| 230 | /* |
| 231 | * The shared state used to coordinate a Parallel Hash Join. This is stored |
| 232 | * in the DSM segment. |
| 233 | */ |
| 234 | typedef struct ParallelHashJoinState |
| 235 | { |
| 236 | dsa_pointer batches; /* array of ParallelHashJoinBatch */ |
| 237 | dsa_pointer old_batches; /* previous generation during repartition */ |
| 238 | int nbatch; /* number of batches now */ |
| 239 | int old_nbatch; /* previous number of batches */ |
| 240 | int nbuckets; /* number of buckets */ |
| 241 | ParallelHashGrowth growth; /* control batch/bucket growth */ |
| 242 | dsa_pointer chunk_work_queue; /* chunk work queue */ |
| 243 | int nparticipants; |
| 244 | size_t space_allowed; |
| 245 | size_t total_tuples; /* total number of inner tuples */ |
| 246 | LWLock lock; /* lock protecting the above */ |
| 247 | |
| 248 | Barrier build_barrier; /* synchronization for the build phases */ |
| 249 | Barrier grow_batches_barrier; |
| 250 | Barrier grow_buckets_barrier; |
| 251 | pg_atomic_uint32 distributor; /* counter for load balancing */ |
| 252 | |
| 253 | SharedFileSet fileset; /* space for shared temporary files */ |
| 254 | } ParallelHashJoinState; |
| 255 | |
| 256 | /* The phases for building batches, used by build_barrier. */ |
| 257 | #define PHJ_BUILD_ELECTING 0 |
| 258 | #define PHJ_BUILD_ALLOCATING 1 |
| 259 | #define PHJ_BUILD_HASHING_INNER 2 |
| 260 | #define PHJ_BUILD_HASHING_OUTER 3 |
| 261 | #define PHJ_BUILD_DONE 4 |
| 262 | |
| 263 | /* The phases for probing each batch, used by for batch_barrier. */ |
| 264 | #define PHJ_BATCH_ELECTING 0 |
| 265 | #define PHJ_BATCH_ALLOCATING 1 |
| 266 | #define PHJ_BATCH_LOADING 2 |
| 267 | #define PHJ_BATCH_PROBING 3 |
| 268 | #define PHJ_BATCH_DONE 4 |
| 269 | |
| 270 | /* The phases of batch growth while hashing, for grow_batches_barrier. */ |
| 271 | #define PHJ_GROW_BATCHES_ELECTING 0 |
| 272 | #define PHJ_GROW_BATCHES_ALLOCATING 1 |
| 273 | #define PHJ_GROW_BATCHES_REPARTITIONING 2 |
| 274 | #define PHJ_GROW_BATCHES_DECIDING 3 |
| 275 | #define PHJ_GROW_BATCHES_FINISHING 4 |
| 276 | #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */ |
| 277 | |
| 278 | /* The phases of bucket growth while hashing, for grow_buckets_barrier. */ |
| 279 | #define PHJ_GROW_BUCKETS_ELECTING 0 |
| 280 | #define PHJ_GROW_BUCKETS_ALLOCATING 1 |
| 281 | #define PHJ_GROW_BUCKETS_REINSERTING 2 |
| 282 | #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ |
| 283 | |
| 284 | typedef struct HashJoinTableData |
| 285 | { |
| 286 | int nbuckets; /* # buckets in the in-memory hash table */ |
| 287 | int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ |
| 288 | |
| 289 | int nbuckets_original; /* # buckets when starting the first hash */ |
| 290 | int nbuckets_optimal; /* optimal # buckets (per batch) */ |
| 291 | int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ |
| 292 | |
| 293 | /* buckets[i] is head of list of tuples in i'th in-memory bucket */ |
| 294 | union |
| 295 | { |
| 296 | /* unshared array is per-batch storage, as are all the tuples */ |
| 297 | struct HashJoinTupleData **unshared; |
| 298 | /* shared array is per-query DSA area, as are all the tuples */ |
| 299 | dsa_pointer_atomic *shared; |
| 300 | } buckets; |
| 301 | |
| 302 | bool keepNulls; /* true to store unmatchable NULL tuples */ |
| 303 | |
| 304 | bool skewEnabled; /* are we using skew optimization? */ |
| 305 | HashSkewBucket **skewBucket; /* hashtable of skew buckets */ |
| 306 | int skewBucketLen; /* size of skewBucket array (a power of 2!) */ |
| 307 | int nSkewBuckets; /* number of active skew buckets */ |
| 308 | int *skewBucketNums; /* array indexes of active skew buckets */ |
| 309 | |
| 310 | int nbatch; /* number of batches */ |
| 311 | int curbatch; /* current batch #; 0 during 1st pass */ |
| 312 | |
| 313 | int nbatch_original; /* nbatch when we started inner scan */ |
| 314 | int nbatch_outstart; /* nbatch when we started outer scan */ |
| 315 | |
| 316 | bool growEnabled; /* flag to shut off nbatch increases */ |
| 317 | |
| 318 | double totalTuples; /* # tuples obtained from inner plan */ |
| 319 | double partialTuples; /* # tuples obtained from inner plan by me */ |
| 320 | double skewTuples; /* # tuples inserted into skew tuples */ |
| 321 | |
| 322 | /* |
| 323 | * These arrays are allocated for the life of the hash join, but only if |
| 324 | * nbatch > 1. A file is opened only when we first write a tuple into it |
| 325 | * (otherwise its pointer remains NULL). Note that the zero'th array |
| 326 | * elements never get used, since we will process rather than dump out any |
| 327 | * tuples of batch zero. |
| 328 | */ |
| 329 | BufFile **innerBatchFile; /* buffered virtual temp file per batch */ |
| 330 | BufFile **outerBatchFile; /* buffered virtual temp file per batch */ |
| 331 | |
| 332 | /* |
| 333 | * Info about the datatype-specific hash functions for the datatypes being |
| 334 | * hashed. These are arrays of the same length as the number of hash join |
| 335 | * clauses (hash keys). |
| 336 | */ |
| 337 | FmgrInfo *outer_hashfunctions; /* lookup data for hash functions */ |
| 338 | FmgrInfo *inner_hashfunctions; /* lookup data for hash functions */ |
| 339 | bool *hashStrict; /* is each hash join operator strict? */ |
| 340 | Oid *collations; |
| 341 | |
| 342 | Size spaceUsed; /* memory space currently used by tuples */ |
| 343 | Size spaceAllowed; /* upper limit for space used */ |
| 344 | Size spacePeak; /* peak space used */ |
| 345 | Size spaceUsedSkew; /* skew hash table's current space usage */ |
| 346 | Size spaceAllowedSkew; /* upper limit for skew hashtable */ |
| 347 | |
| 348 | MemoryContext hashCxt; /* context for whole-hash-join storage */ |
| 349 | MemoryContext batchCxt; /* context for this-batch-only storage */ |
| 350 | |
| 351 | /* used for dense allocation of tuples (into linked chunks) */ |
| 352 | HashMemoryChunk chunks; /* one list for the whole batch */ |
| 353 | |
| 354 | /* Shared and private state for Parallel Hash. */ |
| 355 | HashMemoryChunk current_chunk; /* this backend's current chunk */ |
| 356 | dsa_area *area; /* DSA area to allocate memory from */ |
| 357 | ParallelHashJoinState *parallel_state; |
| 358 | ParallelHashJoinBatchAccessor *batches; |
| 359 | dsa_pointer current_chunk_shared; |
| 360 | } HashJoinTableData; |
| 361 | |
| 362 | #endif /* HASHJOIN_H */ |
| 363 | |