1/*-------------------------------------------------------------------------
2 *
3 * nodeRecursiveunion.c
4 * routines to handle RecursiveUnion nodes.
5 *
6 * To implement UNION (without ALL), we need a hashtable that stores tuples
7 * already seen. The hash key is computed from the grouping columns.
8 *
9 *
10 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 *
14 * IDENTIFICATION
15 * src/backend/executor/nodeRecursiveunion.c
16 *
17 *-------------------------------------------------------------------------
18 */
19#include "postgres.h"
20
21#include "executor/execdebug.h"
22#include "executor/nodeRecursiveunion.h"
23#include "miscadmin.h"
24#include "utils/memutils.h"
25
26
27
28/*
29 * Initialize the hash table to empty.
30 */
31static void
32build_hash_table(RecursiveUnionState *rustate)
33{
34 RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
35 TupleDesc desc = ExecGetResultType(outerPlanState(rustate));
36
37 Assert(node->numCols > 0);
38 Assert(node->numGroups > 0);
39
40 rustate->hashtable = BuildTupleHashTableExt(&rustate->ps,
41 desc,
42 node->numCols,
43 node->dupColIdx,
44 rustate->eqfuncoids,
45 rustate->hashfunctions,
46 node->dupCollations,
47 node->numGroups,
48 0,
49 rustate->ps.state->es_query_cxt,
50 rustate->tableContext,
51 rustate->tempContext,
52 false);
53}
54
55
56/* ----------------------------------------------------------------
57 * ExecRecursiveUnion(node)
58 *
59 * Scans the recursive query sequentially and returns the next
60 * qualifying tuple.
61 *
62 * 1. evaluate non recursive term and assign the result to RT
63 *
64 * 2. execute recursive terms
65 *
66 * 2.1 WT := RT
67 * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT
68 * 2.3 replace the name of recursive term with WT
69 * 2.4 evaluate the recursive term and store into WT
70 * 2.5 append WT to RT
71 * 2.6 go back to 2.2
72 * ----------------------------------------------------------------
73 */
74static TupleTableSlot *
75ExecRecursiveUnion(PlanState *pstate)
76{
77 RecursiveUnionState *node = castNode(RecursiveUnionState, pstate);
78 PlanState *outerPlan = outerPlanState(node);
79 PlanState *innerPlan = innerPlanState(node);
80 RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
81 TupleTableSlot *slot;
82 bool isnew;
83
84 CHECK_FOR_INTERRUPTS();
85
86 /* 1. Evaluate non-recursive term */
87 if (!node->recursing)
88 {
89 for (;;)
90 {
91 slot = ExecProcNode(outerPlan);
92 if (TupIsNull(slot))
93 break;
94 if (plan->numCols > 0)
95 {
96 /* Find or build hashtable entry for this tuple's group */
97 LookupTupleHashEntry(node->hashtable, slot, &isnew);
98 /* Must reset temp context after each hashtable lookup */
99 MemoryContextReset(node->tempContext);
100 /* Ignore tuple if already seen */
101 if (!isnew)
102 continue;
103 }
104 /* Each non-duplicate tuple goes to the working table ... */
105 tuplestore_puttupleslot(node->working_table, slot);
106 /* ... and to the caller */
107 return slot;
108 }
109 node->recursing = true;
110 }
111
112 /* 2. Execute recursive term */
113 for (;;)
114 {
115 slot = ExecProcNode(innerPlan);
116 if (TupIsNull(slot))
117 {
118 /* Done if there's nothing in the intermediate table */
119 if (node->intermediate_empty)
120 break;
121
122 /* done with old working table ... */
123 tuplestore_end(node->working_table);
124
125 /* intermediate table becomes working table */
126 node->working_table = node->intermediate_table;
127
128 /* create new empty intermediate table */
129 node->intermediate_table = tuplestore_begin_heap(false, false,
130 work_mem);
131 node->intermediate_empty = true;
132
133 /* reset the recursive term */
134 innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
135 plan->wtParam);
136
137 /* and continue fetching from recursive term */
138 continue;
139 }
140
141 if (plan->numCols > 0)
142 {
143 /* Find or build hashtable entry for this tuple's group */
144 LookupTupleHashEntry(node->hashtable, slot, &isnew);
145 /* Must reset temp context after each hashtable lookup */
146 MemoryContextReset(node->tempContext);
147 /* Ignore tuple if already seen */
148 if (!isnew)
149 continue;
150 }
151
152 /* Else, tuple is good; stash it in intermediate table ... */
153 node->intermediate_empty = false;
154 tuplestore_puttupleslot(node->intermediate_table, slot);
155 /* ... and return it */
156 return slot;
157 }
158
159 return NULL;
160}
161
162/* ----------------------------------------------------------------
163 * ExecInitRecursiveUnionScan
164 * ----------------------------------------------------------------
165 */
166RecursiveUnionState *
167ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
168{
169 RecursiveUnionState *rustate;
170 ParamExecData *prmdata;
171
172 /* check for unsupported flags */
173 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
174
175 /*
176 * create state structure
177 */
178 rustate = makeNode(RecursiveUnionState);
179 rustate->ps.plan = (Plan *) node;
180 rustate->ps.state = estate;
181 rustate->ps.ExecProcNode = ExecRecursiveUnion;
182
183 rustate->eqfuncoids = NULL;
184 rustate->hashfunctions = NULL;
185 rustate->hashtable = NULL;
186 rustate->tempContext = NULL;
187 rustate->tableContext = NULL;
188
189 /* initialize processing state */
190 rustate->recursing = false;
191 rustate->intermediate_empty = true;
192 rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
193 rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
194
195 /*
196 * If hashing, we need a per-tuple memory context for comparisons, and a
197 * longer-lived context to store the hash table. The table can't just be
198 * kept in the per-query context because we want to be able to throw it
199 * away when rescanning.
200 */
201 if (node->numCols > 0)
202 {
203 rustate->tempContext =
204 AllocSetContextCreate(CurrentMemoryContext,
205 "RecursiveUnion",
206 ALLOCSET_DEFAULT_SIZES);
207 rustate->tableContext =
208 AllocSetContextCreate(CurrentMemoryContext,
209 "RecursiveUnion hash table",
210 ALLOCSET_DEFAULT_SIZES);
211 }
212
213 /*
214 * Make the state structure available to descendant WorkTableScan nodes
215 * via the Param slot reserved for it.
216 */
217 prmdata = &(estate->es_param_exec_vals[node->wtParam]);
218 Assert(prmdata->execPlan == NULL);
219 prmdata->value = PointerGetDatum(rustate);
220 prmdata->isnull = false;
221
222 /*
223 * Miscellaneous initialization
224 *
225 * RecursiveUnion plans don't have expression contexts because they never
226 * call ExecQual or ExecProject.
227 */
228 Assert(node->plan.qual == NIL);
229
230 /*
231 * RecursiveUnion nodes still have Result slots, which hold pointers to
232 * tuples, so we have to initialize them.
233 */
234 ExecInitResultTypeTL(&rustate->ps);
235
236 /*
237 * Initialize result tuple type. (Note: we have to set up the result type
238 * before initializing child nodes, because nodeWorktablescan.c expects it
239 * to be valid.)
240 */
241 rustate->ps.ps_ProjInfo = NULL;
242
243 /*
244 * initialize child nodes
245 */
246 outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
247 innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
248
249 /*
250 * If hashing, precompute fmgr lookup data for inner loop, and create the
251 * hash table.
252 */
253 if (node->numCols > 0)
254 {
255 execTuplesHashPrepare(node->numCols,
256 node->dupOperators,
257 &rustate->eqfuncoids,
258 &rustate->hashfunctions);
259 build_hash_table(rustate);
260 }
261
262 return rustate;
263}
264
265/* ----------------------------------------------------------------
266 * ExecEndRecursiveUnionScan
267 *
268 * frees any storage allocated through C routines.
269 * ----------------------------------------------------------------
270 */
271void
272ExecEndRecursiveUnion(RecursiveUnionState *node)
273{
274 /* Release tuplestores */
275 tuplestore_end(node->working_table);
276 tuplestore_end(node->intermediate_table);
277
278 /* free subsidiary stuff including hashtable */
279 if (node->tempContext)
280 MemoryContextDelete(node->tempContext);
281 if (node->tableContext)
282 MemoryContextDelete(node->tableContext);
283
284 /*
285 * close down subplans
286 */
287 ExecEndNode(outerPlanState(node));
288 ExecEndNode(innerPlanState(node));
289}
290
291/* ----------------------------------------------------------------
292 * ExecReScanRecursiveUnion
293 *
294 * Rescans the relation.
295 * ----------------------------------------------------------------
296 */
297void
298ExecReScanRecursiveUnion(RecursiveUnionState *node)
299{
300 PlanState *outerPlan = outerPlanState(node);
301 PlanState *innerPlan = innerPlanState(node);
302 RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
303
304 /*
305 * Set recursive term's chgParam to tell it that we'll modify the working
306 * table and therefore it has to rescan.
307 */
308 innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
309
310 /*
311 * if chgParam of subnode is not null then plan will be re-scanned by
312 * first ExecProcNode. Because of above, we only have to do this to the
313 * non-recursive term.
314 */
315 if (outerPlan->chgParam == NULL)
316 ExecReScan(outerPlan);
317
318 /* Release any hashtable storage */
319 if (node->tableContext)
320 MemoryContextResetAndDeleteChildren(node->tableContext);
321
322 /* Empty hashtable if needed */
323 if (plan->numCols > 0)
324 ResetTupleHashTable(node->hashtable);
325
326 /* reset processing state */
327 node->recursing = false;
328 node->intermediate_empty = true;
329 tuplestore_clear(node->working_table);
330 tuplestore_clear(node->intermediate_table);
331}
332