1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * nodeGatherMerge.c |
4 | * Scan a plan in multiple workers, and do order-preserving merge. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * IDENTIFICATION |
10 | * src/backend/executor/nodeGatherMerge.c |
11 | * |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | |
15 | #include "postgres.h" |
16 | |
17 | #include "access/relscan.h" |
18 | #include "access/xact.h" |
19 | #include "executor/execdebug.h" |
20 | #include "executor/execParallel.h" |
21 | #include "executor/nodeGatherMerge.h" |
22 | #include "executor/nodeSubplan.h" |
23 | #include "executor/tqueue.h" |
24 | #include "lib/binaryheap.h" |
25 | #include "miscadmin.h" |
26 | #include "optimizer/optimizer.h" |
27 | #include "utils/memutils.h" |
28 | #include "utils/rel.h" |
29 | |
30 | /* |
31 | * When we read tuples from workers, it's a good idea to read several at once |
32 | * for efficiency when possible: this minimizes context-switching overhead. |
33 | * But reading too many at a time wastes memory without improving performance. |
34 | * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one). |
35 | */ |
36 | #define MAX_TUPLE_STORE 10 |
37 | |
38 | /* |
39 | * Pending-tuple array for each worker. This holds additional tuples that |
40 | * we were able to fetch from the worker, but can't process yet. In addition, |
41 | * this struct holds the "done" flag indicating the worker is known to have |
42 | * no more tuples. (We do not use this struct for the leader; we don't keep |
43 | * any pending tuples for the leader, and the need_to_scan_locally flag serves |
44 | * as its "done" indicator.) |
45 | */ |
46 | typedef struct GMReaderTupleBuffer |
47 | { |
48 | HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */ |
49 | int nTuples; /* number of tuples currently stored */ |
50 | int readCounter; /* index of next tuple to extract */ |
51 | bool done; /* true if reader is known exhausted */ |
52 | } GMReaderTupleBuffer; |
53 | |
54 | static TupleTableSlot *ExecGatherMerge(PlanState *pstate); |
55 | static int32 heap_compare_slots(Datum a, Datum b, void *arg); |
56 | static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); |
57 | static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, |
58 | bool nowait, bool *done); |
59 | static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); |
60 | static void gather_merge_setup(GatherMergeState *gm_state); |
61 | static void gather_merge_init(GatherMergeState *gm_state); |
62 | static void gather_merge_clear_tuples(GatherMergeState *gm_state); |
63 | static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, |
64 | bool nowait); |
65 | static void load_tuple_array(GatherMergeState *gm_state, int reader); |
66 | |
67 | /* ---------------------------------------------------------------- |
68 | * ExecInitGather |
69 | * ---------------------------------------------------------------- |
70 | */ |
71 | GatherMergeState * |
72 | ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) |
73 | { |
74 | GatherMergeState *gm_state; |
75 | Plan *outerNode; |
76 | TupleDesc tupDesc; |
77 | |
78 | /* Gather merge node doesn't have innerPlan node. */ |
79 | Assert(innerPlan(node) == NULL); |
80 | |
81 | /* |
82 | * create state structure |
83 | */ |
84 | gm_state = makeNode(GatherMergeState); |
85 | gm_state->ps.plan = (Plan *) node; |
86 | gm_state->ps.state = estate; |
87 | gm_state->ps.ExecProcNode = ExecGatherMerge; |
88 | |
89 | gm_state->initialized = false; |
90 | gm_state->gm_initialized = false; |
91 | gm_state->tuples_needed = -1; |
92 | |
93 | /* |
94 | * Miscellaneous initialization |
95 | * |
96 | * create expression context for node |
97 | */ |
98 | ExecAssignExprContext(estate, &gm_state->ps); |
99 | |
100 | /* |
101 | * GatherMerge doesn't support checking a qual (it's always more efficient |
102 | * to do it in the child node). |
103 | */ |
104 | Assert(!node->plan.qual); |
105 | |
106 | /* |
107 | * now initialize outer plan |
108 | */ |
109 | outerNode = outerPlan(node); |
110 | outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags); |
111 | |
112 | /* |
113 | * Leader may access ExecProcNode result directly (if |
114 | * need_to_scan_locally), or from workers via tuple queue. So we can't |
115 | * trivially rely on the slot type being fixed for expressions evaluated |
116 | * within this node. |
117 | */ |
118 | gm_state->ps.outeropsset = true; |
119 | gm_state->ps.outeropsfixed = false; |
120 | |
121 | /* |
122 | * Store the tuple descriptor into gather merge state, so we can use it |
123 | * while initializing the gather merge slots. |
124 | */ |
125 | tupDesc = ExecGetResultType(outerPlanState(gm_state)); |
126 | gm_state->tupDesc = tupDesc; |
127 | |
128 | /* |
129 | * Initialize result type and projection. |
130 | */ |
131 | ExecInitResultTypeTL(&gm_state->ps); |
132 | ExecConditionalAssignProjectionInfo(&gm_state->ps, tupDesc, OUTER_VAR); |
133 | |
134 | /* |
135 | * Without projections result slot type is not trivially known, see |
136 | * comment above. |
137 | */ |
138 | if (gm_state->ps.ps_ProjInfo == NULL) |
139 | { |
140 | gm_state->ps.resultopsset = true; |
141 | gm_state->ps.resultopsfixed = false; |
142 | } |
143 | |
144 | /* |
145 | * initialize sort-key information |
146 | */ |
147 | if (node->numCols) |
148 | { |
149 | int i; |
150 | |
151 | gm_state->gm_nkeys = node->numCols; |
152 | gm_state->gm_sortkeys = |
153 | palloc0(sizeof(SortSupportData) * node->numCols); |
154 | |
155 | for (i = 0; i < node->numCols; i++) |
156 | { |
157 | SortSupport sortKey = gm_state->gm_sortkeys + i; |
158 | |
159 | sortKey->ssup_cxt = CurrentMemoryContext; |
160 | sortKey->ssup_collation = node->collations[i]; |
161 | sortKey->ssup_nulls_first = node->nullsFirst[i]; |
162 | sortKey->ssup_attno = node->sortColIdx[i]; |
163 | |
164 | /* |
165 | * We don't perform abbreviated key conversion here, for the same |
166 | * reasons that it isn't used in MergeAppend |
167 | */ |
168 | sortKey->abbreviate = false; |
169 | |
170 | PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); |
171 | } |
172 | } |
173 | |
174 | /* Now allocate the workspace for gather merge */ |
175 | gather_merge_setup(gm_state); |
176 | |
177 | return gm_state; |
178 | } |
179 | |
180 | /* ---------------------------------------------------------------- |
181 | * ExecGatherMerge(node) |
182 | * |
183 | * Scans the relation via multiple workers and returns |
184 | * the next qualifying tuple. |
185 | * ---------------------------------------------------------------- |
186 | */ |
187 | static TupleTableSlot * |
188 | ExecGatherMerge(PlanState *pstate) |
189 | { |
190 | GatherMergeState *node = castNode(GatherMergeState, pstate); |
191 | TupleTableSlot *slot; |
192 | ExprContext *econtext; |
193 | |
194 | CHECK_FOR_INTERRUPTS(); |
195 | |
196 | /* |
197 | * As with Gather, we don't launch workers until this node is actually |
198 | * executed. |
199 | */ |
200 | if (!node->initialized) |
201 | { |
202 | EState *estate = node->ps.state; |
203 | GatherMerge *gm = castNode(GatherMerge, node->ps.plan); |
204 | |
205 | /* |
206 | * Sometimes we might have to run without parallelism; but if parallel |
207 | * mode is active then we can try to fire up some workers. |
208 | */ |
209 | if (gm->num_workers > 0 && estate->es_use_parallel_mode) |
210 | { |
211 | ParallelContext *pcxt; |
212 | |
213 | /* Initialize, or re-initialize, shared state needed by workers. */ |
214 | if (!node->pei) |
215 | node->pei = ExecInitParallelPlan(node->ps.lefttree, |
216 | estate, |
217 | gm->initParam, |
218 | gm->num_workers, |
219 | node->tuples_needed); |
220 | else |
221 | ExecParallelReinitialize(node->ps.lefttree, |
222 | node->pei, |
223 | gm->initParam); |
224 | |
225 | /* Try to launch workers. */ |
226 | pcxt = node->pei->pcxt; |
227 | LaunchParallelWorkers(pcxt); |
228 | /* We save # workers launched for the benefit of EXPLAIN */ |
229 | node->nworkers_launched = pcxt->nworkers_launched; |
230 | |
231 | /* Set up tuple queue readers to read the results. */ |
232 | if (pcxt->nworkers_launched > 0) |
233 | { |
234 | ExecParallelCreateReaders(node->pei); |
235 | /* Make a working array showing the active readers */ |
236 | node->nreaders = pcxt->nworkers_launched; |
237 | node->reader = (TupleQueueReader **) |
238 | palloc(node->nreaders * sizeof(TupleQueueReader *)); |
239 | memcpy(node->reader, node->pei->reader, |
240 | node->nreaders * sizeof(TupleQueueReader *)); |
241 | } |
242 | else |
243 | { |
244 | /* No workers? Then never mind. */ |
245 | node->nreaders = 0; |
246 | node->reader = NULL; |
247 | } |
248 | } |
249 | |
250 | /* allow leader to participate if enabled or no choice */ |
251 | if (parallel_leader_participation || node->nreaders == 0) |
252 | node->need_to_scan_locally = true; |
253 | node->initialized = true; |
254 | } |
255 | |
256 | /* |
257 | * Reset per-tuple memory context to free any expression evaluation |
258 | * storage allocated in the previous tuple cycle. |
259 | */ |
260 | econtext = node->ps.ps_ExprContext; |
261 | ResetExprContext(econtext); |
262 | |
263 | /* |
264 | * Get next tuple, either from one of our workers, or by running the plan |
265 | * ourselves. |
266 | */ |
267 | slot = gather_merge_getnext(node); |
268 | if (TupIsNull(slot)) |
269 | return NULL; |
270 | |
271 | /* If no projection is required, we're done. */ |
272 | if (node->ps.ps_ProjInfo == NULL) |
273 | return slot; |
274 | |
275 | /* |
276 | * Form the result tuple using ExecProject(), and return it. |
277 | */ |
278 | econtext->ecxt_outertuple = slot; |
279 | return ExecProject(node->ps.ps_ProjInfo); |
280 | } |
281 | |
282 | /* ---------------------------------------------------------------- |
283 | * ExecEndGatherMerge |
284 | * |
285 | * frees any storage allocated through C routines. |
286 | * ---------------------------------------------------------------- |
287 | */ |
288 | void |
289 | ExecEndGatherMerge(GatherMergeState *node) |
290 | { |
291 | ExecEndNode(outerPlanState(node)); /* let children clean up first */ |
292 | ExecShutdownGatherMerge(node); |
293 | ExecFreeExprContext(&node->ps); |
294 | if (node->ps.ps_ResultTupleSlot) |
295 | ExecClearTuple(node->ps.ps_ResultTupleSlot); |
296 | } |
297 | |
298 | /* ---------------------------------------------------------------- |
299 | * ExecShutdownGatherMerge |
300 | * |
301 | * Destroy the setup for parallel workers including parallel context. |
302 | * ---------------------------------------------------------------- |
303 | */ |
304 | void |
305 | ExecShutdownGatherMerge(GatherMergeState *node) |
306 | { |
307 | ExecShutdownGatherMergeWorkers(node); |
308 | |
309 | /* Now destroy the parallel context. */ |
310 | if (node->pei != NULL) |
311 | { |
312 | ExecParallelCleanup(node->pei); |
313 | node->pei = NULL; |
314 | } |
315 | } |
316 | |
317 | /* ---------------------------------------------------------------- |
318 | * ExecShutdownGatherMergeWorkers |
319 | * |
320 | * Stop all the parallel workers. |
321 | * ---------------------------------------------------------------- |
322 | */ |
323 | static void |
324 | ExecShutdownGatherMergeWorkers(GatherMergeState *node) |
325 | { |
326 | if (node->pei != NULL) |
327 | ExecParallelFinish(node->pei); |
328 | |
329 | /* Flush local copy of reader array */ |
330 | if (node->reader) |
331 | pfree(node->reader); |
332 | node->reader = NULL; |
333 | } |
334 | |
335 | /* ---------------------------------------------------------------- |
336 | * ExecReScanGatherMerge |
337 | * |
338 | * Prepare to re-scan the result of a GatherMerge. |
339 | * ---------------------------------------------------------------- |
340 | */ |
341 | void |
342 | ExecReScanGatherMerge(GatherMergeState *node) |
343 | { |
344 | GatherMerge *gm = (GatherMerge *) node->ps.plan; |
345 | PlanState *outerPlan = outerPlanState(node); |
346 | |
347 | /* Make sure any existing workers are gracefully shut down */ |
348 | ExecShutdownGatherMergeWorkers(node); |
349 | |
350 | /* Free any unused tuples, so we don't leak memory across rescans */ |
351 | gather_merge_clear_tuples(node); |
352 | |
353 | /* Mark node so that shared state will be rebuilt at next call */ |
354 | node->initialized = false; |
355 | node->gm_initialized = false; |
356 | |
357 | /* |
358 | * Set child node's chgParam to tell it that the next scan might deliver a |
359 | * different set of rows within the leader process. (The overall rowset |
360 | * shouldn't change, but the leader process's subset might; hence nodes |
361 | * between here and the parallel table scan node mustn't optimize on the |
362 | * assumption of an unchanging rowset.) |
363 | */ |
364 | if (gm->rescan_param >= 0) |
365 | outerPlan->chgParam = bms_add_member(outerPlan->chgParam, |
366 | gm->rescan_param); |
367 | |
368 | /* |
369 | * If chgParam of subnode is not null then plan will be re-scanned by |
370 | * first ExecProcNode. Note: because this does nothing if we have a |
371 | * rescan_param, it's currently guaranteed that parallel-aware child nodes |
372 | * will not see a ReScan call until after they get a ReInitializeDSM call. |
373 | * That ordering might not be something to rely on, though. A good rule |
374 | * of thumb is that ReInitializeDSM should reset only shared state, ReScan |
375 | * should reset only local state, and anything that depends on both of |
376 | * those steps being finished must wait until the first ExecProcNode call. |
377 | */ |
378 | if (outerPlan->chgParam == NULL) |
379 | ExecReScan(outerPlan); |
380 | } |
381 | |
382 | /* |
383 | * Set up the data structures that we'll need for Gather Merge. |
384 | * |
385 | * We allocate these once on the basis of gm->num_workers, which is an |
386 | * upper bound for the number of workers we'll actually have. During |
387 | * a rescan, we reset the structures to empty. This approach simplifies |
388 | * not leaking memory across rescans. |
389 | * |
390 | * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n |
391 | * are for workers. The values placed into gm_heap correspond to indexes |
392 | * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from |
393 | * 0 to n-1; it has no entry for the leader. |
394 | */ |
395 | static void |
396 | gather_merge_setup(GatherMergeState *gm_state) |
397 | { |
398 | GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan); |
399 | int nreaders = gm->num_workers; |
400 | int i; |
401 | |
402 | /* |
403 | * Allocate gm_slots for the number of workers + one more slot for leader. |
404 | * Slot 0 is always for the leader. Leader always calls ExecProcNode() to |
405 | * read the tuple, and then stores it directly into its gm_slots entry. |
406 | * For other slots, code below will call ExecInitExtraTupleSlot() to |
407 | * create a slot for the worker's results. Note that during any single |
408 | * scan, we might have fewer than num_workers available workers, in which |
409 | * case the extra array entries go unused. |
410 | */ |
411 | gm_state->gm_slots = (TupleTableSlot **) |
412 | palloc0((nreaders + 1) * sizeof(TupleTableSlot *)); |
413 | |
414 | /* Allocate the tuple slot and tuple array for each worker */ |
415 | gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *) |
416 | palloc0(nreaders * sizeof(GMReaderTupleBuffer)); |
417 | |
418 | for (i = 0; i < nreaders; i++) |
419 | { |
420 | /* Allocate the tuple array with length MAX_TUPLE_STORE */ |
421 | gm_state->gm_tuple_buffers[i].tuple = |
422 | (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); |
423 | |
424 | /* Initialize tuple slot for worker */ |
425 | gm_state->gm_slots[i + 1] = |
426 | ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc, |
427 | &TTSOpsHeapTuple); |
428 | } |
429 | |
430 | /* Allocate the resources for the merge */ |
431 | gm_state->gm_heap = binaryheap_allocate(nreaders + 1, |
432 | heap_compare_slots, |
433 | gm_state); |
434 | } |
435 | |
436 | /* |
437 | * Initialize the Gather Merge. |
438 | * |
439 | * Reset data structures to ensure they're empty. Then pull at least one |
440 | * tuple from leader + each worker (or set its "done" indicator), and set up |
441 | * the heap. |
442 | */ |
443 | static void |
444 | gather_merge_init(GatherMergeState *gm_state) |
445 | { |
446 | int nreaders = gm_state->nreaders; |
447 | bool nowait = true; |
448 | int i; |
449 | |
450 | /* Assert that gather_merge_setup made enough space */ |
451 | Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers); |
452 | |
453 | /* Reset leader's tuple slot to empty */ |
454 | gm_state->gm_slots[0] = NULL; |
455 | |
456 | /* Reset the tuple slot and tuple array for each worker */ |
457 | for (i = 0; i < nreaders; i++) |
458 | { |
459 | /* Reset tuple array to empty */ |
460 | gm_state->gm_tuple_buffers[i].nTuples = 0; |
461 | gm_state->gm_tuple_buffers[i].readCounter = 0; |
462 | /* Reset done flag to not-done */ |
463 | gm_state->gm_tuple_buffers[i].done = false; |
464 | /* Ensure output slot is empty */ |
465 | ExecClearTuple(gm_state->gm_slots[i + 1]); |
466 | } |
467 | |
468 | /* Reset binary heap to empty */ |
469 | binaryheap_reset(gm_state->gm_heap); |
470 | |
471 | /* |
472 | * First, try to read a tuple from each worker (including leader) in |
473 | * nowait mode. After this, if not all workers were able to produce a |
474 | * tuple (or a "done" indication), then re-read from remaining workers, |
475 | * this time using wait mode. Add all live readers (those producing at |
476 | * least one tuple) to the heap. |
477 | */ |
478 | reread: |
479 | for (i = 0; i <= nreaders; i++) |
480 | { |
481 | CHECK_FOR_INTERRUPTS(); |
482 | |
483 | /* skip this source if already known done */ |
484 | if ((i == 0) ? gm_state->need_to_scan_locally : |
485 | !gm_state->gm_tuple_buffers[i - 1].done) |
486 | { |
487 | if (TupIsNull(gm_state->gm_slots[i])) |
488 | { |
489 | /* Don't have a tuple yet, try to get one */ |
490 | if (gather_merge_readnext(gm_state, i, nowait)) |
491 | binaryheap_add_unordered(gm_state->gm_heap, |
492 | Int32GetDatum(i)); |
493 | } |
494 | else |
495 | { |
496 | /* |
497 | * We already got at least one tuple from this worker, but |
498 | * might as well see if it has any more ready by now. |
499 | */ |
500 | load_tuple_array(gm_state, i); |
501 | } |
502 | } |
503 | } |
504 | |
505 | /* need not recheck leader, since nowait doesn't matter for it */ |
506 | for (i = 1; i <= nreaders; i++) |
507 | { |
508 | if (!gm_state->gm_tuple_buffers[i - 1].done && |
509 | TupIsNull(gm_state->gm_slots[i])) |
510 | { |
511 | nowait = false; |
512 | goto reread; |
513 | } |
514 | } |
515 | |
516 | /* Now heapify the heap. */ |
517 | binaryheap_build(gm_state->gm_heap); |
518 | |
519 | gm_state->gm_initialized = true; |
520 | } |
521 | |
522 | /* |
523 | * Clear out the tuple table slot, and any unused pending tuples, |
524 | * for each gather merge input. |
525 | */ |
526 | static void |
527 | gather_merge_clear_tuples(GatherMergeState *gm_state) |
528 | { |
529 | int i; |
530 | |
531 | for (i = 0; i < gm_state->nreaders; i++) |
532 | { |
533 | GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i]; |
534 | |
535 | while (tuple_buffer->readCounter < tuple_buffer->nTuples) |
536 | heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]); |
537 | |
538 | ExecClearTuple(gm_state->gm_slots[i + 1]); |
539 | } |
540 | } |
541 | |
542 | /* |
543 | * Read the next tuple for gather merge. |
544 | * |
545 | * Fetch the sorted tuple out of the heap. |
546 | */ |
547 | static TupleTableSlot * |
548 | gather_merge_getnext(GatherMergeState *gm_state) |
549 | { |
550 | int i; |
551 | |
552 | if (!gm_state->gm_initialized) |
553 | { |
554 | /* |
555 | * First time through: pull the first tuple from each participant, and |
556 | * set up the heap. |
557 | */ |
558 | gather_merge_init(gm_state); |
559 | } |
560 | else |
561 | { |
562 | /* |
563 | * Otherwise, pull the next tuple from whichever participant we |
564 | * returned from last time, and reinsert that participant's index into |
565 | * the heap, because it might now compare differently against the |
566 | * other elements of the heap. |
567 | */ |
568 | i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); |
569 | |
570 | if (gather_merge_readnext(gm_state, i, false)) |
571 | binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); |
572 | else |
573 | { |
574 | /* reader exhausted, remove it from heap */ |
575 | (void) binaryheap_remove_first(gm_state->gm_heap); |
576 | } |
577 | } |
578 | |
579 | if (binaryheap_empty(gm_state->gm_heap)) |
580 | { |
581 | /* All the queues are exhausted, and so is the heap */ |
582 | gather_merge_clear_tuples(gm_state); |
583 | return NULL; |
584 | } |
585 | else |
586 | { |
587 | /* Return next tuple from whichever participant has the leading one */ |
588 | i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); |
589 | return gm_state->gm_slots[i]; |
590 | } |
591 | } |
592 | |
593 | /* |
594 | * Read tuple(s) for given reader in nowait mode, and load into its tuple |
595 | * array, until we have MAX_TUPLE_STORE of them or would have to block. |
596 | */ |
597 | static void |
598 | load_tuple_array(GatherMergeState *gm_state, int reader) |
599 | { |
600 | GMReaderTupleBuffer *tuple_buffer; |
601 | int i; |
602 | |
603 | /* Don't do anything if this is the leader. */ |
604 | if (reader == 0) |
605 | return; |
606 | |
607 | tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1]; |
608 | |
609 | /* If there's nothing in the array, reset the counters to zero. */ |
610 | if (tuple_buffer->nTuples == tuple_buffer->readCounter) |
611 | tuple_buffer->nTuples = tuple_buffer->readCounter = 0; |
612 | |
613 | /* Try to fill additional slots in the array. */ |
614 | for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) |
615 | { |
616 | HeapTuple tuple; |
617 | |
618 | tuple = gm_readnext_tuple(gm_state, |
619 | reader, |
620 | true, |
621 | &tuple_buffer->done); |
622 | if (!HeapTupleIsValid(tuple)) |
623 | break; |
624 | tuple_buffer->tuple[i] = tuple; |
625 | tuple_buffer->nTuples++; |
626 | } |
627 | } |
628 | |
629 | /* |
630 | * Store the next tuple for a given reader into the appropriate slot. |
631 | * |
632 | * Returns true if successful, false if not (either reader is exhausted, |
633 | * or we didn't want to wait for a tuple). Sets done flag if reader |
634 | * is found to be exhausted. |
635 | */ |
636 | static bool |
637 | gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) |
638 | { |
639 | GMReaderTupleBuffer *tuple_buffer; |
640 | HeapTuple tup; |
641 | |
642 | /* |
643 | * If we're being asked to generate a tuple from the leader, then we just |
644 | * call ExecProcNode as normal to produce one. |
645 | */ |
646 | if (reader == 0) |
647 | { |
648 | if (gm_state->need_to_scan_locally) |
649 | { |
650 | PlanState *outerPlan = outerPlanState(gm_state); |
651 | TupleTableSlot *outerTupleSlot; |
652 | EState *estate = gm_state->ps.state; |
653 | |
654 | /* Install our DSA area while executing the plan. */ |
655 | estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL; |
656 | outerTupleSlot = ExecProcNode(outerPlan); |
657 | estate->es_query_dsa = NULL; |
658 | |
659 | if (!TupIsNull(outerTupleSlot)) |
660 | { |
661 | gm_state->gm_slots[0] = outerTupleSlot; |
662 | return true; |
663 | } |
664 | /* need_to_scan_locally serves as "done" flag for leader */ |
665 | gm_state->need_to_scan_locally = false; |
666 | } |
667 | return false; |
668 | } |
669 | |
670 | /* Otherwise, check the state of the relevant tuple buffer. */ |
671 | tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1]; |
672 | |
673 | if (tuple_buffer->nTuples > tuple_buffer->readCounter) |
674 | { |
675 | /* Return any tuple previously read that is still buffered. */ |
676 | tup = tuple_buffer->tuple[tuple_buffer->readCounter++]; |
677 | } |
678 | else if (tuple_buffer->done) |
679 | { |
680 | /* Reader is known to be exhausted. */ |
681 | return false; |
682 | } |
683 | else |
684 | { |
685 | /* Read and buffer next tuple. */ |
686 | tup = gm_readnext_tuple(gm_state, |
687 | reader, |
688 | nowait, |
689 | &tuple_buffer->done); |
690 | if (!HeapTupleIsValid(tup)) |
691 | return false; |
692 | |
693 | /* |
694 | * Attempt to read more tuples in nowait mode and store them in the |
695 | * pending-tuple array for the reader. |
696 | */ |
697 | load_tuple_array(gm_state, reader); |
698 | } |
699 | |
700 | Assert(HeapTupleIsValid(tup)); |
701 | |
702 | /* Build the TupleTableSlot for the given tuple */ |
703 | ExecStoreHeapTuple(tup, /* tuple to store */ |
704 | gm_state->gm_slots[reader], /* slot in which to store |
705 | * the tuple */ |
706 | true); /* pfree tuple when done with it */ |
707 | |
708 | return true; |
709 | } |
710 | |
711 | /* |
712 | * Attempt to read a tuple from given worker. |
713 | */ |
714 | static HeapTuple |
715 | gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, |
716 | bool *done) |
717 | { |
718 | TupleQueueReader *reader; |
719 | HeapTuple tup; |
720 | |
721 | /* Check for async events, particularly messages from workers. */ |
722 | CHECK_FOR_INTERRUPTS(); |
723 | |
724 | /* |
725 | * Attempt to read a tuple. |
726 | * |
727 | * Note that TupleQueueReaderNext will just return NULL for a worker which |
728 | * fails to initialize. We'll treat that worker as having produced no |
729 | * tuples; WaitForParallelWorkersToFinish will error out when we get |
730 | * there. |
731 | */ |
732 | reader = gm_state->reader[nreader - 1]; |
733 | tup = TupleQueueReaderNext(reader, nowait, done); |
734 | |
735 | return tup; |
736 | } |
737 | |
738 | /* |
739 | * We have one slot for each item in the heap array. We use SlotNumber |
740 | * to store slot indexes. This doesn't actually provide any formal |
741 | * type-safety, but it makes the code more self-documenting. |
742 | */ |
743 | typedef int32 SlotNumber; |
744 | |
745 | /* |
746 | * Compare the tuples in the two given slots. |
747 | */ |
748 | static int32 |
749 | heap_compare_slots(Datum a, Datum b, void *arg) |
750 | { |
751 | GatherMergeState *node = (GatherMergeState *) arg; |
752 | SlotNumber slot1 = DatumGetInt32(a); |
753 | SlotNumber slot2 = DatumGetInt32(b); |
754 | |
755 | TupleTableSlot *s1 = node->gm_slots[slot1]; |
756 | TupleTableSlot *s2 = node->gm_slots[slot2]; |
757 | int nkey; |
758 | |
759 | Assert(!TupIsNull(s1)); |
760 | Assert(!TupIsNull(s2)); |
761 | |
762 | for (nkey = 0; nkey < node->gm_nkeys; nkey++) |
763 | { |
764 | SortSupport sortKey = node->gm_sortkeys + nkey; |
765 | AttrNumber attno = sortKey->ssup_attno; |
766 | Datum datum1, |
767 | datum2; |
768 | bool isNull1, |
769 | isNull2; |
770 | int compare; |
771 | |
772 | datum1 = slot_getattr(s1, attno, &isNull1); |
773 | datum2 = slot_getattr(s2, attno, &isNull2); |
774 | |
775 | compare = ApplySortComparator(datum1, isNull1, |
776 | datum2, isNull2, |
777 | sortKey); |
778 | if (compare != 0) |
779 | { |
780 | INVERT_COMPARE_RESULT(compare); |
781 | return compare; |
782 | } |
783 | } |
784 | return 0; |
785 | } |
786 | |