| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * nodeHash.c |
| 4 | * Routines to hash relations for hashjoin |
| 5 | * |
| 6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 7 | * Portions Copyright (c) 1994, Regents of the University of California |
| 8 | * |
| 9 | * |
| 10 | * IDENTIFICATION |
| 11 | * src/backend/executor/nodeHash.c |
| 12 | * |
| 13 | * See note on parallelism in nodeHashjoin.c. |
| 14 | * |
| 15 | *------------------------------------------------------------------------- |
| 16 | */ |
| 17 | /* |
| 18 | * INTERFACE ROUTINES |
| 19 | * MultiExecHash - generate an in-memory hash table of the relation |
| 20 | * ExecInitHash - initialize node and subnodes |
| 21 | * ExecEndHash - shutdown node and subnodes |
| 22 | */ |
| 23 | |
| 24 | #include "postgres.h" |
| 25 | |
| 26 | #include <math.h> |
| 27 | #include <limits.h> |
| 28 | |
| 29 | #include "access/htup_details.h" |
| 30 | #include "access/parallel.h" |
| 31 | #include "catalog/pg_statistic.h" |
| 32 | #include "commands/tablespace.h" |
| 33 | #include "executor/execdebug.h" |
| 34 | #include "executor/hashjoin.h" |
| 35 | #include "executor/nodeHash.h" |
| 36 | #include "executor/nodeHashjoin.h" |
| 37 | #include "miscadmin.h" |
| 38 | #include "pgstat.h" |
| 39 | #include "port/atomics.h" |
| 40 | #include "utils/dynahash.h" |
| 41 | #include "utils/memutils.h" |
| 42 | #include "utils/lsyscache.h" |
| 43 | #include "utils/syscache.h" |
| 44 | |
| 45 | |
| 46 | static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); |
| 47 | static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); |
| 48 | static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable); |
| 49 | static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable); |
| 50 | static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, |
| 51 | int mcvsToUse); |
| 52 | static void ExecHashSkewTableInsert(HashJoinTable hashtable, |
| 53 | TupleTableSlot *slot, |
| 54 | uint32 hashvalue, |
| 55 | int bucketNumber); |
| 56 | static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); |
| 57 | |
| 58 | static void *dense_alloc(HashJoinTable hashtable, Size size); |
| 59 | static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, |
| 60 | size_t size, |
| 61 | dsa_pointer *shared); |
| 62 | static void MultiExecPrivateHash(HashState *node); |
| 63 | static void MultiExecParallelHash(HashState *node); |
| 64 | static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, |
| 65 | int bucketno); |
| 66 | static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, |
| 67 | HashJoinTuple tuple); |
| 68 | static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, |
| 69 | HashJoinTuple tuple, |
| 70 | dsa_pointer tuple_shared); |
| 71 | static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); |
| 72 | static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); |
| 73 | static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); |
| 74 | static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); |
| 75 | static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, |
| 76 | dsa_pointer *shared); |
| 77 | static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, |
| 78 | int batchno, |
| 79 | size_t size); |
| 80 | static void ExecParallelHashMergeCounters(HashJoinTable hashtable); |
| 81 | static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); |
| 82 | |
| 83 | |
| 84 | /* ---------------------------------------------------------------- |
| 85 | * ExecHash |
| 86 | * |
| 87 | * stub for pro forma compliance |
| 88 | * ---------------------------------------------------------------- |
| 89 | */ |
| 90 | static TupleTableSlot * |
| 91 | ExecHash(PlanState *pstate) |
| 92 | { |
| 93 | elog(ERROR, "Hash node does not support ExecProcNode call convention" ); |
| 94 | return NULL; |
| 95 | } |
| 96 | |
| 97 | /* ---------------------------------------------------------------- |
| 98 | * MultiExecHash |
| 99 | * |
| 100 | * build hash table for hashjoin, doing partitioning if more |
| 101 | * than one batch is required. |
| 102 | * ---------------------------------------------------------------- |
| 103 | */ |
| 104 | Node * |
| 105 | MultiExecHash(HashState *node) |
| 106 | { |
| 107 | /* must provide our own instrumentation support */ |
| 108 | if (node->ps.instrument) |
| 109 | InstrStartNode(node->ps.instrument); |
| 110 | |
| 111 | if (node->parallel_state != NULL) |
| 112 | MultiExecParallelHash(node); |
| 113 | else |
| 114 | MultiExecPrivateHash(node); |
| 115 | |
| 116 | /* must provide our own instrumentation support */ |
| 117 | if (node->ps.instrument) |
| 118 | InstrStopNode(node->ps.instrument, node->hashtable->partialTuples); |
| 119 | |
| 120 | /* |
| 121 | * We do not return the hash table directly because it's not a subtype of |
| 122 | * Node, and so would violate the MultiExecProcNode API. Instead, our |
| 123 | * parent Hashjoin node is expected to know how to fish it out of our node |
| 124 | * state. Ugly but not really worth cleaning up, since Hashjoin knows |
| 125 | * quite a bit more about Hash besides that. |
| 126 | */ |
| 127 | return NULL; |
| 128 | } |
| 129 | |
| 130 | /* ---------------------------------------------------------------- |
| 131 | * MultiExecPrivateHash |
| 132 | * |
| 133 | * parallel-oblivious version, building a backend-private |
| 134 | * hash table and (if necessary) batch files. |
| 135 | * ---------------------------------------------------------------- |
| 136 | */ |
| 137 | static void |
| 138 | MultiExecPrivateHash(HashState *node) |
| 139 | { |
| 140 | PlanState *outerNode; |
| 141 | List *hashkeys; |
| 142 | HashJoinTable hashtable; |
| 143 | TupleTableSlot *slot; |
| 144 | ExprContext *econtext; |
| 145 | uint32 hashvalue; |
| 146 | |
| 147 | /* |
| 148 | * get state info from node |
| 149 | */ |
| 150 | outerNode = outerPlanState(node); |
| 151 | hashtable = node->hashtable; |
| 152 | |
| 153 | /* |
| 154 | * set expression context |
| 155 | */ |
| 156 | hashkeys = node->hashkeys; |
| 157 | econtext = node->ps.ps_ExprContext; |
| 158 | |
| 159 | /* |
| 160 | * Get all tuples from the node below the Hash node and insert into the |
| 161 | * hash table (or temp files). |
| 162 | */ |
| 163 | for (;;) |
| 164 | { |
| 165 | slot = ExecProcNode(outerNode); |
| 166 | if (TupIsNull(slot)) |
| 167 | break; |
| 168 | /* We have to compute the hash value */ |
| 169 | econtext->ecxt_outertuple = slot; |
| 170 | if (ExecHashGetHashValue(hashtable, econtext, hashkeys, |
| 171 | false, hashtable->keepNulls, |
| 172 | &hashvalue)) |
| 173 | { |
| 174 | int bucketNumber; |
| 175 | |
| 176 | bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); |
| 177 | if (bucketNumber != INVALID_SKEW_BUCKET_NO) |
| 178 | { |
| 179 | /* It's a skew tuple, so put it into that hash table */ |
| 180 | ExecHashSkewTableInsert(hashtable, slot, hashvalue, |
| 181 | bucketNumber); |
| 182 | hashtable->skewTuples += 1; |
| 183 | } |
| 184 | else |
| 185 | { |
| 186 | /* Not subject to skew optimization, so insert normally */ |
| 187 | ExecHashTableInsert(hashtable, slot, hashvalue); |
| 188 | } |
| 189 | hashtable->totalTuples += 1; |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ |
| 194 | if (hashtable->nbuckets != hashtable->nbuckets_optimal) |
| 195 | ExecHashIncreaseNumBuckets(hashtable); |
| 196 | |
| 197 | /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ |
| 198 | hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); |
| 199 | if (hashtable->spaceUsed > hashtable->spacePeak) |
| 200 | hashtable->spacePeak = hashtable->spaceUsed; |
| 201 | |
| 202 | hashtable->partialTuples = hashtable->totalTuples; |
| 203 | } |
| 204 | |
| 205 | /* ---------------------------------------------------------------- |
| 206 | * MultiExecParallelHash |
| 207 | * |
| 208 | * parallel-aware version, building a shared hash table and |
| 209 | * (if necessary) batch files using the combined effort of |
| 210 | * a set of co-operating backends. |
| 211 | * ---------------------------------------------------------------- |
| 212 | */ |
| 213 | static void |
| 214 | MultiExecParallelHash(HashState *node) |
| 215 | { |
| 216 | ParallelHashJoinState *pstate; |
| 217 | PlanState *outerNode; |
| 218 | List *hashkeys; |
| 219 | HashJoinTable hashtable; |
| 220 | TupleTableSlot *slot; |
| 221 | ExprContext *econtext; |
| 222 | uint32 hashvalue; |
| 223 | Barrier *build_barrier; |
| 224 | int i; |
| 225 | |
| 226 | /* |
| 227 | * get state info from node |
| 228 | */ |
| 229 | outerNode = outerPlanState(node); |
| 230 | hashtable = node->hashtable; |
| 231 | |
| 232 | /* |
| 233 | * set expression context |
| 234 | */ |
| 235 | hashkeys = node->hashkeys; |
| 236 | econtext = node->ps.ps_ExprContext; |
| 237 | |
| 238 | /* |
| 239 | * Synchronize the parallel hash table build. At this stage we know that |
| 240 | * the shared hash table has been or is being set up by |
| 241 | * ExecHashTableCreate(), but we don't know if our peers have returned |
| 242 | * from there or are here in MultiExecParallelHash(), and if so how far |
| 243 | * through they are. To find out, we check the build_barrier phase then |
| 244 | * and jump to the right step in the build algorithm. |
| 245 | */ |
| 246 | pstate = hashtable->parallel_state; |
| 247 | build_barrier = &pstate->build_barrier; |
| 248 | Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING); |
| 249 | switch (BarrierPhase(build_barrier)) |
| 250 | { |
| 251 | case PHJ_BUILD_ALLOCATING: |
| 252 | |
| 253 | /* |
| 254 | * Either I just allocated the initial hash table in |
| 255 | * ExecHashTableCreate(), or someone else is doing that. Either |
| 256 | * way, wait for everyone to arrive here so we can proceed. |
| 257 | */ |
| 258 | BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING); |
| 259 | /* Fall through. */ |
| 260 | |
| 261 | case PHJ_BUILD_HASHING_INNER: |
| 262 | |
| 263 | /* |
| 264 | * It's time to begin hashing, or if we just arrived here then |
| 265 | * hashing is already underway, so join in that effort. While |
| 266 | * hashing we have to be prepared to help increase the number of |
| 267 | * batches or buckets at any time, and if we arrived here when |
| 268 | * that was already underway we'll have to help complete that work |
| 269 | * immediately so that it's safe to access batches and buckets |
| 270 | * below. |
| 271 | */ |
| 272 | if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != |
| 273 | PHJ_GROW_BATCHES_ELECTING) |
| 274 | ExecParallelHashIncreaseNumBatches(hashtable); |
| 275 | if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != |
| 276 | PHJ_GROW_BUCKETS_ELECTING) |
| 277 | ExecParallelHashIncreaseNumBuckets(hashtable); |
| 278 | ExecParallelHashEnsureBatchAccessors(hashtable); |
| 279 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| 280 | for (;;) |
| 281 | { |
| 282 | slot = ExecProcNode(outerNode); |
| 283 | if (TupIsNull(slot)) |
| 284 | break; |
| 285 | econtext->ecxt_outertuple = slot; |
| 286 | if (ExecHashGetHashValue(hashtable, econtext, hashkeys, |
| 287 | false, hashtable->keepNulls, |
| 288 | &hashvalue)) |
| 289 | ExecParallelHashTableInsert(hashtable, slot, hashvalue); |
| 290 | hashtable->partialTuples++; |
| 291 | } |
| 292 | |
| 293 | /* |
| 294 | * Make sure that any tuples we wrote to disk are visible to |
| 295 | * others before anyone tries to load them. |
| 296 | */ |
| 297 | for (i = 0; i < hashtable->nbatch; ++i) |
| 298 | sts_end_write(hashtable->batches[i].inner_tuples); |
| 299 | |
| 300 | /* |
| 301 | * Update shared counters. We need an accurate total tuple count |
| 302 | * to control the empty table optimization. |
| 303 | */ |
| 304 | ExecParallelHashMergeCounters(hashtable); |
| 305 | |
| 306 | BarrierDetach(&pstate->grow_buckets_barrier); |
| 307 | BarrierDetach(&pstate->grow_batches_barrier); |
| 308 | |
| 309 | /* |
| 310 | * Wait for everyone to finish building and flushing files and |
| 311 | * counters. |
| 312 | */ |
| 313 | if (BarrierArriveAndWait(build_barrier, |
| 314 | WAIT_EVENT_HASH_BUILD_HASHING_INNER)) |
| 315 | { |
| 316 | /* |
| 317 | * Elect one backend to disable any further growth. Batches |
| 318 | * are now fixed. While building them we made sure they'd fit |
| 319 | * in our memory budget when we load them back in later (or we |
| 320 | * tried to do that and gave up because we detected extreme |
| 321 | * skew). |
| 322 | */ |
| 323 | pstate->growth = PHJ_GROWTH_DISABLED; |
| 324 | } |
| 325 | } |
| 326 | |
| 327 | /* |
| 328 | * We're not yet attached to a batch. We all agree on the dimensions and |
| 329 | * number of inner tuples (for the empty table optimization). |
| 330 | */ |
| 331 | hashtable->curbatch = -1; |
| 332 | hashtable->nbuckets = pstate->nbuckets; |
| 333 | hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); |
| 334 | hashtable->totalTuples = pstate->total_tuples; |
| 335 | ExecParallelHashEnsureBatchAccessors(hashtable); |
| 336 | |
| 337 | /* |
| 338 | * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE |
| 339 | * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't |
| 340 | * there already). |
| 341 | */ |
| 342 | Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || |
| 343 | BarrierPhase(build_barrier) == PHJ_BUILD_DONE); |
| 344 | } |
| 345 | |
| 346 | /* ---------------------------------------------------------------- |
| 347 | * ExecInitHash |
| 348 | * |
| 349 | * Init routine for Hash node |
| 350 | * ---------------------------------------------------------------- |
| 351 | */ |
| 352 | HashState * |
| 353 | ExecInitHash(Hash *node, EState *estate, int eflags) |
| 354 | { |
| 355 | HashState *hashstate; |
| 356 | |
| 357 | /* check for unsupported flags */ |
| 358 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
| 359 | |
| 360 | /* |
| 361 | * create state structure |
| 362 | */ |
| 363 | hashstate = makeNode(HashState); |
| 364 | hashstate->ps.plan = (Plan *) node; |
| 365 | hashstate->ps.state = estate; |
| 366 | hashstate->ps.ExecProcNode = ExecHash; |
| 367 | hashstate->hashtable = NULL; |
| 368 | hashstate->hashkeys = NIL; /* will be set by parent HashJoin */ |
| 369 | |
| 370 | /* |
| 371 | * Miscellaneous initialization |
| 372 | * |
| 373 | * create expression context for node |
| 374 | */ |
| 375 | ExecAssignExprContext(estate, &hashstate->ps); |
| 376 | |
| 377 | /* |
| 378 | * initialize child nodes |
| 379 | */ |
| 380 | outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags); |
| 381 | |
| 382 | /* |
| 383 | * initialize our result slot and type. No need to build projection |
| 384 | * because this node doesn't do projections. |
| 385 | */ |
| 386 | ExecInitResultTupleSlotTL(&hashstate->ps, &TTSOpsMinimalTuple); |
| 387 | hashstate->ps.ps_ProjInfo = NULL; |
| 388 | |
| 389 | /* |
| 390 | * initialize child expressions |
| 391 | */ |
| 392 | Assert(node->plan.qual == NIL); |
| 393 | hashstate->hashkeys = |
| 394 | ExecInitExprList(node->hashkeys, (PlanState *) hashstate); |
| 395 | |
| 396 | return hashstate; |
| 397 | } |
| 398 | |
| 399 | /* --------------------------------------------------------------- |
| 400 | * ExecEndHash |
| 401 | * |
| 402 | * clean up routine for Hash node |
| 403 | * ---------------------------------------------------------------- |
| 404 | */ |
| 405 | void |
| 406 | ExecEndHash(HashState *node) |
| 407 | { |
| 408 | PlanState *outerPlan; |
| 409 | |
| 410 | /* |
| 411 | * free exprcontext |
| 412 | */ |
| 413 | ExecFreeExprContext(&node->ps); |
| 414 | |
| 415 | /* |
| 416 | * shut down the subplan |
| 417 | */ |
| 418 | outerPlan = outerPlanState(node); |
| 419 | ExecEndNode(outerPlan); |
| 420 | } |
| 421 | |
| 422 | |
| 423 | /* ---------------------------------------------------------------- |
| 424 | * ExecHashTableCreate |
| 425 | * |
| 426 | * create an empty hashtable data structure for hashjoin. |
| 427 | * ---------------------------------------------------------------- |
| 428 | */ |
| 429 | HashJoinTable |
| 430 | ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls) |
| 431 | { |
| 432 | Hash *node; |
| 433 | HashJoinTable hashtable; |
| 434 | Plan *outerNode; |
| 435 | size_t space_allowed; |
| 436 | int nbuckets; |
| 437 | int nbatch; |
| 438 | double rows; |
| 439 | int num_skew_mcvs; |
| 440 | int log2_nbuckets; |
| 441 | int nkeys; |
| 442 | int i; |
| 443 | ListCell *ho; |
| 444 | ListCell *hc; |
| 445 | MemoryContext oldcxt; |
| 446 | |
| 447 | /* |
| 448 | * Get information about the size of the relation to be hashed (it's the |
| 449 | * "outer" subtree of this node, but the inner relation of the hashjoin). |
| 450 | * Compute the appropriate size of the hash table. |
| 451 | */ |
| 452 | node = (Hash *) state->ps.plan; |
| 453 | outerNode = outerPlan(node); |
| 454 | |
| 455 | /* |
| 456 | * If this is shared hash table with a partial plan, then we can't use |
| 457 | * outerNode->plan_rows to estimate its size. We need an estimate of the |
| 458 | * total number of rows across all copies of the partial plan. |
| 459 | */ |
| 460 | rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; |
| 461 | |
| 462 | ExecChooseHashTableSize(rows, outerNode->plan_width, |
| 463 | OidIsValid(node->skewTable), |
| 464 | state->parallel_state != NULL, |
| 465 | state->parallel_state != NULL ? |
| 466 | state->parallel_state->nparticipants - 1 : 0, |
| 467 | &space_allowed, |
| 468 | &nbuckets, &nbatch, &num_skew_mcvs); |
| 469 | |
| 470 | /* nbuckets must be a power of 2 */ |
| 471 | log2_nbuckets = my_log2(nbuckets); |
| 472 | Assert(nbuckets == (1 << log2_nbuckets)); |
| 473 | |
| 474 | /* |
| 475 | * Initialize the hash table control block. |
| 476 | * |
| 477 | * The hashtable control block is just palloc'd from the executor's |
| 478 | * per-query memory context. Everything else should be kept inside the |
| 479 | * subsidiary hashCxt or batchCxt. |
| 480 | */ |
| 481 | hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); |
| 482 | hashtable->nbuckets = nbuckets; |
| 483 | hashtable->nbuckets_original = nbuckets; |
| 484 | hashtable->nbuckets_optimal = nbuckets; |
| 485 | hashtable->log2_nbuckets = log2_nbuckets; |
| 486 | hashtable->log2_nbuckets_optimal = log2_nbuckets; |
| 487 | hashtable->buckets.unshared = NULL; |
| 488 | hashtable->keepNulls = keepNulls; |
| 489 | hashtable->skewEnabled = false; |
| 490 | hashtable->skewBucket = NULL; |
| 491 | hashtable->skewBucketLen = 0; |
| 492 | hashtable->nSkewBuckets = 0; |
| 493 | hashtable->skewBucketNums = NULL; |
| 494 | hashtable->nbatch = nbatch; |
| 495 | hashtable->curbatch = 0; |
| 496 | hashtable->nbatch_original = nbatch; |
| 497 | hashtable->nbatch_outstart = nbatch; |
| 498 | hashtable->growEnabled = true; |
| 499 | hashtable->totalTuples = 0; |
| 500 | hashtable->partialTuples = 0; |
| 501 | hashtable->skewTuples = 0; |
| 502 | hashtable->innerBatchFile = NULL; |
| 503 | hashtable->outerBatchFile = NULL; |
| 504 | hashtable->spaceUsed = 0; |
| 505 | hashtable->spacePeak = 0; |
| 506 | hashtable->spaceAllowed = space_allowed; |
| 507 | hashtable->spaceUsedSkew = 0; |
| 508 | hashtable->spaceAllowedSkew = |
| 509 | hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; |
| 510 | hashtable->chunks = NULL; |
| 511 | hashtable->current_chunk = NULL; |
| 512 | hashtable->parallel_state = state->parallel_state; |
| 513 | hashtable->area = state->ps.state->es_query_dsa; |
| 514 | hashtable->batches = NULL; |
| 515 | |
| 516 | #ifdef HJDEBUG |
| 517 | printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n" , |
| 518 | hashtable, nbatch, nbuckets); |
| 519 | #endif |
| 520 | |
| 521 | /* |
| 522 | * Create temporary memory contexts in which to keep the hashtable working |
| 523 | * storage. See notes in executor/hashjoin.h. |
| 524 | */ |
| 525 | hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext, |
| 526 | "HashTableContext" , |
| 527 | ALLOCSET_DEFAULT_SIZES); |
| 528 | |
| 529 | hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt, |
| 530 | "HashBatchContext" , |
| 531 | ALLOCSET_DEFAULT_SIZES); |
| 532 | |
| 533 | /* Allocate data that will live for the life of the hashjoin */ |
| 534 | |
| 535 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
| 536 | |
| 537 | /* |
| 538 | * Get info about the hash functions to be used for each hash key. Also |
| 539 | * remember whether the join operators are strict. |
| 540 | */ |
| 541 | nkeys = list_length(hashOperators); |
| 542 | hashtable->outer_hashfunctions = |
| 543 | (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); |
| 544 | hashtable->inner_hashfunctions = |
| 545 | (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); |
| 546 | hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool)); |
| 547 | hashtable->collations = (Oid *) palloc(nkeys * sizeof(Oid)); |
| 548 | i = 0; |
| 549 | forboth(ho, hashOperators, hc, hashCollations) |
| 550 | { |
| 551 | Oid hashop = lfirst_oid(ho); |
| 552 | Oid left_hashfn; |
| 553 | Oid right_hashfn; |
| 554 | |
| 555 | if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) |
| 556 | elog(ERROR, "could not find hash function for hash operator %u" , |
| 557 | hashop); |
| 558 | fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]); |
| 559 | fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]); |
| 560 | hashtable->hashStrict[i] = op_strict(hashop); |
| 561 | hashtable->collations[i] = lfirst_oid(hc); |
| 562 | i++; |
| 563 | } |
| 564 | |
| 565 | if (nbatch > 1 && hashtable->parallel_state == NULL) |
| 566 | { |
| 567 | /* |
| 568 | * allocate and initialize the file arrays in hashCxt (not needed for |
| 569 | * parallel case which uses shared tuplestores instead of raw files) |
| 570 | */ |
| 571 | hashtable->innerBatchFile = (BufFile **) |
| 572 | palloc0(nbatch * sizeof(BufFile *)); |
| 573 | hashtable->outerBatchFile = (BufFile **) |
| 574 | palloc0(nbatch * sizeof(BufFile *)); |
| 575 | /* The files will not be opened until needed... */ |
| 576 | /* ... but make sure we have temp tablespaces established for them */ |
| 577 | PrepareTempTablespaces(); |
| 578 | } |
| 579 | |
| 580 | MemoryContextSwitchTo(oldcxt); |
| 581 | |
| 582 | if (hashtable->parallel_state) |
| 583 | { |
| 584 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 585 | Barrier *build_barrier; |
| 586 | |
| 587 | /* |
| 588 | * Attach to the build barrier. The corresponding detach operation is |
| 589 | * in ExecHashTableDetach. Note that we won't attach to the |
| 590 | * batch_barrier for batch 0 yet. We'll attach later and start it out |
| 591 | * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front |
| 592 | * and then loaded while hashing (the standard hybrid hash join |
| 593 | * algorithm), and we'll coordinate that using build_barrier. |
| 594 | */ |
| 595 | build_barrier = &pstate->build_barrier; |
| 596 | BarrierAttach(build_barrier); |
| 597 | |
| 598 | /* |
| 599 | * So far we have no idea whether there are any other participants, |
| 600 | * and if so, what phase they are working on. The only thing we care |
| 601 | * about at this point is whether someone has already created the |
| 602 | * SharedHashJoinBatch objects and the hash table for batch 0. One |
| 603 | * backend will be elected to do that now if necessary. |
| 604 | */ |
| 605 | if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING && |
| 606 | BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING)) |
| 607 | { |
| 608 | pstate->nbatch = nbatch; |
| 609 | pstate->space_allowed = space_allowed; |
| 610 | pstate->growth = PHJ_GROWTH_OK; |
| 611 | |
| 612 | /* Set up the shared state for coordinating batches. */ |
| 613 | ExecParallelHashJoinSetUpBatches(hashtable, nbatch); |
| 614 | |
| 615 | /* |
| 616 | * Allocate batch 0's hash table up front so we can load it |
| 617 | * directly while hashing. |
| 618 | */ |
| 619 | pstate->nbuckets = nbuckets; |
| 620 | ExecParallelHashTableAlloc(hashtable, 0); |
| 621 | } |
| 622 | |
| 623 | /* |
| 624 | * The next Parallel Hash synchronization point is in |
| 625 | * MultiExecParallelHash(), which will progress it all the way to |
| 626 | * PHJ_BUILD_DONE. The caller must not return control from this |
| 627 | * executor node between now and then. |
| 628 | */ |
| 629 | } |
| 630 | else |
| 631 | { |
| 632 | /* |
| 633 | * Prepare context for the first-scan space allocations; allocate the |
| 634 | * hashbucket array therein, and set each bucket "empty". |
| 635 | */ |
| 636 | MemoryContextSwitchTo(hashtable->batchCxt); |
| 637 | |
| 638 | hashtable->buckets.unshared = (HashJoinTuple *) |
| 639 | palloc0(nbuckets * sizeof(HashJoinTuple)); |
| 640 | |
| 641 | /* |
| 642 | * Set up for skew optimization, if possible and there's a need for |
| 643 | * more than one batch. (In a one-batch join, there's no point in |
| 644 | * it.) |
| 645 | */ |
| 646 | if (nbatch > 1) |
| 647 | ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); |
| 648 | |
| 649 | MemoryContextSwitchTo(oldcxt); |
| 650 | } |
| 651 | |
| 652 | return hashtable; |
| 653 | } |
| 654 | |
| 655 | |
| 656 | /* |
| 657 | * Compute appropriate size for hashtable given the estimated size of the |
| 658 | * relation to be hashed (number of rows and average row width). |
| 659 | * |
| 660 | * This is exported so that the planner's costsize.c can use it. |
| 661 | */ |
| 662 | |
| 663 | /* Target bucket loading (tuples per bucket) */ |
| 664 | #define NTUP_PER_BUCKET 1 |
| 665 | |
| 666 | void |
| 667 | ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, |
| 668 | bool try_combined_work_mem, |
| 669 | int parallel_workers, |
| 670 | size_t *space_allowed, |
| 671 | int *numbuckets, |
| 672 | int *numbatches, |
| 673 | int *num_skew_mcvs) |
| 674 | { |
| 675 | int tupsize; |
| 676 | double inner_rel_bytes; |
| 677 | long bucket_bytes; |
| 678 | long hash_table_bytes; |
| 679 | long skew_table_bytes; |
| 680 | long max_pointers; |
| 681 | long mppow2; |
| 682 | int nbatch = 1; |
| 683 | int nbuckets; |
| 684 | double dbuckets; |
| 685 | |
| 686 | /* Force a plausible relation size if no info */ |
| 687 | if (ntuples <= 0.0) |
| 688 | ntuples = 1000.0; |
| 689 | |
| 690 | /* |
| 691 | * Estimate tupsize based on footprint of tuple in hashtable... note this |
| 692 | * does not allow for any palloc overhead. The manipulations of spaceUsed |
| 693 | * don't count palloc overhead either. |
| 694 | */ |
| 695 | tupsize = HJTUPLE_OVERHEAD + |
| 696 | MAXALIGN(SizeofMinimalTupleHeader) + |
| 697 | MAXALIGN(tupwidth); |
| 698 | inner_rel_bytes = ntuples * tupsize; |
| 699 | |
| 700 | /* |
| 701 | * Target in-memory hashtable size is work_mem kilobytes. |
| 702 | */ |
| 703 | hash_table_bytes = work_mem * 1024L; |
| 704 | |
| 705 | /* |
| 706 | * Parallel Hash tries to use the combined work_mem of all workers to |
| 707 | * avoid the need to batch. If that won't work, it falls back to work_mem |
| 708 | * per worker and tries to process batches in parallel. |
| 709 | */ |
| 710 | if (try_combined_work_mem) |
| 711 | hash_table_bytes += hash_table_bytes * parallel_workers; |
| 712 | |
| 713 | *space_allowed = hash_table_bytes; |
| 714 | |
| 715 | /* |
| 716 | * If skew optimization is possible, estimate the number of skew buckets |
| 717 | * that will fit in the memory allowed, and decrement the assumed space |
| 718 | * available for the main hash table accordingly. |
| 719 | * |
| 720 | * We make the optimistic assumption that each skew bucket will contain |
| 721 | * one inner-relation tuple. If that turns out to be low, we will recover |
| 722 | * at runtime by reducing the number of skew buckets. |
| 723 | * |
| 724 | * hashtable->skewBucket will have up to 8 times as many HashSkewBucket |
| 725 | * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash |
| 726 | * will round up to the next power of 2 and then multiply by 4 to reduce |
| 727 | * collisions. |
| 728 | */ |
| 729 | if (useskew) |
| 730 | { |
| 731 | skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100; |
| 732 | |
| 733 | /*---------- |
| 734 | * Divisor is: |
| 735 | * size of a hash tuple + |
| 736 | * worst-case size of skewBucket[] per MCV + |
| 737 | * size of skewBucketNums[] entry + |
| 738 | * size of skew bucket struct itself |
| 739 | *---------- |
| 740 | */ |
| 741 | *num_skew_mcvs = skew_table_bytes / (tupsize + |
| 742 | (8 * sizeof(HashSkewBucket *)) + |
| 743 | sizeof(int) + |
| 744 | SKEW_BUCKET_OVERHEAD); |
| 745 | if (*num_skew_mcvs > 0) |
| 746 | hash_table_bytes -= skew_table_bytes; |
| 747 | } |
| 748 | else |
| 749 | *num_skew_mcvs = 0; |
| 750 | |
| 751 | /* |
| 752 | * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when |
| 753 | * memory is filled, assuming a single batch; but limit the value so that |
| 754 | * the pointer arrays we'll try to allocate do not exceed work_mem nor |
| 755 | * MaxAllocSize. |
| 756 | * |
| 757 | * Note that both nbuckets and nbatch must be powers of 2 to make |
| 758 | * ExecHashGetBucketAndBatch fast. |
| 759 | */ |
| 760 | max_pointers = *space_allowed / sizeof(HashJoinTuple); |
| 761 | max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); |
| 762 | /* If max_pointers isn't a power of 2, must round it down to one */ |
| 763 | mppow2 = 1L << my_log2(max_pointers); |
| 764 | if (max_pointers != mppow2) |
| 765 | max_pointers = mppow2 / 2; |
| 766 | |
| 767 | /* Also ensure we avoid integer overflow in nbatch and nbuckets */ |
| 768 | /* (this step is redundant given the current value of MaxAllocSize) */ |
| 769 | max_pointers = Min(max_pointers, INT_MAX / 2); |
| 770 | |
| 771 | dbuckets = ceil(ntuples / NTUP_PER_BUCKET); |
| 772 | dbuckets = Min(dbuckets, max_pointers); |
| 773 | nbuckets = (int) dbuckets; |
| 774 | /* don't let nbuckets be really small, though ... */ |
| 775 | nbuckets = Max(nbuckets, 1024); |
| 776 | /* ... and force it to be a power of 2. */ |
| 777 | nbuckets = 1 << my_log2(nbuckets); |
| 778 | |
| 779 | /* |
| 780 | * If there's not enough space to store the projected number of tuples and |
| 781 | * the required bucket headers, we will need multiple batches. |
| 782 | */ |
| 783 | bucket_bytes = sizeof(HashJoinTuple) * nbuckets; |
| 784 | if (inner_rel_bytes + bucket_bytes > hash_table_bytes) |
| 785 | { |
| 786 | /* We'll need multiple batches */ |
| 787 | long lbuckets; |
| 788 | double dbatch; |
| 789 | int minbatch; |
| 790 | long bucket_size; |
| 791 | |
| 792 | /* |
| 793 | * If Parallel Hash with combined work_mem would still need multiple |
| 794 | * batches, we'll have to fall back to regular work_mem budget. |
| 795 | */ |
| 796 | if (try_combined_work_mem) |
| 797 | { |
| 798 | ExecChooseHashTableSize(ntuples, tupwidth, useskew, |
| 799 | false, parallel_workers, |
| 800 | space_allowed, |
| 801 | numbuckets, |
| 802 | numbatches, |
| 803 | num_skew_mcvs); |
| 804 | return; |
| 805 | } |
| 806 | |
| 807 | /* |
| 808 | * Estimate the number of buckets we'll want to have when work_mem is |
| 809 | * entirely full. Each bucket will contain a bucket pointer plus |
| 810 | * NTUP_PER_BUCKET tuples, whose projected size already includes |
| 811 | * overhead for the hash code, pointer to the next tuple, etc. |
| 812 | */ |
| 813 | bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)); |
| 814 | lbuckets = 1L << my_log2(hash_table_bytes / bucket_size); |
| 815 | lbuckets = Min(lbuckets, max_pointers); |
| 816 | nbuckets = (int) lbuckets; |
| 817 | nbuckets = 1 << my_log2(nbuckets); |
| 818 | bucket_bytes = nbuckets * sizeof(HashJoinTuple); |
| 819 | |
| 820 | /* |
| 821 | * Buckets are simple pointers to hashjoin tuples, while tupsize |
| 822 | * includes the pointer, hash code, and MinimalTupleData. So buckets |
| 823 | * should never really exceed 25% of work_mem (even for |
| 824 | * NTUP_PER_BUCKET=1); except maybe for work_mem values that are not |
| 825 | * 2^N bytes, where we might get more because of doubling. So let's |
| 826 | * look for 50% here. |
| 827 | */ |
| 828 | Assert(bucket_bytes <= hash_table_bytes / 2); |
| 829 | |
| 830 | /* Calculate required number of batches. */ |
| 831 | dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes)); |
| 832 | dbatch = Min(dbatch, max_pointers); |
| 833 | minbatch = (int) dbatch; |
| 834 | nbatch = 2; |
| 835 | while (nbatch < minbatch) |
| 836 | nbatch <<= 1; |
| 837 | } |
| 838 | |
| 839 | Assert(nbuckets > 0); |
| 840 | Assert(nbatch > 0); |
| 841 | |
| 842 | *numbuckets = nbuckets; |
| 843 | *numbatches = nbatch; |
| 844 | } |
| 845 | |
| 846 | |
| 847 | /* ---------------------------------------------------------------- |
| 848 | * ExecHashTableDestroy |
| 849 | * |
| 850 | * destroy a hash table |
| 851 | * ---------------------------------------------------------------- |
| 852 | */ |
| 853 | void |
| 854 | ExecHashTableDestroy(HashJoinTable hashtable) |
| 855 | { |
| 856 | int i; |
| 857 | |
| 858 | /* |
| 859 | * Make sure all the temp files are closed. We skip batch 0, since it |
| 860 | * can't have any temp files (and the arrays might not even exist if |
| 861 | * nbatch is only 1). Parallel hash joins don't use these files. |
| 862 | */ |
| 863 | if (hashtable->innerBatchFile != NULL) |
| 864 | { |
| 865 | for (i = 1; i < hashtable->nbatch; i++) |
| 866 | { |
| 867 | if (hashtable->innerBatchFile[i]) |
| 868 | BufFileClose(hashtable->innerBatchFile[i]); |
| 869 | if (hashtable->outerBatchFile[i]) |
| 870 | BufFileClose(hashtable->outerBatchFile[i]); |
| 871 | } |
| 872 | } |
| 873 | |
| 874 | /* Release working memory (batchCxt is a child, so it goes away too) */ |
| 875 | MemoryContextDelete(hashtable->hashCxt); |
| 876 | |
| 877 | /* And drop the control block */ |
| 878 | pfree(hashtable); |
| 879 | } |
| 880 | |
| 881 | /* |
| 882 | * ExecHashIncreaseNumBatches |
| 883 | * increase the original number of batches in order to reduce |
| 884 | * current memory consumption |
| 885 | */ |
| 886 | static void |
| 887 | ExecHashIncreaseNumBatches(HashJoinTable hashtable) |
| 888 | { |
| 889 | int oldnbatch = hashtable->nbatch; |
| 890 | int curbatch = hashtable->curbatch; |
| 891 | int nbatch; |
| 892 | MemoryContext oldcxt; |
| 893 | long ninmemory; |
| 894 | long nfreed; |
| 895 | HashMemoryChunk oldchunks; |
| 896 | |
| 897 | /* do nothing if we've decided to shut off growth */ |
| 898 | if (!hashtable->growEnabled) |
| 899 | return; |
| 900 | |
| 901 | /* safety check to avoid overflow */ |
| 902 | if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2))) |
| 903 | return; |
| 904 | |
| 905 | nbatch = oldnbatch * 2; |
| 906 | Assert(nbatch > 1); |
| 907 | |
| 908 | #ifdef HJDEBUG |
| 909 | printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n" , |
| 910 | hashtable, nbatch, hashtable->spaceUsed); |
| 911 | #endif |
| 912 | |
| 913 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
| 914 | |
| 915 | if (hashtable->innerBatchFile == NULL) |
| 916 | { |
| 917 | /* we had no file arrays before */ |
| 918 | hashtable->innerBatchFile = (BufFile **) |
| 919 | palloc0(nbatch * sizeof(BufFile *)); |
| 920 | hashtable->outerBatchFile = (BufFile **) |
| 921 | palloc0(nbatch * sizeof(BufFile *)); |
| 922 | /* time to establish the temp tablespaces, too */ |
| 923 | PrepareTempTablespaces(); |
| 924 | } |
| 925 | else |
| 926 | { |
| 927 | /* enlarge arrays and zero out added entries */ |
| 928 | hashtable->innerBatchFile = (BufFile **) |
| 929 | repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *)); |
| 930 | hashtable->outerBatchFile = (BufFile **) |
| 931 | repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *)); |
| 932 | MemSet(hashtable->innerBatchFile + oldnbatch, 0, |
| 933 | (nbatch - oldnbatch) * sizeof(BufFile *)); |
| 934 | MemSet(hashtable->outerBatchFile + oldnbatch, 0, |
| 935 | (nbatch - oldnbatch) * sizeof(BufFile *)); |
| 936 | } |
| 937 | |
| 938 | MemoryContextSwitchTo(oldcxt); |
| 939 | |
| 940 | hashtable->nbatch = nbatch; |
| 941 | |
| 942 | /* |
| 943 | * Scan through the existing hash table entries and dump out any that are |
| 944 | * no longer of the current batch. |
| 945 | */ |
| 946 | ninmemory = nfreed = 0; |
| 947 | |
| 948 | /* If know we need to resize nbuckets, we can do it while rebatching. */ |
| 949 | if (hashtable->nbuckets_optimal != hashtable->nbuckets) |
| 950 | { |
| 951 | /* we never decrease the number of buckets */ |
| 952 | Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); |
| 953 | |
| 954 | hashtable->nbuckets = hashtable->nbuckets_optimal; |
| 955 | hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; |
| 956 | |
| 957 | hashtable->buckets.unshared = |
| 958 | repalloc(hashtable->buckets.unshared, |
| 959 | sizeof(HashJoinTuple) * hashtable->nbuckets); |
| 960 | } |
| 961 | |
| 962 | /* |
| 963 | * We will scan through the chunks directly, so that we can reset the |
| 964 | * buckets now and not have to keep track which tuples in the buckets have |
| 965 | * already been processed. We will free the old chunks as we go. |
| 966 | */ |
| 967 | memset(hashtable->buckets.unshared, 0, |
| 968 | sizeof(HashJoinTuple) * hashtable->nbuckets); |
| 969 | oldchunks = hashtable->chunks; |
| 970 | hashtable->chunks = NULL; |
| 971 | |
| 972 | /* so, let's scan through the old chunks, and all tuples in each chunk */ |
| 973 | while (oldchunks != NULL) |
| 974 | { |
| 975 | HashMemoryChunk nextchunk = oldchunks->next.unshared; |
| 976 | |
| 977 | /* position within the buffer (up to oldchunks->used) */ |
| 978 | size_t idx = 0; |
| 979 | |
| 980 | /* process all tuples stored in this chunk (and then free it) */ |
| 981 | while (idx < oldchunks->used) |
| 982 | { |
| 983 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx); |
| 984 | MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); |
| 985 | int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); |
| 986 | int bucketno; |
| 987 | int batchno; |
| 988 | |
| 989 | ninmemory++; |
| 990 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| 991 | &bucketno, &batchno); |
| 992 | |
| 993 | if (batchno == curbatch) |
| 994 | { |
| 995 | /* keep tuple in memory - copy it into the new chunk */ |
| 996 | HashJoinTuple copyTuple; |
| 997 | |
| 998 | copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); |
| 999 | memcpy(copyTuple, hashTuple, hashTupleSize); |
| 1000 | |
| 1001 | /* and add it back to the appropriate bucket */ |
| 1002 | copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| 1003 | hashtable->buckets.unshared[bucketno] = copyTuple; |
| 1004 | } |
| 1005 | else |
| 1006 | { |
| 1007 | /* dump it out */ |
| 1008 | Assert(batchno > curbatch); |
| 1009 | ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), |
| 1010 | hashTuple->hashvalue, |
| 1011 | &hashtable->innerBatchFile[batchno]); |
| 1012 | |
| 1013 | hashtable->spaceUsed -= hashTupleSize; |
| 1014 | nfreed++; |
| 1015 | } |
| 1016 | |
| 1017 | /* next tuple in this chunk */ |
| 1018 | idx += MAXALIGN(hashTupleSize); |
| 1019 | |
| 1020 | /* allow this loop to be cancellable */ |
| 1021 | CHECK_FOR_INTERRUPTS(); |
| 1022 | } |
| 1023 | |
| 1024 | /* we're done with this chunk - free it and proceed to the next one */ |
| 1025 | pfree(oldchunks); |
| 1026 | oldchunks = nextchunk; |
| 1027 | } |
| 1028 | |
| 1029 | #ifdef HJDEBUG |
| 1030 | printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n" , |
| 1031 | hashtable, nfreed, ninmemory, hashtable->spaceUsed); |
| 1032 | #endif |
| 1033 | |
| 1034 | /* |
| 1035 | * If we dumped out either all or none of the tuples in the table, disable |
| 1036 | * further expansion of nbatch. This situation implies that we have |
| 1037 | * enough tuples of identical hashvalues to overflow spaceAllowed. |
| 1038 | * Increasing nbatch will not fix it since there's no way to subdivide the |
| 1039 | * group any more finely. We have to just gut it out and hope the server |
| 1040 | * has enough RAM. |
| 1041 | */ |
| 1042 | if (nfreed == 0 || nfreed == ninmemory) |
| 1043 | { |
| 1044 | hashtable->growEnabled = false; |
| 1045 | #ifdef HJDEBUG |
| 1046 | printf("Hashjoin %p: disabling further increase of nbatch\n" , |
| 1047 | hashtable); |
| 1048 | #endif |
| 1049 | } |
| 1050 | } |
| 1051 | |
| 1052 | /* |
| 1053 | * ExecParallelHashIncreaseNumBatches |
| 1054 | * Every participant attached to grow_batches_barrier must run this |
| 1055 | * function when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES. |
| 1056 | */ |
| 1057 | static void |
| 1058 | ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) |
| 1059 | { |
| 1060 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 1061 | int i; |
| 1062 | |
| 1063 | Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); |
| 1064 | |
| 1065 | /* |
| 1066 | * It's unlikely, but we need to be prepared for new participants to show |
| 1067 | * up while we're in the middle of this operation so we need to switch on |
| 1068 | * barrier phase here. |
| 1069 | */ |
| 1070 | switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) |
| 1071 | { |
| 1072 | case PHJ_GROW_BATCHES_ELECTING: |
| 1073 | |
| 1074 | /* |
| 1075 | * Elect one participant to prepare to grow the number of batches. |
| 1076 | * This involves reallocating or resetting the buckets of batch 0 |
| 1077 | * in preparation for all participants to begin repartitioning the |
| 1078 | * tuples. |
| 1079 | */ |
| 1080 | if (BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| 1081 | WAIT_EVENT_HASH_GROW_BATCHES_ELECTING)) |
| 1082 | { |
| 1083 | dsa_pointer_atomic *buckets; |
| 1084 | ParallelHashJoinBatch *old_batch0; |
| 1085 | int new_nbatch; |
| 1086 | int i; |
| 1087 | |
| 1088 | /* Move the old batch out of the way. */ |
| 1089 | old_batch0 = hashtable->batches[0].shared; |
| 1090 | pstate->old_batches = pstate->batches; |
| 1091 | pstate->old_nbatch = hashtable->nbatch; |
| 1092 | pstate->batches = InvalidDsaPointer; |
| 1093 | |
| 1094 | /* Free this backend's old accessors. */ |
| 1095 | ExecParallelHashCloseBatchAccessors(hashtable); |
| 1096 | |
| 1097 | /* Figure out how many batches to use. */ |
| 1098 | if (hashtable->nbatch == 1) |
| 1099 | { |
| 1100 | /* |
| 1101 | * We are going from single-batch to multi-batch. We need |
| 1102 | * to switch from one large combined memory budget to the |
| 1103 | * regular work_mem budget. |
| 1104 | */ |
| 1105 | pstate->space_allowed = work_mem * 1024L; |
| 1106 | |
| 1107 | /* |
| 1108 | * The combined work_mem of all participants wasn't |
| 1109 | * enough. Therefore one batch per participant would be |
| 1110 | * approximately equivalent and would probably also be |
| 1111 | * insufficient. So try two batches per participant, |
| 1112 | * rounded up to a power of two. |
| 1113 | */ |
| 1114 | new_nbatch = 1 << my_log2(pstate->nparticipants * 2); |
| 1115 | } |
| 1116 | else |
| 1117 | { |
| 1118 | /* |
| 1119 | * We were already multi-batched. Try doubling the number |
| 1120 | * of batches. |
| 1121 | */ |
| 1122 | new_nbatch = hashtable->nbatch * 2; |
| 1123 | } |
| 1124 | |
| 1125 | /* Allocate new larger generation of batches. */ |
| 1126 | Assert(hashtable->nbatch == pstate->nbatch); |
| 1127 | ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch); |
| 1128 | Assert(hashtable->nbatch == pstate->nbatch); |
| 1129 | |
| 1130 | /* Replace or recycle batch 0's bucket array. */ |
| 1131 | if (pstate->old_nbatch == 1) |
| 1132 | { |
| 1133 | double dtuples; |
| 1134 | double dbuckets; |
| 1135 | int new_nbuckets; |
| 1136 | |
| 1137 | /* |
| 1138 | * We probably also need a smaller bucket array. How many |
| 1139 | * tuples do we expect per batch, assuming we have only |
| 1140 | * half of them so far? Normally we don't need to change |
| 1141 | * the bucket array's size, because the size of each batch |
| 1142 | * stays the same as we add more batches, but in this |
| 1143 | * special case we move from a large batch to many smaller |
| 1144 | * batches and it would be wasteful to keep the large |
| 1145 | * array. |
| 1146 | */ |
| 1147 | dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; |
| 1148 | dbuckets = ceil(dtuples / NTUP_PER_BUCKET); |
| 1149 | dbuckets = Min(dbuckets, |
| 1150 | MaxAllocSize / sizeof(dsa_pointer_atomic)); |
| 1151 | new_nbuckets = (int) dbuckets; |
| 1152 | new_nbuckets = Max(new_nbuckets, 1024); |
| 1153 | new_nbuckets = 1 << my_log2(new_nbuckets); |
| 1154 | dsa_free(hashtable->area, old_batch0->buckets); |
| 1155 | hashtable->batches[0].shared->buckets = |
| 1156 | dsa_allocate(hashtable->area, |
| 1157 | sizeof(dsa_pointer_atomic) * new_nbuckets); |
| 1158 | buckets = (dsa_pointer_atomic *) |
| 1159 | dsa_get_address(hashtable->area, |
| 1160 | hashtable->batches[0].shared->buckets); |
| 1161 | for (i = 0; i < new_nbuckets; ++i) |
| 1162 | dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
| 1163 | pstate->nbuckets = new_nbuckets; |
| 1164 | } |
| 1165 | else |
| 1166 | { |
| 1167 | /* Recycle the existing bucket array. */ |
| 1168 | hashtable->batches[0].shared->buckets = old_batch0->buckets; |
| 1169 | buckets = (dsa_pointer_atomic *) |
| 1170 | dsa_get_address(hashtable->area, old_batch0->buckets); |
| 1171 | for (i = 0; i < hashtable->nbuckets; ++i) |
| 1172 | dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); |
| 1173 | } |
| 1174 | |
| 1175 | /* Move all chunks to the work queue for parallel processing. */ |
| 1176 | pstate->chunk_work_queue = old_batch0->chunks; |
| 1177 | |
| 1178 | /* Disable further growth temporarily while we're growing. */ |
| 1179 | pstate->growth = PHJ_GROWTH_DISABLED; |
| 1180 | } |
| 1181 | else |
| 1182 | { |
| 1183 | /* All other participants just flush their tuples to disk. */ |
| 1184 | ExecParallelHashCloseBatchAccessors(hashtable); |
| 1185 | } |
| 1186 | /* Fall through. */ |
| 1187 | |
| 1188 | case PHJ_GROW_BATCHES_ALLOCATING: |
| 1189 | /* Wait for the above to be finished. */ |
| 1190 | BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| 1191 | WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING); |
| 1192 | /* Fall through. */ |
| 1193 | |
| 1194 | case PHJ_GROW_BATCHES_REPARTITIONING: |
| 1195 | /* Make sure that we have the current dimensions and buckets. */ |
| 1196 | ExecParallelHashEnsureBatchAccessors(hashtable); |
| 1197 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| 1198 | /* Then partition, flush counters. */ |
| 1199 | ExecParallelHashRepartitionFirst(hashtable); |
| 1200 | ExecParallelHashRepartitionRest(hashtable); |
| 1201 | ExecParallelHashMergeCounters(hashtable); |
| 1202 | /* Wait for the above to be finished. */ |
| 1203 | BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| 1204 | WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING); |
| 1205 | /* Fall through. */ |
| 1206 | |
| 1207 | case PHJ_GROW_BATCHES_DECIDING: |
| 1208 | |
| 1209 | /* |
| 1210 | * Elect one participant to clean up and decide whether further |
| 1211 | * repartitioning is needed, or should be disabled because it's |
| 1212 | * not helping. |
| 1213 | */ |
| 1214 | if (BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| 1215 | WAIT_EVENT_HASH_GROW_BATCHES_DECIDING)) |
| 1216 | { |
| 1217 | bool space_exhausted = false; |
| 1218 | bool extreme_skew_detected = false; |
| 1219 | |
| 1220 | /* Make sure that we have the current dimensions and buckets. */ |
| 1221 | ExecParallelHashEnsureBatchAccessors(hashtable); |
| 1222 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| 1223 | |
| 1224 | /* Are any of the new generation of batches exhausted? */ |
| 1225 | for (i = 0; i < hashtable->nbatch; ++i) |
| 1226 | { |
| 1227 | ParallelHashJoinBatch *batch = hashtable->batches[i].shared; |
| 1228 | |
| 1229 | if (batch->space_exhausted || |
| 1230 | batch->estimated_size > pstate->space_allowed) |
| 1231 | { |
| 1232 | int parent; |
| 1233 | |
| 1234 | space_exhausted = true; |
| 1235 | |
| 1236 | /* |
| 1237 | * Did this batch receive ALL of the tuples from its |
| 1238 | * parent batch? That would indicate that further |
| 1239 | * repartitioning isn't going to help (the hash values |
| 1240 | * are probably all the same). |
| 1241 | */ |
| 1242 | parent = i % pstate->old_nbatch; |
| 1243 | if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples) |
| 1244 | extreme_skew_detected = true; |
| 1245 | } |
| 1246 | } |
| 1247 | |
| 1248 | /* Don't keep growing if it's not helping or we'd overflow. */ |
| 1249 | if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2) |
| 1250 | pstate->growth = PHJ_GROWTH_DISABLED; |
| 1251 | else if (space_exhausted) |
| 1252 | pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
| 1253 | else |
| 1254 | pstate->growth = PHJ_GROWTH_OK; |
| 1255 | |
| 1256 | /* Free the old batches in shared memory. */ |
| 1257 | dsa_free(hashtable->area, pstate->old_batches); |
| 1258 | pstate->old_batches = InvalidDsaPointer; |
| 1259 | } |
| 1260 | /* Fall through. */ |
| 1261 | |
| 1262 | case PHJ_GROW_BATCHES_FINISHING: |
| 1263 | /* Wait for the above to complete. */ |
| 1264 | BarrierArriveAndWait(&pstate->grow_batches_barrier, |
| 1265 | WAIT_EVENT_HASH_GROW_BATCHES_FINISHING); |
| 1266 | } |
| 1267 | } |
| 1268 | |
| 1269 | /* |
| 1270 | * Repartition the tuples currently loaded into memory for inner batch 0 |
| 1271 | * because the number of batches has been increased. Some tuples are retained |
| 1272 | * in memory and some are written out to a later batch. |
| 1273 | */ |
| 1274 | static void |
| 1275 | ExecParallelHashRepartitionFirst(HashJoinTable hashtable) |
| 1276 | { |
| 1277 | dsa_pointer chunk_shared; |
| 1278 | HashMemoryChunk chunk; |
| 1279 | |
| 1280 | Assert(hashtable->nbatch == hashtable->parallel_state->nbatch); |
| 1281 | |
| 1282 | while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared))) |
| 1283 | { |
| 1284 | size_t idx = 0; |
| 1285 | |
| 1286 | /* Repartition all tuples in this chunk. */ |
| 1287 | while (idx < chunk->used) |
| 1288 | { |
| 1289 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
| 1290 | MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); |
| 1291 | HashJoinTuple copyTuple; |
| 1292 | dsa_pointer shared; |
| 1293 | int bucketno; |
| 1294 | int batchno; |
| 1295 | |
| 1296 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| 1297 | &bucketno, &batchno); |
| 1298 | |
| 1299 | Assert(batchno < hashtable->nbatch); |
| 1300 | if (batchno == 0) |
| 1301 | { |
| 1302 | /* It still belongs in batch 0. Copy to a new chunk. */ |
| 1303 | copyTuple = |
| 1304 | ExecParallelHashTupleAlloc(hashtable, |
| 1305 | HJTUPLE_OVERHEAD + tuple->t_len, |
| 1306 | &shared); |
| 1307 | copyTuple->hashvalue = hashTuple->hashvalue; |
| 1308 | memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); |
| 1309 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| 1310 | copyTuple, shared); |
| 1311 | } |
| 1312 | else |
| 1313 | { |
| 1314 | size_t tuple_size = |
| 1315 | MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
| 1316 | |
| 1317 | /* It belongs in a later batch. */ |
| 1318 | hashtable->batches[batchno].estimated_size += tuple_size; |
| 1319 | sts_puttuple(hashtable->batches[batchno].inner_tuples, |
| 1320 | &hashTuple->hashvalue, tuple); |
| 1321 | } |
| 1322 | |
| 1323 | /* Count this tuple. */ |
| 1324 | ++hashtable->batches[0].old_ntuples; |
| 1325 | ++hashtable->batches[batchno].ntuples; |
| 1326 | |
| 1327 | idx += MAXALIGN(HJTUPLE_OVERHEAD + |
| 1328 | HJTUPLE_MINTUPLE(hashTuple)->t_len); |
| 1329 | } |
| 1330 | |
| 1331 | /* Free this chunk. */ |
| 1332 | dsa_free(hashtable->area, chunk_shared); |
| 1333 | |
| 1334 | CHECK_FOR_INTERRUPTS(); |
| 1335 | } |
| 1336 | } |
| 1337 | |
| 1338 | /* |
| 1339 | * Help repartition inner batches 1..n. |
| 1340 | */ |
| 1341 | static void |
| 1342 | ExecParallelHashRepartitionRest(HashJoinTable hashtable) |
| 1343 | { |
| 1344 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 1345 | int old_nbatch = pstate->old_nbatch; |
| 1346 | SharedTuplestoreAccessor **old_inner_tuples; |
| 1347 | ParallelHashJoinBatch *old_batches; |
| 1348 | int i; |
| 1349 | |
| 1350 | /* Get our hands on the previous generation of batches. */ |
| 1351 | old_batches = (ParallelHashJoinBatch *) |
| 1352 | dsa_get_address(hashtable->area, pstate->old_batches); |
| 1353 | old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch); |
| 1354 | for (i = 1; i < old_nbatch; ++i) |
| 1355 | { |
| 1356 | ParallelHashJoinBatch *shared = |
| 1357 | NthParallelHashJoinBatch(old_batches, i); |
| 1358 | |
| 1359 | old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared), |
| 1360 | ParallelWorkerNumber + 1, |
| 1361 | &pstate->fileset); |
| 1362 | } |
| 1363 | |
| 1364 | /* Join in the effort to repartition them. */ |
| 1365 | for (i = 1; i < old_nbatch; ++i) |
| 1366 | { |
| 1367 | MinimalTuple tuple; |
| 1368 | uint32 hashvalue; |
| 1369 | |
| 1370 | /* Scan one partition from the previous generation. */ |
| 1371 | sts_begin_parallel_scan(old_inner_tuples[i]); |
| 1372 | while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue))) |
| 1373 | { |
| 1374 | size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
| 1375 | int bucketno; |
| 1376 | int batchno; |
| 1377 | |
| 1378 | /* Decide which partition it goes to in the new generation. */ |
| 1379 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, |
| 1380 | &batchno); |
| 1381 | |
| 1382 | hashtable->batches[batchno].estimated_size += tuple_size; |
| 1383 | ++hashtable->batches[batchno].ntuples; |
| 1384 | ++hashtable->batches[i].old_ntuples; |
| 1385 | |
| 1386 | /* Store the tuple its new batch. */ |
| 1387 | sts_puttuple(hashtable->batches[batchno].inner_tuples, |
| 1388 | &hashvalue, tuple); |
| 1389 | |
| 1390 | CHECK_FOR_INTERRUPTS(); |
| 1391 | } |
| 1392 | sts_end_parallel_scan(old_inner_tuples[i]); |
| 1393 | } |
| 1394 | |
| 1395 | pfree(old_inner_tuples); |
| 1396 | } |
| 1397 | |
| 1398 | /* |
| 1399 | * Transfer the backend-local per-batch counters to the shared totals. |
| 1400 | */ |
| 1401 | static void |
| 1402 | ExecParallelHashMergeCounters(HashJoinTable hashtable) |
| 1403 | { |
| 1404 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 1405 | int i; |
| 1406 | |
| 1407 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| 1408 | pstate->total_tuples = 0; |
| 1409 | for (i = 0; i < hashtable->nbatch; ++i) |
| 1410 | { |
| 1411 | ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i]; |
| 1412 | |
| 1413 | batch->shared->size += batch->size; |
| 1414 | batch->shared->estimated_size += batch->estimated_size; |
| 1415 | batch->shared->ntuples += batch->ntuples; |
| 1416 | batch->shared->old_ntuples += batch->old_ntuples; |
| 1417 | batch->size = 0; |
| 1418 | batch->estimated_size = 0; |
| 1419 | batch->ntuples = 0; |
| 1420 | batch->old_ntuples = 0; |
| 1421 | pstate->total_tuples += batch->shared->ntuples; |
| 1422 | } |
| 1423 | LWLockRelease(&pstate->lock); |
| 1424 | } |
| 1425 | |
| 1426 | /* |
| 1427 | * ExecHashIncreaseNumBuckets |
| 1428 | * increase the original number of buckets in order to reduce |
| 1429 | * number of tuples per bucket |
| 1430 | */ |
| 1431 | static void |
| 1432 | ExecHashIncreaseNumBuckets(HashJoinTable hashtable) |
| 1433 | { |
| 1434 | HashMemoryChunk chunk; |
| 1435 | |
| 1436 | /* do nothing if not an increase (it's called increase for a reason) */ |
| 1437 | if (hashtable->nbuckets >= hashtable->nbuckets_optimal) |
| 1438 | return; |
| 1439 | |
| 1440 | #ifdef HJDEBUG |
| 1441 | printf("Hashjoin %p: increasing nbuckets %d => %d\n" , |
| 1442 | hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal); |
| 1443 | #endif |
| 1444 | |
| 1445 | hashtable->nbuckets = hashtable->nbuckets_optimal; |
| 1446 | hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; |
| 1447 | |
| 1448 | Assert(hashtable->nbuckets > 1); |
| 1449 | Assert(hashtable->nbuckets <= (INT_MAX / 2)); |
| 1450 | Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); |
| 1451 | |
| 1452 | /* |
| 1453 | * Just reallocate the proper number of buckets - we don't need to walk |
| 1454 | * through them - we can walk the dense-allocated chunks (just like in |
| 1455 | * ExecHashIncreaseNumBatches, but without all the copying into new |
| 1456 | * chunks) |
| 1457 | */ |
| 1458 | hashtable->buckets.unshared = |
| 1459 | (HashJoinTuple *) repalloc(hashtable->buckets.unshared, |
| 1460 | hashtable->nbuckets * sizeof(HashJoinTuple)); |
| 1461 | |
| 1462 | memset(hashtable->buckets.unshared, 0, |
| 1463 | hashtable->nbuckets * sizeof(HashJoinTuple)); |
| 1464 | |
| 1465 | /* scan through all tuples in all chunks to rebuild the hash table */ |
| 1466 | for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) |
| 1467 | { |
| 1468 | /* process all tuples stored in this chunk */ |
| 1469 | size_t idx = 0; |
| 1470 | |
| 1471 | while (idx < chunk->used) |
| 1472 | { |
| 1473 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
| 1474 | int bucketno; |
| 1475 | int batchno; |
| 1476 | |
| 1477 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| 1478 | &bucketno, &batchno); |
| 1479 | |
| 1480 | /* add the tuple to the proper bucket */ |
| 1481 | hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| 1482 | hashtable->buckets.unshared[bucketno] = hashTuple; |
| 1483 | |
| 1484 | /* advance index past the tuple */ |
| 1485 | idx += MAXALIGN(HJTUPLE_OVERHEAD + |
| 1486 | HJTUPLE_MINTUPLE(hashTuple)->t_len); |
| 1487 | } |
| 1488 | |
| 1489 | /* allow this loop to be cancellable */ |
| 1490 | CHECK_FOR_INTERRUPTS(); |
| 1491 | } |
| 1492 | } |
| 1493 | |
| 1494 | static void |
| 1495 | ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) |
| 1496 | { |
| 1497 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 1498 | int i; |
| 1499 | HashMemoryChunk chunk; |
| 1500 | dsa_pointer chunk_s; |
| 1501 | |
| 1502 | Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); |
| 1503 | |
| 1504 | /* |
| 1505 | * It's unlikely, but we need to be prepared for new participants to show |
| 1506 | * up while we're in the middle of this operation so we need to switch on |
| 1507 | * barrier phase here. |
| 1508 | */ |
| 1509 | switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) |
| 1510 | { |
| 1511 | case PHJ_GROW_BUCKETS_ELECTING: |
| 1512 | /* Elect one participant to prepare to increase nbuckets. */ |
| 1513 | if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
| 1514 | WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING)) |
| 1515 | { |
| 1516 | size_t size; |
| 1517 | dsa_pointer_atomic *buckets; |
| 1518 | |
| 1519 | /* Double the size of the bucket array. */ |
| 1520 | pstate->nbuckets *= 2; |
| 1521 | size = pstate->nbuckets * sizeof(dsa_pointer_atomic); |
| 1522 | hashtable->batches[0].shared->size += size / 2; |
| 1523 | dsa_free(hashtable->area, hashtable->batches[0].shared->buckets); |
| 1524 | hashtable->batches[0].shared->buckets = |
| 1525 | dsa_allocate(hashtable->area, size); |
| 1526 | buckets = (dsa_pointer_atomic *) |
| 1527 | dsa_get_address(hashtable->area, |
| 1528 | hashtable->batches[0].shared->buckets); |
| 1529 | for (i = 0; i < pstate->nbuckets; ++i) |
| 1530 | dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
| 1531 | |
| 1532 | /* Put the chunk list onto the work queue. */ |
| 1533 | pstate->chunk_work_queue = hashtable->batches[0].shared->chunks; |
| 1534 | |
| 1535 | /* Clear the flag. */ |
| 1536 | pstate->growth = PHJ_GROWTH_OK; |
| 1537 | } |
| 1538 | /* Fall through. */ |
| 1539 | |
| 1540 | case PHJ_GROW_BUCKETS_ALLOCATING: |
| 1541 | /* Wait for the above to complete. */ |
| 1542 | BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
| 1543 | WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING); |
| 1544 | /* Fall through. */ |
| 1545 | |
| 1546 | case PHJ_GROW_BUCKETS_REINSERTING: |
| 1547 | /* Reinsert all tuples into the hash table. */ |
| 1548 | ExecParallelHashEnsureBatchAccessors(hashtable); |
| 1549 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
| 1550 | while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) |
| 1551 | { |
| 1552 | size_t idx = 0; |
| 1553 | |
| 1554 | while (idx < chunk->used) |
| 1555 | { |
| 1556 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
| 1557 | dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx; |
| 1558 | int bucketno; |
| 1559 | int batchno; |
| 1560 | |
| 1561 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
| 1562 | &bucketno, &batchno); |
| 1563 | Assert(batchno == 0); |
| 1564 | |
| 1565 | /* add the tuple to the proper bucket */ |
| 1566 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| 1567 | hashTuple, shared); |
| 1568 | |
| 1569 | /* advance index past the tuple */ |
| 1570 | idx += MAXALIGN(HJTUPLE_OVERHEAD + |
| 1571 | HJTUPLE_MINTUPLE(hashTuple)->t_len); |
| 1572 | } |
| 1573 | |
| 1574 | /* allow this loop to be cancellable */ |
| 1575 | CHECK_FOR_INTERRUPTS(); |
| 1576 | } |
| 1577 | BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
| 1578 | WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING); |
| 1579 | } |
| 1580 | } |
| 1581 | |
| 1582 | /* |
| 1583 | * ExecHashTableInsert |
| 1584 | * insert a tuple into the hash table depending on the hash value |
| 1585 | * it may just go to a temp file for later batches |
| 1586 | * |
| 1587 | * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual |
| 1588 | * tuple; the minimal case in particular is certain to happen while reloading |
| 1589 | * tuples from batch files. We could save some cycles in the regular-tuple |
| 1590 | * case by not forcing the slot contents into minimal form; not clear if it's |
| 1591 | * worth the messiness required. |
| 1592 | */ |
| 1593 | void |
| 1594 | ExecHashTableInsert(HashJoinTable hashtable, |
| 1595 | TupleTableSlot *slot, |
| 1596 | uint32 hashvalue) |
| 1597 | { |
| 1598 | bool shouldFree; |
| 1599 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| 1600 | int bucketno; |
| 1601 | int batchno; |
| 1602 | |
| 1603 | ExecHashGetBucketAndBatch(hashtable, hashvalue, |
| 1604 | &bucketno, &batchno); |
| 1605 | |
| 1606 | /* |
| 1607 | * decide whether to put the tuple in the hash table or a temp file |
| 1608 | */ |
| 1609 | if (batchno == hashtable->curbatch) |
| 1610 | { |
| 1611 | /* |
| 1612 | * put the tuple in hash table |
| 1613 | */ |
| 1614 | HashJoinTuple hashTuple; |
| 1615 | int hashTupleSize; |
| 1616 | double ntuples = (hashtable->totalTuples - hashtable->skewTuples); |
| 1617 | |
| 1618 | /* Create the HashJoinTuple */ |
| 1619 | hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
| 1620 | hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); |
| 1621 | |
| 1622 | hashTuple->hashvalue = hashvalue; |
| 1623 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| 1624 | |
| 1625 | /* |
| 1626 | * We always reset the tuple-matched flag on insertion. This is okay |
| 1627 | * even when reloading a tuple from a batch file, since the tuple |
| 1628 | * could not possibly have been matched to an outer tuple before it |
| 1629 | * went into the batch file. |
| 1630 | */ |
| 1631 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
| 1632 | |
| 1633 | /* Push it onto the front of the bucket's list */ |
| 1634 | hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| 1635 | hashtable->buckets.unshared[bucketno] = hashTuple; |
| 1636 | |
| 1637 | /* |
| 1638 | * Increase the (optimal) number of buckets if we just exceeded the |
| 1639 | * NTUP_PER_BUCKET threshold, but only when there's still a single |
| 1640 | * batch. |
| 1641 | */ |
| 1642 | if (hashtable->nbatch == 1 && |
| 1643 | ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)) |
| 1644 | { |
| 1645 | /* Guard against integer overflow and alloc size overflow */ |
| 1646 | if (hashtable->nbuckets_optimal <= INT_MAX / 2 && |
| 1647 | hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple)) |
| 1648 | { |
| 1649 | hashtable->nbuckets_optimal *= 2; |
| 1650 | hashtable->log2_nbuckets_optimal += 1; |
| 1651 | } |
| 1652 | } |
| 1653 | |
| 1654 | /* Account for space used, and back off if we've used too much */ |
| 1655 | hashtable->spaceUsed += hashTupleSize; |
| 1656 | if (hashtable->spaceUsed > hashtable->spacePeak) |
| 1657 | hashtable->spacePeak = hashtable->spaceUsed; |
| 1658 | if (hashtable->spaceUsed + |
| 1659 | hashtable->nbuckets_optimal * sizeof(HashJoinTuple) |
| 1660 | > hashtable->spaceAllowed) |
| 1661 | ExecHashIncreaseNumBatches(hashtable); |
| 1662 | } |
| 1663 | else |
| 1664 | { |
| 1665 | /* |
| 1666 | * put the tuple into a temp file for later batches |
| 1667 | */ |
| 1668 | Assert(batchno > hashtable->curbatch); |
| 1669 | ExecHashJoinSaveTuple(tuple, |
| 1670 | hashvalue, |
| 1671 | &hashtable->innerBatchFile[batchno]); |
| 1672 | } |
| 1673 | |
| 1674 | if (shouldFree) |
| 1675 | heap_free_minimal_tuple(tuple); |
| 1676 | } |
| 1677 | |
| 1678 | /* |
| 1679 | * ExecParallelHashTableInsert |
| 1680 | * insert a tuple into a shared hash table or shared batch tuplestore |
| 1681 | */ |
| 1682 | void |
| 1683 | ExecParallelHashTableInsert(HashJoinTable hashtable, |
| 1684 | TupleTableSlot *slot, |
| 1685 | uint32 hashvalue) |
| 1686 | { |
| 1687 | bool shouldFree; |
| 1688 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| 1689 | dsa_pointer shared; |
| 1690 | int bucketno; |
| 1691 | int batchno; |
| 1692 | |
| 1693 | retry: |
| 1694 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
| 1695 | |
| 1696 | if (batchno == 0) |
| 1697 | { |
| 1698 | HashJoinTuple hashTuple; |
| 1699 | |
| 1700 | /* Try to load it into memory. */ |
| 1701 | Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == |
| 1702 | PHJ_BUILD_HASHING_INNER); |
| 1703 | hashTuple = ExecParallelHashTupleAlloc(hashtable, |
| 1704 | HJTUPLE_OVERHEAD + tuple->t_len, |
| 1705 | &shared); |
| 1706 | if (hashTuple == NULL) |
| 1707 | goto retry; |
| 1708 | |
| 1709 | /* Store the hash value in the HashJoinTuple header. */ |
| 1710 | hashTuple->hashvalue = hashvalue; |
| 1711 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| 1712 | |
| 1713 | /* Push it onto the front of the bucket's list */ |
| 1714 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| 1715 | hashTuple, shared); |
| 1716 | } |
| 1717 | else |
| 1718 | { |
| 1719 | size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
| 1720 | |
| 1721 | Assert(batchno > 0); |
| 1722 | |
| 1723 | /* Try to preallocate space in the batch if necessary. */ |
| 1724 | if (hashtable->batches[batchno].preallocated < tuple_size) |
| 1725 | { |
| 1726 | if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size)) |
| 1727 | goto retry; |
| 1728 | } |
| 1729 | |
| 1730 | Assert(hashtable->batches[batchno].preallocated >= tuple_size); |
| 1731 | hashtable->batches[batchno].preallocated -= tuple_size; |
| 1732 | sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue, |
| 1733 | tuple); |
| 1734 | } |
| 1735 | ++hashtable->batches[batchno].ntuples; |
| 1736 | |
| 1737 | if (shouldFree) |
| 1738 | heap_free_minimal_tuple(tuple); |
| 1739 | } |
| 1740 | |
| 1741 | /* |
| 1742 | * Insert a tuple into the current hash table. Unlike |
| 1743 | * ExecParallelHashTableInsert, this version is not prepared to send the tuple |
| 1744 | * to other batches or to run out of memory, and should only be called with |
| 1745 | * tuples that belong in the current batch once growth has been disabled. |
| 1746 | */ |
| 1747 | void |
| 1748 | ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, |
| 1749 | TupleTableSlot *slot, |
| 1750 | uint32 hashvalue) |
| 1751 | { |
| 1752 | bool shouldFree; |
| 1753 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| 1754 | HashJoinTuple hashTuple; |
| 1755 | dsa_pointer shared; |
| 1756 | int batchno; |
| 1757 | int bucketno; |
| 1758 | |
| 1759 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
| 1760 | Assert(batchno == hashtable->curbatch); |
| 1761 | hashTuple = ExecParallelHashTupleAlloc(hashtable, |
| 1762 | HJTUPLE_OVERHEAD + tuple->t_len, |
| 1763 | &shared); |
| 1764 | hashTuple->hashvalue = hashvalue; |
| 1765 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| 1766 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
| 1767 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
| 1768 | hashTuple, shared); |
| 1769 | |
| 1770 | if (shouldFree) |
| 1771 | heap_free_minimal_tuple(tuple); |
| 1772 | } |
| 1773 | |
| 1774 | /* |
| 1775 | * ExecHashGetHashValue |
| 1776 | * Compute the hash value for a tuple |
| 1777 | * |
| 1778 | * The tuple to be tested must be in econtext->ecxt_outertuple (thus Vars in |
| 1779 | * the hashkeys expressions need to have OUTER_VAR as varno). If outer_tuple |
| 1780 | * is false (meaning it's the HashJoin's inner node, Hash), econtext, |
| 1781 | * hashkeys, and slot need to be from Hash, with hashkeys/slot referencing and |
| 1782 | * being suitable for tuples from the node below the Hash. Conversely, if |
| 1783 | * outer_tuple is true, econtext is from HashJoin, and hashkeys/slot need to |
| 1784 | * be appropriate for tuples from HashJoin's outer node. |
| 1785 | * |
| 1786 | * A true result means the tuple's hash value has been successfully computed |
| 1787 | * and stored at *hashvalue. A false result means the tuple cannot match |
| 1788 | * because it contains a null attribute, and hence it should be discarded |
| 1789 | * immediately. (If keep_nulls is true then false is never returned.) |
| 1790 | */ |
| 1791 | bool |
| 1792 | ExecHashGetHashValue(HashJoinTable hashtable, |
| 1793 | ExprContext *econtext, |
| 1794 | List *hashkeys, |
| 1795 | bool outer_tuple, |
| 1796 | bool keep_nulls, |
| 1797 | uint32 *hashvalue) |
| 1798 | { |
| 1799 | uint32 hashkey = 0; |
| 1800 | FmgrInfo *hashfunctions; |
| 1801 | ListCell *hk; |
| 1802 | int i = 0; |
| 1803 | MemoryContext oldContext; |
| 1804 | |
| 1805 | /* |
| 1806 | * We reset the eval context each time to reclaim any memory leaked in the |
| 1807 | * hashkey expressions. |
| 1808 | */ |
| 1809 | ResetExprContext(econtext); |
| 1810 | |
| 1811 | oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| 1812 | |
| 1813 | if (outer_tuple) |
| 1814 | hashfunctions = hashtable->outer_hashfunctions; |
| 1815 | else |
| 1816 | hashfunctions = hashtable->inner_hashfunctions; |
| 1817 | |
| 1818 | foreach(hk, hashkeys) |
| 1819 | { |
| 1820 | ExprState *keyexpr = (ExprState *) lfirst(hk); |
| 1821 | Datum keyval; |
| 1822 | bool isNull; |
| 1823 | |
| 1824 | /* rotate hashkey left 1 bit at each step */ |
| 1825 | hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); |
| 1826 | |
| 1827 | /* |
| 1828 | * Get the join attribute value of the tuple |
| 1829 | */ |
| 1830 | keyval = ExecEvalExpr(keyexpr, econtext, &isNull); |
| 1831 | |
| 1832 | /* |
| 1833 | * If the attribute is NULL, and the join operator is strict, then |
| 1834 | * this tuple cannot pass the join qual so we can reject it |
| 1835 | * immediately (unless we're scanning the outside of an outer join, in |
| 1836 | * which case we must not reject it). Otherwise we act like the |
| 1837 | * hashcode of NULL is zero (this will support operators that act like |
| 1838 | * IS NOT DISTINCT, though not any more-random behavior). We treat |
| 1839 | * the hash support function as strict even if the operator is not. |
| 1840 | * |
| 1841 | * Note: currently, all hashjoinable operators must be strict since |
| 1842 | * the hash index AM assumes that. However, it takes so little extra |
| 1843 | * code here to allow non-strict that we may as well do it. |
| 1844 | */ |
| 1845 | if (isNull) |
| 1846 | { |
| 1847 | if (hashtable->hashStrict[i] && !keep_nulls) |
| 1848 | { |
| 1849 | MemoryContextSwitchTo(oldContext); |
| 1850 | return false; /* cannot match */ |
| 1851 | } |
| 1852 | /* else, leave hashkey unmodified, equivalent to hashcode 0 */ |
| 1853 | } |
| 1854 | else |
| 1855 | { |
| 1856 | /* Compute the hash function */ |
| 1857 | uint32 hkey; |
| 1858 | |
| 1859 | hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval)); |
| 1860 | hashkey ^= hkey; |
| 1861 | } |
| 1862 | |
| 1863 | i++; |
| 1864 | } |
| 1865 | |
| 1866 | MemoryContextSwitchTo(oldContext); |
| 1867 | |
| 1868 | *hashvalue = hashkey; |
| 1869 | return true; |
| 1870 | } |
| 1871 | |
| 1872 | /* |
| 1873 | * ExecHashGetBucketAndBatch |
| 1874 | * Determine the bucket number and batch number for a hash value |
| 1875 | * |
| 1876 | * Note: on-the-fly increases of nbatch must not change the bucket number |
| 1877 | * for a given hash code (since we don't move tuples to different hash |
| 1878 | * chains), and must only cause the batch number to remain the same or |
| 1879 | * increase. Our algorithm is |
| 1880 | * bucketno = hashvalue MOD nbuckets |
| 1881 | * batchno = (hashvalue DIV nbuckets) MOD nbatch |
| 1882 | * where nbuckets and nbatch are both expected to be powers of 2, so we can |
| 1883 | * do the computations by shifting and masking. (This assumes that all hash |
| 1884 | * functions are good about randomizing all their output bits, else we are |
| 1885 | * likely to have very skewed bucket or batch occupancy.) |
| 1886 | * |
| 1887 | * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic |
| 1888 | * bucket count growth. Once we start batching, the value is fixed and does |
| 1889 | * not change over the course of the join (making it possible to compute batch |
| 1890 | * number the way we do here). |
| 1891 | * |
| 1892 | * nbatch is always a power of 2; we increase it only by doubling it. This |
| 1893 | * effectively adds one more bit to the top of the batchno. |
| 1894 | */ |
| 1895 | void |
| 1896 | ExecHashGetBucketAndBatch(HashJoinTable hashtable, |
| 1897 | uint32 hashvalue, |
| 1898 | int *bucketno, |
| 1899 | int *batchno) |
| 1900 | { |
| 1901 | uint32 nbuckets = (uint32) hashtable->nbuckets; |
| 1902 | uint32 nbatch = (uint32) hashtable->nbatch; |
| 1903 | |
| 1904 | if (nbatch > 1) |
| 1905 | { |
| 1906 | /* we can do MOD by masking, DIV by shifting */ |
| 1907 | *bucketno = hashvalue & (nbuckets - 1); |
| 1908 | *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1); |
| 1909 | } |
| 1910 | else |
| 1911 | { |
| 1912 | *bucketno = hashvalue & (nbuckets - 1); |
| 1913 | *batchno = 0; |
| 1914 | } |
| 1915 | } |
| 1916 | |
| 1917 | /* |
| 1918 | * ExecScanHashBucket |
| 1919 | * scan a hash bucket for matches to the current outer tuple |
| 1920 | * |
| 1921 | * The current outer tuple must be stored in econtext->ecxt_outertuple. |
| 1922 | * |
| 1923 | * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
| 1924 | * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
| 1925 | * for the latter. |
| 1926 | */ |
| 1927 | bool |
| 1928 | ExecScanHashBucket(HashJoinState *hjstate, |
| 1929 | ExprContext *econtext) |
| 1930 | { |
| 1931 | ExprState *hjclauses = hjstate->hashclauses; |
| 1932 | HashJoinTable hashtable = hjstate->hj_HashTable; |
| 1933 | HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
| 1934 | uint32 hashvalue = hjstate->hj_CurHashValue; |
| 1935 | |
| 1936 | /* |
| 1937 | * hj_CurTuple is the address of the tuple last returned from the current |
| 1938 | * bucket, or NULL if it's time to start scanning a new bucket. |
| 1939 | * |
| 1940 | * If the tuple hashed to a skew bucket then scan the skew bucket |
| 1941 | * otherwise scan the standard hashtable bucket. |
| 1942 | */ |
| 1943 | if (hashTuple != NULL) |
| 1944 | hashTuple = hashTuple->next.unshared; |
| 1945 | else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) |
| 1946 | hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; |
| 1947 | else |
| 1948 | hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; |
| 1949 | |
| 1950 | while (hashTuple != NULL) |
| 1951 | { |
| 1952 | if (hashTuple->hashvalue == hashvalue) |
| 1953 | { |
| 1954 | TupleTableSlot *inntuple; |
| 1955 | |
| 1956 | /* insert hashtable's tuple into exec slot so ExecQual sees it */ |
| 1957 | inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
| 1958 | hjstate->hj_HashTupleSlot, |
| 1959 | false); /* do not pfree */ |
| 1960 | econtext->ecxt_innertuple = inntuple; |
| 1961 | |
| 1962 | if (ExecQualAndReset(hjclauses, econtext)) |
| 1963 | { |
| 1964 | hjstate->hj_CurTuple = hashTuple; |
| 1965 | return true; |
| 1966 | } |
| 1967 | } |
| 1968 | |
| 1969 | hashTuple = hashTuple->next.unshared; |
| 1970 | } |
| 1971 | |
| 1972 | /* |
| 1973 | * no match |
| 1974 | */ |
| 1975 | return false; |
| 1976 | } |
| 1977 | |
| 1978 | /* |
| 1979 | * ExecParallelScanHashBucket |
| 1980 | * scan a hash bucket for matches to the current outer tuple |
| 1981 | * |
| 1982 | * The current outer tuple must be stored in econtext->ecxt_outertuple. |
| 1983 | * |
| 1984 | * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
| 1985 | * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
| 1986 | * for the latter. |
| 1987 | */ |
| 1988 | bool |
| 1989 | ExecParallelScanHashBucket(HashJoinState *hjstate, |
| 1990 | ExprContext *econtext) |
| 1991 | { |
| 1992 | ExprState *hjclauses = hjstate->hashclauses; |
| 1993 | HashJoinTable hashtable = hjstate->hj_HashTable; |
| 1994 | HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
| 1995 | uint32 hashvalue = hjstate->hj_CurHashValue; |
| 1996 | |
| 1997 | /* |
| 1998 | * hj_CurTuple is the address of the tuple last returned from the current |
| 1999 | * bucket, or NULL if it's time to start scanning a new bucket. |
| 2000 | */ |
| 2001 | if (hashTuple != NULL) |
| 2002 | hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
| 2003 | else |
| 2004 | hashTuple = ExecParallelHashFirstTuple(hashtable, |
| 2005 | hjstate->hj_CurBucketNo); |
| 2006 | |
| 2007 | while (hashTuple != NULL) |
| 2008 | { |
| 2009 | if (hashTuple->hashvalue == hashvalue) |
| 2010 | { |
| 2011 | TupleTableSlot *inntuple; |
| 2012 | |
| 2013 | /* insert hashtable's tuple into exec slot so ExecQual sees it */ |
| 2014 | inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
| 2015 | hjstate->hj_HashTupleSlot, |
| 2016 | false); /* do not pfree */ |
| 2017 | econtext->ecxt_innertuple = inntuple; |
| 2018 | |
| 2019 | if (ExecQualAndReset(hjclauses, econtext)) |
| 2020 | { |
| 2021 | hjstate->hj_CurTuple = hashTuple; |
| 2022 | return true; |
| 2023 | } |
| 2024 | } |
| 2025 | |
| 2026 | hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
| 2027 | } |
| 2028 | |
| 2029 | /* |
| 2030 | * no match |
| 2031 | */ |
| 2032 | return false; |
| 2033 | } |
| 2034 | |
| 2035 | /* |
| 2036 | * ExecPrepHashTableForUnmatched |
| 2037 | * set up for a series of ExecScanHashTableForUnmatched calls |
| 2038 | */ |
| 2039 | void |
| 2040 | ExecPrepHashTableForUnmatched(HashJoinState *hjstate) |
| 2041 | { |
| 2042 | /*---------- |
| 2043 | * During this scan we use the HashJoinState fields as follows: |
| 2044 | * |
| 2045 | * hj_CurBucketNo: next regular bucket to scan |
| 2046 | * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums) |
| 2047 | * hj_CurTuple: last tuple returned, or NULL to start next bucket |
| 2048 | *---------- |
| 2049 | */ |
| 2050 | hjstate->hj_CurBucketNo = 0; |
| 2051 | hjstate->hj_CurSkewBucketNo = 0; |
| 2052 | hjstate->hj_CurTuple = NULL; |
| 2053 | } |
| 2054 | |
| 2055 | /* |
| 2056 | * ExecScanHashTableForUnmatched |
| 2057 | * scan the hash table for unmatched inner tuples |
| 2058 | * |
| 2059 | * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
| 2060 | * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
| 2061 | * for the latter. |
| 2062 | */ |
| 2063 | bool |
| 2064 | ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) |
| 2065 | { |
| 2066 | HashJoinTable hashtable = hjstate->hj_HashTable; |
| 2067 | HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
| 2068 | |
| 2069 | for (;;) |
| 2070 | { |
| 2071 | /* |
| 2072 | * hj_CurTuple is the address of the tuple last returned from the |
| 2073 | * current bucket, or NULL if it's time to start scanning a new |
| 2074 | * bucket. |
| 2075 | */ |
| 2076 | if (hashTuple != NULL) |
| 2077 | hashTuple = hashTuple->next.unshared; |
| 2078 | else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) |
| 2079 | { |
| 2080 | hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; |
| 2081 | hjstate->hj_CurBucketNo++; |
| 2082 | } |
| 2083 | else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) |
| 2084 | { |
| 2085 | int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; |
| 2086 | |
| 2087 | hashTuple = hashtable->skewBucket[j]->tuples; |
| 2088 | hjstate->hj_CurSkewBucketNo++; |
| 2089 | } |
| 2090 | else |
| 2091 | break; /* finished all buckets */ |
| 2092 | |
| 2093 | while (hashTuple != NULL) |
| 2094 | { |
| 2095 | if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) |
| 2096 | { |
| 2097 | TupleTableSlot *inntuple; |
| 2098 | |
| 2099 | /* insert hashtable's tuple into exec slot */ |
| 2100 | inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
| 2101 | hjstate->hj_HashTupleSlot, |
| 2102 | false); /* do not pfree */ |
| 2103 | econtext->ecxt_innertuple = inntuple; |
| 2104 | |
| 2105 | /* |
| 2106 | * Reset temp memory each time; although this function doesn't |
| 2107 | * do any qual eval, the caller will, so let's keep it |
| 2108 | * parallel to ExecScanHashBucket. |
| 2109 | */ |
| 2110 | ResetExprContext(econtext); |
| 2111 | |
| 2112 | hjstate->hj_CurTuple = hashTuple; |
| 2113 | return true; |
| 2114 | } |
| 2115 | |
| 2116 | hashTuple = hashTuple->next.unshared; |
| 2117 | } |
| 2118 | |
| 2119 | /* allow this loop to be cancellable */ |
| 2120 | CHECK_FOR_INTERRUPTS(); |
| 2121 | } |
| 2122 | |
| 2123 | /* |
| 2124 | * no more unmatched tuples |
| 2125 | */ |
| 2126 | return false; |
| 2127 | } |
| 2128 | |
| 2129 | /* |
| 2130 | * ExecHashTableReset |
| 2131 | * |
| 2132 | * reset hash table header for new batch |
| 2133 | */ |
| 2134 | void |
| 2135 | ExecHashTableReset(HashJoinTable hashtable) |
| 2136 | { |
| 2137 | MemoryContext oldcxt; |
| 2138 | int nbuckets = hashtable->nbuckets; |
| 2139 | |
| 2140 | /* |
| 2141 | * Release all the hash buckets and tuples acquired in the prior pass, and |
| 2142 | * reinitialize the context for a new pass. |
| 2143 | */ |
| 2144 | MemoryContextReset(hashtable->batchCxt); |
| 2145 | oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); |
| 2146 | |
| 2147 | /* Reallocate and reinitialize the hash bucket headers. */ |
| 2148 | hashtable->buckets.unshared = (HashJoinTuple *) |
| 2149 | palloc0(nbuckets * sizeof(HashJoinTuple)); |
| 2150 | |
| 2151 | hashtable->spaceUsed = 0; |
| 2152 | |
| 2153 | MemoryContextSwitchTo(oldcxt); |
| 2154 | |
| 2155 | /* Forget the chunks (the memory was freed by the context reset above). */ |
| 2156 | hashtable->chunks = NULL; |
| 2157 | } |
| 2158 | |
| 2159 | /* |
| 2160 | * ExecHashTableResetMatchFlags |
| 2161 | * Clear all the HeapTupleHeaderHasMatch flags in the table |
| 2162 | */ |
| 2163 | void |
| 2164 | ExecHashTableResetMatchFlags(HashJoinTable hashtable) |
| 2165 | { |
| 2166 | HashJoinTuple tuple; |
| 2167 | int i; |
| 2168 | |
| 2169 | /* Reset all flags in the main table ... */ |
| 2170 | for (i = 0; i < hashtable->nbuckets; i++) |
| 2171 | { |
| 2172 | for (tuple = hashtable->buckets.unshared[i]; tuple != NULL; |
| 2173 | tuple = tuple->next.unshared) |
| 2174 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); |
| 2175 | } |
| 2176 | |
| 2177 | /* ... and the same for the skew buckets, if any */ |
| 2178 | for (i = 0; i < hashtable->nSkewBuckets; i++) |
| 2179 | { |
| 2180 | int j = hashtable->skewBucketNums[i]; |
| 2181 | HashSkewBucket *skewBucket = hashtable->skewBucket[j]; |
| 2182 | |
| 2183 | for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared) |
| 2184 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); |
| 2185 | } |
| 2186 | } |
| 2187 | |
| 2188 | |
| 2189 | void |
| 2190 | ExecReScanHash(HashState *node) |
| 2191 | { |
| 2192 | /* |
| 2193 | * if chgParam of subnode is not null then plan will be re-scanned by |
| 2194 | * first ExecProcNode. |
| 2195 | */ |
| 2196 | if (node->ps.lefttree->chgParam == NULL) |
| 2197 | ExecReScan(node->ps.lefttree); |
| 2198 | } |
| 2199 | |
| 2200 | |
| 2201 | /* |
| 2202 | * ExecHashBuildSkewHash |
| 2203 | * |
| 2204 | * Set up for skew optimization if we can identify the most common values |
| 2205 | * (MCVs) of the outer relation's join key. We make a skew hash bucket |
| 2206 | * for the hash value of each MCV, up to the number of slots allowed |
| 2207 | * based on available memory. |
| 2208 | */ |
| 2209 | static void |
| 2210 | ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) |
| 2211 | { |
| 2212 | HeapTupleData *statsTuple; |
| 2213 | AttStatsSlot sslot; |
| 2214 | |
| 2215 | /* Do nothing if planner didn't identify the outer relation's join key */ |
| 2216 | if (!OidIsValid(node->skewTable)) |
| 2217 | return; |
| 2218 | /* Also, do nothing if we don't have room for at least one skew bucket */ |
| 2219 | if (mcvsToUse <= 0) |
| 2220 | return; |
| 2221 | |
| 2222 | /* |
| 2223 | * Try to find the MCV statistics for the outer relation's join key. |
| 2224 | */ |
| 2225 | statsTuple = SearchSysCache3(STATRELATTINH, |
| 2226 | ObjectIdGetDatum(node->skewTable), |
| 2227 | Int16GetDatum(node->skewColumn), |
| 2228 | BoolGetDatum(node->skewInherit)); |
| 2229 | if (!HeapTupleIsValid(statsTuple)) |
| 2230 | return; |
| 2231 | |
| 2232 | if (get_attstatsslot(&sslot, statsTuple, |
| 2233 | STATISTIC_KIND_MCV, InvalidOid, |
| 2234 | ATTSTATSSLOT_VALUES | ATTSTATSSLOT_NUMBERS)) |
| 2235 | { |
| 2236 | double frac; |
| 2237 | int nbuckets; |
| 2238 | FmgrInfo *hashfunctions; |
| 2239 | int i; |
| 2240 | |
| 2241 | if (mcvsToUse > sslot.nvalues) |
| 2242 | mcvsToUse = sslot.nvalues; |
| 2243 | |
| 2244 | /* |
| 2245 | * Calculate the expected fraction of outer relation that will |
| 2246 | * participate in the skew optimization. If this isn't at least |
| 2247 | * SKEW_MIN_OUTER_FRACTION, don't use skew optimization. |
| 2248 | */ |
| 2249 | frac = 0; |
| 2250 | for (i = 0; i < mcvsToUse; i++) |
| 2251 | frac += sslot.numbers[i]; |
| 2252 | if (frac < SKEW_MIN_OUTER_FRACTION) |
| 2253 | { |
| 2254 | free_attstatsslot(&sslot); |
| 2255 | ReleaseSysCache(statsTuple); |
| 2256 | return; |
| 2257 | } |
| 2258 | |
| 2259 | /* |
| 2260 | * Okay, set up the skew hashtable. |
| 2261 | * |
| 2262 | * skewBucket[] is an open addressing hashtable with a power of 2 size |
| 2263 | * that is greater than the number of MCV values. (This ensures there |
| 2264 | * will be at least one null entry, so searches will always |
| 2265 | * terminate.) |
| 2266 | * |
| 2267 | * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or |
| 2268 | * MaxAllocSize/sizeof(void *)/8, but that is not currently possible |
| 2269 | * since we limit pg_statistic entries to much less than that. |
| 2270 | */ |
| 2271 | nbuckets = 2; |
| 2272 | while (nbuckets <= mcvsToUse) |
| 2273 | nbuckets <<= 1; |
| 2274 | /* use two more bits just to help avoid collisions */ |
| 2275 | nbuckets <<= 2; |
| 2276 | |
| 2277 | hashtable->skewEnabled = true; |
| 2278 | hashtable->skewBucketLen = nbuckets; |
| 2279 | |
| 2280 | /* |
| 2281 | * We allocate the bucket memory in the hashtable's batch context. It |
| 2282 | * is only needed during the first batch, and this ensures it will be |
| 2283 | * automatically removed once the first batch is done. |
| 2284 | */ |
| 2285 | hashtable->skewBucket = (HashSkewBucket **) |
| 2286 | MemoryContextAllocZero(hashtable->batchCxt, |
| 2287 | nbuckets * sizeof(HashSkewBucket *)); |
| 2288 | hashtable->skewBucketNums = (int *) |
| 2289 | MemoryContextAllocZero(hashtable->batchCxt, |
| 2290 | mcvsToUse * sizeof(int)); |
| 2291 | |
| 2292 | hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *) |
| 2293 | + mcvsToUse * sizeof(int); |
| 2294 | hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) |
| 2295 | + mcvsToUse * sizeof(int); |
| 2296 | if (hashtable->spaceUsed > hashtable->spacePeak) |
| 2297 | hashtable->spacePeak = hashtable->spaceUsed; |
| 2298 | |
| 2299 | /* |
| 2300 | * Create a skew bucket for each MCV hash value. |
| 2301 | * |
| 2302 | * Note: it is very important that we create the buckets in order of |
| 2303 | * decreasing MCV frequency. If we have to remove some buckets, they |
| 2304 | * must be removed in reverse order of creation (see notes in |
| 2305 | * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to |
| 2306 | * be removed first. |
| 2307 | */ |
| 2308 | hashfunctions = hashtable->outer_hashfunctions; |
| 2309 | |
| 2310 | for (i = 0; i < mcvsToUse; i++) |
| 2311 | { |
| 2312 | uint32 hashvalue; |
| 2313 | int bucket; |
| 2314 | |
| 2315 | hashvalue = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[0], |
| 2316 | hashtable->collations[0], |
| 2317 | sslot.values[i])); |
| 2318 | |
| 2319 | /* |
| 2320 | * While we have not hit a hole in the hashtable and have not hit |
| 2321 | * the desired bucket, we have collided with some previous hash |
| 2322 | * value, so try the next bucket location. NB: this code must |
| 2323 | * match ExecHashGetSkewBucket. |
| 2324 | */ |
| 2325 | bucket = hashvalue & (nbuckets - 1); |
| 2326 | while (hashtable->skewBucket[bucket] != NULL && |
| 2327 | hashtable->skewBucket[bucket]->hashvalue != hashvalue) |
| 2328 | bucket = (bucket + 1) & (nbuckets - 1); |
| 2329 | |
| 2330 | /* |
| 2331 | * If we found an existing bucket with the same hashvalue, leave |
| 2332 | * it alone. It's okay for two MCVs to share a hashvalue. |
| 2333 | */ |
| 2334 | if (hashtable->skewBucket[bucket] != NULL) |
| 2335 | continue; |
| 2336 | |
| 2337 | /* Okay, create a new skew bucket for this hashvalue. */ |
| 2338 | hashtable->skewBucket[bucket] = (HashSkewBucket *) |
| 2339 | MemoryContextAlloc(hashtable->batchCxt, |
| 2340 | sizeof(HashSkewBucket)); |
| 2341 | hashtable->skewBucket[bucket]->hashvalue = hashvalue; |
| 2342 | hashtable->skewBucket[bucket]->tuples = NULL; |
| 2343 | hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; |
| 2344 | hashtable->nSkewBuckets++; |
| 2345 | hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; |
| 2346 | hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; |
| 2347 | if (hashtable->spaceUsed > hashtable->spacePeak) |
| 2348 | hashtable->spacePeak = hashtable->spaceUsed; |
| 2349 | } |
| 2350 | |
| 2351 | free_attstatsslot(&sslot); |
| 2352 | } |
| 2353 | |
| 2354 | ReleaseSysCache(statsTuple); |
| 2355 | } |
| 2356 | |
| 2357 | /* |
| 2358 | * ExecHashGetSkewBucket |
| 2359 | * |
| 2360 | * Returns the index of the skew bucket for this hashvalue, |
| 2361 | * or INVALID_SKEW_BUCKET_NO if the hashvalue is not |
| 2362 | * associated with any active skew bucket. |
| 2363 | */ |
| 2364 | int |
| 2365 | ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) |
| 2366 | { |
| 2367 | int bucket; |
| 2368 | |
| 2369 | /* |
| 2370 | * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in |
| 2371 | * particular, this happens after the initial batch is done). |
| 2372 | */ |
| 2373 | if (!hashtable->skewEnabled) |
| 2374 | return INVALID_SKEW_BUCKET_NO; |
| 2375 | |
| 2376 | /* |
| 2377 | * Since skewBucketLen is a power of 2, we can do a modulo by ANDing. |
| 2378 | */ |
| 2379 | bucket = hashvalue & (hashtable->skewBucketLen - 1); |
| 2380 | |
| 2381 | /* |
| 2382 | * While we have not hit a hole in the hashtable and have not hit the |
| 2383 | * desired bucket, we have collided with some other hash value, so try the |
| 2384 | * next bucket location. |
| 2385 | */ |
| 2386 | while (hashtable->skewBucket[bucket] != NULL && |
| 2387 | hashtable->skewBucket[bucket]->hashvalue != hashvalue) |
| 2388 | bucket = (bucket + 1) & (hashtable->skewBucketLen - 1); |
| 2389 | |
| 2390 | /* |
| 2391 | * Found the desired bucket? |
| 2392 | */ |
| 2393 | if (hashtable->skewBucket[bucket] != NULL) |
| 2394 | return bucket; |
| 2395 | |
| 2396 | /* |
| 2397 | * There must not be any hashtable entry for this hash value. |
| 2398 | */ |
| 2399 | return INVALID_SKEW_BUCKET_NO; |
| 2400 | } |
| 2401 | |
| 2402 | /* |
| 2403 | * ExecHashSkewTableInsert |
| 2404 | * |
| 2405 | * Insert a tuple into the skew hashtable. |
| 2406 | * |
| 2407 | * This should generally match up with the current-batch case in |
| 2408 | * ExecHashTableInsert. |
| 2409 | */ |
| 2410 | static void |
| 2411 | ExecHashSkewTableInsert(HashJoinTable hashtable, |
| 2412 | TupleTableSlot *slot, |
| 2413 | uint32 hashvalue, |
| 2414 | int bucketNumber) |
| 2415 | { |
| 2416 | bool shouldFree; |
| 2417 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
| 2418 | HashJoinTuple hashTuple; |
| 2419 | int hashTupleSize; |
| 2420 | |
| 2421 | /* Create the HashJoinTuple */ |
| 2422 | hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
| 2423 | hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, |
| 2424 | hashTupleSize); |
| 2425 | hashTuple->hashvalue = hashvalue; |
| 2426 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
| 2427 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
| 2428 | |
| 2429 | /* Push it onto the front of the skew bucket's list */ |
| 2430 | hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples; |
| 2431 | hashtable->skewBucket[bucketNumber]->tuples = hashTuple; |
| 2432 | Assert(hashTuple != hashTuple->next.unshared); |
| 2433 | |
| 2434 | /* Account for space used, and back off if we've used too much */ |
| 2435 | hashtable->spaceUsed += hashTupleSize; |
| 2436 | hashtable->spaceUsedSkew += hashTupleSize; |
| 2437 | if (hashtable->spaceUsed > hashtable->spacePeak) |
| 2438 | hashtable->spacePeak = hashtable->spaceUsed; |
| 2439 | while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) |
| 2440 | ExecHashRemoveNextSkewBucket(hashtable); |
| 2441 | |
| 2442 | /* Check we are not over the total spaceAllowed, either */ |
| 2443 | if (hashtable->spaceUsed > hashtable->spaceAllowed) |
| 2444 | ExecHashIncreaseNumBatches(hashtable); |
| 2445 | |
| 2446 | if (shouldFree) |
| 2447 | heap_free_minimal_tuple(tuple); |
| 2448 | } |
| 2449 | |
| 2450 | /* |
| 2451 | * ExecHashRemoveNextSkewBucket |
| 2452 | * |
| 2453 | * Remove the least valuable skew bucket by pushing its tuples into |
| 2454 | * the main hash table. |
| 2455 | */ |
| 2456 | static void |
| 2457 | ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) |
| 2458 | { |
| 2459 | int bucketToRemove; |
| 2460 | HashSkewBucket *bucket; |
| 2461 | uint32 hashvalue; |
| 2462 | int bucketno; |
| 2463 | int batchno; |
| 2464 | HashJoinTuple hashTuple; |
| 2465 | |
| 2466 | /* Locate the bucket to remove */ |
| 2467 | bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; |
| 2468 | bucket = hashtable->skewBucket[bucketToRemove]; |
| 2469 | |
| 2470 | /* |
| 2471 | * Calculate which bucket and batch the tuples belong to in the main |
| 2472 | * hashtable. They all have the same hash value, so it's the same for all |
| 2473 | * of them. Also note that it's not possible for nbatch to increase while |
| 2474 | * we are processing the tuples. |
| 2475 | */ |
| 2476 | hashvalue = bucket->hashvalue; |
| 2477 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
| 2478 | |
| 2479 | /* Process all tuples in the bucket */ |
| 2480 | hashTuple = bucket->tuples; |
| 2481 | while (hashTuple != NULL) |
| 2482 | { |
| 2483 | HashJoinTuple nextHashTuple = hashTuple->next.unshared; |
| 2484 | MinimalTuple tuple; |
| 2485 | Size tupleSize; |
| 2486 | |
| 2487 | /* |
| 2488 | * This code must agree with ExecHashTableInsert. We do not use |
| 2489 | * ExecHashTableInsert directly as ExecHashTableInsert expects a |
| 2490 | * TupleTableSlot while we already have HashJoinTuples. |
| 2491 | */ |
| 2492 | tuple = HJTUPLE_MINTUPLE(hashTuple); |
| 2493 | tupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
| 2494 | |
| 2495 | /* Decide whether to put the tuple in the hash table or a temp file */ |
| 2496 | if (batchno == hashtable->curbatch) |
| 2497 | { |
| 2498 | /* Move the tuple to the main hash table */ |
| 2499 | HashJoinTuple copyTuple; |
| 2500 | |
| 2501 | /* |
| 2502 | * We must copy the tuple into the dense storage, else it will not |
| 2503 | * be found by, eg, ExecHashIncreaseNumBatches. |
| 2504 | */ |
| 2505 | copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize); |
| 2506 | memcpy(copyTuple, hashTuple, tupleSize); |
| 2507 | pfree(hashTuple); |
| 2508 | |
| 2509 | copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
| 2510 | hashtable->buckets.unshared[bucketno] = copyTuple; |
| 2511 | |
| 2512 | /* We have reduced skew space, but overall space doesn't change */ |
| 2513 | hashtable->spaceUsedSkew -= tupleSize; |
| 2514 | } |
| 2515 | else |
| 2516 | { |
| 2517 | /* Put the tuple into a temp file for later batches */ |
| 2518 | Assert(batchno > hashtable->curbatch); |
| 2519 | ExecHashJoinSaveTuple(tuple, hashvalue, |
| 2520 | &hashtable->innerBatchFile[batchno]); |
| 2521 | pfree(hashTuple); |
| 2522 | hashtable->spaceUsed -= tupleSize; |
| 2523 | hashtable->spaceUsedSkew -= tupleSize; |
| 2524 | } |
| 2525 | |
| 2526 | hashTuple = nextHashTuple; |
| 2527 | |
| 2528 | /* allow this loop to be cancellable */ |
| 2529 | CHECK_FOR_INTERRUPTS(); |
| 2530 | } |
| 2531 | |
| 2532 | /* |
| 2533 | * Free the bucket struct itself and reset the hashtable entry to NULL. |
| 2534 | * |
| 2535 | * NOTE: this is not nearly as simple as it looks on the surface, because |
| 2536 | * of the possibility of collisions in the hashtable. Suppose that hash |
| 2537 | * values A and B collide at a particular hashtable entry, and that A was |
| 2538 | * entered first so B gets shifted to a different table entry. If we were |
| 2539 | * to remove A first then ExecHashGetSkewBucket would mistakenly start |
| 2540 | * reporting that B is not in the hashtable, because it would hit the NULL |
| 2541 | * before finding B. However, we always remove entries in the reverse |
| 2542 | * order of creation, so this failure cannot happen. |
| 2543 | */ |
| 2544 | hashtable->skewBucket[bucketToRemove] = NULL; |
| 2545 | hashtable->nSkewBuckets--; |
| 2546 | pfree(bucket); |
| 2547 | hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD; |
| 2548 | hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD; |
| 2549 | |
| 2550 | /* |
| 2551 | * If we have removed all skew buckets then give up on skew optimization. |
| 2552 | * Release the arrays since they aren't useful any more. |
| 2553 | */ |
| 2554 | if (hashtable->nSkewBuckets == 0) |
| 2555 | { |
| 2556 | hashtable->skewEnabled = false; |
| 2557 | pfree(hashtable->skewBucket); |
| 2558 | pfree(hashtable->skewBucketNums); |
| 2559 | hashtable->skewBucket = NULL; |
| 2560 | hashtable->skewBucketNums = NULL; |
| 2561 | hashtable->spaceUsed -= hashtable->spaceUsedSkew; |
| 2562 | hashtable->spaceUsedSkew = 0; |
| 2563 | } |
| 2564 | } |
| 2565 | |
| 2566 | /* |
| 2567 | * Reserve space in the DSM segment for instrumentation data. |
| 2568 | */ |
| 2569 | void |
| 2570 | ExecHashEstimate(HashState *node, ParallelContext *pcxt) |
| 2571 | { |
| 2572 | size_t size; |
| 2573 | |
| 2574 | /* don't need this if not instrumenting or no workers */ |
| 2575 | if (!node->ps.instrument || pcxt->nworkers == 0) |
| 2576 | return; |
| 2577 | |
| 2578 | size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation)); |
| 2579 | size = add_size(size, offsetof(SharedHashInfo, hinstrument)); |
| 2580 | shm_toc_estimate_chunk(&pcxt->estimator, size); |
| 2581 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
| 2582 | } |
| 2583 | |
| 2584 | /* |
| 2585 | * Set up a space in the DSM for all workers to record instrumentation data |
| 2586 | * about their hash table. |
| 2587 | */ |
| 2588 | void |
| 2589 | ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) |
| 2590 | { |
| 2591 | size_t size; |
| 2592 | |
| 2593 | /* don't need this if not instrumenting or no workers */ |
| 2594 | if (!node->ps.instrument || pcxt->nworkers == 0) |
| 2595 | return; |
| 2596 | |
| 2597 | size = offsetof(SharedHashInfo, hinstrument) + |
| 2598 | pcxt->nworkers * sizeof(HashInstrumentation); |
| 2599 | node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); |
| 2600 | memset(node->shared_info, 0, size); |
| 2601 | node->shared_info->num_workers = pcxt->nworkers; |
| 2602 | shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, |
| 2603 | node->shared_info); |
| 2604 | } |
| 2605 | |
| 2606 | /* |
| 2607 | * Locate the DSM space for hash table instrumentation data that we'll write |
| 2608 | * to at shutdown time. |
| 2609 | */ |
| 2610 | void |
| 2611 | ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) |
| 2612 | { |
| 2613 | SharedHashInfo *shared_info; |
| 2614 | |
| 2615 | /* don't need this if not instrumenting */ |
| 2616 | if (!node->ps.instrument) |
| 2617 | return; |
| 2618 | |
| 2619 | shared_info = (SharedHashInfo *) |
| 2620 | shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); |
| 2621 | node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; |
| 2622 | } |
| 2623 | |
| 2624 | /* |
| 2625 | * Copy instrumentation data from this worker's hash table (if it built one) |
| 2626 | * to DSM memory so the leader can retrieve it. This must be done in an |
| 2627 | * ExecShutdownHash() rather than ExecEndHash() because the latter runs after |
| 2628 | * we've detached from the DSM segment. |
| 2629 | */ |
| 2630 | void |
| 2631 | ExecShutdownHash(HashState *node) |
| 2632 | { |
| 2633 | if (node->hinstrument && node->hashtable) |
| 2634 | ExecHashGetInstrumentation(node->hinstrument, node->hashtable); |
| 2635 | } |
| 2636 | |
| 2637 | /* |
| 2638 | * Retrieve instrumentation data from workers before the DSM segment is |
| 2639 | * detached, so that EXPLAIN can access it. |
| 2640 | */ |
| 2641 | void |
| 2642 | ExecHashRetrieveInstrumentation(HashState *node) |
| 2643 | { |
| 2644 | SharedHashInfo *shared_info = node->shared_info; |
| 2645 | size_t size; |
| 2646 | |
| 2647 | if (shared_info == NULL) |
| 2648 | return; |
| 2649 | |
| 2650 | /* Replace node->shared_info with a copy in backend-local memory. */ |
| 2651 | size = offsetof(SharedHashInfo, hinstrument) + |
| 2652 | shared_info->num_workers * sizeof(HashInstrumentation); |
| 2653 | node->shared_info = palloc(size); |
| 2654 | memcpy(node->shared_info, shared_info, size); |
| 2655 | } |
| 2656 | |
| 2657 | /* |
| 2658 | * Copy the instrumentation data from 'hashtable' into a HashInstrumentation |
| 2659 | * struct. |
| 2660 | */ |
| 2661 | void |
| 2662 | ExecHashGetInstrumentation(HashInstrumentation *instrument, |
| 2663 | HashJoinTable hashtable) |
| 2664 | { |
| 2665 | instrument->nbuckets = hashtable->nbuckets; |
| 2666 | instrument->nbuckets_original = hashtable->nbuckets_original; |
| 2667 | instrument->nbatch = hashtable->nbatch; |
| 2668 | instrument->nbatch_original = hashtable->nbatch_original; |
| 2669 | instrument->space_peak = hashtable->spacePeak; |
| 2670 | } |
| 2671 | |
| 2672 | /* |
| 2673 | * Allocate 'size' bytes from the currently active HashMemoryChunk |
| 2674 | */ |
| 2675 | static void * |
| 2676 | dense_alloc(HashJoinTable hashtable, Size size) |
| 2677 | { |
| 2678 | HashMemoryChunk newChunk; |
| 2679 | char *ptr; |
| 2680 | |
| 2681 | /* just in case the size is not already aligned properly */ |
| 2682 | size = MAXALIGN(size); |
| 2683 | |
| 2684 | /* |
| 2685 | * If tuple size is larger than threshold, allocate a separate chunk. |
| 2686 | */ |
| 2687 | if (size > HASH_CHUNK_THRESHOLD) |
| 2688 | { |
| 2689 | /* allocate new chunk and put it at the beginning of the list */ |
| 2690 | newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, |
| 2691 | HASH_CHUNK_HEADER_SIZE + size); |
| 2692 | newChunk->maxlen = size; |
| 2693 | newChunk->used = size; |
| 2694 | newChunk->ntuples = 1; |
| 2695 | |
| 2696 | /* |
| 2697 | * Add this chunk to the list after the first existing chunk, so that |
| 2698 | * we don't lose the remaining space in the "current" chunk. |
| 2699 | */ |
| 2700 | if (hashtable->chunks != NULL) |
| 2701 | { |
| 2702 | newChunk->next = hashtable->chunks->next; |
| 2703 | hashtable->chunks->next.unshared = newChunk; |
| 2704 | } |
| 2705 | else |
| 2706 | { |
| 2707 | newChunk->next.unshared = hashtable->chunks; |
| 2708 | hashtable->chunks = newChunk; |
| 2709 | } |
| 2710 | |
| 2711 | return HASH_CHUNK_DATA(newChunk); |
| 2712 | } |
| 2713 | |
| 2714 | /* |
| 2715 | * See if we have enough space for it in the current chunk (if any). If |
| 2716 | * not, allocate a fresh chunk. |
| 2717 | */ |
| 2718 | if ((hashtable->chunks == NULL) || |
| 2719 | (hashtable->chunks->maxlen - hashtable->chunks->used) < size) |
| 2720 | { |
| 2721 | /* allocate new chunk and put it at the beginning of the list */ |
| 2722 | newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, |
| 2723 | HASH_CHUNK_HEADER_SIZE + HASH_CHUNK_SIZE); |
| 2724 | |
| 2725 | newChunk->maxlen = HASH_CHUNK_SIZE; |
| 2726 | newChunk->used = size; |
| 2727 | newChunk->ntuples = 1; |
| 2728 | |
| 2729 | newChunk->next.unshared = hashtable->chunks; |
| 2730 | hashtable->chunks = newChunk; |
| 2731 | |
| 2732 | return HASH_CHUNK_DATA(newChunk); |
| 2733 | } |
| 2734 | |
| 2735 | /* There is enough space in the current chunk, let's add the tuple */ |
| 2736 | ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used; |
| 2737 | hashtable->chunks->used += size; |
| 2738 | hashtable->chunks->ntuples += 1; |
| 2739 | |
| 2740 | /* return pointer to the start of the tuple memory */ |
| 2741 | return ptr; |
| 2742 | } |
| 2743 | |
| 2744 | /* |
| 2745 | * Allocate space for a tuple in shared dense storage. This is equivalent to |
| 2746 | * dense_alloc but for Parallel Hash using shared memory. |
| 2747 | * |
| 2748 | * While loading a tuple into shared memory, we might run out of memory and |
| 2749 | * decide to repartition, or determine that the load factor is too high and |
| 2750 | * decide to expand the bucket array, or discover that another participant has |
| 2751 | * commanded us to help do that. Return NULL if number of buckets or batches |
| 2752 | * has changed, indicating that the caller must retry (considering the |
| 2753 | * possibility that the tuple no longer belongs in the same batch). |
| 2754 | */ |
| 2755 | static HashJoinTuple |
| 2756 | ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, |
| 2757 | dsa_pointer *shared) |
| 2758 | { |
| 2759 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 2760 | dsa_pointer chunk_shared; |
| 2761 | HashMemoryChunk chunk; |
| 2762 | Size chunk_size; |
| 2763 | HashJoinTuple result; |
| 2764 | int curbatch = hashtable->curbatch; |
| 2765 | |
| 2766 | size = MAXALIGN(size); |
| 2767 | |
| 2768 | /* |
| 2769 | * Fast path: if there is enough space in this backend's current chunk, |
| 2770 | * then we can allocate without any locking. |
| 2771 | */ |
| 2772 | chunk = hashtable->current_chunk; |
| 2773 | if (chunk != NULL && |
| 2774 | size <= HASH_CHUNK_THRESHOLD && |
| 2775 | chunk->maxlen - chunk->used >= size) |
| 2776 | { |
| 2777 | |
| 2778 | chunk_shared = hashtable->current_chunk_shared; |
| 2779 | Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); |
| 2780 | *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used; |
| 2781 | result = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + chunk->used); |
| 2782 | chunk->used += size; |
| 2783 | |
| 2784 | Assert(chunk->used <= chunk->maxlen); |
| 2785 | Assert(result == dsa_get_address(hashtable->area, *shared)); |
| 2786 | |
| 2787 | return result; |
| 2788 | } |
| 2789 | |
| 2790 | /* Slow path: try to allocate a new chunk. */ |
| 2791 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| 2792 | |
| 2793 | /* |
| 2794 | * Check if we need to help increase the number of buckets or batches. |
| 2795 | */ |
| 2796 | if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || |
| 2797 | pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| 2798 | { |
| 2799 | ParallelHashGrowth growth = pstate->growth; |
| 2800 | |
| 2801 | hashtable->current_chunk = NULL; |
| 2802 | LWLockRelease(&pstate->lock); |
| 2803 | |
| 2804 | /* Another participant has commanded us to help grow. */ |
| 2805 | if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) |
| 2806 | ExecParallelHashIncreaseNumBatches(hashtable); |
| 2807 | else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| 2808 | ExecParallelHashIncreaseNumBuckets(hashtable); |
| 2809 | |
| 2810 | /* The caller must retry. */ |
| 2811 | return NULL; |
| 2812 | } |
| 2813 | |
| 2814 | /* Oversized tuples get their own chunk. */ |
| 2815 | if (size > HASH_CHUNK_THRESHOLD) |
| 2816 | chunk_size = size + HASH_CHUNK_HEADER_SIZE; |
| 2817 | else |
| 2818 | chunk_size = HASH_CHUNK_SIZE; |
| 2819 | |
| 2820 | /* Check if it's time to grow batches or buckets. */ |
| 2821 | if (pstate->growth != PHJ_GROWTH_DISABLED) |
| 2822 | { |
| 2823 | Assert(curbatch == 0); |
| 2824 | Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); |
| 2825 | |
| 2826 | /* |
| 2827 | * Check if our space limit would be exceeded. To avoid choking on |
| 2828 | * very large tuples or very low work_mem setting, we'll always allow |
| 2829 | * each backend to allocate at least one chunk. |
| 2830 | */ |
| 2831 | if (hashtable->batches[0].at_least_one_chunk && |
| 2832 | hashtable->batches[0].shared->size + |
| 2833 | chunk_size > pstate->space_allowed) |
| 2834 | { |
| 2835 | pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
| 2836 | hashtable->batches[0].shared->space_exhausted = true; |
| 2837 | LWLockRelease(&pstate->lock); |
| 2838 | |
| 2839 | return NULL; |
| 2840 | } |
| 2841 | |
| 2842 | /* Check if our load factor limit would be exceeded. */ |
| 2843 | if (hashtable->nbatch == 1) |
| 2844 | { |
| 2845 | hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples; |
| 2846 | hashtable->batches[0].ntuples = 0; |
| 2847 | /* Guard against integer overflow and alloc size overflow */ |
| 2848 | if (hashtable->batches[0].shared->ntuples + 1 > |
| 2849 | hashtable->nbuckets * NTUP_PER_BUCKET && |
| 2850 | hashtable->nbuckets < (INT_MAX / 2) && |
| 2851 | hashtable->nbuckets * 2 <= |
| 2852 | MaxAllocSize / sizeof(dsa_pointer_atomic)) |
| 2853 | { |
| 2854 | pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS; |
| 2855 | LWLockRelease(&pstate->lock); |
| 2856 | |
| 2857 | return NULL; |
| 2858 | } |
| 2859 | } |
| 2860 | } |
| 2861 | |
| 2862 | /* We are cleared to allocate a new chunk. */ |
| 2863 | chunk_shared = dsa_allocate(hashtable->area, chunk_size); |
| 2864 | hashtable->batches[curbatch].shared->size += chunk_size; |
| 2865 | hashtable->batches[curbatch].at_least_one_chunk = true; |
| 2866 | |
| 2867 | /* Set up the chunk. */ |
| 2868 | chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); |
| 2869 | *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE; |
| 2870 | chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE; |
| 2871 | chunk->used = size; |
| 2872 | |
| 2873 | /* |
| 2874 | * Push it onto the list of chunks, so that it can be found if we need to |
| 2875 | * increase the number of buckets or batches (batch 0 only) and later for |
| 2876 | * freeing the memory (all batches). |
| 2877 | */ |
| 2878 | chunk->next.shared = hashtable->batches[curbatch].shared->chunks; |
| 2879 | hashtable->batches[curbatch].shared->chunks = chunk_shared; |
| 2880 | |
| 2881 | if (size <= HASH_CHUNK_THRESHOLD) |
| 2882 | { |
| 2883 | /* |
| 2884 | * Make this the current chunk so that we can use the fast path to |
| 2885 | * fill the rest of it up in future calls. |
| 2886 | */ |
| 2887 | hashtable->current_chunk = chunk; |
| 2888 | hashtable->current_chunk_shared = chunk_shared; |
| 2889 | } |
| 2890 | LWLockRelease(&pstate->lock); |
| 2891 | |
| 2892 | Assert(HASH_CHUNK_DATA(chunk) == dsa_get_address(hashtable->area, *shared)); |
| 2893 | result = (HashJoinTuple) HASH_CHUNK_DATA(chunk); |
| 2894 | |
| 2895 | return result; |
| 2896 | } |
| 2897 | |
| 2898 | /* |
| 2899 | * One backend needs to set up the shared batch state including tuplestores. |
| 2900 | * Other backends will ensure they have correctly configured accessors by |
| 2901 | * called ExecParallelHashEnsureBatchAccessors(). |
| 2902 | */ |
| 2903 | static void |
| 2904 | ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) |
| 2905 | { |
| 2906 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 2907 | ParallelHashJoinBatch *batches; |
| 2908 | MemoryContext oldcxt; |
| 2909 | int i; |
| 2910 | |
| 2911 | Assert(hashtable->batches == NULL); |
| 2912 | |
| 2913 | /* Allocate space. */ |
| 2914 | pstate->batches = |
| 2915 | dsa_allocate0(hashtable->area, |
| 2916 | EstimateParallelHashJoinBatch(hashtable) * nbatch); |
| 2917 | pstate->nbatch = nbatch; |
| 2918 | batches = dsa_get_address(hashtable->area, pstate->batches); |
| 2919 | |
| 2920 | /* Use hash join memory context. */ |
| 2921 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
| 2922 | |
| 2923 | /* Allocate this backend's accessor array. */ |
| 2924 | hashtable->nbatch = nbatch; |
| 2925 | hashtable->batches = (ParallelHashJoinBatchAccessor *) |
| 2926 | palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); |
| 2927 | |
| 2928 | /* Set up the shared state, tuplestores and backend-local accessors. */ |
| 2929 | for (i = 0; i < hashtable->nbatch; ++i) |
| 2930 | { |
| 2931 | ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; |
| 2932 | ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); |
| 2933 | char name[MAXPGPATH]; |
| 2934 | |
| 2935 | /* |
| 2936 | * All members of shared were zero-initialized. We just need to set |
| 2937 | * up the Barrier. |
| 2938 | */ |
| 2939 | BarrierInit(&shared->batch_barrier, 0); |
| 2940 | if (i == 0) |
| 2941 | { |
| 2942 | /* Batch 0 doesn't need to be loaded. */ |
| 2943 | BarrierAttach(&shared->batch_barrier); |
| 2944 | while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING) |
| 2945 | BarrierArriveAndWait(&shared->batch_barrier, 0); |
| 2946 | BarrierDetach(&shared->batch_barrier); |
| 2947 | } |
| 2948 | |
| 2949 | /* Initialize accessor state. All members were zero-initialized. */ |
| 2950 | accessor->shared = shared; |
| 2951 | |
| 2952 | /* Initialize the shared tuplestores. */ |
| 2953 | snprintf(name, sizeof(name), "i%dof%d" , i, hashtable->nbatch); |
| 2954 | accessor->inner_tuples = |
| 2955 | sts_initialize(ParallelHashJoinBatchInner(shared), |
| 2956 | pstate->nparticipants, |
| 2957 | ParallelWorkerNumber + 1, |
| 2958 | sizeof(uint32), |
| 2959 | SHARED_TUPLESTORE_SINGLE_PASS, |
| 2960 | &pstate->fileset, |
| 2961 | name); |
| 2962 | snprintf(name, sizeof(name), "o%dof%d" , i, hashtable->nbatch); |
| 2963 | accessor->outer_tuples = |
| 2964 | sts_initialize(ParallelHashJoinBatchOuter(shared, |
| 2965 | pstate->nparticipants), |
| 2966 | pstate->nparticipants, |
| 2967 | ParallelWorkerNumber + 1, |
| 2968 | sizeof(uint32), |
| 2969 | SHARED_TUPLESTORE_SINGLE_PASS, |
| 2970 | &pstate->fileset, |
| 2971 | name); |
| 2972 | } |
| 2973 | |
| 2974 | MemoryContextSwitchTo(oldcxt); |
| 2975 | } |
| 2976 | |
| 2977 | /* |
| 2978 | * Free the current set of ParallelHashJoinBatchAccessor objects. |
| 2979 | */ |
| 2980 | static void |
| 2981 | ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable) |
| 2982 | { |
| 2983 | int i; |
| 2984 | |
| 2985 | for (i = 0; i < hashtable->nbatch; ++i) |
| 2986 | { |
| 2987 | /* Make sure no files are left open. */ |
| 2988 | sts_end_write(hashtable->batches[i].inner_tuples); |
| 2989 | sts_end_write(hashtable->batches[i].outer_tuples); |
| 2990 | sts_end_parallel_scan(hashtable->batches[i].inner_tuples); |
| 2991 | sts_end_parallel_scan(hashtable->batches[i].outer_tuples); |
| 2992 | } |
| 2993 | pfree(hashtable->batches); |
| 2994 | hashtable->batches = NULL; |
| 2995 | } |
| 2996 | |
| 2997 | /* |
| 2998 | * Make sure this backend has up-to-date accessors for the current set of |
| 2999 | * batches. |
| 3000 | */ |
| 3001 | static void |
| 3002 | ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) |
| 3003 | { |
| 3004 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 3005 | ParallelHashJoinBatch *batches; |
| 3006 | MemoryContext oldcxt; |
| 3007 | int i; |
| 3008 | |
| 3009 | if (hashtable->batches != NULL) |
| 3010 | { |
| 3011 | if (hashtable->nbatch == pstate->nbatch) |
| 3012 | return; |
| 3013 | ExecParallelHashCloseBatchAccessors(hashtable); |
| 3014 | } |
| 3015 | |
| 3016 | /* |
| 3017 | * It's possible for a backend to start up very late so that the whole |
| 3018 | * join is finished and the shm state for tracking batches has already |
| 3019 | * been freed by ExecHashTableDetach(). In that case we'll just leave |
| 3020 | * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives |
| 3021 | * up early. |
| 3022 | */ |
| 3023 | if (!DsaPointerIsValid(pstate->batches)) |
| 3024 | return; |
| 3025 | |
| 3026 | /* Use hash join memory context. */ |
| 3027 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
| 3028 | |
| 3029 | /* Allocate this backend's accessor array. */ |
| 3030 | hashtable->nbatch = pstate->nbatch; |
| 3031 | hashtable->batches = (ParallelHashJoinBatchAccessor *) |
| 3032 | palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); |
| 3033 | |
| 3034 | /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */ |
| 3035 | batches = (ParallelHashJoinBatch *) |
| 3036 | dsa_get_address(hashtable->area, pstate->batches); |
| 3037 | |
| 3038 | /* Set up the accessor array and attach to the tuplestores. */ |
| 3039 | for (i = 0; i < hashtable->nbatch; ++i) |
| 3040 | { |
| 3041 | ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; |
| 3042 | ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); |
| 3043 | |
| 3044 | accessor->shared = shared; |
| 3045 | accessor->preallocated = 0; |
| 3046 | accessor->done = false; |
| 3047 | accessor->inner_tuples = |
| 3048 | sts_attach(ParallelHashJoinBatchInner(shared), |
| 3049 | ParallelWorkerNumber + 1, |
| 3050 | &pstate->fileset); |
| 3051 | accessor->outer_tuples = |
| 3052 | sts_attach(ParallelHashJoinBatchOuter(shared, |
| 3053 | pstate->nparticipants), |
| 3054 | ParallelWorkerNumber + 1, |
| 3055 | &pstate->fileset); |
| 3056 | } |
| 3057 | |
| 3058 | MemoryContextSwitchTo(oldcxt); |
| 3059 | } |
| 3060 | |
| 3061 | /* |
| 3062 | * Allocate an empty shared memory hash table for a given batch. |
| 3063 | */ |
| 3064 | void |
| 3065 | ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno) |
| 3066 | { |
| 3067 | ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared; |
| 3068 | dsa_pointer_atomic *buckets; |
| 3069 | int nbuckets = hashtable->parallel_state->nbuckets; |
| 3070 | int i; |
| 3071 | |
| 3072 | batch->buckets = |
| 3073 | dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets); |
| 3074 | buckets = (dsa_pointer_atomic *) |
| 3075 | dsa_get_address(hashtable->area, batch->buckets); |
| 3076 | for (i = 0; i < nbuckets; ++i) |
| 3077 | dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
| 3078 | } |
| 3079 | |
| 3080 | /* |
| 3081 | * If we are currently attached to a shared hash join batch, detach. If we |
| 3082 | * are last to detach, clean up. |
| 3083 | */ |
| 3084 | void |
| 3085 | ExecHashTableDetachBatch(HashJoinTable hashtable) |
| 3086 | { |
| 3087 | if (hashtable->parallel_state != NULL && |
| 3088 | hashtable->curbatch >= 0) |
| 3089 | { |
| 3090 | int curbatch = hashtable->curbatch; |
| 3091 | ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; |
| 3092 | |
| 3093 | /* Make sure any temporary files are closed. */ |
| 3094 | sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); |
| 3095 | sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); |
| 3096 | |
| 3097 | /* Detach from the batch we were last working on. */ |
| 3098 | if (BarrierArriveAndDetach(&batch->batch_barrier)) |
| 3099 | { |
| 3100 | /* |
| 3101 | * Technically we shouldn't access the barrier because we're no |
| 3102 | * longer attached, but since there is no way it's moving after |
| 3103 | * this point it seems safe to make the following assertion. |
| 3104 | */ |
| 3105 | Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); |
| 3106 | |
| 3107 | /* Free shared chunks and buckets. */ |
| 3108 | while (DsaPointerIsValid(batch->chunks)) |
| 3109 | { |
| 3110 | HashMemoryChunk chunk = |
| 3111 | dsa_get_address(hashtable->area, batch->chunks); |
| 3112 | dsa_pointer next = chunk->next.shared; |
| 3113 | |
| 3114 | dsa_free(hashtable->area, batch->chunks); |
| 3115 | batch->chunks = next; |
| 3116 | } |
| 3117 | if (DsaPointerIsValid(batch->buckets)) |
| 3118 | { |
| 3119 | dsa_free(hashtable->area, batch->buckets); |
| 3120 | batch->buckets = InvalidDsaPointer; |
| 3121 | } |
| 3122 | } |
| 3123 | |
| 3124 | /* |
| 3125 | * Track the largest batch we've been attached to. Though each |
| 3126 | * backend might see a different subset of batches, explain.c will |
| 3127 | * scan the results from all backends to find the largest value. |
| 3128 | */ |
| 3129 | hashtable->spacePeak = |
| 3130 | Max(hashtable->spacePeak, |
| 3131 | batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); |
| 3132 | |
| 3133 | /* Remember that we are not attached to a batch. */ |
| 3134 | hashtable->curbatch = -1; |
| 3135 | } |
| 3136 | } |
| 3137 | |
| 3138 | /* |
| 3139 | * Detach from all shared resources. If we are last to detach, clean up. |
| 3140 | */ |
| 3141 | void |
| 3142 | ExecHashTableDetach(HashJoinTable hashtable) |
| 3143 | { |
| 3144 | if (hashtable->parallel_state) |
| 3145 | { |
| 3146 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 3147 | int i; |
| 3148 | |
| 3149 | /* Make sure any temporary files are closed. */ |
| 3150 | if (hashtable->batches) |
| 3151 | { |
| 3152 | for (i = 0; i < hashtable->nbatch; ++i) |
| 3153 | { |
| 3154 | sts_end_write(hashtable->batches[i].inner_tuples); |
| 3155 | sts_end_write(hashtable->batches[i].outer_tuples); |
| 3156 | sts_end_parallel_scan(hashtable->batches[i].inner_tuples); |
| 3157 | sts_end_parallel_scan(hashtable->batches[i].outer_tuples); |
| 3158 | } |
| 3159 | } |
| 3160 | |
| 3161 | /* If we're last to detach, clean up shared memory. */ |
| 3162 | if (BarrierDetach(&pstate->build_barrier)) |
| 3163 | { |
| 3164 | if (DsaPointerIsValid(pstate->batches)) |
| 3165 | { |
| 3166 | dsa_free(hashtable->area, pstate->batches); |
| 3167 | pstate->batches = InvalidDsaPointer; |
| 3168 | } |
| 3169 | } |
| 3170 | |
| 3171 | hashtable->parallel_state = NULL; |
| 3172 | } |
| 3173 | } |
| 3174 | |
| 3175 | /* |
| 3176 | * Get the first tuple in a given bucket identified by number. |
| 3177 | */ |
| 3178 | static inline HashJoinTuple |
| 3179 | ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) |
| 3180 | { |
| 3181 | HashJoinTuple tuple; |
| 3182 | dsa_pointer p; |
| 3183 | |
| 3184 | Assert(hashtable->parallel_state); |
| 3185 | p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); |
| 3186 | tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p); |
| 3187 | |
| 3188 | return tuple; |
| 3189 | } |
| 3190 | |
| 3191 | /* |
| 3192 | * Get the next tuple in the same bucket as 'tuple'. |
| 3193 | */ |
| 3194 | static inline HashJoinTuple |
| 3195 | ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple) |
| 3196 | { |
| 3197 | HashJoinTuple next; |
| 3198 | |
| 3199 | Assert(hashtable->parallel_state); |
| 3200 | next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared); |
| 3201 | |
| 3202 | return next; |
| 3203 | } |
| 3204 | |
| 3205 | /* |
| 3206 | * Insert a tuple at the front of a chain of tuples in DSA memory atomically. |
| 3207 | */ |
| 3208 | static inline void |
| 3209 | ExecParallelHashPushTuple(dsa_pointer_atomic *head, |
| 3210 | HashJoinTuple tuple, |
| 3211 | dsa_pointer tuple_shared) |
| 3212 | { |
| 3213 | for (;;) |
| 3214 | { |
| 3215 | tuple->next.shared = dsa_pointer_atomic_read(head); |
| 3216 | if (dsa_pointer_atomic_compare_exchange(head, |
| 3217 | &tuple->next.shared, |
| 3218 | tuple_shared)) |
| 3219 | break; |
| 3220 | } |
| 3221 | } |
| 3222 | |
| 3223 | /* |
| 3224 | * Prepare to work on a given batch. |
| 3225 | */ |
| 3226 | void |
| 3227 | ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) |
| 3228 | { |
| 3229 | Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer); |
| 3230 | |
| 3231 | hashtable->curbatch = batchno; |
| 3232 | hashtable->buckets.shared = (dsa_pointer_atomic *) |
| 3233 | dsa_get_address(hashtable->area, |
| 3234 | hashtable->batches[batchno].shared->buckets); |
| 3235 | hashtable->nbuckets = hashtable->parallel_state->nbuckets; |
| 3236 | hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); |
| 3237 | hashtable->current_chunk = NULL; |
| 3238 | hashtable->current_chunk_shared = InvalidDsaPointer; |
| 3239 | hashtable->batches[batchno].at_least_one_chunk = false; |
| 3240 | } |
| 3241 | |
| 3242 | /* |
| 3243 | * Take the next available chunk from the queue of chunks being worked on in |
| 3244 | * parallel. Return NULL if there are none left. Otherwise return a pointer |
| 3245 | * to the chunk, and set *shared to the DSA pointer to the chunk. |
| 3246 | */ |
| 3247 | static HashMemoryChunk |
| 3248 | ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared) |
| 3249 | { |
| 3250 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 3251 | HashMemoryChunk chunk; |
| 3252 | |
| 3253 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| 3254 | if (DsaPointerIsValid(pstate->chunk_work_queue)) |
| 3255 | { |
| 3256 | *shared = pstate->chunk_work_queue; |
| 3257 | chunk = (HashMemoryChunk) |
| 3258 | dsa_get_address(hashtable->area, *shared); |
| 3259 | pstate->chunk_work_queue = chunk->next.shared; |
| 3260 | } |
| 3261 | else |
| 3262 | chunk = NULL; |
| 3263 | LWLockRelease(&pstate->lock); |
| 3264 | |
| 3265 | return chunk; |
| 3266 | } |
| 3267 | |
| 3268 | /* |
| 3269 | * Increase the space preallocated in this backend for a given inner batch by |
| 3270 | * at least a given amount. This allows us to track whether a given batch |
| 3271 | * would fit in memory when loaded back in. Also increase the number of |
| 3272 | * batches or buckets if required. |
| 3273 | * |
| 3274 | * This maintains a running estimation of how much space will be taken when we |
| 3275 | * load the batch back into memory by simulating the way chunks will be handed |
| 3276 | * out to workers. It's not perfectly accurate because the tuples will be |
| 3277 | * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but |
| 3278 | * it should be pretty close. It tends to overestimate by a fraction of a |
| 3279 | * chunk per worker since all workers gang up to preallocate during hashing, |
| 3280 | * but workers tend to reload batches alone if there are enough to go around, |
| 3281 | * leaving fewer partially filled chunks. This effect is bounded by |
| 3282 | * nparticipants. |
| 3283 | * |
| 3284 | * Return false if the number of batches or buckets has changed, and the |
| 3285 | * caller should reconsider which batch a given tuple now belongs in and call |
| 3286 | * again. |
| 3287 | */ |
| 3288 | static bool |
| 3289 | ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) |
| 3290 | { |
| 3291 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
| 3292 | ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno]; |
| 3293 | size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE); |
| 3294 | |
| 3295 | Assert(batchno > 0); |
| 3296 | Assert(batchno < hashtable->nbatch); |
| 3297 | Assert(size == MAXALIGN(size)); |
| 3298 | |
| 3299 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
| 3300 | |
| 3301 | /* Has another participant commanded us to help grow? */ |
| 3302 | if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || |
| 3303 | pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| 3304 | { |
| 3305 | ParallelHashGrowth growth = pstate->growth; |
| 3306 | |
| 3307 | LWLockRelease(&pstate->lock); |
| 3308 | if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) |
| 3309 | ExecParallelHashIncreaseNumBatches(hashtable); |
| 3310 | else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
| 3311 | ExecParallelHashIncreaseNumBuckets(hashtable); |
| 3312 | |
| 3313 | return false; |
| 3314 | } |
| 3315 | |
| 3316 | if (pstate->growth != PHJ_GROWTH_DISABLED && |
| 3317 | batch->at_least_one_chunk && |
| 3318 | (batch->shared->estimated_size + want + HASH_CHUNK_HEADER_SIZE |
| 3319 | > pstate->space_allowed)) |
| 3320 | { |
| 3321 | /* |
| 3322 | * We have determined that this batch would exceed the space budget if |
| 3323 | * loaded into memory. Command all participants to help repartition. |
| 3324 | */ |
| 3325 | batch->shared->space_exhausted = true; |
| 3326 | pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
| 3327 | LWLockRelease(&pstate->lock); |
| 3328 | |
| 3329 | return false; |
| 3330 | } |
| 3331 | |
| 3332 | batch->at_least_one_chunk = true; |
| 3333 | batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE; |
| 3334 | batch->preallocated = want; |
| 3335 | LWLockRelease(&pstate->lock); |
| 3336 | |
| 3337 | return true; |
| 3338 | } |
| 3339 | |