1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * nodeMergeAppend.c |
4 | * routines to handle MergeAppend nodes. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/executor/nodeMergeAppend.c |
12 | * |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | /* INTERFACE ROUTINES |
16 | * ExecInitMergeAppend - initialize the MergeAppend node |
17 | * ExecMergeAppend - retrieve the next tuple from the node |
18 | * ExecEndMergeAppend - shut down the MergeAppend node |
19 | * ExecReScanMergeAppend - rescan the MergeAppend node |
20 | * |
21 | * NOTES |
22 | * A MergeAppend node contains a list of one or more subplans. |
23 | * These are each expected to deliver tuples that are sorted according |
24 | * to a common sort key. The MergeAppend node merges these streams |
25 | * to produce output sorted the same way. |
26 | * |
27 | * MergeAppend nodes don't make use of their left and right |
28 | * subtrees, rather they maintain a list of subplans so |
29 | * a typical MergeAppend node looks like this in the plan tree: |
30 | * |
31 | * ... |
32 | * / |
33 | * MergeAppend---+------+------+--- nil |
34 | * / \ | | | |
35 | * nil nil ... ... ... |
36 | * subplans |
37 | */ |
38 | |
39 | #include "postgres.h" |
40 | |
41 | #include "executor/execdebug.h" |
42 | #include "executor/execPartition.h" |
43 | #include "executor/nodeMergeAppend.h" |
44 | #include "lib/binaryheap.h" |
45 | #include "miscadmin.h" |
46 | |
47 | /* |
48 | * We have one slot for each item in the heap array. We use SlotNumber |
49 | * to store slot indexes. This doesn't actually provide any formal |
50 | * type-safety, but it makes the code more self-documenting. |
51 | */ |
52 | typedef int32 SlotNumber; |
53 | |
54 | static TupleTableSlot *ExecMergeAppend(PlanState *pstate); |
55 | static int heap_compare_slots(Datum a, Datum b, void *arg); |
56 | |
57 | |
58 | /* ---------------------------------------------------------------- |
59 | * ExecInitMergeAppend |
60 | * |
61 | * Begin all of the subscans of the MergeAppend node. |
62 | * ---------------------------------------------------------------- |
63 | */ |
64 | MergeAppendState * |
65 | ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) |
66 | { |
67 | MergeAppendState *mergestate = makeNode(MergeAppendState); |
68 | PlanState **mergeplanstates; |
69 | Bitmapset *validsubplans; |
70 | int nplans; |
71 | int i, |
72 | j; |
73 | ListCell *lc; |
74 | |
75 | /* check for unsupported flags */ |
76 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
77 | |
78 | /* |
79 | * create new MergeAppendState for our node |
80 | */ |
81 | mergestate->ps.plan = (Plan *) node; |
82 | mergestate->ps.state = estate; |
83 | mergestate->ps.ExecProcNode = ExecMergeAppend; |
84 | mergestate->ms_noopscan = false; |
85 | |
86 | /* If run-time partition pruning is enabled, then set that up now */ |
87 | if (node->part_prune_info != NULL) |
88 | { |
89 | PartitionPruneState *prunestate; |
90 | |
91 | /* We may need an expression context to evaluate partition exprs */ |
92 | ExecAssignExprContext(estate, &mergestate->ps); |
93 | |
94 | prunestate = ExecCreatePartitionPruneState(&mergestate->ps, |
95 | node->part_prune_info); |
96 | mergestate->ms_prune_state = prunestate; |
97 | |
98 | /* Perform an initial partition prune, if required. */ |
99 | if (prunestate->do_initial_prune) |
100 | { |
101 | /* Determine which subplans survive initial pruning */ |
102 | validsubplans = ExecFindInitialMatchingSubPlans(prunestate, |
103 | list_length(node->mergeplans)); |
104 | |
105 | /* |
106 | * The case where no subplans survive pruning must be handled |
107 | * specially. The problem here is that code in explain.c requires |
108 | * a MergeAppend to have at least one subplan in order for it to |
109 | * properly determine the Vars in that subplan's targetlist. We |
110 | * sidestep this issue by just initializing the first subplan and |
111 | * setting ms_noopscan to true to indicate that we don't really |
112 | * need to scan any subnodes. |
113 | */ |
114 | if (bms_is_empty(validsubplans)) |
115 | { |
116 | mergestate->ms_noopscan = true; |
117 | |
118 | /* Mark the first as valid so that it's initialized below */ |
119 | validsubplans = bms_make_singleton(0); |
120 | } |
121 | |
122 | nplans = bms_num_members(validsubplans); |
123 | } |
124 | else |
125 | { |
126 | /* We'll need to initialize all subplans */ |
127 | nplans = list_length(node->mergeplans); |
128 | Assert(nplans > 0); |
129 | validsubplans = bms_add_range(NULL, 0, nplans - 1); |
130 | } |
131 | |
132 | /* |
133 | * If no runtime pruning is required, we can fill ms_valid_subplans |
134 | * immediately, preventing later calls to ExecFindMatchingSubPlans. |
135 | */ |
136 | if (!prunestate->do_exec_prune) |
137 | { |
138 | Assert(nplans > 0); |
139 | mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1); |
140 | } |
141 | } |
142 | else |
143 | { |
144 | nplans = list_length(node->mergeplans); |
145 | |
146 | /* |
147 | * When run-time partition pruning is not enabled we can just mark all |
148 | * subplans as valid; they must also all be initialized. |
149 | */ |
150 | Assert(nplans > 0); |
151 | mergestate->ms_valid_subplans = validsubplans = |
152 | bms_add_range(NULL, 0, nplans - 1); |
153 | mergestate->ms_prune_state = NULL; |
154 | } |
155 | |
156 | mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *)); |
157 | mergestate->mergeplans = mergeplanstates; |
158 | mergestate->ms_nplans = nplans; |
159 | |
160 | mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); |
161 | mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, |
162 | mergestate); |
163 | |
164 | /* |
165 | * Miscellaneous initialization |
166 | * |
167 | * MergeAppend nodes do have Result slots, which hold pointers to tuples, |
168 | * so we have to initialize them. FIXME |
169 | */ |
170 | ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual); |
171 | |
172 | /* node returns slots from each of its subnodes, therefore not fixed */ |
173 | mergestate->ps.resultopsset = true; |
174 | mergestate->ps.resultopsfixed = false; |
175 | |
176 | /* |
177 | * call ExecInitNode on each of the valid plans to be executed and save |
178 | * the results into the mergeplanstates array. |
179 | */ |
180 | j = i = 0; |
181 | foreach(lc, node->mergeplans) |
182 | { |
183 | if (bms_is_member(i, validsubplans)) |
184 | { |
185 | Plan *initNode = (Plan *) lfirst(lc); |
186 | |
187 | mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags); |
188 | } |
189 | i++; |
190 | } |
191 | |
192 | mergestate->ps.ps_ProjInfo = NULL; |
193 | |
194 | /* |
195 | * initialize sort-key information |
196 | */ |
197 | mergestate->ms_nkeys = node->numCols; |
198 | mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols); |
199 | |
200 | for (i = 0; i < node->numCols; i++) |
201 | { |
202 | SortSupport sortKey = mergestate->ms_sortkeys + i; |
203 | |
204 | sortKey->ssup_cxt = CurrentMemoryContext; |
205 | sortKey->ssup_collation = node->collations[i]; |
206 | sortKey->ssup_nulls_first = node->nullsFirst[i]; |
207 | sortKey->ssup_attno = node->sortColIdx[i]; |
208 | |
209 | /* |
210 | * It isn't feasible to perform abbreviated key conversion, since |
211 | * tuples are pulled into mergestate's binary heap as needed. It |
212 | * would likely be counter-productive to convert tuples into an |
213 | * abbreviated representation as they're pulled up, so opt out of that |
214 | * additional optimization entirely. |
215 | */ |
216 | sortKey->abbreviate = false; |
217 | |
218 | PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); |
219 | } |
220 | |
221 | /* |
222 | * initialize to show we have not run the subplans yet |
223 | */ |
224 | mergestate->ms_initialized = false; |
225 | |
226 | return mergestate; |
227 | } |
228 | |
229 | /* ---------------------------------------------------------------- |
230 | * ExecMergeAppend |
231 | * |
232 | * Handles iteration over multiple subplans. |
233 | * ---------------------------------------------------------------- |
234 | */ |
235 | static TupleTableSlot * |
236 | ExecMergeAppend(PlanState *pstate) |
237 | { |
238 | MergeAppendState *node = castNode(MergeAppendState, pstate); |
239 | TupleTableSlot *result; |
240 | SlotNumber i; |
241 | |
242 | CHECK_FOR_INTERRUPTS(); |
243 | |
244 | if (!node->ms_initialized) |
245 | { |
246 | /* Nothing to do if all subplans were pruned */ |
247 | if (node->ms_noopscan) |
248 | return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
249 | |
250 | /* |
251 | * If we've yet to determine the valid subplans then do so now. If |
252 | * run-time pruning is disabled then the valid subplans will always be |
253 | * set to all subplans. |
254 | */ |
255 | if (node->ms_valid_subplans == NULL) |
256 | node->ms_valid_subplans = |
257 | ExecFindMatchingSubPlans(node->ms_prune_state); |
258 | |
259 | /* |
260 | * First time through: pull the first tuple from each valid subplan, |
261 | * and set up the heap. |
262 | */ |
263 | i = -1; |
264 | while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0) |
265 | { |
266 | node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); |
267 | if (!TupIsNull(node->ms_slots[i])) |
268 | binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); |
269 | } |
270 | binaryheap_build(node->ms_heap); |
271 | node->ms_initialized = true; |
272 | } |
273 | else |
274 | { |
275 | /* |
276 | * Otherwise, pull the next tuple from whichever subplan we returned |
277 | * from last time, and reinsert the subplan index into the heap, |
278 | * because it might now compare differently against the existing |
279 | * elements of the heap. (We could perhaps simplify the logic a bit |
280 | * by doing this before returning from the prior call, but it's better |
281 | * to not pull tuples until necessary.) |
282 | */ |
283 | i = DatumGetInt32(binaryheap_first(node->ms_heap)); |
284 | node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); |
285 | if (!TupIsNull(node->ms_slots[i])) |
286 | binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); |
287 | else |
288 | (void) binaryheap_remove_first(node->ms_heap); |
289 | } |
290 | |
291 | if (binaryheap_empty(node->ms_heap)) |
292 | { |
293 | /* All the subplans are exhausted, and so is the heap */ |
294 | result = ExecClearTuple(node->ps.ps_ResultTupleSlot); |
295 | } |
296 | else |
297 | { |
298 | i = DatumGetInt32(binaryheap_first(node->ms_heap)); |
299 | result = node->ms_slots[i]; |
300 | } |
301 | |
302 | return result; |
303 | } |
304 | |
305 | /* |
306 | * Compare the tuples in the two given slots. |
307 | */ |
308 | static int32 |
309 | heap_compare_slots(Datum a, Datum b, void *arg) |
310 | { |
311 | MergeAppendState *node = (MergeAppendState *) arg; |
312 | SlotNumber slot1 = DatumGetInt32(a); |
313 | SlotNumber slot2 = DatumGetInt32(b); |
314 | |
315 | TupleTableSlot *s1 = node->ms_slots[slot1]; |
316 | TupleTableSlot *s2 = node->ms_slots[slot2]; |
317 | int nkey; |
318 | |
319 | Assert(!TupIsNull(s1)); |
320 | Assert(!TupIsNull(s2)); |
321 | |
322 | for (nkey = 0; nkey < node->ms_nkeys; nkey++) |
323 | { |
324 | SortSupport sortKey = node->ms_sortkeys + nkey; |
325 | AttrNumber attno = sortKey->ssup_attno; |
326 | Datum datum1, |
327 | datum2; |
328 | bool isNull1, |
329 | isNull2; |
330 | int compare; |
331 | |
332 | datum1 = slot_getattr(s1, attno, &isNull1); |
333 | datum2 = slot_getattr(s2, attno, &isNull2); |
334 | |
335 | compare = ApplySortComparator(datum1, isNull1, |
336 | datum2, isNull2, |
337 | sortKey); |
338 | if (compare != 0) |
339 | { |
340 | INVERT_COMPARE_RESULT(compare); |
341 | return compare; |
342 | } |
343 | } |
344 | return 0; |
345 | } |
346 | |
347 | /* ---------------------------------------------------------------- |
348 | * ExecEndMergeAppend |
349 | * |
350 | * Shuts down the subscans of the MergeAppend node. |
351 | * |
352 | * Returns nothing of interest. |
353 | * ---------------------------------------------------------------- |
354 | */ |
355 | void |
356 | ExecEndMergeAppend(MergeAppendState *node) |
357 | { |
358 | PlanState **mergeplans; |
359 | int nplans; |
360 | int i; |
361 | |
362 | /* |
363 | * get information from the node |
364 | */ |
365 | mergeplans = node->mergeplans; |
366 | nplans = node->ms_nplans; |
367 | |
368 | /* |
369 | * shut down each of the subscans |
370 | */ |
371 | for (i = 0; i < nplans; i++) |
372 | ExecEndNode(mergeplans[i]); |
373 | } |
374 | |
375 | void |
376 | ExecReScanMergeAppend(MergeAppendState *node) |
377 | { |
378 | int i; |
379 | |
380 | /* |
381 | * If any PARAM_EXEC Params used in pruning expressions have changed, then |
382 | * we'd better unset the valid subplans so that they are reselected for |
383 | * the new parameter values. |
384 | */ |
385 | if (node->ms_prune_state && |
386 | bms_overlap(node->ps.chgParam, |
387 | node->ms_prune_state->execparamids)) |
388 | { |
389 | bms_free(node->ms_valid_subplans); |
390 | node->ms_valid_subplans = NULL; |
391 | } |
392 | |
393 | for (i = 0; i < node->ms_nplans; i++) |
394 | { |
395 | PlanState *subnode = node->mergeplans[i]; |
396 | |
397 | /* |
398 | * ExecReScan doesn't know about my subplans, so I have to do |
399 | * changed-parameter signaling myself. |
400 | */ |
401 | if (node->ps.chgParam != NULL) |
402 | UpdateChangedParamSet(subnode, node->ps.chgParam); |
403 | |
404 | /* |
405 | * If chgParam of subnode is not null then plan will be re-scanned by |
406 | * first ExecProcNode. |
407 | */ |
408 | if (subnode->chgParam == NULL) |
409 | ExecReScan(subnode); |
410 | } |
411 | binaryheap_reset(node->ms_heap); |
412 | node->ms_initialized = false; |
413 | } |
414 | |