1/*-------------------------------------------------------------------------
2 *
3 * nodeHashjoin.c
4 * Routines to handle hash join nodes
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeHashjoin.c
12 *
13 * PARALLELISM
14 *
15 * Hash joins can participate in parallel query execution in several ways. A
16 * parallel-oblivious hash join is one where the node is unaware that it is
17 * part of a parallel plan. In this case, a copy of the inner plan is used to
18 * build a copy of the hash table in every backend, and the outer plan could
19 * either be built from a partial or complete path, so that the results of the
20 * hash join are correspondingly either partial or complete. A parallel-aware
21 * hash join is one that behaves differently, coordinating work between
22 * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23 * Hash Join always appears with a Parallel Hash node.
24 *
25 * Parallel-aware hash joins use the same per-backend state machine to track
26 * progress through the hash join algorithm as parallel-oblivious hash joins.
27 * In a parallel-aware hash join, there is also a shared state machine that
28 * co-operating backends use to synchronize their local state machines and
29 * program counters. The shared state machine is managed with a Barrier IPC
30 * primitive. When all attached participants arrive at a barrier, the phase
31 * advances and all waiting participants are released.
32 *
33 * When a participant begins working on a parallel hash join, it must first
34 * figure out how much progress has already been made, because participants
35 * don't wait for each other to begin. For this reason there are switch
36 * statements at key points in the code where we have to synchronize our local
37 * state machine with the phase, and then jump to the correct part of the
38 * algorithm so that we can get started.
39 *
40 * One barrier called build_barrier is used to coordinate the hashing phases.
41 * The phase is represented by an integer which begins at zero and increments
42 * one by one, but in the code it is referred to by symbolic names as follows:
43 *
44 * PHJ_BUILD_ELECTING -- initial state
45 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
46 * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
47 * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
48 * PHJ_BUILD_DONE -- building done, probing can begin
49 *
50 * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51 * be used repeatedly as required to coordinate expansions in the number of
52 * batches or buckets. Their phases are as follows:
53 *
54 * PHJ_GROW_BATCHES_ELECTING -- initial state
55 * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
56 * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57 * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
58 *
59 * PHJ_GROW_BUCKETS_ELECTING -- initial state
60 * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
61 * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
62 *
63 * If the planner got the number of batches and buckets right, those won't be
64 * necessary, but on the other hand we might finish up needing to expand the
65 * buckets or batches multiple times while hashing the inner relation to stay
66 * within our memory budget and load factor target. For that reason it's a
67 * separate pair of barriers using circular phases.
68 *
69 * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70 * because we need to divide the outer relation into batches up front in order
71 * to be able to process batches entirely independently. In contrast, the
72 * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73 * batches whenever it encounters them while scanning and probing, which it
74 * can do because it processes batches in serial order.
75 *
76 * Once PHJ_BUILD_DONE is reached, backends then split up and process
77 * different batches, or gang up and work together on probing batches if there
78 * aren't enough to go around. For each batch there is a separate barrier
79 * with the following phases:
80 *
81 * PHJ_BATCH_ELECTING -- initial state
82 * PHJ_BATCH_ALLOCATING -- one allocates buckets
83 * PHJ_BATCH_LOADING -- all load the hash table from disk
84 * PHJ_BATCH_PROBING -- all probe
85 * PHJ_BATCH_DONE -- end
86 *
87 * Batch 0 is a special case, because it starts out in phase
88 * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89 * PHJ_BUILD_HASHING_INNER so we can skip loading.
90 *
91 * Initially we try to plan for a single-batch hash join using the combined
92 * work_mem of all participants to create a large shared hash table. If that
93 * turns out either at planning or execution time to be impossible then we
94 * fall back to regular work_mem sized hash tables.
95 *
96 * To avoid deadlocks, we never wait for any barrier unless it is known that
97 * all other backends attached to it are actively executing the node or have
98 * already arrived. Practically, that means that we never return a tuple
99 * while attached to a barrier, unless the barrier has reached its final
100 * state. In the slightly special case of the per-batch barrier, we return
101 * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102 * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
103 *
104 *-------------------------------------------------------------------------
105 */
106
107#include "postgres.h"
108
109#include "access/htup_details.h"
110#include "access/parallel.h"
111#include "executor/executor.h"
112#include "executor/hashjoin.h"
113#include "executor/nodeHash.h"
114#include "executor/nodeHashjoin.h"
115#include "miscadmin.h"
116#include "pgstat.h"
117#include "utils/memutils.h"
118#include "utils/sharedtuplestore.h"
119
120
121/*
122 * States of the ExecHashJoin state machine
123 */
124#define HJ_BUILD_HASHTABLE 1
125#define HJ_NEED_NEW_OUTER 2
126#define HJ_SCAN_BUCKET 3
127#define HJ_FILL_OUTER_TUPLE 4
128#define HJ_FILL_INNER_TUPLES 5
129#define HJ_NEED_NEW_BATCH 6
130
131/* Returns true if doing null-fill on outer relation */
132#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
133/* Returns true if doing null-fill on inner relation */
134#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
135
136static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
137 HashJoinState *hjstate,
138 uint32 *hashvalue);
139static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
140 HashJoinState *hjstate,
141 uint32 *hashvalue);
142static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
143 BufFile *file,
144 uint32 *hashvalue,
145 TupleTableSlot *tupleSlot);
146static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
147static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
148static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
149
150
151/* ----------------------------------------------------------------
152 * ExecHashJoinImpl
153 *
154 * This function implements the Hybrid Hashjoin algorithm. It is marked
155 * with an always-inline attribute so that ExecHashJoin() and
156 * ExecParallelHashJoin() can inline it. Compilers that respect the
157 * attribute should create versions specialized for parallel == true and
158 * parallel == false with unnecessary branches removed.
159 *
160 * Note: the relation we build hash table on is the "inner"
161 * the other one is "outer".
162 * ----------------------------------------------------------------
163 */
164static pg_attribute_always_inline TupleTableSlot *
165ExecHashJoinImpl(PlanState *pstate, bool parallel)
166{
167 HashJoinState *node = castNode(HashJoinState, pstate);
168 PlanState *outerNode;
169 HashState *hashNode;
170 ExprState *joinqual;
171 ExprState *otherqual;
172 ExprContext *econtext;
173 HashJoinTable hashtable;
174 TupleTableSlot *outerTupleSlot;
175 uint32 hashvalue;
176 int batchno;
177 ParallelHashJoinState *parallel_state;
178
179 /*
180 * get information from HashJoin node
181 */
182 joinqual = node->js.joinqual;
183 otherqual = node->js.ps.qual;
184 hashNode = (HashState *) innerPlanState(node);
185 outerNode = outerPlanState(node);
186 hashtable = node->hj_HashTable;
187 econtext = node->js.ps.ps_ExprContext;
188 parallel_state = hashNode->parallel_state;
189
190 /*
191 * Reset per-tuple memory context to free any expression evaluation
192 * storage allocated in the previous tuple cycle.
193 */
194 ResetExprContext(econtext);
195
196 /*
197 * run the hash join state machine
198 */
199 for (;;)
200 {
201 /*
202 * It's possible to iterate this loop many times before returning a
203 * tuple, in some pathological cases such as needing to move much of
204 * the current batch to a later batch. So let's check for interrupts
205 * each time through.
206 */
207 CHECK_FOR_INTERRUPTS();
208
209 switch (node->hj_JoinState)
210 {
211 case HJ_BUILD_HASHTABLE:
212
213 /*
214 * First time through: build hash table for inner relation.
215 */
216 Assert(hashtable == NULL);
217
218 /*
219 * If the outer relation is completely empty, and it's not
220 * right/full join, we can quit without building the hash
221 * table. However, for an inner join it is only a win to
222 * check this when the outer relation's startup cost is less
223 * than the projected cost of building the hash table.
224 * Otherwise it's best to build the hash table first and see
225 * if the inner relation is empty. (When it's a left join, we
226 * should always make this check, since we aren't going to be
227 * able to skip the join on the strength of an empty inner
228 * relation anyway.)
229 *
230 * If we are rescanning the join, we make use of information
231 * gained on the previous scan: don't bother to try the
232 * prefetch if the previous scan found the outer relation
233 * nonempty. This is not 100% reliable since with new
234 * parameters the outer relation might yield different
235 * results, but it's a good heuristic.
236 *
237 * The only way to make the check is to try to fetch a tuple
238 * from the outer plan node. If we succeed, we have to stash
239 * it away for later consumption by ExecHashJoinOuterGetTuple.
240 */
241 if (HJ_FILL_INNER(node))
242 {
243 /* no chance to not build the hash table */
244 node->hj_FirstOuterTupleSlot = NULL;
245 }
246 else if (parallel)
247 {
248 /*
249 * The empty-outer optimization is not implemented for
250 * shared hash tables, because no one participant can
251 * determine that there are no outer tuples, and it's not
252 * yet clear that it's worth the synchronization overhead
253 * of reaching consensus to figure that out. So we have
254 * to build the hash table.
255 */
256 node->hj_FirstOuterTupleSlot = NULL;
257 }
258 else if (HJ_FILL_OUTER(node) ||
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
260 !node->hj_OuterNotEmpty))
261 {
262 node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
264 {
265 node->hj_OuterNotEmpty = false;
266 return NULL;
267 }
268 else
269 node->hj_OuterNotEmpty = true;
270 }
271 else
272 node->hj_FirstOuterTupleSlot = NULL;
273
274 /*
275 * Create the hash table. If using Parallel Hash, then
276 * whoever gets here first will create the hash table and any
277 * later arrivals will merely attach to it.
278 */
279 hashtable = ExecHashTableCreate(hashNode,
280 node->hj_HashOperators,
281 node->hj_Collations,
282 HJ_FILL_INNER(node));
283 node->hj_HashTable = hashtable;
284
285 /*
286 * Execute the Hash node, to build the hash table. If using
287 * Parallel Hash, then we'll try to help hashing unless we
288 * arrived too late.
289 */
290 hashNode->hashtable = hashtable;
291 (void) MultiExecProcNode((PlanState *) hashNode);
292
293 /*
294 * If the inner relation is completely empty, and we're not
295 * doing a left outer join, we can quit without scanning the
296 * outer relation.
297 */
298 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
299 return NULL;
300
301 /*
302 * need to remember whether nbatch has increased since we
303 * began scanning the outer relation
304 */
305 hashtable->nbatch_outstart = hashtable->nbatch;
306
307 /*
308 * Reset OuterNotEmpty for scan. (It's OK if we fetched a
309 * tuple above, because ExecHashJoinOuterGetTuple will
310 * immediately set it again.)
311 */
312 node->hj_OuterNotEmpty = false;
313
314 if (parallel)
315 {
316 Barrier *build_barrier;
317
318 build_barrier = &parallel_state->build_barrier;
319 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
320 BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
321 if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
322 {
323 /*
324 * If multi-batch, we need to hash the outer relation
325 * up front.
326 */
327 if (hashtable->nbatch > 1)
328 ExecParallelHashJoinPartitionOuter(node);
329 BarrierArriveAndWait(build_barrier,
330 WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
331 }
332 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
333
334 /* Each backend should now select a batch to work on. */
335 hashtable->curbatch = -1;
336 node->hj_JoinState = HJ_NEED_NEW_BATCH;
337
338 continue;
339 }
340 else
341 node->hj_JoinState = HJ_NEED_NEW_OUTER;
342
343 /* FALL THRU */
344
345 case HJ_NEED_NEW_OUTER:
346
347 /*
348 * We don't have an outer tuple, try to get the next one
349 */
350 if (parallel)
351 outerTupleSlot =
352 ExecParallelHashJoinOuterGetTuple(outerNode, node,
353 &hashvalue);
354 else
355 outerTupleSlot =
356 ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
357
358 if (TupIsNull(outerTupleSlot))
359 {
360 /* end of batch, or maybe whole join */
361 if (HJ_FILL_INNER(node))
362 {
363 /* set up to scan for unmatched inner tuples */
364 ExecPrepHashTableForUnmatched(node);
365 node->hj_JoinState = HJ_FILL_INNER_TUPLES;
366 }
367 else
368 node->hj_JoinState = HJ_NEED_NEW_BATCH;
369 continue;
370 }
371
372 econtext->ecxt_outertuple = outerTupleSlot;
373 node->hj_MatchedOuter = false;
374
375 /*
376 * Find the corresponding bucket for this tuple in the main
377 * hash table or skew hash table.
378 */
379 node->hj_CurHashValue = hashvalue;
380 ExecHashGetBucketAndBatch(hashtable, hashvalue,
381 &node->hj_CurBucketNo, &batchno);
382 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383 hashvalue);
384 node->hj_CurTuple = NULL;
385
386 /*
387 * The tuple might not belong to the current batch (where
388 * "current batch" includes the skew buckets if any).
389 */
390 if (batchno != hashtable->curbatch &&
391 node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
392 {
393 bool shouldFree;
394 MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
395 &shouldFree);
396
397 /*
398 * Need to postpone this outer tuple to a later batch.
399 * Save it in the corresponding outer-batch file.
400 */
401 Assert(parallel_state == NULL);
402 Assert(batchno > hashtable->curbatch);
403 ExecHashJoinSaveTuple(mintuple, hashvalue,
404 &hashtable->outerBatchFile[batchno]);
405
406 if (shouldFree)
407 heap_free_minimal_tuple(mintuple);
408
409 /* Loop around, staying in HJ_NEED_NEW_OUTER state */
410 continue;
411 }
412
413 /* OK, let's scan the bucket for matches */
414 node->hj_JoinState = HJ_SCAN_BUCKET;
415
416 /* FALL THRU */
417
418 case HJ_SCAN_BUCKET:
419
420 /*
421 * Scan the selected hash bucket for matches to current outer
422 */
423 if (parallel)
424 {
425 if (!ExecParallelScanHashBucket(node, econtext))
426 {
427 /* out of matches; check for possible outer-join fill */
428 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
429 continue;
430 }
431 }
432 else
433 {
434 if (!ExecScanHashBucket(node, econtext))
435 {
436 /* out of matches; check for possible outer-join fill */
437 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
438 continue;
439 }
440 }
441
442 /*
443 * We've got a match, but still need to test non-hashed quals.
444 * ExecScanHashBucket already set up all the state needed to
445 * call ExecQual.
446 *
447 * If we pass the qual, then save state for next call and have
448 * ExecProject form the projection, store it in the tuple
449 * table, and return the slot.
450 *
451 * Only the joinquals determine tuple match status, but all
452 * quals must pass to actually return the tuple.
453 */
454 if (joinqual == NULL || ExecQual(joinqual, econtext))
455 {
456 node->hj_MatchedOuter = true;
457 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
458
459 /* In an antijoin, we never return a matched tuple */
460 if (node->js.jointype == JOIN_ANTI)
461 {
462 node->hj_JoinState = HJ_NEED_NEW_OUTER;
463 continue;
464 }
465
466 /*
467 * If we only need to join to the first matching inner
468 * tuple, then consider returning this one, but after that
469 * continue with next outer tuple.
470 */
471 if (node->js.single_match)
472 node->hj_JoinState = HJ_NEED_NEW_OUTER;
473
474 if (otherqual == NULL || ExecQual(otherqual, econtext))
475 return ExecProject(node->js.ps.ps_ProjInfo);
476 else
477 InstrCountFiltered2(node, 1);
478 }
479 else
480 InstrCountFiltered1(node, 1);
481 break;
482
483 case HJ_FILL_OUTER_TUPLE:
484
485 /*
486 * The current outer tuple has run out of matches, so check
487 * whether to emit a dummy outer-join tuple. Whether we emit
488 * one or not, the next state is NEED_NEW_OUTER.
489 */
490 node->hj_JoinState = HJ_NEED_NEW_OUTER;
491
492 if (!node->hj_MatchedOuter &&
493 HJ_FILL_OUTER(node))
494 {
495 /*
496 * Generate a fake join tuple with nulls for the inner
497 * tuple, and return it if it passes the non-join quals.
498 */
499 econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
500
501 if (otherqual == NULL || ExecQual(otherqual, econtext))
502 return ExecProject(node->js.ps.ps_ProjInfo);
503 else
504 InstrCountFiltered2(node, 1);
505 }
506 break;
507
508 case HJ_FILL_INNER_TUPLES:
509
510 /*
511 * We have finished a batch, but we are doing right/full join,
512 * so any unmatched inner tuples in the hashtable have to be
513 * emitted before we continue to the next batch.
514 */
515 if (!ExecScanHashTableForUnmatched(node, econtext))
516 {
517 /* no more unmatched tuples */
518 node->hj_JoinState = HJ_NEED_NEW_BATCH;
519 continue;
520 }
521
522 /*
523 * Generate a fake join tuple with nulls for the outer tuple,
524 * and return it if it passes the non-join quals.
525 */
526 econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
527
528 if (otherqual == NULL || ExecQual(otherqual, econtext))
529 return ExecProject(node->js.ps.ps_ProjInfo);
530 else
531 InstrCountFiltered2(node, 1);
532 break;
533
534 case HJ_NEED_NEW_BATCH:
535
536 /*
537 * Try to advance to next batch. Done if there are no more.
538 */
539 if (parallel)
540 {
541 if (!ExecParallelHashJoinNewBatch(node))
542 return NULL; /* end of parallel-aware join */
543 }
544 else
545 {
546 if (!ExecHashJoinNewBatch(node))
547 return NULL; /* end of parallel-oblivious join */
548 }
549 node->hj_JoinState = HJ_NEED_NEW_OUTER;
550 break;
551
552 default:
553 elog(ERROR, "unrecognized hashjoin state: %d",
554 (int) node->hj_JoinState);
555 }
556 }
557}
558
559/* ----------------------------------------------------------------
560 * ExecHashJoin
561 *
562 * Parallel-oblivious version.
563 * ----------------------------------------------------------------
564 */
565static TupleTableSlot * /* return: a tuple or NULL */
566ExecHashJoin(PlanState *pstate)
567{
568 /*
569 * On sufficiently smart compilers this should be inlined with the
570 * parallel-aware branches removed.
571 */
572 return ExecHashJoinImpl(pstate, false);
573}
574
575/* ----------------------------------------------------------------
576 * ExecParallelHashJoin
577 *
578 * Parallel-aware version.
579 * ----------------------------------------------------------------
580 */
581static TupleTableSlot * /* return: a tuple or NULL */
582ExecParallelHashJoin(PlanState *pstate)
583{
584 /*
585 * On sufficiently smart compilers this should be inlined with the
586 * parallel-oblivious branches removed.
587 */
588 return ExecHashJoinImpl(pstate, true);
589}
590
591/* ----------------------------------------------------------------
592 * ExecInitHashJoin
593 *
594 * Init routine for HashJoin node.
595 * ----------------------------------------------------------------
596 */
597HashJoinState *
598ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
599{
600 HashJoinState *hjstate;
601 Plan *outerNode;
602 Hash *hashNode;
603 TupleDesc outerDesc,
604 innerDesc;
605 const TupleTableSlotOps *ops;
606
607 /* check for unsupported flags */
608 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
609
610 /*
611 * create state structure
612 */
613 hjstate = makeNode(HashJoinState);
614 hjstate->js.ps.plan = (Plan *) node;
615 hjstate->js.ps.state = estate;
616
617 /*
618 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
619 * where this function may be replaced with a parallel version, if we
620 * managed to launch a parallel query.
621 */
622 hjstate->js.ps.ExecProcNode = ExecHashJoin;
623 hjstate->js.jointype = node->join.jointype;
624
625 /*
626 * Miscellaneous initialization
627 *
628 * create expression context for node
629 */
630 ExecAssignExprContext(estate, &hjstate->js.ps);
631
632 /*
633 * initialize child nodes
634 *
635 * Note: we could suppress the REWIND flag for the inner input, which
636 * would amount to betting that the hash will be a single batch. Not
637 * clear if this would be a win or not.
638 */
639 outerNode = outerPlan(node);
640 hashNode = (Hash *) innerPlan(node);
641
642 outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
643 outerDesc = ExecGetResultType(outerPlanState(hjstate));
644 innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
645 innerDesc = ExecGetResultType(innerPlanState(hjstate));
646
647 /*
648 * Initialize result slot, type and projection.
649 */
650 ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
651 ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
652
653 /*
654 * tuple table initialization
655 */
656 ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
657 hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
658 ops);
659
660 /*
661 * detect whether we need only consider the first matching inner tuple
662 */
663 hjstate->js.single_match = (node->join.inner_unique ||
664 node->join.jointype == JOIN_SEMI);
665
666 /* set up null tuples for outer joins, if needed */
667 switch (node->join.jointype)
668 {
669 case JOIN_INNER:
670 case JOIN_SEMI:
671 break;
672 case JOIN_LEFT:
673 case JOIN_ANTI:
674 hjstate->hj_NullInnerTupleSlot =
675 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
676 break;
677 case JOIN_RIGHT:
678 hjstate->hj_NullOuterTupleSlot =
679 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
680 break;
681 case JOIN_FULL:
682 hjstate->hj_NullOuterTupleSlot =
683 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
684 hjstate->hj_NullInnerTupleSlot =
685 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
686 break;
687 default:
688 elog(ERROR, "unrecognized join type: %d",
689 (int) node->join.jointype);
690 }
691
692 /*
693 * now for some voodoo. our temporary tuple slot is actually the result
694 * tuple slot of the Hash node (which is our inner plan). we can do this
695 * because Hash nodes don't return tuples via ExecProcNode() -- instead
696 * the hash join node uses ExecScanHashBucket() to get at the contents of
697 * the hash table. -cim 6/9/91
698 */
699 {
700 HashState *hashstate = (HashState *) innerPlanState(hjstate);
701 TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
702
703 hjstate->hj_HashTupleSlot = slot;
704 }
705
706 /*
707 * initialize child expressions
708 */
709 hjstate->js.ps.qual =
710 ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
711 hjstate->js.joinqual =
712 ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
713 hjstate->hashclauses =
714 ExecInitQual(node->hashclauses, (PlanState *) hjstate);
715
716 /*
717 * initialize hash-specific info
718 */
719 hjstate->hj_HashTable = NULL;
720 hjstate->hj_FirstOuterTupleSlot = NULL;
721
722 hjstate->hj_CurHashValue = 0;
723 hjstate->hj_CurBucketNo = 0;
724 hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
725 hjstate->hj_CurTuple = NULL;
726
727 hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
728 (PlanState *) hjstate);
729 hjstate->hj_HashOperators = node->hashoperators;
730 hjstate->hj_Collations = node->hashcollations;
731
732 hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
733 hjstate->hj_MatchedOuter = false;
734 hjstate->hj_OuterNotEmpty = false;
735
736 return hjstate;
737}
738
739/* ----------------------------------------------------------------
740 * ExecEndHashJoin
741 *
742 * clean up routine for HashJoin node
743 * ----------------------------------------------------------------
744 */
745void
746ExecEndHashJoin(HashJoinState *node)
747{
748 /*
749 * Free hash table
750 */
751 if (node->hj_HashTable)
752 {
753 ExecHashTableDestroy(node->hj_HashTable);
754 node->hj_HashTable = NULL;
755 }
756
757 /*
758 * Free the exprcontext
759 */
760 ExecFreeExprContext(&node->js.ps);
761
762 /*
763 * clean out the tuple table
764 */
765 ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
766 ExecClearTuple(node->hj_OuterTupleSlot);
767 ExecClearTuple(node->hj_HashTupleSlot);
768
769 /*
770 * clean up subtrees
771 */
772 ExecEndNode(outerPlanState(node));
773 ExecEndNode(innerPlanState(node));
774}
775
776/*
777 * ExecHashJoinOuterGetTuple
778 *
779 * get the next outer tuple for a parallel oblivious hashjoin: either by
780 * executing the outer plan node in the first pass, or from the temp
781 * files for the hashjoin batches.
782 *
783 * Returns a null slot if no more outer tuples (within the current batch).
784 *
785 * On success, the tuple's hash value is stored at *hashvalue --- this is
786 * either originally computed, or re-read from the temp file.
787 */
788static TupleTableSlot *
789ExecHashJoinOuterGetTuple(PlanState *outerNode,
790 HashJoinState *hjstate,
791 uint32 *hashvalue)
792{
793 HashJoinTable hashtable = hjstate->hj_HashTable;
794 int curbatch = hashtable->curbatch;
795 TupleTableSlot *slot;
796
797 if (curbatch == 0) /* if it is the first pass */
798 {
799 /*
800 * Check to see if first outer tuple was already fetched by
801 * ExecHashJoin() and not used yet.
802 */
803 slot = hjstate->hj_FirstOuterTupleSlot;
804 if (!TupIsNull(slot))
805 hjstate->hj_FirstOuterTupleSlot = NULL;
806 else
807 slot = ExecProcNode(outerNode);
808
809 while (!TupIsNull(slot))
810 {
811 /*
812 * We have to compute the tuple's hash value.
813 */
814 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
815
816 econtext->ecxt_outertuple = slot;
817 if (ExecHashGetHashValue(hashtable, econtext,
818 hjstate->hj_OuterHashKeys,
819 true, /* outer tuple */
820 HJ_FILL_OUTER(hjstate),
821 hashvalue))
822 {
823 /* remember outer relation is not empty for possible rescan */
824 hjstate->hj_OuterNotEmpty = true;
825
826 return slot;
827 }
828
829 /*
830 * That tuple couldn't match because of a NULL, so discard it and
831 * continue with the next one.
832 */
833 slot = ExecProcNode(outerNode);
834 }
835 }
836 else if (curbatch < hashtable->nbatch)
837 {
838 BufFile *file = hashtable->outerBatchFile[curbatch];
839
840 /*
841 * In outer-join cases, we could get here even though the batch file
842 * is empty.
843 */
844 if (file == NULL)
845 return NULL;
846
847 slot = ExecHashJoinGetSavedTuple(hjstate,
848 file,
849 hashvalue,
850 hjstate->hj_OuterTupleSlot);
851 if (!TupIsNull(slot))
852 return slot;
853 }
854
855 /* End of this batch */
856 return NULL;
857}
858
859/*
860 * ExecHashJoinOuterGetTuple variant for the parallel case.
861 */
862static TupleTableSlot *
863ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
864 HashJoinState *hjstate,
865 uint32 *hashvalue)
866{
867 HashJoinTable hashtable = hjstate->hj_HashTable;
868 int curbatch = hashtable->curbatch;
869 TupleTableSlot *slot;
870
871 /*
872 * In the Parallel Hash case we only run the outer plan directly for
873 * single-batch hash joins. Otherwise we have to go to batch files, even
874 * for batch 0.
875 */
876 if (curbatch == 0 && hashtable->nbatch == 1)
877 {
878 slot = ExecProcNode(outerNode);
879
880 while (!TupIsNull(slot))
881 {
882 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
883
884 econtext->ecxt_outertuple = slot;
885 if (ExecHashGetHashValue(hashtable, econtext,
886 hjstate->hj_OuterHashKeys,
887 true, /* outer tuple */
888 HJ_FILL_OUTER(hjstate),
889 hashvalue))
890 return slot;
891
892 /*
893 * That tuple couldn't match because of a NULL, so discard it and
894 * continue with the next one.
895 */
896 slot = ExecProcNode(outerNode);
897 }
898 }
899 else if (curbatch < hashtable->nbatch)
900 {
901 MinimalTuple tuple;
902
903 tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
904 hashvalue);
905 if (tuple != NULL)
906 {
907 ExecForceStoreMinimalTuple(tuple,
908 hjstate->hj_OuterTupleSlot,
909 false);
910 slot = hjstate->hj_OuterTupleSlot;
911 return slot;
912 }
913 else
914 ExecClearTuple(hjstate->hj_OuterTupleSlot);
915 }
916
917 /* End of this batch */
918 return NULL;
919}
920
921/*
922 * ExecHashJoinNewBatch
923 * switch to a new hashjoin batch
924 *
925 * Returns true if successful, false if there are no more batches.
926 */
927static bool
928ExecHashJoinNewBatch(HashJoinState *hjstate)
929{
930 HashJoinTable hashtable = hjstate->hj_HashTable;
931 int nbatch;
932 int curbatch;
933 BufFile *innerFile;
934 TupleTableSlot *slot;
935 uint32 hashvalue;
936
937 nbatch = hashtable->nbatch;
938 curbatch = hashtable->curbatch;
939
940 if (curbatch > 0)
941 {
942 /*
943 * We no longer need the previous outer batch file; close it right
944 * away to free disk space.
945 */
946 if (hashtable->outerBatchFile[curbatch])
947 BufFileClose(hashtable->outerBatchFile[curbatch]);
948 hashtable->outerBatchFile[curbatch] = NULL;
949 }
950 else /* we just finished the first batch */
951 {
952 /*
953 * Reset some of the skew optimization state variables, since we no
954 * longer need to consider skew tuples after the first batch. The
955 * memory context reset we are about to do will release the skew
956 * hashtable itself.
957 */
958 hashtable->skewEnabled = false;
959 hashtable->skewBucket = NULL;
960 hashtable->skewBucketNums = NULL;
961 hashtable->nSkewBuckets = 0;
962 hashtable->spaceUsedSkew = 0;
963 }
964
965 /*
966 * We can always skip over any batches that are completely empty on both
967 * sides. We can sometimes skip over batches that are empty on only one
968 * side, but there are exceptions:
969 *
970 * 1. In a left/full outer join, we have to process outer batches even if
971 * the inner batch is empty. Similarly, in a right/full outer join, we
972 * have to process inner batches even if the outer batch is empty.
973 *
974 * 2. If we have increased nbatch since the initial estimate, we have to
975 * scan inner batches since they might contain tuples that need to be
976 * reassigned to later inner batches.
977 *
978 * 3. Similarly, if we have increased nbatch since starting the outer
979 * scan, we have to rescan outer batches in case they contain tuples that
980 * need to be reassigned.
981 */
982 curbatch++;
983 while (curbatch < nbatch &&
984 (hashtable->outerBatchFile[curbatch] == NULL ||
985 hashtable->innerBatchFile[curbatch] == NULL))
986 {
987 if (hashtable->outerBatchFile[curbatch] &&
988 HJ_FILL_OUTER(hjstate))
989 break; /* must process due to rule 1 */
990 if (hashtable->innerBatchFile[curbatch] &&
991 HJ_FILL_INNER(hjstate))
992 break; /* must process due to rule 1 */
993 if (hashtable->innerBatchFile[curbatch] &&
994 nbatch != hashtable->nbatch_original)
995 break; /* must process due to rule 2 */
996 if (hashtable->outerBatchFile[curbatch] &&
997 nbatch != hashtable->nbatch_outstart)
998 break; /* must process due to rule 3 */
999 /* We can ignore this batch. */
1000 /* Release associated temp files right away. */
1001 if (hashtable->innerBatchFile[curbatch])
1002 BufFileClose(hashtable->innerBatchFile[curbatch]);
1003 hashtable->innerBatchFile[curbatch] = NULL;
1004 if (hashtable->outerBatchFile[curbatch])
1005 BufFileClose(hashtable->outerBatchFile[curbatch]);
1006 hashtable->outerBatchFile[curbatch] = NULL;
1007 curbatch++;
1008 }
1009
1010 if (curbatch >= nbatch)
1011 return false; /* no more batches */
1012
1013 hashtable->curbatch = curbatch;
1014
1015 /*
1016 * Reload the hash table with the new inner batch (which could be empty)
1017 */
1018 ExecHashTableReset(hashtable);
1019
1020 innerFile = hashtable->innerBatchFile[curbatch];
1021
1022 if (innerFile != NULL)
1023 {
1024 if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1025 ereport(ERROR,
1026 (errcode_for_file_access(),
1027 errmsg("could not rewind hash-join temporary file: %m")));
1028
1029 while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1030 innerFile,
1031 &hashvalue,
1032 hjstate->hj_HashTupleSlot)))
1033 {
1034 /*
1035 * NOTE: some tuples may be sent to future batches. Also, it is
1036 * possible for hashtable->nbatch to be increased here!
1037 */
1038 ExecHashTableInsert(hashtable, slot, hashvalue);
1039 }
1040
1041 /*
1042 * after we build the hash table, the inner batch file is no longer
1043 * needed
1044 */
1045 BufFileClose(innerFile);
1046 hashtable->innerBatchFile[curbatch] = NULL;
1047 }
1048
1049 /*
1050 * Rewind outer batch file (if present), so that we can start reading it.
1051 */
1052 if (hashtable->outerBatchFile[curbatch] != NULL)
1053 {
1054 if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1055 ereport(ERROR,
1056 (errcode_for_file_access(),
1057 errmsg("could not rewind hash-join temporary file: %m")));
1058 }
1059
1060 return true;
1061}
1062
1063/*
1064 * Choose a batch to work on, and attach to it. Returns true if successful,
1065 * false if there are no more batches.
1066 */
1067static bool
1068ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1069{
1070 HashJoinTable hashtable = hjstate->hj_HashTable;
1071 int start_batchno;
1072 int batchno;
1073
1074 /*
1075 * If we started up so late that the batch tracking array has been freed
1076 * already by ExecHashTableDetach(), then we are finished. See also
1077 * ExecParallelHashEnsureBatchAccessors().
1078 */
1079 if (hashtable->batches == NULL)
1080 return false;
1081
1082 /*
1083 * If we were already attached to a batch, remember not to bother checking
1084 * it again, and detach from it (possibly freeing the hash table if we are
1085 * last to detach).
1086 */
1087 if (hashtable->curbatch >= 0)
1088 {
1089 hashtable->batches[hashtable->curbatch].done = true;
1090 ExecHashTableDetachBatch(hashtable);
1091 }
1092
1093 /*
1094 * Search for a batch that isn't done. We use an atomic counter to start
1095 * our search at a different batch in every participant when there are
1096 * more batches than participants.
1097 */
1098 batchno = start_batchno =
1099 pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1100 hashtable->nbatch;
1101 do
1102 {
1103 uint32 hashvalue;
1104 MinimalTuple tuple;
1105 TupleTableSlot *slot;
1106
1107 if (!hashtable->batches[batchno].done)
1108 {
1109 SharedTuplestoreAccessor *inner_tuples;
1110 Barrier *batch_barrier =
1111 &hashtable->batches[batchno].shared->batch_barrier;
1112
1113 switch (BarrierAttach(batch_barrier))
1114 {
1115 case PHJ_BATCH_ELECTING:
1116
1117 /* One backend allocates the hash table. */
1118 if (BarrierArriveAndWait(batch_barrier,
1119 WAIT_EVENT_HASH_BATCH_ELECTING))
1120 ExecParallelHashTableAlloc(hashtable, batchno);
1121 /* Fall through. */
1122
1123 case PHJ_BATCH_ALLOCATING:
1124 /* Wait for allocation to complete. */
1125 BarrierArriveAndWait(batch_barrier,
1126 WAIT_EVENT_HASH_BATCH_ALLOCATING);
1127 /* Fall through. */
1128
1129 case PHJ_BATCH_LOADING:
1130 /* Start (or join in) loading tuples. */
1131 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1132 inner_tuples = hashtable->batches[batchno].inner_tuples;
1133 sts_begin_parallel_scan(inner_tuples);
1134 while ((tuple = sts_parallel_scan_next(inner_tuples,
1135 &hashvalue)))
1136 {
1137 ExecForceStoreMinimalTuple(tuple,
1138 hjstate->hj_HashTupleSlot,
1139 false);
1140 slot = hjstate->hj_HashTupleSlot;
1141 ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1142 hashvalue);
1143 }
1144 sts_end_parallel_scan(inner_tuples);
1145 BarrierArriveAndWait(batch_barrier,
1146 WAIT_EVENT_HASH_BATCH_LOADING);
1147 /* Fall through. */
1148
1149 case PHJ_BATCH_PROBING:
1150
1151 /*
1152 * This batch is ready to probe. Return control to
1153 * caller. We stay attached to batch_barrier so that the
1154 * hash table stays alive until everyone's finished
1155 * probing it, but no participant is allowed to wait at
1156 * this barrier again (or else a deadlock could occur).
1157 * All attached participants must eventually call
1158 * BarrierArriveAndDetach() so that the final phase
1159 * PHJ_BATCH_DONE can be reached.
1160 */
1161 ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1162 sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1163 return true;
1164
1165 case PHJ_BATCH_DONE:
1166
1167 /*
1168 * Already done. Detach and go around again (if any
1169 * remain).
1170 */
1171 BarrierDetach(batch_barrier);
1172 hashtable->batches[batchno].done = true;
1173 hashtable->curbatch = -1;
1174 break;
1175
1176 default:
1177 elog(ERROR, "unexpected batch phase %d",
1178 BarrierPhase(batch_barrier));
1179 }
1180 }
1181 batchno = (batchno + 1) % hashtable->nbatch;
1182 } while (batchno != start_batchno);
1183
1184 return false;
1185}
1186
1187/*
1188 * ExecHashJoinSaveTuple
1189 * save a tuple to a batch file.
1190 *
1191 * The data recorded in the file for each tuple is its hash value,
1192 * then the tuple in MinimalTuple format.
1193 *
1194 * Note: it is important always to call this in the regular executor
1195 * context, not in a shorter-lived context; else the temp file buffers
1196 * will get messed up.
1197 */
1198void
1199ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1200 BufFile **fileptr)
1201{
1202 BufFile *file = *fileptr;
1203 size_t written;
1204
1205 if (file == NULL)
1206 {
1207 /* First write to this batch file, so open it. */
1208 file = BufFileCreateTemp(false);
1209 *fileptr = file;
1210 }
1211
1212 written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1213 if (written != sizeof(uint32))
1214 ereport(ERROR,
1215 (errcode_for_file_access(),
1216 errmsg("could not write to hash-join temporary file: %m")));
1217
1218 written = BufFileWrite(file, (void *) tuple, tuple->t_len);
1219 if (written != tuple->t_len)
1220 ereport(ERROR,
1221 (errcode_for_file_access(),
1222 errmsg("could not write to hash-join temporary file: %m")));
1223}
1224
1225/*
1226 * ExecHashJoinGetSavedTuple
1227 * read the next tuple from a batch file. Return NULL if no more.
1228 *
1229 * On success, *hashvalue is set to the tuple's hash value, and the tuple
1230 * itself is stored in the given slot.
1231 */
1232static TupleTableSlot *
1233ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1234 BufFile *file,
1235 uint32 *hashvalue,
1236 TupleTableSlot *tupleSlot)
1237{
1238 uint32 header[2];
1239 size_t nread;
1240 MinimalTuple tuple;
1241
1242 /*
1243 * We check for interrupts here because this is typically taken as an
1244 * alternative code path to an ExecProcNode() call, which would include
1245 * such a check.
1246 */
1247 CHECK_FOR_INTERRUPTS();
1248
1249 /*
1250 * Since both the hash value and the MinimalTuple length word are uint32,
1251 * we can read them both in one BufFileRead() call without any type
1252 * cheating.
1253 */
1254 nread = BufFileRead(file, (void *) header, sizeof(header));
1255 if (nread == 0) /* end of file */
1256 {
1257 ExecClearTuple(tupleSlot);
1258 return NULL;
1259 }
1260 if (nread != sizeof(header))
1261 ereport(ERROR,
1262 (errcode_for_file_access(),
1263 errmsg("could not read from hash-join temporary file: %m")));
1264 *hashvalue = header[0];
1265 tuple = (MinimalTuple) palloc(header[1]);
1266 tuple->t_len = header[1];
1267 nread = BufFileRead(file,
1268 (void *) ((char *) tuple + sizeof(uint32)),
1269 header[1] - sizeof(uint32));
1270 if (nread != header[1] - sizeof(uint32))
1271 ereport(ERROR,
1272 (errcode_for_file_access(),
1273 errmsg("could not read from hash-join temporary file: %m")));
1274 ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1275 return tupleSlot;
1276}
1277
1278
1279void
1280ExecReScanHashJoin(HashJoinState *node)
1281{
1282 /*
1283 * In a multi-batch join, we currently have to do rescans the hard way,
1284 * primarily because batch temp files may have already been released. But
1285 * if it's a single-batch join, and there is no parameter change for the
1286 * inner subnode, then we can just re-use the existing hash table without
1287 * rebuilding it.
1288 */
1289 if (node->hj_HashTable != NULL)
1290 {
1291 if (node->hj_HashTable->nbatch == 1 &&
1292 node->js.ps.righttree->chgParam == NULL)
1293 {
1294 /*
1295 * Okay to reuse the hash table; needn't rescan inner, either.
1296 *
1297 * However, if it's a right/full join, we'd better reset the
1298 * inner-tuple match flags contained in the table.
1299 */
1300 if (HJ_FILL_INNER(node))
1301 ExecHashTableResetMatchFlags(node->hj_HashTable);
1302
1303 /*
1304 * Also, we need to reset our state about the emptiness of the
1305 * outer relation, so that the new scan of the outer will update
1306 * it correctly if it turns out to be empty this time. (There's no
1307 * harm in clearing it now because ExecHashJoin won't need the
1308 * info. In the other cases, where the hash table doesn't exist
1309 * or we are destroying it, we leave this state alone because
1310 * ExecHashJoin will need it the first time through.)
1311 */
1312 node->hj_OuterNotEmpty = false;
1313
1314 /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1315 node->hj_JoinState = HJ_NEED_NEW_OUTER;
1316 }
1317 else
1318 {
1319 /* must destroy and rebuild hash table */
1320 ExecHashTableDestroy(node->hj_HashTable);
1321 node->hj_HashTable = NULL;
1322 node->hj_JoinState = HJ_BUILD_HASHTABLE;
1323
1324 /*
1325 * if chgParam of subnode is not null then plan will be re-scanned
1326 * by first ExecProcNode.
1327 */
1328 if (node->js.ps.righttree->chgParam == NULL)
1329 ExecReScan(node->js.ps.righttree);
1330 }
1331 }
1332
1333 /* Always reset intra-tuple state */
1334 node->hj_CurHashValue = 0;
1335 node->hj_CurBucketNo = 0;
1336 node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1337 node->hj_CurTuple = NULL;
1338
1339 node->hj_MatchedOuter = false;
1340 node->hj_FirstOuterTupleSlot = NULL;
1341
1342 /*
1343 * if chgParam of subnode is not null then plan will be re-scanned by
1344 * first ExecProcNode.
1345 */
1346 if (node->js.ps.lefttree->chgParam == NULL)
1347 ExecReScan(node->js.ps.lefttree);
1348}
1349
1350void
1351ExecShutdownHashJoin(HashJoinState *node)
1352{
1353 if (node->hj_HashTable)
1354 {
1355 /*
1356 * Detach from shared state before DSM memory goes away. This makes
1357 * sure that we don't have any pointers into DSM memory by the time
1358 * ExecEndHashJoin runs.
1359 */
1360 ExecHashTableDetachBatch(node->hj_HashTable);
1361 ExecHashTableDetach(node->hj_HashTable);
1362 }
1363}
1364
1365static void
1366ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1367{
1368 PlanState *outerState = outerPlanState(hjstate);
1369 ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1370 HashJoinTable hashtable = hjstate->hj_HashTable;
1371 TupleTableSlot *slot;
1372 uint32 hashvalue;
1373 int i;
1374
1375 Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1376
1377 /* Execute outer plan, writing all tuples to shared tuplestores. */
1378 for (;;)
1379 {
1380 slot = ExecProcNode(outerState);
1381 if (TupIsNull(slot))
1382 break;
1383 econtext->ecxt_outertuple = slot;
1384 if (ExecHashGetHashValue(hashtable, econtext,
1385 hjstate->hj_OuterHashKeys,
1386 true, /* outer tuple */
1387 HJ_FILL_OUTER(hjstate),
1388 &hashvalue))
1389 {
1390 int batchno;
1391 int bucketno;
1392 bool shouldFree;
1393 MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1394
1395 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1396 &batchno);
1397 sts_puttuple(hashtable->batches[batchno].outer_tuples,
1398 &hashvalue, mintup);
1399
1400 if (shouldFree)
1401 heap_free_minimal_tuple(mintup);
1402 }
1403 CHECK_FOR_INTERRUPTS();
1404 }
1405
1406 /* Make sure all outer partitions are readable by any backend. */
1407 for (i = 0; i < hashtable->nbatch; ++i)
1408 sts_end_write(hashtable->batches[i].outer_tuples);
1409}
1410
1411void
1412ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1413{
1414 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1415 shm_toc_estimate_keys(&pcxt->estimator, 1);
1416}
1417
1418void
1419ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1420{
1421 int plan_node_id = state->js.ps.plan->plan_node_id;
1422 HashState *hashNode;
1423 ParallelHashJoinState *pstate;
1424
1425 /*
1426 * Disable shared hash table mode if we failed to create a real DSM
1427 * segment, because that means that we don't have a DSA area to work with.
1428 */
1429 if (pcxt->seg == NULL)
1430 return;
1431
1432 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1433
1434 /*
1435 * Set up the state needed to coordinate access to the shared hash
1436 * table(s), using the plan node ID as the toc key.
1437 */
1438 pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1439 shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1440
1441 /*
1442 * Set up the shared hash join state with no batches initially.
1443 * ExecHashTableCreate() will prepare at least one later and set nbatch
1444 * and space_allowed.
1445 */
1446 pstate->nbatch = 0;
1447 pstate->space_allowed = 0;
1448 pstate->batches = InvalidDsaPointer;
1449 pstate->old_batches = InvalidDsaPointer;
1450 pstate->nbuckets = 0;
1451 pstate->growth = PHJ_GROWTH_OK;
1452 pstate->chunk_work_queue = InvalidDsaPointer;
1453 pg_atomic_init_u32(&pstate->distributor, 0);
1454 pstate->nparticipants = pcxt->nworkers + 1;
1455 pstate->total_tuples = 0;
1456 LWLockInitialize(&pstate->lock,
1457 LWTRANCHE_PARALLEL_HASH_JOIN);
1458 BarrierInit(&pstate->build_barrier, 0);
1459 BarrierInit(&pstate->grow_batches_barrier, 0);
1460 BarrierInit(&pstate->grow_buckets_barrier, 0);
1461
1462 /* Set up the space we'll use for shared temporary files. */
1463 SharedFileSetInit(&pstate->fileset, pcxt->seg);
1464
1465 /* Initialize the shared state in the hash node. */
1466 hashNode = (HashState *) innerPlanState(state);
1467 hashNode->parallel_state = pstate;
1468}
1469
1470/* ----------------------------------------------------------------
1471 * ExecHashJoinReInitializeDSM
1472 *
1473 * Reset shared state before beginning a fresh scan.
1474 * ----------------------------------------------------------------
1475 */
1476void
1477ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
1478{
1479 int plan_node_id = state->js.ps.plan->plan_node_id;
1480 ParallelHashJoinState *pstate =
1481 shm_toc_lookup(cxt->toc, plan_node_id, false);
1482
1483 /*
1484 * It would be possible to reuse the shared hash table in single-batch
1485 * cases by resetting and then fast-forwarding build_barrier to
1486 * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1487 * currently shared hash tables are already freed by now (by the last
1488 * participant to detach from the batch). We could consider keeping it
1489 * around for single-batch joins. We'd also need to adjust
1490 * finalize_plan() so that it doesn't record a dummy dependency for
1491 * Parallel Hash nodes, preventing the rescan optimization. For now we
1492 * don't try.
1493 */
1494
1495 /* Detach, freeing any remaining shared memory. */
1496 if (state->hj_HashTable != NULL)
1497 {
1498 ExecHashTableDetachBatch(state->hj_HashTable);
1499 ExecHashTableDetach(state->hj_HashTable);
1500 }
1501
1502 /* Clear any shared batch files. */
1503 SharedFileSetDeleteAll(&pstate->fileset);
1504
1505 /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1506 BarrierInit(&pstate->build_barrier, 0);
1507}
1508
1509void
1510ExecHashJoinInitializeWorker(HashJoinState *state,
1511 ParallelWorkerContext *pwcxt)
1512{
1513 HashState *hashNode;
1514 int plan_node_id = state->js.ps.plan->plan_node_id;
1515 ParallelHashJoinState *pstate =
1516 shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1517
1518 /* Attach to the space for shared temporary files. */
1519 SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1520
1521 /* Attach to the shared state in the hash node. */
1522 hashNode = (HashState *) innerPlanState(state);
1523 hashNode->parallel_state = pstate;
1524
1525 ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1526}
1527