1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * nodeRecursiveunion.c |
4 | * routines to handle RecursiveUnion nodes. |
5 | * |
6 | * To implement UNION (without ALL), we need a hashtable that stores tuples |
7 | * already seen. The hash key is computed from the grouping columns. |
8 | * |
9 | * |
10 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
11 | * Portions Copyright (c) 1994, Regents of the University of California |
12 | * |
13 | * |
14 | * IDENTIFICATION |
15 | * src/backend/executor/nodeRecursiveunion.c |
16 | * |
17 | *------------------------------------------------------------------------- |
18 | */ |
19 | #include "postgres.h" |
20 | |
21 | #include "executor/execdebug.h" |
22 | #include "executor/nodeRecursiveunion.h" |
23 | #include "miscadmin.h" |
24 | #include "utils/memutils.h" |
25 | |
26 | |
27 | |
28 | /* |
29 | * Initialize the hash table to empty. |
30 | */ |
31 | static void |
32 | build_hash_table(RecursiveUnionState *rustate) |
33 | { |
34 | RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan; |
35 | TupleDesc desc = ExecGetResultType(outerPlanState(rustate)); |
36 | |
37 | Assert(node->numCols > 0); |
38 | Assert(node->numGroups > 0); |
39 | |
40 | rustate->hashtable = BuildTupleHashTableExt(&rustate->ps, |
41 | desc, |
42 | node->numCols, |
43 | node->dupColIdx, |
44 | rustate->eqfuncoids, |
45 | rustate->hashfunctions, |
46 | node->dupCollations, |
47 | node->numGroups, |
48 | 0, |
49 | rustate->ps.state->es_query_cxt, |
50 | rustate->tableContext, |
51 | rustate->tempContext, |
52 | false); |
53 | } |
54 | |
55 | |
56 | /* ---------------------------------------------------------------- |
57 | * ExecRecursiveUnion(node) |
58 | * |
59 | * Scans the recursive query sequentially and returns the next |
60 | * qualifying tuple. |
61 | * |
62 | * 1. evaluate non recursive term and assign the result to RT |
63 | * |
64 | * 2. execute recursive terms |
65 | * |
66 | * 2.1 WT := RT |
67 | * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT |
68 | * 2.3 replace the name of recursive term with WT |
69 | * 2.4 evaluate the recursive term and store into WT |
70 | * 2.5 append WT to RT |
71 | * 2.6 go back to 2.2 |
72 | * ---------------------------------------------------------------- |
73 | */ |
74 | static TupleTableSlot * |
75 | ExecRecursiveUnion(PlanState *pstate) |
76 | { |
77 | RecursiveUnionState *node = castNode(RecursiveUnionState, pstate); |
78 | PlanState *outerPlan = outerPlanState(node); |
79 | PlanState *innerPlan = innerPlanState(node); |
80 | RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan; |
81 | TupleTableSlot *slot; |
82 | bool isnew; |
83 | |
84 | CHECK_FOR_INTERRUPTS(); |
85 | |
86 | /* 1. Evaluate non-recursive term */ |
87 | if (!node->recursing) |
88 | { |
89 | for (;;) |
90 | { |
91 | slot = ExecProcNode(outerPlan); |
92 | if (TupIsNull(slot)) |
93 | break; |
94 | if (plan->numCols > 0) |
95 | { |
96 | /* Find or build hashtable entry for this tuple's group */ |
97 | LookupTupleHashEntry(node->hashtable, slot, &isnew); |
98 | /* Must reset temp context after each hashtable lookup */ |
99 | MemoryContextReset(node->tempContext); |
100 | /* Ignore tuple if already seen */ |
101 | if (!isnew) |
102 | continue; |
103 | } |
104 | /* Each non-duplicate tuple goes to the working table ... */ |
105 | tuplestore_puttupleslot(node->working_table, slot); |
106 | /* ... and to the caller */ |
107 | return slot; |
108 | } |
109 | node->recursing = true; |
110 | } |
111 | |
112 | /* 2. Execute recursive term */ |
113 | for (;;) |
114 | { |
115 | slot = ExecProcNode(innerPlan); |
116 | if (TupIsNull(slot)) |
117 | { |
118 | /* Done if there's nothing in the intermediate table */ |
119 | if (node->intermediate_empty) |
120 | break; |
121 | |
122 | /* done with old working table ... */ |
123 | tuplestore_end(node->working_table); |
124 | |
125 | /* intermediate table becomes working table */ |
126 | node->working_table = node->intermediate_table; |
127 | |
128 | /* create new empty intermediate table */ |
129 | node->intermediate_table = tuplestore_begin_heap(false, false, |
130 | work_mem); |
131 | node->intermediate_empty = true; |
132 | |
133 | /* reset the recursive term */ |
134 | innerPlan->chgParam = bms_add_member(innerPlan->chgParam, |
135 | plan->wtParam); |
136 | |
137 | /* and continue fetching from recursive term */ |
138 | continue; |
139 | } |
140 | |
141 | if (plan->numCols > 0) |
142 | { |
143 | /* Find or build hashtable entry for this tuple's group */ |
144 | LookupTupleHashEntry(node->hashtable, slot, &isnew); |
145 | /* Must reset temp context after each hashtable lookup */ |
146 | MemoryContextReset(node->tempContext); |
147 | /* Ignore tuple if already seen */ |
148 | if (!isnew) |
149 | continue; |
150 | } |
151 | |
152 | /* Else, tuple is good; stash it in intermediate table ... */ |
153 | node->intermediate_empty = false; |
154 | tuplestore_puttupleslot(node->intermediate_table, slot); |
155 | /* ... and return it */ |
156 | return slot; |
157 | } |
158 | |
159 | return NULL; |
160 | } |
161 | |
162 | /* ---------------------------------------------------------------- |
163 | * ExecInitRecursiveUnionScan |
164 | * ---------------------------------------------------------------- |
165 | */ |
166 | RecursiveUnionState * |
167 | ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags) |
168 | { |
169 | RecursiveUnionState *rustate; |
170 | ParamExecData *prmdata; |
171 | |
172 | /* check for unsupported flags */ |
173 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
174 | |
175 | /* |
176 | * create state structure |
177 | */ |
178 | rustate = makeNode(RecursiveUnionState); |
179 | rustate->ps.plan = (Plan *) node; |
180 | rustate->ps.state = estate; |
181 | rustate->ps.ExecProcNode = ExecRecursiveUnion; |
182 | |
183 | rustate->eqfuncoids = NULL; |
184 | rustate->hashfunctions = NULL; |
185 | rustate->hashtable = NULL; |
186 | rustate->tempContext = NULL; |
187 | rustate->tableContext = NULL; |
188 | |
189 | /* initialize processing state */ |
190 | rustate->recursing = false; |
191 | rustate->intermediate_empty = true; |
192 | rustate->working_table = tuplestore_begin_heap(false, false, work_mem); |
193 | rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem); |
194 | |
195 | /* |
196 | * If hashing, we need a per-tuple memory context for comparisons, and a |
197 | * longer-lived context to store the hash table. The table can't just be |
198 | * kept in the per-query context because we want to be able to throw it |
199 | * away when rescanning. |
200 | */ |
201 | if (node->numCols > 0) |
202 | { |
203 | rustate->tempContext = |
204 | AllocSetContextCreate(CurrentMemoryContext, |
205 | "RecursiveUnion" , |
206 | ALLOCSET_DEFAULT_SIZES); |
207 | rustate->tableContext = |
208 | AllocSetContextCreate(CurrentMemoryContext, |
209 | "RecursiveUnion hash table" , |
210 | ALLOCSET_DEFAULT_SIZES); |
211 | } |
212 | |
213 | /* |
214 | * Make the state structure available to descendant WorkTableScan nodes |
215 | * via the Param slot reserved for it. |
216 | */ |
217 | prmdata = &(estate->es_param_exec_vals[node->wtParam]); |
218 | Assert(prmdata->execPlan == NULL); |
219 | prmdata->value = PointerGetDatum(rustate); |
220 | prmdata->isnull = false; |
221 | |
222 | /* |
223 | * Miscellaneous initialization |
224 | * |
225 | * RecursiveUnion plans don't have expression contexts because they never |
226 | * call ExecQual or ExecProject. |
227 | */ |
228 | Assert(node->plan.qual == NIL); |
229 | |
230 | /* |
231 | * RecursiveUnion nodes still have Result slots, which hold pointers to |
232 | * tuples, so we have to initialize them. |
233 | */ |
234 | ExecInitResultTypeTL(&rustate->ps); |
235 | |
236 | /* |
237 | * Initialize result tuple type. (Note: we have to set up the result type |
238 | * before initializing child nodes, because nodeWorktablescan.c expects it |
239 | * to be valid.) |
240 | */ |
241 | rustate->ps.ps_ProjInfo = NULL; |
242 | |
243 | /* |
244 | * initialize child nodes |
245 | */ |
246 | outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags); |
247 | innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags); |
248 | |
249 | /* |
250 | * If hashing, precompute fmgr lookup data for inner loop, and create the |
251 | * hash table. |
252 | */ |
253 | if (node->numCols > 0) |
254 | { |
255 | execTuplesHashPrepare(node->numCols, |
256 | node->dupOperators, |
257 | &rustate->eqfuncoids, |
258 | &rustate->hashfunctions); |
259 | build_hash_table(rustate); |
260 | } |
261 | |
262 | return rustate; |
263 | } |
264 | |
265 | /* ---------------------------------------------------------------- |
266 | * ExecEndRecursiveUnionScan |
267 | * |
268 | * frees any storage allocated through C routines. |
269 | * ---------------------------------------------------------------- |
270 | */ |
271 | void |
272 | ExecEndRecursiveUnion(RecursiveUnionState *node) |
273 | { |
274 | /* Release tuplestores */ |
275 | tuplestore_end(node->working_table); |
276 | tuplestore_end(node->intermediate_table); |
277 | |
278 | /* free subsidiary stuff including hashtable */ |
279 | if (node->tempContext) |
280 | MemoryContextDelete(node->tempContext); |
281 | if (node->tableContext) |
282 | MemoryContextDelete(node->tableContext); |
283 | |
284 | /* |
285 | * close down subplans |
286 | */ |
287 | ExecEndNode(outerPlanState(node)); |
288 | ExecEndNode(innerPlanState(node)); |
289 | } |
290 | |
291 | /* ---------------------------------------------------------------- |
292 | * ExecReScanRecursiveUnion |
293 | * |
294 | * Rescans the relation. |
295 | * ---------------------------------------------------------------- |
296 | */ |
297 | void |
298 | ExecReScanRecursiveUnion(RecursiveUnionState *node) |
299 | { |
300 | PlanState *outerPlan = outerPlanState(node); |
301 | PlanState *innerPlan = innerPlanState(node); |
302 | RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan; |
303 | |
304 | /* |
305 | * Set recursive term's chgParam to tell it that we'll modify the working |
306 | * table and therefore it has to rescan. |
307 | */ |
308 | innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam); |
309 | |
310 | /* |
311 | * if chgParam of subnode is not null then plan will be re-scanned by |
312 | * first ExecProcNode. Because of above, we only have to do this to the |
313 | * non-recursive term. |
314 | */ |
315 | if (outerPlan->chgParam == NULL) |
316 | ExecReScan(outerPlan); |
317 | |
318 | /* Release any hashtable storage */ |
319 | if (node->tableContext) |
320 | MemoryContextResetAndDeleteChildren(node->tableContext); |
321 | |
322 | /* Empty hashtable if needed */ |
323 | if (plan->numCols > 0) |
324 | ResetTupleHashTable(node->hashtable); |
325 | |
326 | /* reset processing state */ |
327 | node->recursing = false; |
328 | node->intermediate_empty = true; |
329 | tuplestore_clear(node->working_table); |
330 | tuplestore_clear(node->intermediate_table); |
331 | } |
332 | |