1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * nodeGather.c |
4 | * Support routines for scanning a plan via multiple workers. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * A Gather executor launches parallel workers to run multiple copies of a |
10 | * plan. It can also run the plan itself, if the workers are not available |
11 | * or have not started up yet. It then merges all of the results it produces |
12 | * and the results from the workers into a single output stream. Therefore, |
13 | * it will normally be used with a plan where running multiple copies of the |
14 | * same plan does not produce duplicate output, such as parallel-aware |
15 | * SeqScan. |
16 | * |
17 | * Alternatively, a Gather node can be configured to use just one worker |
18 | * and the single-copy flag can be set. In this case, the Gather node will |
19 | * run the plan in one worker and will not execute the plan itself. In |
20 | * this case, it simply returns whatever tuples were returned by the worker. |
21 | * If a worker cannot be obtained, then it will run the plan itself and |
22 | * return the results. Therefore, a plan used with a single-copy Gather |
23 | * node need not be parallel-aware. |
24 | * |
25 | * IDENTIFICATION |
26 | * src/backend/executor/nodeGather.c |
27 | * |
28 | *------------------------------------------------------------------------- |
29 | */ |
30 | |
31 | #include "postgres.h" |
32 | |
33 | #include "access/relscan.h" |
34 | #include "access/xact.h" |
35 | #include "executor/execdebug.h" |
36 | #include "executor/execParallel.h" |
37 | #include "executor/nodeGather.h" |
38 | #include "executor/nodeSubplan.h" |
39 | #include "executor/tqueue.h" |
40 | #include "miscadmin.h" |
41 | #include "optimizer/optimizer.h" |
42 | #include "pgstat.h" |
43 | #include "utils/memutils.h" |
44 | #include "utils/rel.h" |
45 | |
46 | |
47 | static TupleTableSlot *ExecGather(PlanState *pstate); |
48 | static TupleTableSlot *gather_getnext(GatherState *gatherstate); |
49 | static HeapTuple gather_readnext(GatherState *gatherstate); |
50 | static void ExecShutdownGatherWorkers(GatherState *node); |
51 | |
52 | |
53 | /* ---------------------------------------------------------------- |
54 | * ExecInitGather |
55 | * ---------------------------------------------------------------- |
56 | */ |
57 | GatherState * |
58 | ExecInitGather(Gather *node, EState *estate, int eflags) |
59 | { |
60 | GatherState *gatherstate; |
61 | Plan *outerNode; |
62 | TupleDesc tupDesc; |
63 | |
64 | /* Gather node doesn't have innerPlan node. */ |
65 | Assert(innerPlan(node) == NULL); |
66 | |
67 | /* |
68 | * create state structure |
69 | */ |
70 | gatherstate = makeNode(GatherState); |
71 | gatherstate->ps.plan = (Plan *) node; |
72 | gatherstate->ps.state = estate; |
73 | gatherstate->ps.ExecProcNode = ExecGather; |
74 | |
75 | gatherstate->initialized = false; |
76 | gatherstate->need_to_scan_locally = |
77 | !node->single_copy && parallel_leader_participation; |
78 | gatherstate->tuples_needed = -1; |
79 | |
80 | /* |
81 | * Miscellaneous initialization |
82 | * |
83 | * create expression context for node |
84 | */ |
85 | ExecAssignExprContext(estate, &gatherstate->ps); |
86 | |
87 | /* |
88 | * now initialize outer plan |
89 | */ |
90 | outerNode = outerPlan(node); |
91 | outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags); |
92 | tupDesc = ExecGetResultType(outerPlanState(gatherstate)); |
93 | |
94 | /* |
95 | * Leader may access ExecProcNode result directly (if |
96 | * need_to_scan_locally), or from workers via tuple queue. So we can't |
97 | * trivially rely on the slot type being fixed for expressions evaluated |
98 | * within this node. |
99 | */ |
100 | gatherstate->ps.outeropsset = true; |
101 | gatherstate->ps.outeropsfixed = false; |
102 | |
103 | /* |
104 | * Initialize result type and projection. |
105 | */ |
106 | ExecInitResultTypeTL(&gatherstate->ps); |
107 | ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); |
108 | |
109 | /* |
110 | * Without projections result slot type is not trivially known, see |
111 | * comment above. |
112 | */ |
113 | if (gatherstate->ps.ps_ProjInfo == NULL) |
114 | { |
115 | gatherstate->ps.resultopsset = true; |
116 | gatherstate->ps.resultopsfixed = false; |
117 | } |
118 | |
119 | /* |
120 | * Initialize funnel slot to same tuple descriptor as outer plan. |
121 | */ |
122 | gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc, |
123 | &TTSOpsHeapTuple); |
124 | |
125 | /* |
126 | * Gather doesn't support checking a qual (it's always more efficient to |
127 | * do it in the child node). |
128 | */ |
129 | Assert(!node->plan.qual); |
130 | |
131 | return gatherstate; |
132 | } |
133 | |
134 | /* ---------------------------------------------------------------- |
135 | * ExecGather(node) |
136 | * |
137 | * Scans the relation via multiple workers and returns |
138 | * the next qualifying tuple. |
139 | * ---------------------------------------------------------------- |
140 | */ |
141 | static TupleTableSlot * |
142 | ExecGather(PlanState *pstate) |
143 | { |
144 | GatherState *node = castNode(GatherState, pstate); |
145 | TupleTableSlot *slot; |
146 | ExprContext *econtext; |
147 | |
148 | CHECK_FOR_INTERRUPTS(); |
149 | |
150 | /* |
151 | * Initialize the parallel context and workers on first execution. We do |
152 | * this on first execution rather than during node initialization, as it |
153 | * needs to allocate a large dynamic segment, so it is better to do it |
154 | * only if it is really needed. |
155 | */ |
156 | if (!node->initialized) |
157 | { |
158 | EState *estate = node->ps.state; |
159 | Gather *gather = (Gather *) node->ps.plan; |
160 | |
161 | /* |
162 | * Sometimes we might have to run without parallelism; but if parallel |
163 | * mode is active then we can try to fire up some workers. |
164 | */ |
165 | if (gather->num_workers > 0 && estate->es_use_parallel_mode) |
166 | { |
167 | ParallelContext *pcxt; |
168 | |
169 | /* Initialize, or re-initialize, shared state needed by workers. */ |
170 | if (!node->pei) |
171 | node->pei = ExecInitParallelPlan(node->ps.lefttree, |
172 | estate, |
173 | gather->initParam, |
174 | gather->num_workers, |
175 | node->tuples_needed); |
176 | else |
177 | ExecParallelReinitialize(node->ps.lefttree, |
178 | node->pei, |
179 | gather->initParam); |
180 | |
181 | /* |
182 | * Register backend workers. We might not get as many as we |
183 | * requested, or indeed any at all. |
184 | */ |
185 | pcxt = node->pei->pcxt; |
186 | LaunchParallelWorkers(pcxt); |
187 | /* We save # workers launched for the benefit of EXPLAIN */ |
188 | node->nworkers_launched = pcxt->nworkers_launched; |
189 | |
190 | /* Set up tuple queue readers to read the results. */ |
191 | if (pcxt->nworkers_launched > 0) |
192 | { |
193 | ExecParallelCreateReaders(node->pei); |
194 | /* Make a working array showing the active readers */ |
195 | node->nreaders = pcxt->nworkers_launched; |
196 | node->reader = (TupleQueueReader **) |
197 | palloc(node->nreaders * sizeof(TupleQueueReader *)); |
198 | memcpy(node->reader, node->pei->reader, |
199 | node->nreaders * sizeof(TupleQueueReader *)); |
200 | } |
201 | else |
202 | { |
203 | /* No workers? Then never mind. */ |
204 | node->nreaders = 0; |
205 | node->reader = NULL; |
206 | } |
207 | node->nextreader = 0; |
208 | } |
209 | |
210 | /* Run plan locally if no workers or enabled and not single-copy. */ |
211 | node->need_to_scan_locally = (node->nreaders == 0) |
212 | || (!gather->single_copy && parallel_leader_participation); |
213 | node->initialized = true; |
214 | } |
215 | |
216 | /* |
217 | * Reset per-tuple memory context to free any expression evaluation |
218 | * storage allocated in the previous tuple cycle. |
219 | */ |
220 | econtext = node->ps.ps_ExprContext; |
221 | ResetExprContext(econtext); |
222 | |
223 | /* |
224 | * Get next tuple, either from one of our workers, or by running the plan |
225 | * ourselves. |
226 | */ |
227 | slot = gather_getnext(node); |
228 | if (TupIsNull(slot)) |
229 | return NULL; |
230 | |
231 | /* If no projection is required, we're done. */ |
232 | if (node->ps.ps_ProjInfo == NULL) |
233 | return slot; |
234 | |
235 | /* |
236 | * Form the result tuple using ExecProject(), and return it. |
237 | */ |
238 | econtext->ecxt_outertuple = slot; |
239 | return ExecProject(node->ps.ps_ProjInfo); |
240 | } |
241 | |
242 | /* ---------------------------------------------------------------- |
243 | * ExecEndGather |
244 | * |
245 | * frees any storage allocated through C routines. |
246 | * ---------------------------------------------------------------- |
247 | */ |
248 | void |
249 | ExecEndGather(GatherState *node) |
250 | { |
251 | ExecEndNode(outerPlanState(node)); /* let children clean up first */ |
252 | ExecShutdownGather(node); |
253 | ExecFreeExprContext(&node->ps); |
254 | if (node->ps.ps_ResultTupleSlot) |
255 | ExecClearTuple(node->ps.ps_ResultTupleSlot); |
256 | } |
257 | |
258 | /* |
259 | * Read the next tuple. We might fetch a tuple from one of the tuple queues |
260 | * using gather_readnext, or if no tuple queue contains a tuple and the |
261 | * single_copy flag is not set, we might generate one locally instead. |
262 | */ |
263 | static TupleTableSlot * |
264 | gather_getnext(GatherState *gatherstate) |
265 | { |
266 | PlanState *outerPlan = outerPlanState(gatherstate); |
267 | TupleTableSlot *outerTupleSlot; |
268 | TupleTableSlot *fslot = gatherstate->funnel_slot; |
269 | HeapTuple tup; |
270 | |
271 | while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) |
272 | { |
273 | CHECK_FOR_INTERRUPTS(); |
274 | |
275 | if (gatherstate->nreaders > 0) |
276 | { |
277 | tup = gather_readnext(gatherstate); |
278 | |
279 | if (HeapTupleIsValid(tup)) |
280 | { |
281 | ExecStoreHeapTuple(tup, /* tuple to store */ |
282 | fslot, /* slot to store the tuple */ |
283 | true); /* pfree tuple when done with it */ |
284 | return fslot; |
285 | } |
286 | } |
287 | |
288 | if (gatherstate->need_to_scan_locally) |
289 | { |
290 | EState *estate = gatherstate->ps.state; |
291 | |
292 | /* Install our DSA area while executing the plan. */ |
293 | estate->es_query_dsa = |
294 | gatherstate->pei ? gatherstate->pei->area : NULL; |
295 | outerTupleSlot = ExecProcNode(outerPlan); |
296 | estate->es_query_dsa = NULL; |
297 | |
298 | if (!TupIsNull(outerTupleSlot)) |
299 | return outerTupleSlot; |
300 | |
301 | gatherstate->need_to_scan_locally = false; |
302 | } |
303 | } |
304 | |
305 | return ExecClearTuple(fslot); |
306 | } |
307 | |
308 | /* |
309 | * Attempt to read a tuple from one of our parallel workers. |
310 | */ |
311 | static HeapTuple |
312 | gather_readnext(GatherState *gatherstate) |
313 | { |
314 | int nvisited = 0; |
315 | |
316 | for (;;) |
317 | { |
318 | TupleQueueReader *reader; |
319 | HeapTuple tup; |
320 | bool readerdone; |
321 | |
322 | /* Check for async events, particularly messages from workers. */ |
323 | CHECK_FOR_INTERRUPTS(); |
324 | |
325 | /* |
326 | * Attempt to read a tuple, but don't block if none is available. |
327 | * |
328 | * Note that TupleQueueReaderNext will just return NULL for a worker |
329 | * which fails to initialize. We'll treat that worker as having |
330 | * produced no tuples; WaitForParallelWorkersToFinish will error out |
331 | * when we get there. |
332 | */ |
333 | Assert(gatherstate->nextreader < gatherstate->nreaders); |
334 | reader = gatherstate->reader[gatherstate->nextreader]; |
335 | tup = TupleQueueReaderNext(reader, true, &readerdone); |
336 | |
337 | /* |
338 | * If this reader is done, remove it from our working array of active |
339 | * readers. If all readers are done, we're outta here. |
340 | */ |
341 | if (readerdone) |
342 | { |
343 | Assert(!tup); |
344 | --gatherstate->nreaders; |
345 | if (gatherstate->nreaders == 0) |
346 | { |
347 | ExecShutdownGatherWorkers(gatherstate); |
348 | return NULL; |
349 | } |
350 | memmove(&gatherstate->reader[gatherstate->nextreader], |
351 | &gatherstate->reader[gatherstate->nextreader + 1], |
352 | sizeof(TupleQueueReader *) |
353 | * (gatherstate->nreaders - gatherstate->nextreader)); |
354 | if (gatherstate->nextreader >= gatherstate->nreaders) |
355 | gatherstate->nextreader = 0; |
356 | continue; |
357 | } |
358 | |
359 | /* If we got a tuple, return it. */ |
360 | if (tup) |
361 | return tup; |
362 | |
363 | /* |
364 | * Advance nextreader pointer in round-robin fashion. Note that we |
365 | * only reach this code if we weren't able to get a tuple from the |
366 | * current worker. We used to advance the nextreader pointer after |
367 | * every tuple, but it turns out to be much more efficient to keep |
368 | * reading from the same queue until that would require blocking. |
369 | */ |
370 | gatherstate->nextreader++; |
371 | if (gatherstate->nextreader >= gatherstate->nreaders) |
372 | gatherstate->nextreader = 0; |
373 | |
374 | /* Have we visited every (surviving) TupleQueueReader? */ |
375 | nvisited++; |
376 | if (nvisited >= gatherstate->nreaders) |
377 | { |
378 | /* |
379 | * If (still) running plan locally, return NULL so caller can |
380 | * generate another tuple from the local copy of the plan. |
381 | */ |
382 | if (gatherstate->need_to_scan_locally) |
383 | return NULL; |
384 | |
385 | /* Nothing to do except wait for developments. */ |
386 | (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
387 | WAIT_EVENT_EXECUTE_GATHER); |
388 | ResetLatch(MyLatch); |
389 | nvisited = 0; |
390 | } |
391 | } |
392 | } |
393 | |
394 | /* ---------------------------------------------------------------- |
395 | * ExecShutdownGatherWorkers |
396 | * |
397 | * Stop all the parallel workers. |
398 | * ---------------------------------------------------------------- |
399 | */ |
400 | static void |
401 | ExecShutdownGatherWorkers(GatherState *node) |
402 | { |
403 | if (node->pei != NULL) |
404 | ExecParallelFinish(node->pei); |
405 | |
406 | /* Flush local copy of reader array */ |
407 | if (node->reader) |
408 | pfree(node->reader); |
409 | node->reader = NULL; |
410 | } |
411 | |
412 | /* ---------------------------------------------------------------- |
413 | * ExecShutdownGather |
414 | * |
415 | * Destroy the setup for parallel workers including parallel context. |
416 | * ---------------------------------------------------------------- |
417 | */ |
418 | void |
419 | ExecShutdownGather(GatherState *node) |
420 | { |
421 | ExecShutdownGatherWorkers(node); |
422 | |
423 | /* Now destroy the parallel context. */ |
424 | if (node->pei != NULL) |
425 | { |
426 | ExecParallelCleanup(node->pei); |
427 | node->pei = NULL; |
428 | } |
429 | } |
430 | |
431 | /* ---------------------------------------------------------------- |
432 | * Join Support |
433 | * ---------------------------------------------------------------- |
434 | */ |
435 | |
436 | /* ---------------------------------------------------------------- |
437 | * ExecReScanGather |
438 | * |
439 | * Prepare to re-scan the result of a Gather. |
440 | * ---------------------------------------------------------------- |
441 | */ |
442 | void |
443 | ExecReScanGather(GatherState *node) |
444 | { |
445 | Gather *gather = (Gather *) node->ps.plan; |
446 | PlanState *outerPlan = outerPlanState(node); |
447 | |
448 | /* Make sure any existing workers are gracefully shut down */ |
449 | ExecShutdownGatherWorkers(node); |
450 | |
451 | /* Mark node so that shared state will be rebuilt at next call */ |
452 | node->initialized = false; |
453 | |
454 | /* |
455 | * Set child node's chgParam to tell it that the next scan might deliver a |
456 | * different set of rows within the leader process. (The overall rowset |
457 | * shouldn't change, but the leader process's subset might; hence nodes |
458 | * between here and the parallel table scan node mustn't optimize on the |
459 | * assumption of an unchanging rowset.) |
460 | */ |
461 | if (gather->rescan_param >= 0) |
462 | outerPlan->chgParam = bms_add_member(outerPlan->chgParam, |
463 | gather->rescan_param); |
464 | |
465 | /* |
466 | * If chgParam of subnode is not null then plan will be re-scanned by |
467 | * first ExecProcNode. Note: because this does nothing if we have a |
468 | * rescan_param, it's currently guaranteed that parallel-aware child nodes |
469 | * will not see a ReScan call until after they get a ReInitializeDSM call. |
470 | * That ordering might not be something to rely on, though. A good rule |
471 | * of thumb is that ReInitializeDSM should reset only shared state, ReScan |
472 | * should reset only local state, and anything that depends on both of |
473 | * those steps being finished must wait until the first ExecProcNode call. |
474 | */ |
475 | if (outerPlan->chgParam == NULL) |
476 | ExecReScan(outerPlan); |
477 | } |
478 | |