1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * nodeHash.c |
4 | * Routines to hash relations for hashjoin |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/executor/nodeHash.c |
12 | * |
13 | * See note on parallelism in nodeHashjoin.c. |
14 | * |
15 | *------------------------------------------------------------------------- |
16 | */ |
17 | /* |
18 | * INTERFACE ROUTINES |
19 | * MultiExecHash - generate an in-memory hash table of the relation |
20 | * ExecInitHash - initialize node and subnodes |
21 | * ExecEndHash - shutdown node and subnodes |
22 | */ |
23 | |
24 | #include "postgres.h" |
25 | |
26 | #include <math.h> |
27 | #include <limits.h> |
28 | |
29 | #include "access/htup_details.h" |
30 | #include "access/parallel.h" |
31 | #include "catalog/pg_statistic.h" |
32 | #include "commands/tablespace.h" |
33 | #include "executor/execdebug.h" |
34 | #include "executor/hashjoin.h" |
35 | #include "executor/nodeHash.h" |
36 | #include "executor/nodeHashjoin.h" |
37 | #include "miscadmin.h" |
38 | #include "pgstat.h" |
39 | #include "port/atomics.h" |
40 | #include "utils/dynahash.h" |
41 | #include "utils/memutils.h" |
42 | #include "utils/lsyscache.h" |
43 | #include "utils/syscache.h" |
44 | |
45 | |
46 | static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); |
47 | static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); |
48 | static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable); |
49 | static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable); |
50 | static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, |
51 | int mcvsToUse); |
52 | static void ExecHashSkewTableInsert(HashJoinTable hashtable, |
53 | TupleTableSlot *slot, |
54 | uint32 hashvalue, |
55 | int bucketNumber); |
56 | static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); |
57 | |
58 | static void *dense_alloc(HashJoinTable hashtable, Size size); |
59 | static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, |
60 | size_t size, |
61 | dsa_pointer *shared); |
62 | static void MultiExecPrivateHash(HashState *node); |
63 | static void MultiExecParallelHash(HashState *node); |
64 | static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, |
65 | int bucketno); |
66 | static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, |
67 | HashJoinTuple tuple); |
68 | static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, |
69 | HashJoinTuple tuple, |
70 | dsa_pointer tuple_shared); |
71 | static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); |
72 | static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); |
73 | static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); |
74 | static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); |
75 | static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, |
76 | dsa_pointer *shared); |
77 | static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, |
78 | int batchno, |
79 | size_t size); |
80 | static void ExecParallelHashMergeCounters(HashJoinTable hashtable); |
81 | static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); |
82 | |
83 | |
84 | /* ---------------------------------------------------------------- |
85 | * ExecHash |
86 | * |
87 | * stub for pro forma compliance |
88 | * ---------------------------------------------------------------- |
89 | */ |
90 | static TupleTableSlot * |
91 | ExecHash(PlanState *pstate) |
92 | { |
93 | elog(ERROR, "Hash node does not support ExecProcNode call convention" ); |
94 | return NULL; |
95 | } |
96 | |
97 | /* ---------------------------------------------------------------- |
98 | * MultiExecHash |
99 | * |
100 | * build hash table for hashjoin, doing partitioning if more |
101 | * than one batch is required. |
102 | * ---------------------------------------------------------------- |
103 | */ |
104 | Node * |
105 | MultiExecHash(HashState *node) |
106 | { |
107 | /* must provide our own instrumentation support */ |
108 | if (node->ps.instrument) |
109 | InstrStartNode(node->ps.instrument); |
110 | |
111 | if (node->parallel_state != NULL) |
112 | MultiExecParallelHash(node); |
113 | else |
114 | MultiExecPrivateHash(node); |
115 | |
116 | /* must provide our own instrumentation support */ |
117 | if (node->ps.instrument) |
118 | InstrStopNode(node->ps.instrument, node->hashtable->partialTuples); |
119 | |
120 | /* |
121 | * We do not return the hash table directly because it's not a subtype of |
122 | * Node, and so would violate the MultiExecProcNode API. Instead, our |
123 | * parent Hashjoin node is expected to know how to fish it out of our node |
124 | * state. Ugly but not really worth cleaning up, since Hashjoin knows |
125 | * quite a bit more about Hash besides that. |
126 | */ |
127 | return NULL; |
128 | } |
129 | |
130 | /* ---------------------------------------------------------------- |
131 | * MultiExecPrivateHash |
132 | * |
133 | * parallel-oblivious version, building a backend-private |
134 | * hash table and (if necessary) batch files. |
135 | * ---------------------------------------------------------------- |
136 | */ |
137 | static void |
138 | MultiExecPrivateHash(HashState *node) |
139 | { |
140 | PlanState *outerNode; |
141 | List *hashkeys; |
142 | HashJoinTable hashtable; |
143 | TupleTableSlot *slot; |
144 | ExprContext *econtext; |
145 | uint32 hashvalue; |
146 | |
147 | /* |
148 | * get state info from node |
149 | */ |
150 | outerNode = outerPlanState(node); |
151 | hashtable = node->hashtable; |
152 | |
153 | /* |
154 | * set expression context |
155 | */ |
156 | hashkeys = node->hashkeys; |
157 | econtext = node->ps.ps_ExprContext; |
158 | |
159 | /* |
160 | * Get all tuples from the node below the Hash node and insert into the |
161 | * hash table (or temp files). |
162 | */ |
163 | for (;;) |
164 | { |
165 | slot = ExecProcNode(outerNode); |
166 | if (TupIsNull(slot)) |
167 | break; |
168 | /* We have to compute the hash value */ |
169 | econtext->ecxt_outertuple = slot; |
170 | if (ExecHashGetHashValue(hashtable, econtext, hashkeys, |
171 | false, hashtable->keepNulls, |
172 | &hashvalue)) |
173 | { |
174 | int bucketNumber; |
175 | |
176 | bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); |
177 | if (bucketNumber != INVALID_SKEW_BUCKET_NO) |
178 | { |
179 | /* It's a skew tuple, so put it into that hash table */ |
180 | ExecHashSkewTableInsert(hashtable, slot, hashvalue, |
181 | bucketNumber); |
182 | hashtable->skewTuples += 1; |
183 | } |
184 | else |
185 | { |
186 | /* Not subject to skew optimization, so insert normally */ |
187 | ExecHashTableInsert(hashtable, slot, hashvalue); |
188 | } |
189 | hashtable->totalTuples += 1; |
190 | } |
191 | } |
192 | |
193 | /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ |
194 | if (hashtable->nbuckets != hashtable->nbuckets_optimal) |
195 | ExecHashIncreaseNumBuckets(hashtable); |
196 | |
197 | /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ |
198 | hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); |
199 | if (hashtable->spaceUsed > hashtable->spacePeak) |
200 | hashtable->spacePeak = hashtable->spaceUsed; |
201 | |
202 | hashtable->partialTuples = hashtable->totalTuples; |
203 | } |
204 | |
205 | /* ---------------------------------------------------------------- |
206 | * MultiExecParallelHash |
207 | * |
208 | * parallel-aware version, building a shared hash table and |
209 | * (if necessary) batch files using the combined effort of |
210 | * a set of co-operating backends. |
211 | * ---------------------------------------------------------------- |
212 | */ |
213 | static void |
214 | MultiExecParallelHash(HashState *node) |
215 | { |
216 | ParallelHashJoinState *pstate; |
217 | PlanState *outerNode; |
218 | List *hashkeys; |
219 | HashJoinTable hashtable; |
220 | TupleTableSlot *slot; |
221 | ExprContext *econtext; |
222 | uint32 hashvalue; |
223 | Barrier *build_barrier; |
224 | int i; |
225 | |
226 | /* |
227 | * get state info from node |
228 | */ |
229 | outerNode = outerPlanState(node); |
230 | hashtable = node->hashtable; |
231 | |
232 | /* |
233 | * set expression context |
234 | */ |
235 | hashkeys = node->hashkeys; |
236 | econtext = node->ps.ps_ExprContext; |
237 | |
238 | /* |
239 | * Synchronize the parallel hash table build. At this stage we know that |
240 | * the shared hash table has been or is being set up by |
241 | * ExecHashTableCreate(), but we don't know if our peers have returned |
242 | * from there or are here in MultiExecParallelHash(), and if so how far |
243 | * through they are. To find out, we check the build_barrier phase then |
244 | * and jump to the right step in the build algorithm. |
245 | */ |
246 | pstate = hashtable->parallel_state; |
247 | build_barrier = &pstate->build_barrier; |
248 | Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING); |
249 | switch (BarrierPhase(build_barrier)) |
250 | { |
251 | case PHJ_BUILD_ALLOCATING: |
252 | |
253 | /* |
254 | * Either I just allocated the initial hash table in |
255 | * ExecHashTableCreate(), or someone else is doing that. Either |
256 | * way, wait for everyone to arrive here so we can proceed. |
257 | */ |
258 | BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING); |
259 | /* Fall through. */ |
260 | |
261 | case PHJ_BUILD_HASHING_INNER: |
262 | |
263 | /* |
264 | * It's time to begin hashing, or if we just arrived here then |
265 | * hashing is already underway, so join in that effort. While |
266 | * hashing we have to be prepared to help increase the number of |
267 | * batches or buckets at any time, and if we arrived here when |
268 | * that was already underway we'll have to help complete that work |
269 | * immediately so that it's safe to access batches and buckets |
270 | * below. |
271 | */ |
272 | if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != |
273 | PHJ_GROW_BATCHES_ELECTING) |
274 | ExecParallelHashIncreaseNumBatches(hashtable); |
275 | if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != |
276 | PHJ_GROW_BUCKETS_ELECTING) |
277 | ExecParallelHashIncreaseNumBuckets(hashtable); |
278 | ExecParallelHashEnsureBatchAccessors(hashtable); |
279 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
280 | for (;;) |
281 | { |
282 | slot = ExecProcNode(outerNode); |
283 | if (TupIsNull(slot)) |
284 | break; |
285 | econtext->ecxt_outertuple = slot; |
286 | if (ExecHashGetHashValue(hashtable, econtext, hashkeys, |
287 | false, hashtable->keepNulls, |
288 | &hashvalue)) |
289 | ExecParallelHashTableInsert(hashtable, slot, hashvalue); |
290 | hashtable->partialTuples++; |
291 | } |
292 | |
293 | /* |
294 | * Make sure that any tuples we wrote to disk are visible to |
295 | * others before anyone tries to load them. |
296 | */ |
297 | for (i = 0; i < hashtable->nbatch; ++i) |
298 | sts_end_write(hashtable->batches[i].inner_tuples); |
299 | |
300 | /* |
301 | * Update shared counters. We need an accurate total tuple count |
302 | * to control the empty table optimization. |
303 | */ |
304 | ExecParallelHashMergeCounters(hashtable); |
305 | |
306 | BarrierDetach(&pstate->grow_buckets_barrier); |
307 | BarrierDetach(&pstate->grow_batches_barrier); |
308 | |
309 | /* |
310 | * Wait for everyone to finish building and flushing files and |
311 | * counters. |
312 | */ |
313 | if (BarrierArriveAndWait(build_barrier, |
314 | WAIT_EVENT_HASH_BUILD_HASHING_INNER)) |
315 | { |
316 | /* |
317 | * Elect one backend to disable any further growth. Batches |
318 | * are now fixed. While building them we made sure they'd fit |
319 | * in our memory budget when we load them back in later (or we |
320 | * tried to do that and gave up because we detected extreme |
321 | * skew). |
322 | */ |
323 | pstate->growth = PHJ_GROWTH_DISABLED; |
324 | } |
325 | } |
326 | |
327 | /* |
328 | * We're not yet attached to a batch. We all agree on the dimensions and |
329 | * number of inner tuples (for the empty table optimization). |
330 | */ |
331 | hashtable->curbatch = -1; |
332 | hashtable->nbuckets = pstate->nbuckets; |
333 | hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); |
334 | hashtable->totalTuples = pstate->total_tuples; |
335 | ExecParallelHashEnsureBatchAccessors(hashtable); |
336 | |
337 | /* |
338 | * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE |
339 | * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't |
340 | * there already). |
341 | */ |
342 | Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || |
343 | BarrierPhase(build_barrier) == PHJ_BUILD_DONE); |
344 | } |
345 | |
346 | /* ---------------------------------------------------------------- |
347 | * ExecInitHash |
348 | * |
349 | * Init routine for Hash node |
350 | * ---------------------------------------------------------------- |
351 | */ |
352 | HashState * |
353 | ExecInitHash(Hash *node, EState *estate, int eflags) |
354 | { |
355 | HashState *hashstate; |
356 | |
357 | /* check for unsupported flags */ |
358 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
359 | |
360 | /* |
361 | * create state structure |
362 | */ |
363 | hashstate = makeNode(HashState); |
364 | hashstate->ps.plan = (Plan *) node; |
365 | hashstate->ps.state = estate; |
366 | hashstate->ps.ExecProcNode = ExecHash; |
367 | hashstate->hashtable = NULL; |
368 | hashstate->hashkeys = NIL; /* will be set by parent HashJoin */ |
369 | |
370 | /* |
371 | * Miscellaneous initialization |
372 | * |
373 | * create expression context for node |
374 | */ |
375 | ExecAssignExprContext(estate, &hashstate->ps); |
376 | |
377 | /* |
378 | * initialize child nodes |
379 | */ |
380 | outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags); |
381 | |
382 | /* |
383 | * initialize our result slot and type. No need to build projection |
384 | * because this node doesn't do projections. |
385 | */ |
386 | ExecInitResultTupleSlotTL(&hashstate->ps, &TTSOpsMinimalTuple); |
387 | hashstate->ps.ps_ProjInfo = NULL; |
388 | |
389 | /* |
390 | * initialize child expressions |
391 | */ |
392 | Assert(node->plan.qual == NIL); |
393 | hashstate->hashkeys = |
394 | ExecInitExprList(node->hashkeys, (PlanState *) hashstate); |
395 | |
396 | return hashstate; |
397 | } |
398 | |
399 | /* --------------------------------------------------------------- |
400 | * ExecEndHash |
401 | * |
402 | * clean up routine for Hash node |
403 | * ---------------------------------------------------------------- |
404 | */ |
405 | void |
406 | ExecEndHash(HashState *node) |
407 | { |
408 | PlanState *outerPlan; |
409 | |
410 | /* |
411 | * free exprcontext |
412 | */ |
413 | ExecFreeExprContext(&node->ps); |
414 | |
415 | /* |
416 | * shut down the subplan |
417 | */ |
418 | outerPlan = outerPlanState(node); |
419 | ExecEndNode(outerPlan); |
420 | } |
421 | |
422 | |
423 | /* ---------------------------------------------------------------- |
424 | * ExecHashTableCreate |
425 | * |
426 | * create an empty hashtable data structure for hashjoin. |
427 | * ---------------------------------------------------------------- |
428 | */ |
429 | HashJoinTable |
430 | ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls) |
431 | { |
432 | Hash *node; |
433 | HashJoinTable hashtable; |
434 | Plan *outerNode; |
435 | size_t space_allowed; |
436 | int nbuckets; |
437 | int nbatch; |
438 | double rows; |
439 | int num_skew_mcvs; |
440 | int log2_nbuckets; |
441 | int nkeys; |
442 | int i; |
443 | ListCell *ho; |
444 | ListCell *hc; |
445 | MemoryContext oldcxt; |
446 | |
447 | /* |
448 | * Get information about the size of the relation to be hashed (it's the |
449 | * "outer" subtree of this node, but the inner relation of the hashjoin). |
450 | * Compute the appropriate size of the hash table. |
451 | */ |
452 | node = (Hash *) state->ps.plan; |
453 | outerNode = outerPlan(node); |
454 | |
455 | /* |
456 | * If this is shared hash table with a partial plan, then we can't use |
457 | * outerNode->plan_rows to estimate its size. We need an estimate of the |
458 | * total number of rows across all copies of the partial plan. |
459 | */ |
460 | rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; |
461 | |
462 | ExecChooseHashTableSize(rows, outerNode->plan_width, |
463 | OidIsValid(node->skewTable), |
464 | state->parallel_state != NULL, |
465 | state->parallel_state != NULL ? |
466 | state->parallel_state->nparticipants - 1 : 0, |
467 | &space_allowed, |
468 | &nbuckets, &nbatch, &num_skew_mcvs); |
469 | |
470 | /* nbuckets must be a power of 2 */ |
471 | log2_nbuckets = my_log2(nbuckets); |
472 | Assert(nbuckets == (1 << log2_nbuckets)); |
473 | |
474 | /* |
475 | * Initialize the hash table control block. |
476 | * |
477 | * The hashtable control block is just palloc'd from the executor's |
478 | * per-query memory context. Everything else should be kept inside the |
479 | * subsidiary hashCxt or batchCxt. |
480 | */ |
481 | hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); |
482 | hashtable->nbuckets = nbuckets; |
483 | hashtable->nbuckets_original = nbuckets; |
484 | hashtable->nbuckets_optimal = nbuckets; |
485 | hashtable->log2_nbuckets = log2_nbuckets; |
486 | hashtable->log2_nbuckets_optimal = log2_nbuckets; |
487 | hashtable->buckets.unshared = NULL; |
488 | hashtable->keepNulls = keepNulls; |
489 | hashtable->skewEnabled = false; |
490 | hashtable->skewBucket = NULL; |
491 | hashtable->skewBucketLen = 0; |
492 | hashtable->nSkewBuckets = 0; |
493 | hashtable->skewBucketNums = NULL; |
494 | hashtable->nbatch = nbatch; |
495 | hashtable->curbatch = 0; |
496 | hashtable->nbatch_original = nbatch; |
497 | hashtable->nbatch_outstart = nbatch; |
498 | hashtable->growEnabled = true; |
499 | hashtable->totalTuples = 0; |
500 | hashtable->partialTuples = 0; |
501 | hashtable->skewTuples = 0; |
502 | hashtable->innerBatchFile = NULL; |
503 | hashtable->outerBatchFile = NULL; |
504 | hashtable->spaceUsed = 0; |
505 | hashtable->spacePeak = 0; |
506 | hashtable->spaceAllowed = space_allowed; |
507 | hashtable->spaceUsedSkew = 0; |
508 | hashtable->spaceAllowedSkew = |
509 | hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; |
510 | hashtable->chunks = NULL; |
511 | hashtable->current_chunk = NULL; |
512 | hashtable->parallel_state = state->parallel_state; |
513 | hashtable->area = state->ps.state->es_query_dsa; |
514 | hashtable->batches = NULL; |
515 | |
516 | #ifdef HJDEBUG |
517 | printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n" , |
518 | hashtable, nbatch, nbuckets); |
519 | #endif |
520 | |
521 | /* |
522 | * Create temporary memory contexts in which to keep the hashtable working |
523 | * storage. See notes in executor/hashjoin.h. |
524 | */ |
525 | hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext, |
526 | "HashTableContext" , |
527 | ALLOCSET_DEFAULT_SIZES); |
528 | |
529 | hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt, |
530 | "HashBatchContext" , |
531 | ALLOCSET_DEFAULT_SIZES); |
532 | |
533 | /* Allocate data that will live for the life of the hashjoin */ |
534 | |
535 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
536 | |
537 | /* |
538 | * Get info about the hash functions to be used for each hash key. Also |
539 | * remember whether the join operators are strict. |
540 | */ |
541 | nkeys = list_length(hashOperators); |
542 | hashtable->outer_hashfunctions = |
543 | (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); |
544 | hashtable->inner_hashfunctions = |
545 | (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); |
546 | hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool)); |
547 | hashtable->collations = (Oid *) palloc(nkeys * sizeof(Oid)); |
548 | i = 0; |
549 | forboth(ho, hashOperators, hc, hashCollations) |
550 | { |
551 | Oid hashop = lfirst_oid(ho); |
552 | Oid left_hashfn; |
553 | Oid right_hashfn; |
554 | |
555 | if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) |
556 | elog(ERROR, "could not find hash function for hash operator %u" , |
557 | hashop); |
558 | fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]); |
559 | fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]); |
560 | hashtable->hashStrict[i] = op_strict(hashop); |
561 | hashtable->collations[i] = lfirst_oid(hc); |
562 | i++; |
563 | } |
564 | |
565 | if (nbatch > 1 && hashtable->parallel_state == NULL) |
566 | { |
567 | /* |
568 | * allocate and initialize the file arrays in hashCxt (not needed for |
569 | * parallel case which uses shared tuplestores instead of raw files) |
570 | */ |
571 | hashtable->innerBatchFile = (BufFile **) |
572 | palloc0(nbatch * sizeof(BufFile *)); |
573 | hashtable->outerBatchFile = (BufFile **) |
574 | palloc0(nbatch * sizeof(BufFile *)); |
575 | /* The files will not be opened until needed... */ |
576 | /* ... but make sure we have temp tablespaces established for them */ |
577 | PrepareTempTablespaces(); |
578 | } |
579 | |
580 | MemoryContextSwitchTo(oldcxt); |
581 | |
582 | if (hashtable->parallel_state) |
583 | { |
584 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
585 | Barrier *build_barrier; |
586 | |
587 | /* |
588 | * Attach to the build barrier. The corresponding detach operation is |
589 | * in ExecHashTableDetach. Note that we won't attach to the |
590 | * batch_barrier for batch 0 yet. We'll attach later and start it out |
591 | * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front |
592 | * and then loaded while hashing (the standard hybrid hash join |
593 | * algorithm), and we'll coordinate that using build_barrier. |
594 | */ |
595 | build_barrier = &pstate->build_barrier; |
596 | BarrierAttach(build_barrier); |
597 | |
598 | /* |
599 | * So far we have no idea whether there are any other participants, |
600 | * and if so, what phase they are working on. The only thing we care |
601 | * about at this point is whether someone has already created the |
602 | * SharedHashJoinBatch objects and the hash table for batch 0. One |
603 | * backend will be elected to do that now if necessary. |
604 | */ |
605 | if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING && |
606 | BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING)) |
607 | { |
608 | pstate->nbatch = nbatch; |
609 | pstate->space_allowed = space_allowed; |
610 | pstate->growth = PHJ_GROWTH_OK; |
611 | |
612 | /* Set up the shared state for coordinating batches. */ |
613 | ExecParallelHashJoinSetUpBatches(hashtable, nbatch); |
614 | |
615 | /* |
616 | * Allocate batch 0's hash table up front so we can load it |
617 | * directly while hashing. |
618 | */ |
619 | pstate->nbuckets = nbuckets; |
620 | ExecParallelHashTableAlloc(hashtable, 0); |
621 | } |
622 | |
623 | /* |
624 | * The next Parallel Hash synchronization point is in |
625 | * MultiExecParallelHash(), which will progress it all the way to |
626 | * PHJ_BUILD_DONE. The caller must not return control from this |
627 | * executor node between now and then. |
628 | */ |
629 | } |
630 | else |
631 | { |
632 | /* |
633 | * Prepare context for the first-scan space allocations; allocate the |
634 | * hashbucket array therein, and set each bucket "empty". |
635 | */ |
636 | MemoryContextSwitchTo(hashtable->batchCxt); |
637 | |
638 | hashtable->buckets.unshared = (HashJoinTuple *) |
639 | palloc0(nbuckets * sizeof(HashJoinTuple)); |
640 | |
641 | /* |
642 | * Set up for skew optimization, if possible and there's a need for |
643 | * more than one batch. (In a one-batch join, there's no point in |
644 | * it.) |
645 | */ |
646 | if (nbatch > 1) |
647 | ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); |
648 | |
649 | MemoryContextSwitchTo(oldcxt); |
650 | } |
651 | |
652 | return hashtable; |
653 | } |
654 | |
655 | |
656 | /* |
657 | * Compute appropriate size for hashtable given the estimated size of the |
658 | * relation to be hashed (number of rows and average row width). |
659 | * |
660 | * This is exported so that the planner's costsize.c can use it. |
661 | */ |
662 | |
663 | /* Target bucket loading (tuples per bucket) */ |
664 | #define NTUP_PER_BUCKET 1 |
665 | |
666 | void |
667 | ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, |
668 | bool try_combined_work_mem, |
669 | int parallel_workers, |
670 | size_t *space_allowed, |
671 | int *numbuckets, |
672 | int *numbatches, |
673 | int *num_skew_mcvs) |
674 | { |
675 | int tupsize; |
676 | double inner_rel_bytes; |
677 | long bucket_bytes; |
678 | long hash_table_bytes; |
679 | long skew_table_bytes; |
680 | long max_pointers; |
681 | long mppow2; |
682 | int nbatch = 1; |
683 | int nbuckets; |
684 | double dbuckets; |
685 | |
686 | /* Force a plausible relation size if no info */ |
687 | if (ntuples <= 0.0) |
688 | ntuples = 1000.0; |
689 | |
690 | /* |
691 | * Estimate tupsize based on footprint of tuple in hashtable... note this |
692 | * does not allow for any palloc overhead. The manipulations of spaceUsed |
693 | * don't count palloc overhead either. |
694 | */ |
695 | tupsize = HJTUPLE_OVERHEAD + |
696 | MAXALIGN(SizeofMinimalTupleHeader) + |
697 | MAXALIGN(tupwidth); |
698 | inner_rel_bytes = ntuples * tupsize; |
699 | |
700 | /* |
701 | * Target in-memory hashtable size is work_mem kilobytes. |
702 | */ |
703 | hash_table_bytes = work_mem * 1024L; |
704 | |
705 | /* |
706 | * Parallel Hash tries to use the combined work_mem of all workers to |
707 | * avoid the need to batch. If that won't work, it falls back to work_mem |
708 | * per worker and tries to process batches in parallel. |
709 | */ |
710 | if (try_combined_work_mem) |
711 | hash_table_bytes += hash_table_bytes * parallel_workers; |
712 | |
713 | *space_allowed = hash_table_bytes; |
714 | |
715 | /* |
716 | * If skew optimization is possible, estimate the number of skew buckets |
717 | * that will fit in the memory allowed, and decrement the assumed space |
718 | * available for the main hash table accordingly. |
719 | * |
720 | * We make the optimistic assumption that each skew bucket will contain |
721 | * one inner-relation tuple. If that turns out to be low, we will recover |
722 | * at runtime by reducing the number of skew buckets. |
723 | * |
724 | * hashtable->skewBucket will have up to 8 times as many HashSkewBucket |
725 | * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash |
726 | * will round up to the next power of 2 and then multiply by 4 to reduce |
727 | * collisions. |
728 | */ |
729 | if (useskew) |
730 | { |
731 | skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100; |
732 | |
733 | /*---------- |
734 | * Divisor is: |
735 | * size of a hash tuple + |
736 | * worst-case size of skewBucket[] per MCV + |
737 | * size of skewBucketNums[] entry + |
738 | * size of skew bucket struct itself |
739 | *---------- |
740 | */ |
741 | *num_skew_mcvs = skew_table_bytes / (tupsize + |
742 | (8 * sizeof(HashSkewBucket *)) + |
743 | sizeof(int) + |
744 | SKEW_BUCKET_OVERHEAD); |
745 | if (*num_skew_mcvs > 0) |
746 | hash_table_bytes -= skew_table_bytes; |
747 | } |
748 | else |
749 | *num_skew_mcvs = 0; |
750 | |
751 | /* |
752 | * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when |
753 | * memory is filled, assuming a single batch; but limit the value so that |
754 | * the pointer arrays we'll try to allocate do not exceed work_mem nor |
755 | * MaxAllocSize. |
756 | * |
757 | * Note that both nbuckets and nbatch must be powers of 2 to make |
758 | * ExecHashGetBucketAndBatch fast. |
759 | */ |
760 | max_pointers = *space_allowed / sizeof(HashJoinTuple); |
761 | max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); |
762 | /* If max_pointers isn't a power of 2, must round it down to one */ |
763 | mppow2 = 1L << my_log2(max_pointers); |
764 | if (max_pointers != mppow2) |
765 | max_pointers = mppow2 / 2; |
766 | |
767 | /* Also ensure we avoid integer overflow in nbatch and nbuckets */ |
768 | /* (this step is redundant given the current value of MaxAllocSize) */ |
769 | max_pointers = Min(max_pointers, INT_MAX / 2); |
770 | |
771 | dbuckets = ceil(ntuples / NTUP_PER_BUCKET); |
772 | dbuckets = Min(dbuckets, max_pointers); |
773 | nbuckets = (int) dbuckets; |
774 | /* don't let nbuckets be really small, though ... */ |
775 | nbuckets = Max(nbuckets, 1024); |
776 | /* ... and force it to be a power of 2. */ |
777 | nbuckets = 1 << my_log2(nbuckets); |
778 | |
779 | /* |
780 | * If there's not enough space to store the projected number of tuples and |
781 | * the required bucket headers, we will need multiple batches. |
782 | */ |
783 | bucket_bytes = sizeof(HashJoinTuple) * nbuckets; |
784 | if (inner_rel_bytes + bucket_bytes > hash_table_bytes) |
785 | { |
786 | /* We'll need multiple batches */ |
787 | long lbuckets; |
788 | double dbatch; |
789 | int minbatch; |
790 | long bucket_size; |
791 | |
792 | /* |
793 | * If Parallel Hash with combined work_mem would still need multiple |
794 | * batches, we'll have to fall back to regular work_mem budget. |
795 | */ |
796 | if (try_combined_work_mem) |
797 | { |
798 | ExecChooseHashTableSize(ntuples, tupwidth, useskew, |
799 | false, parallel_workers, |
800 | space_allowed, |
801 | numbuckets, |
802 | numbatches, |
803 | num_skew_mcvs); |
804 | return; |
805 | } |
806 | |
807 | /* |
808 | * Estimate the number of buckets we'll want to have when work_mem is |
809 | * entirely full. Each bucket will contain a bucket pointer plus |
810 | * NTUP_PER_BUCKET tuples, whose projected size already includes |
811 | * overhead for the hash code, pointer to the next tuple, etc. |
812 | */ |
813 | bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)); |
814 | lbuckets = 1L << my_log2(hash_table_bytes / bucket_size); |
815 | lbuckets = Min(lbuckets, max_pointers); |
816 | nbuckets = (int) lbuckets; |
817 | nbuckets = 1 << my_log2(nbuckets); |
818 | bucket_bytes = nbuckets * sizeof(HashJoinTuple); |
819 | |
820 | /* |
821 | * Buckets are simple pointers to hashjoin tuples, while tupsize |
822 | * includes the pointer, hash code, and MinimalTupleData. So buckets |
823 | * should never really exceed 25% of work_mem (even for |
824 | * NTUP_PER_BUCKET=1); except maybe for work_mem values that are not |
825 | * 2^N bytes, where we might get more because of doubling. So let's |
826 | * look for 50% here. |
827 | */ |
828 | Assert(bucket_bytes <= hash_table_bytes / 2); |
829 | |
830 | /* Calculate required number of batches. */ |
831 | dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes)); |
832 | dbatch = Min(dbatch, max_pointers); |
833 | minbatch = (int) dbatch; |
834 | nbatch = 2; |
835 | while (nbatch < minbatch) |
836 | nbatch <<= 1; |
837 | } |
838 | |
839 | Assert(nbuckets > 0); |
840 | Assert(nbatch > 0); |
841 | |
842 | *numbuckets = nbuckets; |
843 | *numbatches = nbatch; |
844 | } |
845 | |
846 | |
847 | /* ---------------------------------------------------------------- |
848 | * ExecHashTableDestroy |
849 | * |
850 | * destroy a hash table |
851 | * ---------------------------------------------------------------- |
852 | */ |
853 | void |
854 | ExecHashTableDestroy(HashJoinTable hashtable) |
855 | { |
856 | int i; |
857 | |
858 | /* |
859 | * Make sure all the temp files are closed. We skip batch 0, since it |
860 | * can't have any temp files (and the arrays might not even exist if |
861 | * nbatch is only 1). Parallel hash joins don't use these files. |
862 | */ |
863 | if (hashtable->innerBatchFile != NULL) |
864 | { |
865 | for (i = 1; i < hashtable->nbatch; i++) |
866 | { |
867 | if (hashtable->innerBatchFile[i]) |
868 | BufFileClose(hashtable->innerBatchFile[i]); |
869 | if (hashtable->outerBatchFile[i]) |
870 | BufFileClose(hashtable->outerBatchFile[i]); |
871 | } |
872 | } |
873 | |
874 | /* Release working memory (batchCxt is a child, so it goes away too) */ |
875 | MemoryContextDelete(hashtable->hashCxt); |
876 | |
877 | /* And drop the control block */ |
878 | pfree(hashtable); |
879 | } |
880 | |
881 | /* |
882 | * ExecHashIncreaseNumBatches |
883 | * increase the original number of batches in order to reduce |
884 | * current memory consumption |
885 | */ |
886 | static void |
887 | ExecHashIncreaseNumBatches(HashJoinTable hashtable) |
888 | { |
889 | int oldnbatch = hashtable->nbatch; |
890 | int curbatch = hashtable->curbatch; |
891 | int nbatch; |
892 | MemoryContext oldcxt; |
893 | long ninmemory; |
894 | long nfreed; |
895 | HashMemoryChunk oldchunks; |
896 | |
897 | /* do nothing if we've decided to shut off growth */ |
898 | if (!hashtable->growEnabled) |
899 | return; |
900 | |
901 | /* safety check to avoid overflow */ |
902 | if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2))) |
903 | return; |
904 | |
905 | nbatch = oldnbatch * 2; |
906 | Assert(nbatch > 1); |
907 | |
908 | #ifdef HJDEBUG |
909 | printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n" , |
910 | hashtable, nbatch, hashtable->spaceUsed); |
911 | #endif |
912 | |
913 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
914 | |
915 | if (hashtable->innerBatchFile == NULL) |
916 | { |
917 | /* we had no file arrays before */ |
918 | hashtable->innerBatchFile = (BufFile **) |
919 | palloc0(nbatch * sizeof(BufFile *)); |
920 | hashtable->outerBatchFile = (BufFile **) |
921 | palloc0(nbatch * sizeof(BufFile *)); |
922 | /* time to establish the temp tablespaces, too */ |
923 | PrepareTempTablespaces(); |
924 | } |
925 | else |
926 | { |
927 | /* enlarge arrays and zero out added entries */ |
928 | hashtable->innerBatchFile = (BufFile **) |
929 | repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *)); |
930 | hashtable->outerBatchFile = (BufFile **) |
931 | repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *)); |
932 | MemSet(hashtable->innerBatchFile + oldnbatch, 0, |
933 | (nbatch - oldnbatch) * sizeof(BufFile *)); |
934 | MemSet(hashtable->outerBatchFile + oldnbatch, 0, |
935 | (nbatch - oldnbatch) * sizeof(BufFile *)); |
936 | } |
937 | |
938 | MemoryContextSwitchTo(oldcxt); |
939 | |
940 | hashtable->nbatch = nbatch; |
941 | |
942 | /* |
943 | * Scan through the existing hash table entries and dump out any that are |
944 | * no longer of the current batch. |
945 | */ |
946 | ninmemory = nfreed = 0; |
947 | |
948 | /* If know we need to resize nbuckets, we can do it while rebatching. */ |
949 | if (hashtable->nbuckets_optimal != hashtable->nbuckets) |
950 | { |
951 | /* we never decrease the number of buckets */ |
952 | Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); |
953 | |
954 | hashtable->nbuckets = hashtable->nbuckets_optimal; |
955 | hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; |
956 | |
957 | hashtable->buckets.unshared = |
958 | repalloc(hashtable->buckets.unshared, |
959 | sizeof(HashJoinTuple) * hashtable->nbuckets); |
960 | } |
961 | |
962 | /* |
963 | * We will scan through the chunks directly, so that we can reset the |
964 | * buckets now and not have to keep track which tuples in the buckets have |
965 | * already been processed. We will free the old chunks as we go. |
966 | */ |
967 | memset(hashtable->buckets.unshared, 0, |
968 | sizeof(HashJoinTuple) * hashtable->nbuckets); |
969 | oldchunks = hashtable->chunks; |
970 | hashtable->chunks = NULL; |
971 | |
972 | /* so, let's scan through the old chunks, and all tuples in each chunk */ |
973 | while (oldchunks != NULL) |
974 | { |
975 | HashMemoryChunk nextchunk = oldchunks->next.unshared; |
976 | |
977 | /* position within the buffer (up to oldchunks->used) */ |
978 | size_t idx = 0; |
979 | |
980 | /* process all tuples stored in this chunk (and then free it) */ |
981 | while (idx < oldchunks->used) |
982 | { |
983 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx); |
984 | MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); |
985 | int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); |
986 | int bucketno; |
987 | int batchno; |
988 | |
989 | ninmemory++; |
990 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
991 | &bucketno, &batchno); |
992 | |
993 | if (batchno == curbatch) |
994 | { |
995 | /* keep tuple in memory - copy it into the new chunk */ |
996 | HashJoinTuple copyTuple; |
997 | |
998 | copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); |
999 | memcpy(copyTuple, hashTuple, hashTupleSize); |
1000 | |
1001 | /* and add it back to the appropriate bucket */ |
1002 | copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
1003 | hashtable->buckets.unshared[bucketno] = copyTuple; |
1004 | } |
1005 | else |
1006 | { |
1007 | /* dump it out */ |
1008 | Assert(batchno > curbatch); |
1009 | ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), |
1010 | hashTuple->hashvalue, |
1011 | &hashtable->innerBatchFile[batchno]); |
1012 | |
1013 | hashtable->spaceUsed -= hashTupleSize; |
1014 | nfreed++; |
1015 | } |
1016 | |
1017 | /* next tuple in this chunk */ |
1018 | idx += MAXALIGN(hashTupleSize); |
1019 | |
1020 | /* allow this loop to be cancellable */ |
1021 | CHECK_FOR_INTERRUPTS(); |
1022 | } |
1023 | |
1024 | /* we're done with this chunk - free it and proceed to the next one */ |
1025 | pfree(oldchunks); |
1026 | oldchunks = nextchunk; |
1027 | } |
1028 | |
1029 | #ifdef HJDEBUG |
1030 | printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n" , |
1031 | hashtable, nfreed, ninmemory, hashtable->spaceUsed); |
1032 | #endif |
1033 | |
1034 | /* |
1035 | * If we dumped out either all or none of the tuples in the table, disable |
1036 | * further expansion of nbatch. This situation implies that we have |
1037 | * enough tuples of identical hashvalues to overflow spaceAllowed. |
1038 | * Increasing nbatch will not fix it since there's no way to subdivide the |
1039 | * group any more finely. We have to just gut it out and hope the server |
1040 | * has enough RAM. |
1041 | */ |
1042 | if (nfreed == 0 || nfreed == ninmemory) |
1043 | { |
1044 | hashtable->growEnabled = false; |
1045 | #ifdef HJDEBUG |
1046 | printf("Hashjoin %p: disabling further increase of nbatch\n" , |
1047 | hashtable); |
1048 | #endif |
1049 | } |
1050 | } |
1051 | |
1052 | /* |
1053 | * ExecParallelHashIncreaseNumBatches |
1054 | * Every participant attached to grow_batches_barrier must run this |
1055 | * function when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES. |
1056 | */ |
1057 | static void |
1058 | ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) |
1059 | { |
1060 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
1061 | int i; |
1062 | |
1063 | Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); |
1064 | |
1065 | /* |
1066 | * It's unlikely, but we need to be prepared for new participants to show |
1067 | * up while we're in the middle of this operation so we need to switch on |
1068 | * barrier phase here. |
1069 | */ |
1070 | switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) |
1071 | { |
1072 | case PHJ_GROW_BATCHES_ELECTING: |
1073 | |
1074 | /* |
1075 | * Elect one participant to prepare to grow the number of batches. |
1076 | * This involves reallocating or resetting the buckets of batch 0 |
1077 | * in preparation for all participants to begin repartitioning the |
1078 | * tuples. |
1079 | */ |
1080 | if (BarrierArriveAndWait(&pstate->grow_batches_barrier, |
1081 | WAIT_EVENT_HASH_GROW_BATCHES_ELECTING)) |
1082 | { |
1083 | dsa_pointer_atomic *buckets; |
1084 | ParallelHashJoinBatch *old_batch0; |
1085 | int new_nbatch; |
1086 | int i; |
1087 | |
1088 | /* Move the old batch out of the way. */ |
1089 | old_batch0 = hashtable->batches[0].shared; |
1090 | pstate->old_batches = pstate->batches; |
1091 | pstate->old_nbatch = hashtable->nbatch; |
1092 | pstate->batches = InvalidDsaPointer; |
1093 | |
1094 | /* Free this backend's old accessors. */ |
1095 | ExecParallelHashCloseBatchAccessors(hashtable); |
1096 | |
1097 | /* Figure out how many batches to use. */ |
1098 | if (hashtable->nbatch == 1) |
1099 | { |
1100 | /* |
1101 | * We are going from single-batch to multi-batch. We need |
1102 | * to switch from one large combined memory budget to the |
1103 | * regular work_mem budget. |
1104 | */ |
1105 | pstate->space_allowed = work_mem * 1024L; |
1106 | |
1107 | /* |
1108 | * The combined work_mem of all participants wasn't |
1109 | * enough. Therefore one batch per participant would be |
1110 | * approximately equivalent and would probably also be |
1111 | * insufficient. So try two batches per participant, |
1112 | * rounded up to a power of two. |
1113 | */ |
1114 | new_nbatch = 1 << my_log2(pstate->nparticipants * 2); |
1115 | } |
1116 | else |
1117 | { |
1118 | /* |
1119 | * We were already multi-batched. Try doubling the number |
1120 | * of batches. |
1121 | */ |
1122 | new_nbatch = hashtable->nbatch * 2; |
1123 | } |
1124 | |
1125 | /* Allocate new larger generation of batches. */ |
1126 | Assert(hashtable->nbatch == pstate->nbatch); |
1127 | ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch); |
1128 | Assert(hashtable->nbatch == pstate->nbatch); |
1129 | |
1130 | /* Replace or recycle batch 0's bucket array. */ |
1131 | if (pstate->old_nbatch == 1) |
1132 | { |
1133 | double dtuples; |
1134 | double dbuckets; |
1135 | int new_nbuckets; |
1136 | |
1137 | /* |
1138 | * We probably also need a smaller bucket array. How many |
1139 | * tuples do we expect per batch, assuming we have only |
1140 | * half of them so far? Normally we don't need to change |
1141 | * the bucket array's size, because the size of each batch |
1142 | * stays the same as we add more batches, but in this |
1143 | * special case we move from a large batch to many smaller |
1144 | * batches and it would be wasteful to keep the large |
1145 | * array. |
1146 | */ |
1147 | dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; |
1148 | dbuckets = ceil(dtuples / NTUP_PER_BUCKET); |
1149 | dbuckets = Min(dbuckets, |
1150 | MaxAllocSize / sizeof(dsa_pointer_atomic)); |
1151 | new_nbuckets = (int) dbuckets; |
1152 | new_nbuckets = Max(new_nbuckets, 1024); |
1153 | new_nbuckets = 1 << my_log2(new_nbuckets); |
1154 | dsa_free(hashtable->area, old_batch0->buckets); |
1155 | hashtable->batches[0].shared->buckets = |
1156 | dsa_allocate(hashtable->area, |
1157 | sizeof(dsa_pointer_atomic) * new_nbuckets); |
1158 | buckets = (dsa_pointer_atomic *) |
1159 | dsa_get_address(hashtable->area, |
1160 | hashtable->batches[0].shared->buckets); |
1161 | for (i = 0; i < new_nbuckets; ++i) |
1162 | dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
1163 | pstate->nbuckets = new_nbuckets; |
1164 | } |
1165 | else |
1166 | { |
1167 | /* Recycle the existing bucket array. */ |
1168 | hashtable->batches[0].shared->buckets = old_batch0->buckets; |
1169 | buckets = (dsa_pointer_atomic *) |
1170 | dsa_get_address(hashtable->area, old_batch0->buckets); |
1171 | for (i = 0; i < hashtable->nbuckets; ++i) |
1172 | dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); |
1173 | } |
1174 | |
1175 | /* Move all chunks to the work queue for parallel processing. */ |
1176 | pstate->chunk_work_queue = old_batch0->chunks; |
1177 | |
1178 | /* Disable further growth temporarily while we're growing. */ |
1179 | pstate->growth = PHJ_GROWTH_DISABLED; |
1180 | } |
1181 | else |
1182 | { |
1183 | /* All other participants just flush their tuples to disk. */ |
1184 | ExecParallelHashCloseBatchAccessors(hashtable); |
1185 | } |
1186 | /* Fall through. */ |
1187 | |
1188 | case PHJ_GROW_BATCHES_ALLOCATING: |
1189 | /* Wait for the above to be finished. */ |
1190 | BarrierArriveAndWait(&pstate->grow_batches_barrier, |
1191 | WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING); |
1192 | /* Fall through. */ |
1193 | |
1194 | case PHJ_GROW_BATCHES_REPARTITIONING: |
1195 | /* Make sure that we have the current dimensions and buckets. */ |
1196 | ExecParallelHashEnsureBatchAccessors(hashtable); |
1197 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
1198 | /* Then partition, flush counters. */ |
1199 | ExecParallelHashRepartitionFirst(hashtable); |
1200 | ExecParallelHashRepartitionRest(hashtable); |
1201 | ExecParallelHashMergeCounters(hashtable); |
1202 | /* Wait for the above to be finished. */ |
1203 | BarrierArriveAndWait(&pstate->grow_batches_barrier, |
1204 | WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING); |
1205 | /* Fall through. */ |
1206 | |
1207 | case PHJ_GROW_BATCHES_DECIDING: |
1208 | |
1209 | /* |
1210 | * Elect one participant to clean up and decide whether further |
1211 | * repartitioning is needed, or should be disabled because it's |
1212 | * not helping. |
1213 | */ |
1214 | if (BarrierArriveAndWait(&pstate->grow_batches_barrier, |
1215 | WAIT_EVENT_HASH_GROW_BATCHES_DECIDING)) |
1216 | { |
1217 | bool space_exhausted = false; |
1218 | bool extreme_skew_detected = false; |
1219 | |
1220 | /* Make sure that we have the current dimensions and buckets. */ |
1221 | ExecParallelHashEnsureBatchAccessors(hashtable); |
1222 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
1223 | |
1224 | /* Are any of the new generation of batches exhausted? */ |
1225 | for (i = 0; i < hashtable->nbatch; ++i) |
1226 | { |
1227 | ParallelHashJoinBatch *batch = hashtable->batches[i].shared; |
1228 | |
1229 | if (batch->space_exhausted || |
1230 | batch->estimated_size > pstate->space_allowed) |
1231 | { |
1232 | int parent; |
1233 | |
1234 | space_exhausted = true; |
1235 | |
1236 | /* |
1237 | * Did this batch receive ALL of the tuples from its |
1238 | * parent batch? That would indicate that further |
1239 | * repartitioning isn't going to help (the hash values |
1240 | * are probably all the same). |
1241 | */ |
1242 | parent = i % pstate->old_nbatch; |
1243 | if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples) |
1244 | extreme_skew_detected = true; |
1245 | } |
1246 | } |
1247 | |
1248 | /* Don't keep growing if it's not helping or we'd overflow. */ |
1249 | if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2) |
1250 | pstate->growth = PHJ_GROWTH_DISABLED; |
1251 | else if (space_exhausted) |
1252 | pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
1253 | else |
1254 | pstate->growth = PHJ_GROWTH_OK; |
1255 | |
1256 | /* Free the old batches in shared memory. */ |
1257 | dsa_free(hashtable->area, pstate->old_batches); |
1258 | pstate->old_batches = InvalidDsaPointer; |
1259 | } |
1260 | /* Fall through. */ |
1261 | |
1262 | case PHJ_GROW_BATCHES_FINISHING: |
1263 | /* Wait for the above to complete. */ |
1264 | BarrierArriveAndWait(&pstate->grow_batches_barrier, |
1265 | WAIT_EVENT_HASH_GROW_BATCHES_FINISHING); |
1266 | } |
1267 | } |
1268 | |
1269 | /* |
1270 | * Repartition the tuples currently loaded into memory for inner batch 0 |
1271 | * because the number of batches has been increased. Some tuples are retained |
1272 | * in memory and some are written out to a later batch. |
1273 | */ |
1274 | static void |
1275 | ExecParallelHashRepartitionFirst(HashJoinTable hashtable) |
1276 | { |
1277 | dsa_pointer chunk_shared; |
1278 | HashMemoryChunk chunk; |
1279 | |
1280 | Assert(hashtable->nbatch == hashtable->parallel_state->nbatch); |
1281 | |
1282 | while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared))) |
1283 | { |
1284 | size_t idx = 0; |
1285 | |
1286 | /* Repartition all tuples in this chunk. */ |
1287 | while (idx < chunk->used) |
1288 | { |
1289 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
1290 | MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); |
1291 | HashJoinTuple copyTuple; |
1292 | dsa_pointer shared; |
1293 | int bucketno; |
1294 | int batchno; |
1295 | |
1296 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
1297 | &bucketno, &batchno); |
1298 | |
1299 | Assert(batchno < hashtable->nbatch); |
1300 | if (batchno == 0) |
1301 | { |
1302 | /* It still belongs in batch 0. Copy to a new chunk. */ |
1303 | copyTuple = |
1304 | ExecParallelHashTupleAlloc(hashtable, |
1305 | HJTUPLE_OVERHEAD + tuple->t_len, |
1306 | &shared); |
1307 | copyTuple->hashvalue = hashTuple->hashvalue; |
1308 | memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); |
1309 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
1310 | copyTuple, shared); |
1311 | } |
1312 | else |
1313 | { |
1314 | size_t tuple_size = |
1315 | MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
1316 | |
1317 | /* It belongs in a later batch. */ |
1318 | hashtable->batches[batchno].estimated_size += tuple_size; |
1319 | sts_puttuple(hashtable->batches[batchno].inner_tuples, |
1320 | &hashTuple->hashvalue, tuple); |
1321 | } |
1322 | |
1323 | /* Count this tuple. */ |
1324 | ++hashtable->batches[0].old_ntuples; |
1325 | ++hashtable->batches[batchno].ntuples; |
1326 | |
1327 | idx += MAXALIGN(HJTUPLE_OVERHEAD + |
1328 | HJTUPLE_MINTUPLE(hashTuple)->t_len); |
1329 | } |
1330 | |
1331 | /* Free this chunk. */ |
1332 | dsa_free(hashtable->area, chunk_shared); |
1333 | |
1334 | CHECK_FOR_INTERRUPTS(); |
1335 | } |
1336 | } |
1337 | |
1338 | /* |
1339 | * Help repartition inner batches 1..n. |
1340 | */ |
1341 | static void |
1342 | ExecParallelHashRepartitionRest(HashJoinTable hashtable) |
1343 | { |
1344 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
1345 | int old_nbatch = pstate->old_nbatch; |
1346 | SharedTuplestoreAccessor **old_inner_tuples; |
1347 | ParallelHashJoinBatch *old_batches; |
1348 | int i; |
1349 | |
1350 | /* Get our hands on the previous generation of batches. */ |
1351 | old_batches = (ParallelHashJoinBatch *) |
1352 | dsa_get_address(hashtable->area, pstate->old_batches); |
1353 | old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch); |
1354 | for (i = 1; i < old_nbatch; ++i) |
1355 | { |
1356 | ParallelHashJoinBatch *shared = |
1357 | NthParallelHashJoinBatch(old_batches, i); |
1358 | |
1359 | old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared), |
1360 | ParallelWorkerNumber + 1, |
1361 | &pstate->fileset); |
1362 | } |
1363 | |
1364 | /* Join in the effort to repartition them. */ |
1365 | for (i = 1; i < old_nbatch; ++i) |
1366 | { |
1367 | MinimalTuple tuple; |
1368 | uint32 hashvalue; |
1369 | |
1370 | /* Scan one partition from the previous generation. */ |
1371 | sts_begin_parallel_scan(old_inner_tuples[i]); |
1372 | while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue))) |
1373 | { |
1374 | size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
1375 | int bucketno; |
1376 | int batchno; |
1377 | |
1378 | /* Decide which partition it goes to in the new generation. */ |
1379 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, |
1380 | &batchno); |
1381 | |
1382 | hashtable->batches[batchno].estimated_size += tuple_size; |
1383 | ++hashtable->batches[batchno].ntuples; |
1384 | ++hashtable->batches[i].old_ntuples; |
1385 | |
1386 | /* Store the tuple its new batch. */ |
1387 | sts_puttuple(hashtable->batches[batchno].inner_tuples, |
1388 | &hashvalue, tuple); |
1389 | |
1390 | CHECK_FOR_INTERRUPTS(); |
1391 | } |
1392 | sts_end_parallel_scan(old_inner_tuples[i]); |
1393 | } |
1394 | |
1395 | pfree(old_inner_tuples); |
1396 | } |
1397 | |
1398 | /* |
1399 | * Transfer the backend-local per-batch counters to the shared totals. |
1400 | */ |
1401 | static void |
1402 | ExecParallelHashMergeCounters(HashJoinTable hashtable) |
1403 | { |
1404 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
1405 | int i; |
1406 | |
1407 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
1408 | pstate->total_tuples = 0; |
1409 | for (i = 0; i < hashtable->nbatch; ++i) |
1410 | { |
1411 | ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i]; |
1412 | |
1413 | batch->shared->size += batch->size; |
1414 | batch->shared->estimated_size += batch->estimated_size; |
1415 | batch->shared->ntuples += batch->ntuples; |
1416 | batch->shared->old_ntuples += batch->old_ntuples; |
1417 | batch->size = 0; |
1418 | batch->estimated_size = 0; |
1419 | batch->ntuples = 0; |
1420 | batch->old_ntuples = 0; |
1421 | pstate->total_tuples += batch->shared->ntuples; |
1422 | } |
1423 | LWLockRelease(&pstate->lock); |
1424 | } |
1425 | |
1426 | /* |
1427 | * ExecHashIncreaseNumBuckets |
1428 | * increase the original number of buckets in order to reduce |
1429 | * number of tuples per bucket |
1430 | */ |
1431 | static void |
1432 | ExecHashIncreaseNumBuckets(HashJoinTable hashtable) |
1433 | { |
1434 | HashMemoryChunk chunk; |
1435 | |
1436 | /* do nothing if not an increase (it's called increase for a reason) */ |
1437 | if (hashtable->nbuckets >= hashtable->nbuckets_optimal) |
1438 | return; |
1439 | |
1440 | #ifdef HJDEBUG |
1441 | printf("Hashjoin %p: increasing nbuckets %d => %d\n" , |
1442 | hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal); |
1443 | #endif |
1444 | |
1445 | hashtable->nbuckets = hashtable->nbuckets_optimal; |
1446 | hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; |
1447 | |
1448 | Assert(hashtable->nbuckets > 1); |
1449 | Assert(hashtable->nbuckets <= (INT_MAX / 2)); |
1450 | Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); |
1451 | |
1452 | /* |
1453 | * Just reallocate the proper number of buckets - we don't need to walk |
1454 | * through them - we can walk the dense-allocated chunks (just like in |
1455 | * ExecHashIncreaseNumBatches, but without all the copying into new |
1456 | * chunks) |
1457 | */ |
1458 | hashtable->buckets.unshared = |
1459 | (HashJoinTuple *) repalloc(hashtable->buckets.unshared, |
1460 | hashtable->nbuckets * sizeof(HashJoinTuple)); |
1461 | |
1462 | memset(hashtable->buckets.unshared, 0, |
1463 | hashtable->nbuckets * sizeof(HashJoinTuple)); |
1464 | |
1465 | /* scan through all tuples in all chunks to rebuild the hash table */ |
1466 | for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) |
1467 | { |
1468 | /* process all tuples stored in this chunk */ |
1469 | size_t idx = 0; |
1470 | |
1471 | while (idx < chunk->used) |
1472 | { |
1473 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
1474 | int bucketno; |
1475 | int batchno; |
1476 | |
1477 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
1478 | &bucketno, &batchno); |
1479 | |
1480 | /* add the tuple to the proper bucket */ |
1481 | hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
1482 | hashtable->buckets.unshared[bucketno] = hashTuple; |
1483 | |
1484 | /* advance index past the tuple */ |
1485 | idx += MAXALIGN(HJTUPLE_OVERHEAD + |
1486 | HJTUPLE_MINTUPLE(hashTuple)->t_len); |
1487 | } |
1488 | |
1489 | /* allow this loop to be cancellable */ |
1490 | CHECK_FOR_INTERRUPTS(); |
1491 | } |
1492 | } |
1493 | |
1494 | static void |
1495 | ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) |
1496 | { |
1497 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
1498 | int i; |
1499 | HashMemoryChunk chunk; |
1500 | dsa_pointer chunk_s; |
1501 | |
1502 | Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); |
1503 | |
1504 | /* |
1505 | * It's unlikely, but we need to be prepared for new participants to show |
1506 | * up while we're in the middle of this operation so we need to switch on |
1507 | * barrier phase here. |
1508 | */ |
1509 | switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) |
1510 | { |
1511 | case PHJ_GROW_BUCKETS_ELECTING: |
1512 | /* Elect one participant to prepare to increase nbuckets. */ |
1513 | if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
1514 | WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING)) |
1515 | { |
1516 | size_t size; |
1517 | dsa_pointer_atomic *buckets; |
1518 | |
1519 | /* Double the size of the bucket array. */ |
1520 | pstate->nbuckets *= 2; |
1521 | size = pstate->nbuckets * sizeof(dsa_pointer_atomic); |
1522 | hashtable->batches[0].shared->size += size / 2; |
1523 | dsa_free(hashtable->area, hashtable->batches[0].shared->buckets); |
1524 | hashtable->batches[0].shared->buckets = |
1525 | dsa_allocate(hashtable->area, size); |
1526 | buckets = (dsa_pointer_atomic *) |
1527 | dsa_get_address(hashtable->area, |
1528 | hashtable->batches[0].shared->buckets); |
1529 | for (i = 0; i < pstate->nbuckets; ++i) |
1530 | dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
1531 | |
1532 | /* Put the chunk list onto the work queue. */ |
1533 | pstate->chunk_work_queue = hashtable->batches[0].shared->chunks; |
1534 | |
1535 | /* Clear the flag. */ |
1536 | pstate->growth = PHJ_GROWTH_OK; |
1537 | } |
1538 | /* Fall through. */ |
1539 | |
1540 | case PHJ_GROW_BUCKETS_ALLOCATING: |
1541 | /* Wait for the above to complete. */ |
1542 | BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
1543 | WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING); |
1544 | /* Fall through. */ |
1545 | |
1546 | case PHJ_GROW_BUCKETS_REINSERTING: |
1547 | /* Reinsert all tuples into the hash table. */ |
1548 | ExecParallelHashEnsureBatchAccessors(hashtable); |
1549 | ExecParallelHashTableSetCurrentBatch(hashtable, 0); |
1550 | while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) |
1551 | { |
1552 | size_t idx = 0; |
1553 | |
1554 | while (idx < chunk->used) |
1555 | { |
1556 | HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); |
1557 | dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx; |
1558 | int bucketno; |
1559 | int batchno; |
1560 | |
1561 | ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, |
1562 | &bucketno, &batchno); |
1563 | Assert(batchno == 0); |
1564 | |
1565 | /* add the tuple to the proper bucket */ |
1566 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
1567 | hashTuple, shared); |
1568 | |
1569 | /* advance index past the tuple */ |
1570 | idx += MAXALIGN(HJTUPLE_OVERHEAD + |
1571 | HJTUPLE_MINTUPLE(hashTuple)->t_len); |
1572 | } |
1573 | |
1574 | /* allow this loop to be cancellable */ |
1575 | CHECK_FOR_INTERRUPTS(); |
1576 | } |
1577 | BarrierArriveAndWait(&pstate->grow_buckets_barrier, |
1578 | WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING); |
1579 | } |
1580 | } |
1581 | |
1582 | /* |
1583 | * ExecHashTableInsert |
1584 | * insert a tuple into the hash table depending on the hash value |
1585 | * it may just go to a temp file for later batches |
1586 | * |
1587 | * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual |
1588 | * tuple; the minimal case in particular is certain to happen while reloading |
1589 | * tuples from batch files. We could save some cycles in the regular-tuple |
1590 | * case by not forcing the slot contents into minimal form; not clear if it's |
1591 | * worth the messiness required. |
1592 | */ |
1593 | void |
1594 | ExecHashTableInsert(HashJoinTable hashtable, |
1595 | TupleTableSlot *slot, |
1596 | uint32 hashvalue) |
1597 | { |
1598 | bool shouldFree; |
1599 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
1600 | int bucketno; |
1601 | int batchno; |
1602 | |
1603 | ExecHashGetBucketAndBatch(hashtable, hashvalue, |
1604 | &bucketno, &batchno); |
1605 | |
1606 | /* |
1607 | * decide whether to put the tuple in the hash table or a temp file |
1608 | */ |
1609 | if (batchno == hashtable->curbatch) |
1610 | { |
1611 | /* |
1612 | * put the tuple in hash table |
1613 | */ |
1614 | HashJoinTuple hashTuple; |
1615 | int hashTupleSize; |
1616 | double ntuples = (hashtable->totalTuples - hashtable->skewTuples); |
1617 | |
1618 | /* Create the HashJoinTuple */ |
1619 | hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
1620 | hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); |
1621 | |
1622 | hashTuple->hashvalue = hashvalue; |
1623 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
1624 | |
1625 | /* |
1626 | * We always reset the tuple-matched flag on insertion. This is okay |
1627 | * even when reloading a tuple from a batch file, since the tuple |
1628 | * could not possibly have been matched to an outer tuple before it |
1629 | * went into the batch file. |
1630 | */ |
1631 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
1632 | |
1633 | /* Push it onto the front of the bucket's list */ |
1634 | hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
1635 | hashtable->buckets.unshared[bucketno] = hashTuple; |
1636 | |
1637 | /* |
1638 | * Increase the (optimal) number of buckets if we just exceeded the |
1639 | * NTUP_PER_BUCKET threshold, but only when there's still a single |
1640 | * batch. |
1641 | */ |
1642 | if (hashtable->nbatch == 1 && |
1643 | ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)) |
1644 | { |
1645 | /* Guard against integer overflow and alloc size overflow */ |
1646 | if (hashtable->nbuckets_optimal <= INT_MAX / 2 && |
1647 | hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple)) |
1648 | { |
1649 | hashtable->nbuckets_optimal *= 2; |
1650 | hashtable->log2_nbuckets_optimal += 1; |
1651 | } |
1652 | } |
1653 | |
1654 | /* Account for space used, and back off if we've used too much */ |
1655 | hashtable->spaceUsed += hashTupleSize; |
1656 | if (hashtable->spaceUsed > hashtable->spacePeak) |
1657 | hashtable->spacePeak = hashtable->spaceUsed; |
1658 | if (hashtable->spaceUsed + |
1659 | hashtable->nbuckets_optimal * sizeof(HashJoinTuple) |
1660 | > hashtable->spaceAllowed) |
1661 | ExecHashIncreaseNumBatches(hashtable); |
1662 | } |
1663 | else |
1664 | { |
1665 | /* |
1666 | * put the tuple into a temp file for later batches |
1667 | */ |
1668 | Assert(batchno > hashtable->curbatch); |
1669 | ExecHashJoinSaveTuple(tuple, |
1670 | hashvalue, |
1671 | &hashtable->innerBatchFile[batchno]); |
1672 | } |
1673 | |
1674 | if (shouldFree) |
1675 | heap_free_minimal_tuple(tuple); |
1676 | } |
1677 | |
1678 | /* |
1679 | * ExecParallelHashTableInsert |
1680 | * insert a tuple into a shared hash table or shared batch tuplestore |
1681 | */ |
1682 | void |
1683 | ExecParallelHashTableInsert(HashJoinTable hashtable, |
1684 | TupleTableSlot *slot, |
1685 | uint32 hashvalue) |
1686 | { |
1687 | bool shouldFree; |
1688 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
1689 | dsa_pointer shared; |
1690 | int bucketno; |
1691 | int batchno; |
1692 | |
1693 | retry: |
1694 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
1695 | |
1696 | if (batchno == 0) |
1697 | { |
1698 | HashJoinTuple hashTuple; |
1699 | |
1700 | /* Try to load it into memory. */ |
1701 | Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == |
1702 | PHJ_BUILD_HASHING_INNER); |
1703 | hashTuple = ExecParallelHashTupleAlloc(hashtable, |
1704 | HJTUPLE_OVERHEAD + tuple->t_len, |
1705 | &shared); |
1706 | if (hashTuple == NULL) |
1707 | goto retry; |
1708 | |
1709 | /* Store the hash value in the HashJoinTuple header. */ |
1710 | hashTuple->hashvalue = hashvalue; |
1711 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
1712 | |
1713 | /* Push it onto the front of the bucket's list */ |
1714 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
1715 | hashTuple, shared); |
1716 | } |
1717 | else |
1718 | { |
1719 | size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); |
1720 | |
1721 | Assert(batchno > 0); |
1722 | |
1723 | /* Try to preallocate space in the batch if necessary. */ |
1724 | if (hashtable->batches[batchno].preallocated < tuple_size) |
1725 | { |
1726 | if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size)) |
1727 | goto retry; |
1728 | } |
1729 | |
1730 | Assert(hashtable->batches[batchno].preallocated >= tuple_size); |
1731 | hashtable->batches[batchno].preallocated -= tuple_size; |
1732 | sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue, |
1733 | tuple); |
1734 | } |
1735 | ++hashtable->batches[batchno].ntuples; |
1736 | |
1737 | if (shouldFree) |
1738 | heap_free_minimal_tuple(tuple); |
1739 | } |
1740 | |
1741 | /* |
1742 | * Insert a tuple into the current hash table. Unlike |
1743 | * ExecParallelHashTableInsert, this version is not prepared to send the tuple |
1744 | * to other batches or to run out of memory, and should only be called with |
1745 | * tuples that belong in the current batch once growth has been disabled. |
1746 | */ |
1747 | void |
1748 | ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, |
1749 | TupleTableSlot *slot, |
1750 | uint32 hashvalue) |
1751 | { |
1752 | bool shouldFree; |
1753 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
1754 | HashJoinTuple hashTuple; |
1755 | dsa_pointer shared; |
1756 | int batchno; |
1757 | int bucketno; |
1758 | |
1759 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
1760 | Assert(batchno == hashtable->curbatch); |
1761 | hashTuple = ExecParallelHashTupleAlloc(hashtable, |
1762 | HJTUPLE_OVERHEAD + tuple->t_len, |
1763 | &shared); |
1764 | hashTuple->hashvalue = hashvalue; |
1765 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
1766 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
1767 | ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], |
1768 | hashTuple, shared); |
1769 | |
1770 | if (shouldFree) |
1771 | heap_free_minimal_tuple(tuple); |
1772 | } |
1773 | |
1774 | /* |
1775 | * ExecHashGetHashValue |
1776 | * Compute the hash value for a tuple |
1777 | * |
1778 | * The tuple to be tested must be in econtext->ecxt_outertuple (thus Vars in |
1779 | * the hashkeys expressions need to have OUTER_VAR as varno). If outer_tuple |
1780 | * is false (meaning it's the HashJoin's inner node, Hash), econtext, |
1781 | * hashkeys, and slot need to be from Hash, with hashkeys/slot referencing and |
1782 | * being suitable for tuples from the node below the Hash. Conversely, if |
1783 | * outer_tuple is true, econtext is from HashJoin, and hashkeys/slot need to |
1784 | * be appropriate for tuples from HashJoin's outer node. |
1785 | * |
1786 | * A true result means the tuple's hash value has been successfully computed |
1787 | * and stored at *hashvalue. A false result means the tuple cannot match |
1788 | * because it contains a null attribute, and hence it should be discarded |
1789 | * immediately. (If keep_nulls is true then false is never returned.) |
1790 | */ |
1791 | bool |
1792 | ExecHashGetHashValue(HashJoinTable hashtable, |
1793 | ExprContext *econtext, |
1794 | List *hashkeys, |
1795 | bool outer_tuple, |
1796 | bool keep_nulls, |
1797 | uint32 *hashvalue) |
1798 | { |
1799 | uint32 hashkey = 0; |
1800 | FmgrInfo *hashfunctions; |
1801 | ListCell *hk; |
1802 | int i = 0; |
1803 | MemoryContext oldContext; |
1804 | |
1805 | /* |
1806 | * We reset the eval context each time to reclaim any memory leaked in the |
1807 | * hashkey expressions. |
1808 | */ |
1809 | ResetExprContext(econtext); |
1810 | |
1811 | oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
1812 | |
1813 | if (outer_tuple) |
1814 | hashfunctions = hashtable->outer_hashfunctions; |
1815 | else |
1816 | hashfunctions = hashtable->inner_hashfunctions; |
1817 | |
1818 | foreach(hk, hashkeys) |
1819 | { |
1820 | ExprState *keyexpr = (ExprState *) lfirst(hk); |
1821 | Datum keyval; |
1822 | bool isNull; |
1823 | |
1824 | /* rotate hashkey left 1 bit at each step */ |
1825 | hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); |
1826 | |
1827 | /* |
1828 | * Get the join attribute value of the tuple |
1829 | */ |
1830 | keyval = ExecEvalExpr(keyexpr, econtext, &isNull); |
1831 | |
1832 | /* |
1833 | * If the attribute is NULL, and the join operator is strict, then |
1834 | * this tuple cannot pass the join qual so we can reject it |
1835 | * immediately (unless we're scanning the outside of an outer join, in |
1836 | * which case we must not reject it). Otherwise we act like the |
1837 | * hashcode of NULL is zero (this will support operators that act like |
1838 | * IS NOT DISTINCT, though not any more-random behavior). We treat |
1839 | * the hash support function as strict even if the operator is not. |
1840 | * |
1841 | * Note: currently, all hashjoinable operators must be strict since |
1842 | * the hash index AM assumes that. However, it takes so little extra |
1843 | * code here to allow non-strict that we may as well do it. |
1844 | */ |
1845 | if (isNull) |
1846 | { |
1847 | if (hashtable->hashStrict[i] && !keep_nulls) |
1848 | { |
1849 | MemoryContextSwitchTo(oldContext); |
1850 | return false; /* cannot match */ |
1851 | } |
1852 | /* else, leave hashkey unmodified, equivalent to hashcode 0 */ |
1853 | } |
1854 | else |
1855 | { |
1856 | /* Compute the hash function */ |
1857 | uint32 hkey; |
1858 | |
1859 | hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval)); |
1860 | hashkey ^= hkey; |
1861 | } |
1862 | |
1863 | i++; |
1864 | } |
1865 | |
1866 | MemoryContextSwitchTo(oldContext); |
1867 | |
1868 | *hashvalue = hashkey; |
1869 | return true; |
1870 | } |
1871 | |
1872 | /* |
1873 | * ExecHashGetBucketAndBatch |
1874 | * Determine the bucket number and batch number for a hash value |
1875 | * |
1876 | * Note: on-the-fly increases of nbatch must not change the bucket number |
1877 | * for a given hash code (since we don't move tuples to different hash |
1878 | * chains), and must only cause the batch number to remain the same or |
1879 | * increase. Our algorithm is |
1880 | * bucketno = hashvalue MOD nbuckets |
1881 | * batchno = (hashvalue DIV nbuckets) MOD nbatch |
1882 | * where nbuckets and nbatch are both expected to be powers of 2, so we can |
1883 | * do the computations by shifting and masking. (This assumes that all hash |
1884 | * functions are good about randomizing all their output bits, else we are |
1885 | * likely to have very skewed bucket or batch occupancy.) |
1886 | * |
1887 | * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic |
1888 | * bucket count growth. Once we start batching, the value is fixed and does |
1889 | * not change over the course of the join (making it possible to compute batch |
1890 | * number the way we do here). |
1891 | * |
1892 | * nbatch is always a power of 2; we increase it only by doubling it. This |
1893 | * effectively adds one more bit to the top of the batchno. |
1894 | */ |
1895 | void |
1896 | ExecHashGetBucketAndBatch(HashJoinTable hashtable, |
1897 | uint32 hashvalue, |
1898 | int *bucketno, |
1899 | int *batchno) |
1900 | { |
1901 | uint32 nbuckets = (uint32) hashtable->nbuckets; |
1902 | uint32 nbatch = (uint32) hashtable->nbatch; |
1903 | |
1904 | if (nbatch > 1) |
1905 | { |
1906 | /* we can do MOD by masking, DIV by shifting */ |
1907 | *bucketno = hashvalue & (nbuckets - 1); |
1908 | *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1); |
1909 | } |
1910 | else |
1911 | { |
1912 | *bucketno = hashvalue & (nbuckets - 1); |
1913 | *batchno = 0; |
1914 | } |
1915 | } |
1916 | |
1917 | /* |
1918 | * ExecScanHashBucket |
1919 | * scan a hash bucket for matches to the current outer tuple |
1920 | * |
1921 | * The current outer tuple must be stored in econtext->ecxt_outertuple. |
1922 | * |
1923 | * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
1924 | * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
1925 | * for the latter. |
1926 | */ |
1927 | bool |
1928 | ExecScanHashBucket(HashJoinState *hjstate, |
1929 | ExprContext *econtext) |
1930 | { |
1931 | ExprState *hjclauses = hjstate->hashclauses; |
1932 | HashJoinTable hashtable = hjstate->hj_HashTable; |
1933 | HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
1934 | uint32 hashvalue = hjstate->hj_CurHashValue; |
1935 | |
1936 | /* |
1937 | * hj_CurTuple is the address of the tuple last returned from the current |
1938 | * bucket, or NULL if it's time to start scanning a new bucket. |
1939 | * |
1940 | * If the tuple hashed to a skew bucket then scan the skew bucket |
1941 | * otherwise scan the standard hashtable bucket. |
1942 | */ |
1943 | if (hashTuple != NULL) |
1944 | hashTuple = hashTuple->next.unshared; |
1945 | else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) |
1946 | hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; |
1947 | else |
1948 | hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; |
1949 | |
1950 | while (hashTuple != NULL) |
1951 | { |
1952 | if (hashTuple->hashvalue == hashvalue) |
1953 | { |
1954 | TupleTableSlot *inntuple; |
1955 | |
1956 | /* insert hashtable's tuple into exec slot so ExecQual sees it */ |
1957 | inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
1958 | hjstate->hj_HashTupleSlot, |
1959 | false); /* do not pfree */ |
1960 | econtext->ecxt_innertuple = inntuple; |
1961 | |
1962 | if (ExecQualAndReset(hjclauses, econtext)) |
1963 | { |
1964 | hjstate->hj_CurTuple = hashTuple; |
1965 | return true; |
1966 | } |
1967 | } |
1968 | |
1969 | hashTuple = hashTuple->next.unshared; |
1970 | } |
1971 | |
1972 | /* |
1973 | * no match |
1974 | */ |
1975 | return false; |
1976 | } |
1977 | |
1978 | /* |
1979 | * ExecParallelScanHashBucket |
1980 | * scan a hash bucket for matches to the current outer tuple |
1981 | * |
1982 | * The current outer tuple must be stored in econtext->ecxt_outertuple. |
1983 | * |
1984 | * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
1985 | * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
1986 | * for the latter. |
1987 | */ |
1988 | bool |
1989 | ExecParallelScanHashBucket(HashJoinState *hjstate, |
1990 | ExprContext *econtext) |
1991 | { |
1992 | ExprState *hjclauses = hjstate->hashclauses; |
1993 | HashJoinTable hashtable = hjstate->hj_HashTable; |
1994 | HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
1995 | uint32 hashvalue = hjstate->hj_CurHashValue; |
1996 | |
1997 | /* |
1998 | * hj_CurTuple is the address of the tuple last returned from the current |
1999 | * bucket, or NULL if it's time to start scanning a new bucket. |
2000 | */ |
2001 | if (hashTuple != NULL) |
2002 | hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
2003 | else |
2004 | hashTuple = ExecParallelHashFirstTuple(hashtable, |
2005 | hjstate->hj_CurBucketNo); |
2006 | |
2007 | while (hashTuple != NULL) |
2008 | { |
2009 | if (hashTuple->hashvalue == hashvalue) |
2010 | { |
2011 | TupleTableSlot *inntuple; |
2012 | |
2013 | /* insert hashtable's tuple into exec slot so ExecQual sees it */ |
2014 | inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
2015 | hjstate->hj_HashTupleSlot, |
2016 | false); /* do not pfree */ |
2017 | econtext->ecxt_innertuple = inntuple; |
2018 | |
2019 | if (ExecQualAndReset(hjclauses, econtext)) |
2020 | { |
2021 | hjstate->hj_CurTuple = hashTuple; |
2022 | return true; |
2023 | } |
2024 | } |
2025 | |
2026 | hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); |
2027 | } |
2028 | |
2029 | /* |
2030 | * no match |
2031 | */ |
2032 | return false; |
2033 | } |
2034 | |
2035 | /* |
2036 | * ExecPrepHashTableForUnmatched |
2037 | * set up for a series of ExecScanHashTableForUnmatched calls |
2038 | */ |
2039 | void |
2040 | ExecPrepHashTableForUnmatched(HashJoinState *hjstate) |
2041 | { |
2042 | /*---------- |
2043 | * During this scan we use the HashJoinState fields as follows: |
2044 | * |
2045 | * hj_CurBucketNo: next regular bucket to scan |
2046 | * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums) |
2047 | * hj_CurTuple: last tuple returned, or NULL to start next bucket |
2048 | *---------- |
2049 | */ |
2050 | hjstate->hj_CurBucketNo = 0; |
2051 | hjstate->hj_CurSkewBucketNo = 0; |
2052 | hjstate->hj_CurTuple = NULL; |
2053 | } |
2054 | |
2055 | /* |
2056 | * ExecScanHashTableForUnmatched |
2057 | * scan the hash table for unmatched inner tuples |
2058 | * |
2059 | * On success, the inner tuple is stored into hjstate->hj_CurTuple and |
2060 | * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot |
2061 | * for the latter. |
2062 | */ |
2063 | bool |
2064 | ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) |
2065 | { |
2066 | HashJoinTable hashtable = hjstate->hj_HashTable; |
2067 | HashJoinTuple hashTuple = hjstate->hj_CurTuple; |
2068 | |
2069 | for (;;) |
2070 | { |
2071 | /* |
2072 | * hj_CurTuple is the address of the tuple last returned from the |
2073 | * current bucket, or NULL if it's time to start scanning a new |
2074 | * bucket. |
2075 | */ |
2076 | if (hashTuple != NULL) |
2077 | hashTuple = hashTuple->next.unshared; |
2078 | else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) |
2079 | { |
2080 | hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; |
2081 | hjstate->hj_CurBucketNo++; |
2082 | } |
2083 | else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) |
2084 | { |
2085 | int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; |
2086 | |
2087 | hashTuple = hashtable->skewBucket[j]->tuples; |
2088 | hjstate->hj_CurSkewBucketNo++; |
2089 | } |
2090 | else |
2091 | break; /* finished all buckets */ |
2092 | |
2093 | while (hashTuple != NULL) |
2094 | { |
2095 | if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) |
2096 | { |
2097 | TupleTableSlot *inntuple; |
2098 | |
2099 | /* insert hashtable's tuple into exec slot */ |
2100 | inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), |
2101 | hjstate->hj_HashTupleSlot, |
2102 | false); /* do not pfree */ |
2103 | econtext->ecxt_innertuple = inntuple; |
2104 | |
2105 | /* |
2106 | * Reset temp memory each time; although this function doesn't |
2107 | * do any qual eval, the caller will, so let's keep it |
2108 | * parallel to ExecScanHashBucket. |
2109 | */ |
2110 | ResetExprContext(econtext); |
2111 | |
2112 | hjstate->hj_CurTuple = hashTuple; |
2113 | return true; |
2114 | } |
2115 | |
2116 | hashTuple = hashTuple->next.unshared; |
2117 | } |
2118 | |
2119 | /* allow this loop to be cancellable */ |
2120 | CHECK_FOR_INTERRUPTS(); |
2121 | } |
2122 | |
2123 | /* |
2124 | * no more unmatched tuples |
2125 | */ |
2126 | return false; |
2127 | } |
2128 | |
2129 | /* |
2130 | * ExecHashTableReset |
2131 | * |
2132 | * reset hash table header for new batch |
2133 | */ |
2134 | void |
2135 | ExecHashTableReset(HashJoinTable hashtable) |
2136 | { |
2137 | MemoryContext oldcxt; |
2138 | int nbuckets = hashtable->nbuckets; |
2139 | |
2140 | /* |
2141 | * Release all the hash buckets and tuples acquired in the prior pass, and |
2142 | * reinitialize the context for a new pass. |
2143 | */ |
2144 | MemoryContextReset(hashtable->batchCxt); |
2145 | oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); |
2146 | |
2147 | /* Reallocate and reinitialize the hash bucket headers. */ |
2148 | hashtable->buckets.unshared = (HashJoinTuple *) |
2149 | palloc0(nbuckets * sizeof(HashJoinTuple)); |
2150 | |
2151 | hashtable->spaceUsed = 0; |
2152 | |
2153 | MemoryContextSwitchTo(oldcxt); |
2154 | |
2155 | /* Forget the chunks (the memory was freed by the context reset above). */ |
2156 | hashtable->chunks = NULL; |
2157 | } |
2158 | |
2159 | /* |
2160 | * ExecHashTableResetMatchFlags |
2161 | * Clear all the HeapTupleHeaderHasMatch flags in the table |
2162 | */ |
2163 | void |
2164 | ExecHashTableResetMatchFlags(HashJoinTable hashtable) |
2165 | { |
2166 | HashJoinTuple tuple; |
2167 | int i; |
2168 | |
2169 | /* Reset all flags in the main table ... */ |
2170 | for (i = 0; i < hashtable->nbuckets; i++) |
2171 | { |
2172 | for (tuple = hashtable->buckets.unshared[i]; tuple != NULL; |
2173 | tuple = tuple->next.unshared) |
2174 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); |
2175 | } |
2176 | |
2177 | /* ... and the same for the skew buckets, if any */ |
2178 | for (i = 0; i < hashtable->nSkewBuckets; i++) |
2179 | { |
2180 | int j = hashtable->skewBucketNums[i]; |
2181 | HashSkewBucket *skewBucket = hashtable->skewBucket[j]; |
2182 | |
2183 | for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared) |
2184 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); |
2185 | } |
2186 | } |
2187 | |
2188 | |
2189 | void |
2190 | ExecReScanHash(HashState *node) |
2191 | { |
2192 | /* |
2193 | * if chgParam of subnode is not null then plan will be re-scanned by |
2194 | * first ExecProcNode. |
2195 | */ |
2196 | if (node->ps.lefttree->chgParam == NULL) |
2197 | ExecReScan(node->ps.lefttree); |
2198 | } |
2199 | |
2200 | |
2201 | /* |
2202 | * ExecHashBuildSkewHash |
2203 | * |
2204 | * Set up for skew optimization if we can identify the most common values |
2205 | * (MCVs) of the outer relation's join key. We make a skew hash bucket |
2206 | * for the hash value of each MCV, up to the number of slots allowed |
2207 | * based on available memory. |
2208 | */ |
2209 | static void |
2210 | ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) |
2211 | { |
2212 | HeapTupleData *statsTuple; |
2213 | AttStatsSlot sslot; |
2214 | |
2215 | /* Do nothing if planner didn't identify the outer relation's join key */ |
2216 | if (!OidIsValid(node->skewTable)) |
2217 | return; |
2218 | /* Also, do nothing if we don't have room for at least one skew bucket */ |
2219 | if (mcvsToUse <= 0) |
2220 | return; |
2221 | |
2222 | /* |
2223 | * Try to find the MCV statistics for the outer relation's join key. |
2224 | */ |
2225 | statsTuple = SearchSysCache3(STATRELATTINH, |
2226 | ObjectIdGetDatum(node->skewTable), |
2227 | Int16GetDatum(node->skewColumn), |
2228 | BoolGetDatum(node->skewInherit)); |
2229 | if (!HeapTupleIsValid(statsTuple)) |
2230 | return; |
2231 | |
2232 | if (get_attstatsslot(&sslot, statsTuple, |
2233 | STATISTIC_KIND_MCV, InvalidOid, |
2234 | ATTSTATSSLOT_VALUES | ATTSTATSSLOT_NUMBERS)) |
2235 | { |
2236 | double frac; |
2237 | int nbuckets; |
2238 | FmgrInfo *hashfunctions; |
2239 | int i; |
2240 | |
2241 | if (mcvsToUse > sslot.nvalues) |
2242 | mcvsToUse = sslot.nvalues; |
2243 | |
2244 | /* |
2245 | * Calculate the expected fraction of outer relation that will |
2246 | * participate in the skew optimization. If this isn't at least |
2247 | * SKEW_MIN_OUTER_FRACTION, don't use skew optimization. |
2248 | */ |
2249 | frac = 0; |
2250 | for (i = 0; i < mcvsToUse; i++) |
2251 | frac += sslot.numbers[i]; |
2252 | if (frac < SKEW_MIN_OUTER_FRACTION) |
2253 | { |
2254 | free_attstatsslot(&sslot); |
2255 | ReleaseSysCache(statsTuple); |
2256 | return; |
2257 | } |
2258 | |
2259 | /* |
2260 | * Okay, set up the skew hashtable. |
2261 | * |
2262 | * skewBucket[] is an open addressing hashtable with a power of 2 size |
2263 | * that is greater than the number of MCV values. (This ensures there |
2264 | * will be at least one null entry, so searches will always |
2265 | * terminate.) |
2266 | * |
2267 | * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or |
2268 | * MaxAllocSize/sizeof(void *)/8, but that is not currently possible |
2269 | * since we limit pg_statistic entries to much less than that. |
2270 | */ |
2271 | nbuckets = 2; |
2272 | while (nbuckets <= mcvsToUse) |
2273 | nbuckets <<= 1; |
2274 | /* use two more bits just to help avoid collisions */ |
2275 | nbuckets <<= 2; |
2276 | |
2277 | hashtable->skewEnabled = true; |
2278 | hashtable->skewBucketLen = nbuckets; |
2279 | |
2280 | /* |
2281 | * We allocate the bucket memory in the hashtable's batch context. It |
2282 | * is only needed during the first batch, and this ensures it will be |
2283 | * automatically removed once the first batch is done. |
2284 | */ |
2285 | hashtable->skewBucket = (HashSkewBucket **) |
2286 | MemoryContextAllocZero(hashtable->batchCxt, |
2287 | nbuckets * sizeof(HashSkewBucket *)); |
2288 | hashtable->skewBucketNums = (int *) |
2289 | MemoryContextAllocZero(hashtable->batchCxt, |
2290 | mcvsToUse * sizeof(int)); |
2291 | |
2292 | hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *) |
2293 | + mcvsToUse * sizeof(int); |
2294 | hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) |
2295 | + mcvsToUse * sizeof(int); |
2296 | if (hashtable->spaceUsed > hashtable->spacePeak) |
2297 | hashtable->spacePeak = hashtable->spaceUsed; |
2298 | |
2299 | /* |
2300 | * Create a skew bucket for each MCV hash value. |
2301 | * |
2302 | * Note: it is very important that we create the buckets in order of |
2303 | * decreasing MCV frequency. If we have to remove some buckets, they |
2304 | * must be removed in reverse order of creation (see notes in |
2305 | * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to |
2306 | * be removed first. |
2307 | */ |
2308 | hashfunctions = hashtable->outer_hashfunctions; |
2309 | |
2310 | for (i = 0; i < mcvsToUse; i++) |
2311 | { |
2312 | uint32 hashvalue; |
2313 | int bucket; |
2314 | |
2315 | hashvalue = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[0], |
2316 | hashtable->collations[0], |
2317 | sslot.values[i])); |
2318 | |
2319 | /* |
2320 | * While we have not hit a hole in the hashtable and have not hit |
2321 | * the desired bucket, we have collided with some previous hash |
2322 | * value, so try the next bucket location. NB: this code must |
2323 | * match ExecHashGetSkewBucket. |
2324 | */ |
2325 | bucket = hashvalue & (nbuckets - 1); |
2326 | while (hashtable->skewBucket[bucket] != NULL && |
2327 | hashtable->skewBucket[bucket]->hashvalue != hashvalue) |
2328 | bucket = (bucket + 1) & (nbuckets - 1); |
2329 | |
2330 | /* |
2331 | * If we found an existing bucket with the same hashvalue, leave |
2332 | * it alone. It's okay for two MCVs to share a hashvalue. |
2333 | */ |
2334 | if (hashtable->skewBucket[bucket] != NULL) |
2335 | continue; |
2336 | |
2337 | /* Okay, create a new skew bucket for this hashvalue. */ |
2338 | hashtable->skewBucket[bucket] = (HashSkewBucket *) |
2339 | MemoryContextAlloc(hashtable->batchCxt, |
2340 | sizeof(HashSkewBucket)); |
2341 | hashtable->skewBucket[bucket]->hashvalue = hashvalue; |
2342 | hashtable->skewBucket[bucket]->tuples = NULL; |
2343 | hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; |
2344 | hashtable->nSkewBuckets++; |
2345 | hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; |
2346 | hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; |
2347 | if (hashtable->spaceUsed > hashtable->spacePeak) |
2348 | hashtable->spacePeak = hashtable->spaceUsed; |
2349 | } |
2350 | |
2351 | free_attstatsslot(&sslot); |
2352 | } |
2353 | |
2354 | ReleaseSysCache(statsTuple); |
2355 | } |
2356 | |
2357 | /* |
2358 | * ExecHashGetSkewBucket |
2359 | * |
2360 | * Returns the index of the skew bucket for this hashvalue, |
2361 | * or INVALID_SKEW_BUCKET_NO if the hashvalue is not |
2362 | * associated with any active skew bucket. |
2363 | */ |
2364 | int |
2365 | ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) |
2366 | { |
2367 | int bucket; |
2368 | |
2369 | /* |
2370 | * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in |
2371 | * particular, this happens after the initial batch is done). |
2372 | */ |
2373 | if (!hashtable->skewEnabled) |
2374 | return INVALID_SKEW_BUCKET_NO; |
2375 | |
2376 | /* |
2377 | * Since skewBucketLen is a power of 2, we can do a modulo by ANDing. |
2378 | */ |
2379 | bucket = hashvalue & (hashtable->skewBucketLen - 1); |
2380 | |
2381 | /* |
2382 | * While we have not hit a hole in the hashtable and have not hit the |
2383 | * desired bucket, we have collided with some other hash value, so try the |
2384 | * next bucket location. |
2385 | */ |
2386 | while (hashtable->skewBucket[bucket] != NULL && |
2387 | hashtable->skewBucket[bucket]->hashvalue != hashvalue) |
2388 | bucket = (bucket + 1) & (hashtable->skewBucketLen - 1); |
2389 | |
2390 | /* |
2391 | * Found the desired bucket? |
2392 | */ |
2393 | if (hashtable->skewBucket[bucket] != NULL) |
2394 | return bucket; |
2395 | |
2396 | /* |
2397 | * There must not be any hashtable entry for this hash value. |
2398 | */ |
2399 | return INVALID_SKEW_BUCKET_NO; |
2400 | } |
2401 | |
2402 | /* |
2403 | * ExecHashSkewTableInsert |
2404 | * |
2405 | * Insert a tuple into the skew hashtable. |
2406 | * |
2407 | * This should generally match up with the current-batch case in |
2408 | * ExecHashTableInsert. |
2409 | */ |
2410 | static void |
2411 | ExecHashSkewTableInsert(HashJoinTable hashtable, |
2412 | TupleTableSlot *slot, |
2413 | uint32 hashvalue, |
2414 | int bucketNumber) |
2415 | { |
2416 | bool shouldFree; |
2417 | MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
2418 | HashJoinTuple hashTuple; |
2419 | int hashTupleSize; |
2420 | |
2421 | /* Create the HashJoinTuple */ |
2422 | hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
2423 | hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, |
2424 | hashTupleSize); |
2425 | hashTuple->hashvalue = hashvalue; |
2426 | memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); |
2427 | HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); |
2428 | |
2429 | /* Push it onto the front of the skew bucket's list */ |
2430 | hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples; |
2431 | hashtable->skewBucket[bucketNumber]->tuples = hashTuple; |
2432 | Assert(hashTuple != hashTuple->next.unshared); |
2433 | |
2434 | /* Account for space used, and back off if we've used too much */ |
2435 | hashtable->spaceUsed += hashTupleSize; |
2436 | hashtable->spaceUsedSkew += hashTupleSize; |
2437 | if (hashtable->spaceUsed > hashtable->spacePeak) |
2438 | hashtable->spacePeak = hashtable->spaceUsed; |
2439 | while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) |
2440 | ExecHashRemoveNextSkewBucket(hashtable); |
2441 | |
2442 | /* Check we are not over the total spaceAllowed, either */ |
2443 | if (hashtable->spaceUsed > hashtable->spaceAllowed) |
2444 | ExecHashIncreaseNumBatches(hashtable); |
2445 | |
2446 | if (shouldFree) |
2447 | heap_free_minimal_tuple(tuple); |
2448 | } |
2449 | |
2450 | /* |
2451 | * ExecHashRemoveNextSkewBucket |
2452 | * |
2453 | * Remove the least valuable skew bucket by pushing its tuples into |
2454 | * the main hash table. |
2455 | */ |
2456 | static void |
2457 | ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) |
2458 | { |
2459 | int bucketToRemove; |
2460 | HashSkewBucket *bucket; |
2461 | uint32 hashvalue; |
2462 | int bucketno; |
2463 | int batchno; |
2464 | HashJoinTuple hashTuple; |
2465 | |
2466 | /* Locate the bucket to remove */ |
2467 | bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; |
2468 | bucket = hashtable->skewBucket[bucketToRemove]; |
2469 | |
2470 | /* |
2471 | * Calculate which bucket and batch the tuples belong to in the main |
2472 | * hashtable. They all have the same hash value, so it's the same for all |
2473 | * of them. Also note that it's not possible for nbatch to increase while |
2474 | * we are processing the tuples. |
2475 | */ |
2476 | hashvalue = bucket->hashvalue; |
2477 | ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); |
2478 | |
2479 | /* Process all tuples in the bucket */ |
2480 | hashTuple = bucket->tuples; |
2481 | while (hashTuple != NULL) |
2482 | { |
2483 | HashJoinTuple nextHashTuple = hashTuple->next.unshared; |
2484 | MinimalTuple tuple; |
2485 | Size tupleSize; |
2486 | |
2487 | /* |
2488 | * This code must agree with ExecHashTableInsert. We do not use |
2489 | * ExecHashTableInsert directly as ExecHashTableInsert expects a |
2490 | * TupleTableSlot while we already have HashJoinTuples. |
2491 | */ |
2492 | tuple = HJTUPLE_MINTUPLE(hashTuple); |
2493 | tupleSize = HJTUPLE_OVERHEAD + tuple->t_len; |
2494 | |
2495 | /* Decide whether to put the tuple in the hash table or a temp file */ |
2496 | if (batchno == hashtable->curbatch) |
2497 | { |
2498 | /* Move the tuple to the main hash table */ |
2499 | HashJoinTuple copyTuple; |
2500 | |
2501 | /* |
2502 | * We must copy the tuple into the dense storage, else it will not |
2503 | * be found by, eg, ExecHashIncreaseNumBatches. |
2504 | */ |
2505 | copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize); |
2506 | memcpy(copyTuple, hashTuple, tupleSize); |
2507 | pfree(hashTuple); |
2508 | |
2509 | copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; |
2510 | hashtable->buckets.unshared[bucketno] = copyTuple; |
2511 | |
2512 | /* We have reduced skew space, but overall space doesn't change */ |
2513 | hashtable->spaceUsedSkew -= tupleSize; |
2514 | } |
2515 | else |
2516 | { |
2517 | /* Put the tuple into a temp file for later batches */ |
2518 | Assert(batchno > hashtable->curbatch); |
2519 | ExecHashJoinSaveTuple(tuple, hashvalue, |
2520 | &hashtable->innerBatchFile[batchno]); |
2521 | pfree(hashTuple); |
2522 | hashtable->spaceUsed -= tupleSize; |
2523 | hashtable->spaceUsedSkew -= tupleSize; |
2524 | } |
2525 | |
2526 | hashTuple = nextHashTuple; |
2527 | |
2528 | /* allow this loop to be cancellable */ |
2529 | CHECK_FOR_INTERRUPTS(); |
2530 | } |
2531 | |
2532 | /* |
2533 | * Free the bucket struct itself and reset the hashtable entry to NULL. |
2534 | * |
2535 | * NOTE: this is not nearly as simple as it looks on the surface, because |
2536 | * of the possibility of collisions in the hashtable. Suppose that hash |
2537 | * values A and B collide at a particular hashtable entry, and that A was |
2538 | * entered first so B gets shifted to a different table entry. If we were |
2539 | * to remove A first then ExecHashGetSkewBucket would mistakenly start |
2540 | * reporting that B is not in the hashtable, because it would hit the NULL |
2541 | * before finding B. However, we always remove entries in the reverse |
2542 | * order of creation, so this failure cannot happen. |
2543 | */ |
2544 | hashtable->skewBucket[bucketToRemove] = NULL; |
2545 | hashtable->nSkewBuckets--; |
2546 | pfree(bucket); |
2547 | hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD; |
2548 | hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD; |
2549 | |
2550 | /* |
2551 | * If we have removed all skew buckets then give up on skew optimization. |
2552 | * Release the arrays since they aren't useful any more. |
2553 | */ |
2554 | if (hashtable->nSkewBuckets == 0) |
2555 | { |
2556 | hashtable->skewEnabled = false; |
2557 | pfree(hashtable->skewBucket); |
2558 | pfree(hashtable->skewBucketNums); |
2559 | hashtable->skewBucket = NULL; |
2560 | hashtable->skewBucketNums = NULL; |
2561 | hashtable->spaceUsed -= hashtable->spaceUsedSkew; |
2562 | hashtable->spaceUsedSkew = 0; |
2563 | } |
2564 | } |
2565 | |
2566 | /* |
2567 | * Reserve space in the DSM segment for instrumentation data. |
2568 | */ |
2569 | void |
2570 | ExecHashEstimate(HashState *node, ParallelContext *pcxt) |
2571 | { |
2572 | size_t size; |
2573 | |
2574 | /* don't need this if not instrumenting or no workers */ |
2575 | if (!node->ps.instrument || pcxt->nworkers == 0) |
2576 | return; |
2577 | |
2578 | size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation)); |
2579 | size = add_size(size, offsetof(SharedHashInfo, hinstrument)); |
2580 | shm_toc_estimate_chunk(&pcxt->estimator, size); |
2581 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
2582 | } |
2583 | |
2584 | /* |
2585 | * Set up a space in the DSM for all workers to record instrumentation data |
2586 | * about their hash table. |
2587 | */ |
2588 | void |
2589 | ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) |
2590 | { |
2591 | size_t size; |
2592 | |
2593 | /* don't need this if not instrumenting or no workers */ |
2594 | if (!node->ps.instrument || pcxt->nworkers == 0) |
2595 | return; |
2596 | |
2597 | size = offsetof(SharedHashInfo, hinstrument) + |
2598 | pcxt->nworkers * sizeof(HashInstrumentation); |
2599 | node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); |
2600 | memset(node->shared_info, 0, size); |
2601 | node->shared_info->num_workers = pcxt->nworkers; |
2602 | shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, |
2603 | node->shared_info); |
2604 | } |
2605 | |
2606 | /* |
2607 | * Locate the DSM space for hash table instrumentation data that we'll write |
2608 | * to at shutdown time. |
2609 | */ |
2610 | void |
2611 | ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) |
2612 | { |
2613 | SharedHashInfo *shared_info; |
2614 | |
2615 | /* don't need this if not instrumenting */ |
2616 | if (!node->ps.instrument) |
2617 | return; |
2618 | |
2619 | shared_info = (SharedHashInfo *) |
2620 | shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); |
2621 | node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; |
2622 | } |
2623 | |
2624 | /* |
2625 | * Copy instrumentation data from this worker's hash table (if it built one) |
2626 | * to DSM memory so the leader can retrieve it. This must be done in an |
2627 | * ExecShutdownHash() rather than ExecEndHash() because the latter runs after |
2628 | * we've detached from the DSM segment. |
2629 | */ |
2630 | void |
2631 | ExecShutdownHash(HashState *node) |
2632 | { |
2633 | if (node->hinstrument && node->hashtable) |
2634 | ExecHashGetInstrumentation(node->hinstrument, node->hashtable); |
2635 | } |
2636 | |
2637 | /* |
2638 | * Retrieve instrumentation data from workers before the DSM segment is |
2639 | * detached, so that EXPLAIN can access it. |
2640 | */ |
2641 | void |
2642 | ExecHashRetrieveInstrumentation(HashState *node) |
2643 | { |
2644 | SharedHashInfo *shared_info = node->shared_info; |
2645 | size_t size; |
2646 | |
2647 | if (shared_info == NULL) |
2648 | return; |
2649 | |
2650 | /* Replace node->shared_info with a copy in backend-local memory. */ |
2651 | size = offsetof(SharedHashInfo, hinstrument) + |
2652 | shared_info->num_workers * sizeof(HashInstrumentation); |
2653 | node->shared_info = palloc(size); |
2654 | memcpy(node->shared_info, shared_info, size); |
2655 | } |
2656 | |
2657 | /* |
2658 | * Copy the instrumentation data from 'hashtable' into a HashInstrumentation |
2659 | * struct. |
2660 | */ |
2661 | void |
2662 | ExecHashGetInstrumentation(HashInstrumentation *instrument, |
2663 | HashJoinTable hashtable) |
2664 | { |
2665 | instrument->nbuckets = hashtable->nbuckets; |
2666 | instrument->nbuckets_original = hashtable->nbuckets_original; |
2667 | instrument->nbatch = hashtable->nbatch; |
2668 | instrument->nbatch_original = hashtable->nbatch_original; |
2669 | instrument->space_peak = hashtable->spacePeak; |
2670 | } |
2671 | |
2672 | /* |
2673 | * Allocate 'size' bytes from the currently active HashMemoryChunk |
2674 | */ |
2675 | static void * |
2676 | dense_alloc(HashJoinTable hashtable, Size size) |
2677 | { |
2678 | HashMemoryChunk newChunk; |
2679 | char *ptr; |
2680 | |
2681 | /* just in case the size is not already aligned properly */ |
2682 | size = MAXALIGN(size); |
2683 | |
2684 | /* |
2685 | * If tuple size is larger than threshold, allocate a separate chunk. |
2686 | */ |
2687 | if (size > HASH_CHUNK_THRESHOLD) |
2688 | { |
2689 | /* allocate new chunk and put it at the beginning of the list */ |
2690 | newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, |
2691 | HASH_CHUNK_HEADER_SIZE + size); |
2692 | newChunk->maxlen = size; |
2693 | newChunk->used = size; |
2694 | newChunk->ntuples = 1; |
2695 | |
2696 | /* |
2697 | * Add this chunk to the list after the first existing chunk, so that |
2698 | * we don't lose the remaining space in the "current" chunk. |
2699 | */ |
2700 | if (hashtable->chunks != NULL) |
2701 | { |
2702 | newChunk->next = hashtable->chunks->next; |
2703 | hashtable->chunks->next.unshared = newChunk; |
2704 | } |
2705 | else |
2706 | { |
2707 | newChunk->next.unshared = hashtable->chunks; |
2708 | hashtable->chunks = newChunk; |
2709 | } |
2710 | |
2711 | return HASH_CHUNK_DATA(newChunk); |
2712 | } |
2713 | |
2714 | /* |
2715 | * See if we have enough space for it in the current chunk (if any). If |
2716 | * not, allocate a fresh chunk. |
2717 | */ |
2718 | if ((hashtable->chunks == NULL) || |
2719 | (hashtable->chunks->maxlen - hashtable->chunks->used) < size) |
2720 | { |
2721 | /* allocate new chunk and put it at the beginning of the list */ |
2722 | newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, |
2723 | HASH_CHUNK_HEADER_SIZE + HASH_CHUNK_SIZE); |
2724 | |
2725 | newChunk->maxlen = HASH_CHUNK_SIZE; |
2726 | newChunk->used = size; |
2727 | newChunk->ntuples = 1; |
2728 | |
2729 | newChunk->next.unshared = hashtable->chunks; |
2730 | hashtable->chunks = newChunk; |
2731 | |
2732 | return HASH_CHUNK_DATA(newChunk); |
2733 | } |
2734 | |
2735 | /* There is enough space in the current chunk, let's add the tuple */ |
2736 | ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used; |
2737 | hashtable->chunks->used += size; |
2738 | hashtable->chunks->ntuples += 1; |
2739 | |
2740 | /* return pointer to the start of the tuple memory */ |
2741 | return ptr; |
2742 | } |
2743 | |
2744 | /* |
2745 | * Allocate space for a tuple in shared dense storage. This is equivalent to |
2746 | * dense_alloc but for Parallel Hash using shared memory. |
2747 | * |
2748 | * While loading a tuple into shared memory, we might run out of memory and |
2749 | * decide to repartition, or determine that the load factor is too high and |
2750 | * decide to expand the bucket array, or discover that another participant has |
2751 | * commanded us to help do that. Return NULL if number of buckets or batches |
2752 | * has changed, indicating that the caller must retry (considering the |
2753 | * possibility that the tuple no longer belongs in the same batch). |
2754 | */ |
2755 | static HashJoinTuple |
2756 | ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, |
2757 | dsa_pointer *shared) |
2758 | { |
2759 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
2760 | dsa_pointer chunk_shared; |
2761 | HashMemoryChunk chunk; |
2762 | Size chunk_size; |
2763 | HashJoinTuple result; |
2764 | int curbatch = hashtable->curbatch; |
2765 | |
2766 | size = MAXALIGN(size); |
2767 | |
2768 | /* |
2769 | * Fast path: if there is enough space in this backend's current chunk, |
2770 | * then we can allocate without any locking. |
2771 | */ |
2772 | chunk = hashtable->current_chunk; |
2773 | if (chunk != NULL && |
2774 | size <= HASH_CHUNK_THRESHOLD && |
2775 | chunk->maxlen - chunk->used >= size) |
2776 | { |
2777 | |
2778 | chunk_shared = hashtable->current_chunk_shared; |
2779 | Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); |
2780 | *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used; |
2781 | result = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + chunk->used); |
2782 | chunk->used += size; |
2783 | |
2784 | Assert(chunk->used <= chunk->maxlen); |
2785 | Assert(result == dsa_get_address(hashtable->area, *shared)); |
2786 | |
2787 | return result; |
2788 | } |
2789 | |
2790 | /* Slow path: try to allocate a new chunk. */ |
2791 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
2792 | |
2793 | /* |
2794 | * Check if we need to help increase the number of buckets or batches. |
2795 | */ |
2796 | if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || |
2797 | pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
2798 | { |
2799 | ParallelHashGrowth growth = pstate->growth; |
2800 | |
2801 | hashtable->current_chunk = NULL; |
2802 | LWLockRelease(&pstate->lock); |
2803 | |
2804 | /* Another participant has commanded us to help grow. */ |
2805 | if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) |
2806 | ExecParallelHashIncreaseNumBatches(hashtable); |
2807 | else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
2808 | ExecParallelHashIncreaseNumBuckets(hashtable); |
2809 | |
2810 | /* The caller must retry. */ |
2811 | return NULL; |
2812 | } |
2813 | |
2814 | /* Oversized tuples get their own chunk. */ |
2815 | if (size > HASH_CHUNK_THRESHOLD) |
2816 | chunk_size = size + HASH_CHUNK_HEADER_SIZE; |
2817 | else |
2818 | chunk_size = HASH_CHUNK_SIZE; |
2819 | |
2820 | /* Check if it's time to grow batches or buckets. */ |
2821 | if (pstate->growth != PHJ_GROWTH_DISABLED) |
2822 | { |
2823 | Assert(curbatch == 0); |
2824 | Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); |
2825 | |
2826 | /* |
2827 | * Check if our space limit would be exceeded. To avoid choking on |
2828 | * very large tuples or very low work_mem setting, we'll always allow |
2829 | * each backend to allocate at least one chunk. |
2830 | */ |
2831 | if (hashtable->batches[0].at_least_one_chunk && |
2832 | hashtable->batches[0].shared->size + |
2833 | chunk_size > pstate->space_allowed) |
2834 | { |
2835 | pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
2836 | hashtable->batches[0].shared->space_exhausted = true; |
2837 | LWLockRelease(&pstate->lock); |
2838 | |
2839 | return NULL; |
2840 | } |
2841 | |
2842 | /* Check if our load factor limit would be exceeded. */ |
2843 | if (hashtable->nbatch == 1) |
2844 | { |
2845 | hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples; |
2846 | hashtable->batches[0].ntuples = 0; |
2847 | /* Guard against integer overflow and alloc size overflow */ |
2848 | if (hashtable->batches[0].shared->ntuples + 1 > |
2849 | hashtable->nbuckets * NTUP_PER_BUCKET && |
2850 | hashtable->nbuckets < (INT_MAX / 2) && |
2851 | hashtable->nbuckets * 2 <= |
2852 | MaxAllocSize / sizeof(dsa_pointer_atomic)) |
2853 | { |
2854 | pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS; |
2855 | LWLockRelease(&pstate->lock); |
2856 | |
2857 | return NULL; |
2858 | } |
2859 | } |
2860 | } |
2861 | |
2862 | /* We are cleared to allocate a new chunk. */ |
2863 | chunk_shared = dsa_allocate(hashtable->area, chunk_size); |
2864 | hashtable->batches[curbatch].shared->size += chunk_size; |
2865 | hashtable->batches[curbatch].at_least_one_chunk = true; |
2866 | |
2867 | /* Set up the chunk. */ |
2868 | chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); |
2869 | *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE; |
2870 | chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE; |
2871 | chunk->used = size; |
2872 | |
2873 | /* |
2874 | * Push it onto the list of chunks, so that it can be found if we need to |
2875 | * increase the number of buckets or batches (batch 0 only) and later for |
2876 | * freeing the memory (all batches). |
2877 | */ |
2878 | chunk->next.shared = hashtable->batches[curbatch].shared->chunks; |
2879 | hashtable->batches[curbatch].shared->chunks = chunk_shared; |
2880 | |
2881 | if (size <= HASH_CHUNK_THRESHOLD) |
2882 | { |
2883 | /* |
2884 | * Make this the current chunk so that we can use the fast path to |
2885 | * fill the rest of it up in future calls. |
2886 | */ |
2887 | hashtable->current_chunk = chunk; |
2888 | hashtable->current_chunk_shared = chunk_shared; |
2889 | } |
2890 | LWLockRelease(&pstate->lock); |
2891 | |
2892 | Assert(HASH_CHUNK_DATA(chunk) == dsa_get_address(hashtable->area, *shared)); |
2893 | result = (HashJoinTuple) HASH_CHUNK_DATA(chunk); |
2894 | |
2895 | return result; |
2896 | } |
2897 | |
2898 | /* |
2899 | * One backend needs to set up the shared batch state including tuplestores. |
2900 | * Other backends will ensure they have correctly configured accessors by |
2901 | * called ExecParallelHashEnsureBatchAccessors(). |
2902 | */ |
2903 | static void |
2904 | ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) |
2905 | { |
2906 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
2907 | ParallelHashJoinBatch *batches; |
2908 | MemoryContext oldcxt; |
2909 | int i; |
2910 | |
2911 | Assert(hashtable->batches == NULL); |
2912 | |
2913 | /* Allocate space. */ |
2914 | pstate->batches = |
2915 | dsa_allocate0(hashtable->area, |
2916 | EstimateParallelHashJoinBatch(hashtable) * nbatch); |
2917 | pstate->nbatch = nbatch; |
2918 | batches = dsa_get_address(hashtable->area, pstate->batches); |
2919 | |
2920 | /* Use hash join memory context. */ |
2921 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
2922 | |
2923 | /* Allocate this backend's accessor array. */ |
2924 | hashtable->nbatch = nbatch; |
2925 | hashtable->batches = (ParallelHashJoinBatchAccessor *) |
2926 | palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); |
2927 | |
2928 | /* Set up the shared state, tuplestores and backend-local accessors. */ |
2929 | for (i = 0; i < hashtable->nbatch; ++i) |
2930 | { |
2931 | ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; |
2932 | ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); |
2933 | char name[MAXPGPATH]; |
2934 | |
2935 | /* |
2936 | * All members of shared were zero-initialized. We just need to set |
2937 | * up the Barrier. |
2938 | */ |
2939 | BarrierInit(&shared->batch_barrier, 0); |
2940 | if (i == 0) |
2941 | { |
2942 | /* Batch 0 doesn't need to be loaded. */ |
2943 | BarrierAttach(&shared->batch_barrier); |
2944 | while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING) |
2945 | BarrierArriveAndWait(&shared->batch_barrier, 0); |
2946 | BarrierDetach(&shared->batch_barrier); |
2947 | } |
2948 | |
2949 | /* Initialize accessor state. All members were zero-initialized. */ |
2950 | accessor->shared = shared; |
2951 | |
2952 | /* Initialize the shared tuplestores. */ |
2953 | snprintf(name, sizeof(name), "i%dof%d" , i, hashtable->nbatch); |
2954 | accessor->inner_tuples = |
2955 | sts_initialize(ParallelHashJoinBatchInner(shared), |
2956 | pstate->nparticipants, |
2957 | ParallelWorkerNumber + 1, |
2958 | sizeof(uint32), |
2959 | SHARED_TUPLESTORE_SINGLE_PASS, |
2960 | &pstate->fileset, |
2961 | name); |
2962 | snprintf(name, sizeof(name), "o%dof%d" , i, hashtable->nbatch); |
2963 | accessor->outer_tuples = |
2964 | sts_initialize(ParallelHashJoinBatchOuter(shared, |
2965 | pstate->nparticipants), |
2966 | pstate->nparticipants, |
2967 | ParallelWorkerNumber + 1, |
2968 | sizeof(uint32), |
2969 | SHARED_TUPLESTORE_SINGLE_PASS, |
2970 | &pstate->fileset, |
2971 | name); |
2972 | } |
2973 | |
2974 | MemoryContextSwitchTo(oldcxt); |
2975 | } |
2976 | |
2977 | /* |
2978 | * Free the current set of ParallelHashJoinBatchAccessor objects. |
2979 | */ |
2980 | static void |
2981 | ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable) |
2982 | { |
2983 | int i; |
2984 | |
2985 | for (i = 0; i < hashtable->nbatch; ++i) |
2986 | { |
2987 | /* Make sure no files are left open. */ |
2988 | sts_end_write(hashtable->batches[i].inner_tuples); |
2989 | sts_end_write(hashtable->batches[i].outer_tuples); |
2990 | sts_end_parallel_scan(hashtable->batches[i].inner_tuples); |
2991 | sts_end_parallel_scan(hashtable->batches[i].outer_tuples); |
2992 | } |
2993 | pfree(hashtable->batches); |
2994 | hashtable->batches = NULL; |
2995 | } |
2996 | |
2997 | /* |
2998 | * Make sure this backend has up-to-date accessors for the current set of |
2999 | * batches. |
3000 | */ |
3001 | static void |
3002 | ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) |
3003 | { |
3004 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
3005 | ParallelHashJoinBatch *batches; |
3006 | MemoryContext oldcxt; |
3007 | int i; |
3008 | |
3009 | if (hashtable->batches != NULL) |
3010 | { |
3011 | if (hashtable->nbatch == pstate->nbatch) |
3012 | return; |
3013 | ExecParallelHashCloseBatchAccessors(hashtable); |
3014 | } |
3015 | |
3016 | /* |
3017 | * It's possible for a backend to start up very late so that the whole |
3018 | * join is finished and the shm state for tracking batches has already |
3019 | * been freed by ExecHashTableDetach(). In that case we'll just leave |
3020 | * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives |
3021 | * up early. |
3022 | */ |
3023 | if (!DsaPointerIsValid(pstate->batches)) |
3024 | return; |
3025 | |
3026 | /* Use hash join memory context. */ |
3027 | oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); |
3028 | |
3029 | /* Allocate this backend's accessor array. */ |
3030 | hashtable->nbatch = pstate->nbatch; |
3031 | hashtable->batches = (ParallelHashJoinBatchAccessor *) |
3032 | palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); |
3033 | |
3034 | /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */ |
3035 | batches = (ParallelHashJoinBatch *) |
3036 | dsa_get_address(hashtable->area, pstate->batches); |
3037 | |
3038 | /* Set up the accessor array and attach to the tuplestores. */ |
3039 | for (i = 0; i < hashtable->nbatch; ++i) |
3040 | { |
3041 | ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; |
3042 | ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); |
3043 | |
3044 | accessor->shared = shared; |
3045 | accessor->preallocated = 0; |
3046 | accessor->done = false; |
3047 | accessor->inner_tuples = |
3048 | sts_attach(ParallelHashJoinBatchInner(shared), |
3049 | ParallelWorkerNumber + 1, |
3050 | &pstate->fileset); |
3051 | accessor->outer_tuples = |
3052 | sts_attach(ParallelHashJoinBatchOuter(shared, |
3053 | pstate->nparticipants), |
3054 | ParallelWorkerNumber + 1, |
3055 | &pstate->fileset); |
3056 | } |
3057 | |
3058 | MemoryContextSwitchTo(oldcxt); |
3059 | } |
3060 | |
3061 | /* |
3062 | * Allocate an empty shared memory hash table for a given batch. |
3063 | */ |
3064 | void |
3065 | ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno) |
3066 | { |
3067 | ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared; |
3068 | dsa_pointer_atomic *buckets; |
3069 | int nbuckets = hashtable->parallel_state->nbuckets; |
3070 | int i; |
3071 | |
3072 | batch->buckets = |
3073 | dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets); |
3074 | buckets = (dsa_pointer_atomic *) |
3075 | dsa_get_address(hashtable->area, batch->buckets); |
3076 | for (i = 0; i < nbuckets; ++i) |
3077 | dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); |
3078 | } |
3079 | |
3080 | /* |
3081 | * If we are currently attached to a shared hash join batch, detach. If we |
3082 | * are last to detach, clean up. |
3083 | */ |
3084 | void |
3085 | ExecHashTableDetachBatch(HashJoinTable hashtable) |
3086 | { |
3087 | if (hashtable->parallel_state != NULL && |
3088 | hashtable->curbatch >= 0) |
3089 | { |
3090 | int curbatch = hashtable->curbatch; |
3091 | ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; |
3092 | |
3093 | /* Make sure any temporary files are closed. */ |
3094 | sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); |
3095 | sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); |
3096 | |
3097 | /* Detach from the batch we were last working on. */ |
3098 | if (BarrierArriveAndDetach(&batch->batch_barrier)) |
3099 | { |
3100 | /* |
3101 | * Technically we shouldn't access the barrier because we're no |
3102 | * longer attached, but since there is no way it's moving after |
3103 | * this point it seems safe to make the following assertion. |
3104 | */ |
3105 | Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); |
3106 | |
3107 | /* Free shared chunks and buckets. */ |
3108 | while (DsaPointerIsValid(batch->chunks)) |
3109 | { |
3110 | HashMemoryChunk chunk = |
3111 | dsa_get_address(hashtable->area, batch->chunks); |
3112 | dsa_pointer next = chunk->next.shared; |
3113 | |
3114 | dsa_free(hashtable->area, batch->chunks); |
3115 | batch->chunks = next; |
3116 | } |
3117 | if (DsaPointerIsValid(batch->buckets)) |
3118 | { |
3119 | dsa_free(hashtable->area, batch->buckets); |
3120 | batch->buckets = InvalidDsaPointer; |
3121 | } |
3122 | } |
3123 | |
3124 | /* |
3125 | * Track the largest batch we've been attached to. Though each |
3126 | * backend might see a different subset of batches, explain.c will |
3127 | * scan the results from all backends to find the largest value. |
3128 | */ |
3129 | hashtable->spacePeak = |
3130 | Max(hashtable->spacePeak, |
3131 | batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); |
3132 | |
3133 | /* Remember that we are not attached to a batch. */ |
3134 | hashtable->curbatch = -1; |
3135 | } |
3136 | } |
3137 | |
3138 | /* |
3139 | * Detach from all shared resources. If we are last to detach, clean up. |
3140 | */ |
3141 | void |
3142 | ExecHashTableDetach(HashJoinTable hashtable) |
3143 | { |
3144 | if (hashtable->parallel_state) |
3145 | { |
3146 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
3147 | int i; |
3148 | |
3149 | /* Make sure any temporary files are closed. */ |
3150 | if (hashtable->batches) |
3151 | { |
3152 | for (i = 0; i < hashtable->nbatch; ++i) |
3153 | { |
3154 | sts_end_write(hashtable->batches[i].inner_tuples); |
3155 | sts_end_write(hashtable->batches[i].outer_tuples); |
3156 | sts_end_parallel_scan(hashtable->batches[i].inner_tuples); |
3157 | sts_end_parallel_scan(hashtable->batches[i].outer_tuples); |
3158 | } |
3159 | } |
3160 | |
3161 | /* If we're last to detach, clean up shared memory. */ |
3162 | if (BarrierDetach(&pstate->build_barrier)) |
3163 | { |
3164 | if (DsaPointerIsValid(pstate->batches)) |
3165 | { |
3166 | dsa_free(hashtable->area, pstate->batches); |
3167 | pstate->batches = InvalidDsaPointer; |
3168 | } |
3169 | } |
3170 | |
3171 | hashtable->parallel_state = NULL; |
3172 | } |
3173 | } |
3174 | |
3175 | /* |
3176 | * Get the first tuple in a given bucket identified by number. |
3177 | */ |
3178 | static inline HashJoinTuple |
3179 | ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) |
3180 | { |
3181 | HashJoinTuple tuple; |
3182 | dsa_pointer p; |
3183 | |
3184 | Assert(hashtable->parallel_state); |
3185 | p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); |
3186 | tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p); |
3187 | |
3188 | return tuple; |
3189 | } |
3190 | |
3191 | /* |
3192 | * Get the next tuple in the same bucket as 'tuple'. |
3193 | */ |
3194 | static inline HashJoinTuple |
3195 | ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple) |
3196 | { |
3197 | HashJoinTuple next; |
3198 | |
3199 | Assert(hashtable->parallel_state); |
3200 | next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared); |
3201 | |
3202 | return next; |
3203 | } |
3204 | |
3205 | /* |
3206 | * Insert a tuple at the front of a chain of tuples in DSA memory atomically. |
3207 | */ |
3208 | static inline void |
3209 | ExecParallelHashPushTuple(dsa_pointer_atomic *head, |
3210 | HashJoinTuple tuple, |
3211 | dsa_pointer tuple_shared) |
3212 | { |
3213 | for (;;) |
3214 | { |
3215 | tuple->next.shared = dsa_pointer_atomic_read(head); |
3216 | if (dsa_pointer_atomic_compare_exchange(head, |
3217 | &tuple->next.shared, |
3218 | tuple_shared)) |
3219 | break; |
3220 | } |
3221 | } |
3222 | |
3223 | /* |
3224 | * Prepare to work on a given batch. |
3225 | */ |
3226 | void |
3227 | ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) |
3228 | { |
3229 | Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer); |
3230 | |
3231 | hashtable->curbatch = batchno; |
3232 | hashtable->buckets.shared = (dsa_pointer_atomic *) |
3233 | dsa_get_address(hashtable->area, |
3234 | hashtable->batches[batchno].shared->buckets); |
3235 | hashtable->nbuckets = hashtable->parallel_state->nbuckets; |
3236 | hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); |
3237 | hashtable->current_chunk = NULL; |
3238 | hashtable->current_chunk_shared = InvalidDsaPointer; |
3239 | hashtable->batches[batchno].at_least_one_chunk = false; |
3240 | } |
3241 | |
3242 | /* |
3243 | * Take the next available chunk from the queue of chunks being worked on in |
3244 | * parallel. Return NULL if there are none left. Otherwise return a pointer |
3245 | * to the chunk, and set *shared to the DSA pointer to the chunk. |
3246 | */ |
3247 | static HashMemoryChunk |
3248 | ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared) |
3249 | { |
3250 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
3251 | HashMemoryChunk chunk; |
3252 | |
3253 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
3254 | if (DsaPointerIsValid(pstate->chunk_work_queue)) |
3255 | { |
3256 | *shared = pstate->chunk_work_queue; |
3257 | chunk = (HashMemoryChunk) |
3258 | dsa_get_address(hashtable->area, *shared); |
3259 | pstate->chunk_work_queue = chunk->next.shared; |
3260 | } |
3261 | else |
3262 | chunk = NULL; |
3263 | LWLockRelease(&pstate->lock); |
3264 | |
3265 | return chunk; |
3266 | } |
3267 | |
3268 | /* |
3269 | * Increase the space preallocated in this backend for a given inner batch by |
3270 | * at least a given amount. This allows us to track whether a given batch |
3271 | * would fit in memory when loaded back in. Also increase the number of |
3272 | * batches or buckets if required. |
3273 | * |
3274 | * This maintains a running estimation of how much space will be taken when we |
3275 | * load the batch back into memory by simulating the way chunks will be handed |
3276 | * out to workers. It's not perfectly accurate because the tuples will be |
3277 | * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but |
3278 | * it should be pretty close. It tends to overestimate by a fraction of a |
3279 | * chunk per worker since all workers gang up to preallocate during hashing, |
3280 | * but workers tend to reload batches alone if there are enough to go around, |
3281 | * leaving fewer partially filled chunks. This effect is bounded by |
3282 | * nparticipants. |
3283 | * |
3284 | * Return false if the number of batches or buckets has changed, and the |
3285 | * caller should reconsider which batch a given tuple now belongs in and call |
3286 | * again. |
3287 | */ |
3288 | static bool |
3289 | ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) |
3290 | { |
3291 | ParallelHashJoinState *pstate = hashtable->parallel_state; |
3292 | ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno]; |
3293 | size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE); |
3294 | |
3295 | Assert(batchno > 0); |
3296 | Assert(batchno < hashtable->nbatch); |
3297 | Assert(size == MAXALIGN(size)); |
3298 | |
3299 | LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); |
3300 | |
3301 | /* Has another participant commanded us to help grow? */ |
3302 | if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || |
3303 | pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
3304 | { |
3305 | ParallelHashGrowth growth = pstate->growth; |
3306 | |
3307 | LWLockRelease(&pstate->lock); |
3308 | if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) |
3309 | ExecParallelHashIncreaseNumBatches(hashtable); |
3310 | else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) |
3311 | ExecParallelHashIncreaseNumBuckets(hashtable); |
3312 | |
3313 | return false; |
3314 | } |
3315 | |
3316 | if (pstate->growth != PHJ_GROWTH_DISABLED && |
3317 | batch->at_least_one_chunk && |
3318 | (batch->shared->estimated_size + want + HASH_CHUNK_HEADER_SIZE |
3319 | > pstate->space_allowed)) |
3320 | { |
3321 | /* |
3322 | * We have determined that this batch would exceed the space budget if |
3323 | * loaded into memory. Command all participants to help repartition. |
3324 | */ |
3325 | batch->shared->space_exhausted = true; |
3326 | pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; |
3327 | LWLockRelease(&pstate->lock); |
3328 | |
3329 | return false; |
3330 | } |
3331 | |
3332 | batch->at_least_one_chunk = true; |
3333 | batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE; |
3334 | batch->preallocated = want; |
3335 | LWLockRelease(&pstate->lock); |
3336 | |
3337 | return true; |
3338 | } |
3339 | |