1/*-------------------------------------------------------------------------
2 *
3 * nodeMergeAppend.c
4 * routines to handle MergeAppend nodes.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeMergeAppend.c
12 *
13 *-------------------------------------------------------------------------
14 */
15/* INTERFACE ROUTINES
16 * ExecInitMergeAppend - initialize the MergeAppend node
17 * ExecMergeAppend - retrieve the next tuple from the node
18 * ExecEndMergeAppend - shut down the MergeAppend node
19 * ExecReScanMergeAppend - rescan the MergeAppend node
20 *
21 * NOTES
22 * A MergeAppend node contains a list of one or more subplans.
23 * These are each expected to deliver tuples that are sorted according
24 * to a common sort key. The MergeAppend node merges these streams
25 * to produce output sorted the same way.
26 *
27 * MergeAppend nodes don't make use of their left and right
28 * subtrees, rather they maintain a list of subplans so
29 * a typical MergeAppend node looks like this in the plan tree:
30 *
31 * ...
32 * /
33 * MergeAppend---+------+------+--- nil
34 * / \ | | |
35 * nil nil ... ... ...
36 * subplans
37 */
38
39#include "postgres.h"
40
41#include "executor/execdebug.h"
42#include "executor/execPartition.h"
43#include "executor/nodeMergeAppend.h"
44#include "lib/binaryheap.h"
45#include "miscadmin.h"
46
47/*
48 * We have one slot for each item in the heap array. We use SlotNumber
49 * to store slot indexes. This doesn't actually provide any formal
50 * type-safety, but it makes the code more self-documenting.
51 */
52typedef int32 SlotNumber;
53
54static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55static int heap_compare_slots(Datum a, Datum b, void *arg);
56
57
58/* ----------------------------------------------------------------
59 * ExecInitMergeAppend
60 *
61 * Begin all of the subscans of the MergeAppend node.
62 * ----------------------------------------------------------------
63 */
64MergeAppendState *
65ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66{
67 MergeAppendState *mergestate = makeNode(MergeAppendState);
68 PlanState **mergeplanstates;
69 Bitmapset *validsubplans;
70 int nplans;
71 int i,
72 j;
73 ListCell *lc;
74
75 /* check for unsupported flags */
76 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
77
78 /*
79 * create new MergeAppendState for our node
80 */
81 mergestate->ps.plan = (Plan *) node;
82 mergestate->ps.state = estate;
83 mergestate->ps.ExecProcNode = ExecMergeAppend;
84 mergestate->ms_noopscan = false;
85
86 /* If run-time partition pruning is enabled, then set that up now */
87 if (node->part_prune_info != NULL)
88 {
89 PartitionPruneState *prunestate;
90
91 /* We may need an expression context to evaluate partition exprs */
92 ExecAssignExprContext(estate, &mergestate->ps);
93
94 prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
95 node->part_prune_info);
96 mergestate->ms_prune_state = prunestate;
97
98 /* Perform an initial partition prune, if required. */
99 if (prunestate->do_initial_prune)
100 {
101 /* Determine which subplans survive initial pruning */
102 validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
103 list_length(node->mergeplans));
104
105 /*
106 * The case where no subplans survive pruning must be handled
107 * specially. The problem here is that code in explain.c requires
108 * a MergeAppend to have at least one subplan in order for it to
109 * properly determine the Vars in that subplan's targetlist. We
110 * sidestep this issue by just initializing the first subplan and
111 * setting ms_noopscan to true to indicate that we don't really
112 * need to scan any subnodes.
113 */
114 if (bms_is_empty(validsubplans))
115 {
116 mergestate->ms_noopscan = true;
117
118 /* Mark the first as valid so that it's initialized below */
119 validsubplans = bms_make_singleton(0);
120 }
121
122 nplans = bms_num_members(validsubplans);
123 }
124 else
125 {
126 /* We'll need to initialize all subplans */
127 nplans = list_length(node->mergeplans);
128 Assert(nplans > 0);
129 validsubplans = bms_add_range(NULL, 0, nplans - 1);
130 }
131
132 /*
133 * If no runtime pruning is required, we can fill ms_valid_subplans
134 * immediately, preventing later calls to ExecFindMatchingSubPlans.
135 */
136 if (!prunestate->do_exec_prune)
137 {
138 Assert(nplans > 0);
139 mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
140 }
141 }
142 else
143 {
144 nplans = list_length(node->mergeplans);
145
146 /*
147 * When run-time partition pruning is not enabled we can just mark all
148 * subplans as valid; they must also all be initialized.
149 */
150 Assert(nplans > 0);
151 mergestate->ms_valid_subplans = validsubplans =
152 bms_add_range(NULL, 0, nplans - 1);
153 mergestate->ms_prune_state = NULL;
154 }
155
156 mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
157 mergestate->mergeplans = mergeplanstates;
158 mergestate->ms_nplans = nplans;
159
160 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
161 mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
162 mergestate);
163
164 /*
165 * Miscellaneous initialization
166 *
167 * MergeAppend nodes do have Result slots, which hold pointers to tuples,
168 * so we have to initialize them. FIXME
169 */
170 ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
171
172 /* node returns slots from each of its subnodes, therefore not fixed */
173 mergestate->ps.resultopsset = true;
174 mergestate->ps.resultopsfixed = false;
175
176 /*
177 * call ExecInitNode on each of the valid plans to be executed and save
178 * the results into the mergeplanstates array.
179 */
180 j = i = 0;
181 foreach(lc, node->mergeplans)
182 {
183 if (bms_is_member(i, validsubplans))
184 {
185 Plan *initNode = (Plan *) lfirst(lc);
186
187 mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
188 }
189 i++;
190 }
191
192 mergestate->ps.ps_ProjInfo = NULL;
193
194 /*
195 * initialize sort-key information
196 */
197 mergestate->ms_nkeys = node->numCols;
198 mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
199
200 for (i = 0; i < node->numCols; i++)
201 {
202 SortSupport sortKey = mergestate->ms_sortkeys + i;
203
204 sortKey->ssup_cxt = CurrentMemoryContext;
205 sortKey->ssup_collation = node->collations[i];
206 sortKey->ssup_nulls_first = node->nullsFirst[i];
207 sortKey->ssup_attno = node->sortColIdx[i];
208
209 /*
210 * It isn't feasible to perform abbreviated key conversion, since
211 * tuples are pulled into mergestate's binary heap as needed. It
212 * would likely be counter-productive to convert tuples into an
213 * abbreviated representation as they're pulled up, so opt out of that
214 * additional optimization entirely.
215 */
216 sortKey->abbreviate = false;
217
218 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
219 }
220
221 /*
222 * initialize to show we have not run the subplans yet
223 */
224 mergestate->ms_initialized = false;
225
226 return mergestate;
227}
228
229/* ----------------------------------------------------------------
230 * ExecMergeAppend
231 *
232 * Handles iteration over multiple subplans.
233 * ----------------------------------------------------------------
234 */
235static TupleTableSlot *
236ExecMergeAppend(PlanState *pstate)
237{
238 MergeAppendState *node = castNode(MergeAppendState, pstate);
239 TupleTableSlot *result;
240 SlotNumber i;
241
242 CHECK_FOR_INTERRUPTS();
243
244 if (!node->ms_initialized)
245 {
246 /* Nothing to do if all subplans were pruned */
247 if (node->ms_noopscan)
248 return ExecClearTuple(node->ps.ps_ResultTupleSlot);
249
250 /*
251 * If we've yet to determine the valid subplans then do so now. If
252 * run-time pruning is disabled then the valid subplans will always be
253 * set to all subplans.
254 */
255 if (node->ms_valid_subplans == NULL)
256 node->ms_valid_subplans =
257 ExecFindMatchingSubPlans(node->ms_prune_state);
258
259 /*
260 * First time through: pull the first tuple from each valid subplan,
261 * and set up the heap.
262 */
263 i = -1;
264 while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
265 {
266 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
267 if (!TupIsNull(node->ms_slots[i]))
268 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
269 }
270 binaryheap_build(node->ms_heap);
271 node->ms_initialized = true;
272 }
273 else
274 {
275 /*
276 * Otherwise, pull the next tuple from whichever subplan we returned
277 * from last time, and reinsert the subplan index into the heap,
278 * because it might now compare differently against the existing
279 * elements of the heap. (We could perhaps simplify the logic a bit
280 * by doing this before returning from the prior call, but it's better
281 * to not pull tuples until necessary.)
282 */
283 i = DatumGetInt32(binaryheap_first(node->ms_heap));
284 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
285 if (!TupIsNull(node->ms_slots[i]))
286 binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
287 else
288 (void) binaryheap_remove_first(node->ms_heap);
289 }
290
291 if (binaryheap_empty(node->ms_heap))
292 {
293 /* All the subplans are exhausted, and so is the heap */
294 result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
295 }
296 else
297 {
298 i = DatumGetInt32(binaryheap_first(node->ms_heap));
299 result = node->ms_slots[i];
300 }
301
302 return result;
303}
304
305/*
306 * Compare the tuples in the two given slots.
307 */
308static int32
309heap_compare_slots(Datum a, Datum b, void *arg)
310{
311 MergeAppendState *node = (MergeAppendState *) arg;
312 SlotNumber slot1 = DatumGetInt32(a);
313 SlotNumber slot2 = DatumGetInt32(b);
314
315 TupleTableSlot *s1 = node->ms_slots[slot1];
316 TupleTableSlot *s2 = node->ms_slots[slot2];
317 int nkey;
318
319 Assert(!TupIsNull(s1));
320 Assert(!TupIsNull(s2));
321
322 for (nkey = 0; nkey < node->ms_nkeys; nkey++)
323 {
324 SortSupport sortKey = node->ms_sortkeys + nkey;
325 AttrNumber attno = sortKey->ssup_attno;
326 Datum datum1,
327 datum2;
328 bool isNull1,
329 isNull2;
330 int compare;
331
332 datum1 = slot_getattr(s1, attno, &isNull1);
333 datum2 = slot_getattr(s2, attno, &isNull2);
334
335 compare = ApplySortComparator(datum1, isNull1,
336 datum2, isNull2,
337 sortKey);
338 if (compare != 0)
339 {
340 INVERT_COMPARE_RESULT(compare);
341 return compare;
342 }
343 }
344 return 0;
345}
346
347/* ----------------------------------------------------------------
348 * ExecEndMergeAppend
349 *
350 * Shuts down the subscans of the MergeAppend node.
351 *
352 * Returns nothing of interest.
353 * ----------------------------------------------------------------
354 */
355void
356ExecEndMergeAppend(MergeAppendState *node)
357{
358 PlanState **mergeplans;
359 int nplans;
360 int i;
361
362 /*
363 * get information from the node
364 */
365 mergeplans = node->mergeplans;
366 nplans = node->ms_nplans;
367
368 /*
369 * shut down each of the subscans
370 */
371 for (i = 0; i < nplans; i++)
372 ExecEndNode(mergeplans[i]);
373}
374
375void
376ExecReScanMergeAppend(MergeAppendState *node)
377{
378 int i;
379
380 /*
381 * If any PARAM_EXEC Params used in pruning expressions have changed, then
382 * we'd better unset the valid subplans so that they are reselected for
383 * the new parameter values.
384 */
385 if (node->ms_prune_state &&
386 bms_overlap(node->ps.chgParam,
387 node->ms_prune_state->execparamids))
388 {
389 bms_free(node->ms_valid_subplans);
390 node->ms_valid_subplans = NULL;
391 }
392
393 for (i = 0; i < node->ms_nplans; i++)
394 {
395 PlanState *subnode = node->mergeplans[i];
396
397 /*
398 * ExecReScan doesn't know about my subplans, so I have to do
399 * changed-parameter signaling myself.
400 */
401 if (node->ps.chgParam != NULL)
402 UpdateChangedParamSet(subnode, node->ps.chgParam);
403
404 /*
405 * If chgParam of subnode is not null then plan will be re-scanned by
406 * first ExecProcNode.
407 */
408 if (subnode->chgParam == NULL)
409 ExecReScan(subnode);
410 }
411 binaryheap_reset(node->ms_heap);
412 node->ms_initialized = false;
413}
414