1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * nodeForeignscan.c |
4 | * Routines to support scans of foreign tables |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/executor/nodeForeignscan.c |
12 | * |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | /* |
16 | * INTERFACE ROUTINES |
17 | * |
18 | * ExecForeignScan scans a foreign table. |
19 | * ExecInitForeignScan creates and initializes state info. |
20 | * ExecReScanForeignScan rescans the foreign relation. |
21 | * ExecEndForeignScan releases any resources allocated. |
22 | */ |
23 | #include "postgres.h" |
24 | |
25 | #include "executor/executor.h" |
26 | #include "executor/nodeForeignscan.h" |
27 | #include "foreign/fdwapi.h" |
28 | #include "utils/memutils.h" |
29 | #include "utils/rel.h" |
30 | |
31 | static TupleTableSlot *ForeignNext(ForeignScanState *node); |
32 | static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot); |
33 | |
34 | |
35 | /* ---------------------------------------------------------------- |
36 | * ForeignNext |
37 | * |
38 | * This is a workhorse for ExecForeignScan |
39 | * ---------------------------------------------------------------- |
40 | */ |
41 | static TupleTableSlot * |
42 | ForeignNext(ForeignScanState *node) |
43 | { |
44 | TupleTableSlot *slot; |
45 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
46 | ExprContext *econtext = node->ss.ps.ps_ExprContext; |
47 | MemoryContext oldcontext; |
48 | |
49 | /* Call the Iterate function in short-lived context */ |
50 | oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
51 | if (plan->operation != CMD_SELECT) |
52 | slot = node->fdwroutine->IterateDirectModify(node); |
53 | else |
54 | slot = node->fdwroutine->IterateForeignScan(node); |
55 | MemoryContextSwitchTo(oldcontext); |
56 | |
57 | /* |
58 | * Insert valid value into tableoid, the only actually-useful system |
59 | * column. |
60 | */ |
61 | if (plan->fsSystemCol && !TupIsNull(slot)) |
62 | slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation); |
63 | |
64 | return slot; |
65 | } |
66 | |
67 | /* |
68 | * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual |
69 | */ |
70 | static bool |
71 | ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot) |
72 | { |
73 | FdwRoutine *fdwroutine = node->fdwroutine; |
74 | ExprContext *econtext; |
75 | |
76 | /* |
77 | * extract necessary information from foreign scan node |
78 | */ |
79 | econtext = node->ss.ps.ps_ExprContext; |
80 | |
81 | /* Does the tuple meet the remote qual condition? */ |
82 | econtext->ecxt_scantuple = slot; |
83 | |
84 | ResetExprContext(econtext); |
85 | |
86 | /* |
87 | * If an outer join is pushed down, RecheckForeignScan may need to store a |
88 | * different tuple in the slot, because a different set of columns may go |
89 | * to NULL upon recheck. Otherwise, it shouldn't need to change the slot |
90 | * contents, just return true or false to indicate whether the quals still |
91 | * pass. For simple cases, setting fdw_recheck_quals may be easier than |
92 | * providing this callback. |
93 | */ |
94 | if (fdwroutine->RecheckForeignScan && |
95 | !fdwroutine->RecheckForeignScan(node, slot)) |
96 | return false; |
97 | |
98 | return ExecQual(node->fdw_recheck_quals, econtext); |
99 | } |
100 | |
101 | /* ---------------------------------------------------------------- |
102 | * ExecForeignScan(node) |
103 | * |
104 | * Fetches the next tuple from the FDW, checks local quals, and |
105 | * returns it. |
106 | * We call the ExecScan() routine and pass it the appropriate |
107 | * access method functions. |
108 | * ---------------------------------------------------------------- |
109 | */ |
110 | static TupleTableSlot * |
111 | ExecForeignScan(PlanState *pstate) |
112 | { |
113 | ForeignScanState *node = castNode(ForeignScanState, pstate); |
114 | |
115 | return ExecScan(&node->ss, |
116 | (ExecScanAccessMtd) ForeignNext, |
117 | (ExecScanRecheckMtd) ForeignRecheck); |
118 | } |
119 | |
120 | |
121 | /* ---------------------------------------------------------------- |
122 | * ExecInitForeignScan |
123 | * ---------------------------------------------------------------- |
124 | */ |
125 | ForeignScanState * |
126 | ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) |
127 | { |
128 | ForeignScanState *scanstate; |
129 | Relation currentRelation = NULL; |
130 | Index scanrelid = node->scan.scanrelid; |
131 | Index tlistvarno; |
132 | FdwRoutine *fdwroutine; |
133 | |
134 | /* check for unsupported flags */ |
135 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
136 | |
137 | /* |
138 | * create state structure |
139 | */ |
140 | scanstate = makeNode(ForeignScanState); |
141 | scanstate->ss.ps.plan = (Plan *) node; |
142 | scanstate->ss.ps.state = estate; |
143 | scanstate->ss.ps.ExecProcNode = ExecForeignScan; |
144 | |
145 | /* |
146 | * Miscellaneous initialization |
147 | * |
148 | * create expression context for node |
149 | */ |
150 | ExecAssignExprContext(estate, &scanstate->ss.ps); |
151 | |
152 | /* |
153 | * open the scan relation, if any; also acquire function pointers from the |
154 | * FDW's handler |
155 | */ |
156 | if (scanrelid > 0) |
157 | { |
158 | currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags); |
159 | scanstate->ss.ss_currentRelation = currentRelation; |
160 | fdwroutine = GetFdwRoutineForRelation(currentRelation, true); |
161 | } |
162 | else |
163 | { |
164 | /* We can't use the relcache, so get fdwroutine the hard way */ |
165 | fdwroutine = GetFdwRoutineByServerId(node->fs_server); |
166 | } |
167 | |
168 | /* |
169 | * Determine the scan tuple type. If the FDW provided a targetlist |
170 | * describing the scan tuples, use that; else use base relation's rowtype. |
171 | */ |
172 | if (node->fdw_scan_tlist != NIL || currentRelation == NULL) |
173 | { |
174 | TupleDesc scan_tupdesc; |
175 | |
176 | scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist); |
177 | ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
178 | &TTSOpsHeapTuple); |
179 | /* Node's targetlist will contain Vars with varno = INDEX_VAR */ |
180 | tlistvarno = INDEX_VAR; |
181 | } |
182 | else |
183 | { |
184 | TupleDesc scan_tupdesc; |
185 | |
186 | /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */ |
187 | scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation)); |
188 | ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
189 | &TTSOpsHeapTuple); |
190 | /* Node's targetlist will contain Vars with varno = scanrelid */ |
191 | tlistvarno = scanrelid; |
192 | } |
193 | |
194 | /* Don't know what an FDW might return */ |
195 | scanstate->ss.ps.scanopsfixed = false; |
196 | scanstate->ss.ps.scanopsset = true; |
197 | |
198 | /* |
199 | * Initialize result slot, type and projection. |
200 | */ |
201 | ExecInitResultTypeTL(&scanstate->ss.ps); |
202 | ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno); |
203 | |
204 | /* |
205 | * initialize child expressions |
206 | */ |
207 | scanstate->ss.ps.qual = |
208 | ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate); |
209 | scanstate->fdw_recheck_quals = |
210 | ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); |
211 | |
212 | /* |
213 | * Initialize FDW-related state. |
214 | */ |
215 | scanstate->fdwroutine = fdwroutine; |
216 | scanstate->fdw_state = NULL; |
217 | |
218 | /* Initialize any outer plan. */ |
219 | if (outerPlan(node)) |
220 | outerPlanState(scanstate) = |
221 | ExecInitNode(outerPlan(node), estate, eflags); |
222 | |
223 | /* |
224 | * Tell the FDW to initialize the scan. |
225 | */ |
226 | if (node->operation != CMD_SELECT) |
227 | fdwroutine->BeginDirectModify(scanstate, eflags); |
228 | else |
229 | fdwroutine->BeginForeignScan(scanstate, eflags); |
230 | |
231 | return scanstate; |
232 | } |
233 | |
234 | /* ---------------------------------------------------------------- |
235 | * ExecEndForeignScan |
236 | * |
237 | * frees any storage allocated through C routines. |
238 | * ---------------------------------------------------------------- |
239 | */ |
240 | void |
241 | ExecEndForeignScan(ForeignScanState *node) |
242 | { |
243 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
244 | |
245 | /* Let the FDW shut down */ |
246 | if (plan->operation != CMD_SELECT) |
247 | node->fdwroutine->EndDirectModify(node); |
248 | else |
249 | node->fdwroutine->EndForeignScan(node); |
250 | |
251 | /* Shut down any outer plan. */ |
252 | if (outerPlanState(node)) |
253 | ExecEndNode(outerPlanState(node)); |
254 | |
255 | /* Free the exprcontext */ |
256 | ExecFreeExprContext(&node->ss.ps); |
257 | |
258 | /* clean out the tuple table */ |
259 | if (node->ss.ps.ps_ResultTupleSlot) |
260 | ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); |
261 | ExecClearTuple(node->ss.ss_ScanTupleSlot); |
262 | } |
263 | |
264 | /* ---------------------------------------------------------------- |
265 | * ExecReScanForeignScan |
266 | * |
267 | * Rescans the relation. |
268 | * ---------------------------------------------------------------- |
269 | */ |
270 | void |
271 | ExecReScanForeignScan(ForeignScanState *node) |
272 | { |
273 | PlanState *outerPlan = outerPlanState(node); |
274 | |
275 | node->fdwroutine->ReScanForeignScan(node); |
276 | |
277 | /* |
278 | * If chgParam of subnode is not null then plan will be re-scanned by |
279 | * first ExecProcNode. outerPlan may also be NULL, in which case there is |
280 | * nothing to rescan at all. |
281 | */ |
282 | if (outerPlan != NULL && outerPlan->chgParam == NULL) |
283 | ExecReScan(outerPlan); |
284 | |
285 | ExecScanReScan(&node->ss); |
286 | } |
287 | |
288 | /* ---------------------------------------------------------------- |
289 | * ExecForeignScanEstimate |
290 | * |
291 | * Informs size of the parallel coordination information, if any |
292 | * ---------------------------------------------------------------- |
293 | */ |
294 | void |
295 | ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) |
296 | { |
297 | FdwRoutine *fdwroutine = node->fdwroutine; |
298 | |
299 | if (fdwroutine->EstimateDSMForeignScan) |
300 | { |
301 | node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); |
302 | shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); |
303 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
304 | } |
305 | } |
306 | |
307 | /* ---------------------------------------------------------------- |
308 | * ExecForeignScanInitializeDSM |
309 | * |
310 | * Initialize the parallel coordination information |
311 | * ---------------------------------------------------------------- |
312 | */ |
313 | void |
314 | ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
315 | { |
316 | FdwRoutine *fdwroutine = node->fdwroutine; |
317 | |
318 | if (fdwroutine->InitializeDSMForeignScan) |
319 | { |
320 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
321 | void *coordinate; |
322 | |
323 | coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); |
324 | fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); |
325 | shm_toc_insert(pcxt->toc, plan_node_id, coordinate); |
326 | } |
327 | } |
328 | |
329 | /* ---------------------------------------------------------------- |
330 | * ExecForeignScanReInitializeDSM |
331 | * |
332 | * Reset shared state before beginning a fresh scan. |
333 | * ---------------------------------------------------------------- |
334 | */ |
335 | void |
336 | ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
337 | { |
338 | FdwRoutine *fdwroutine = node->fdwroutine; |
339 | |
340 | if (fdwroutine->ReInitializeDSMForeignScan) |
341 | { |
342 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
343 | void *coordinate; |
344 | |
345 | coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); |
346 | fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate); |
347 | } |
348 | } |
349 | |
350 | /* ---------------------------------------------------------------- |
351 | * ExecForeignScanInitializeWorker |
352 | * |
353 | * Initialization according to the parallel coordination information |
354 | * ---------------------------------------------------------------- |
355 | */ |
356 | void |
357 | ExecForeignScanInitializeWorker(ForeignScanState *node, |
358 | ParallelWorkerContext *pwcxt) |
359 | { |
360 | FdwRoutine *fdwroutine = node->fdwroutine; |
361 | |
362 | if (fdwroutine->InitializeWorkerForeignScan) |
363 | { |
364 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
365 | void *coordinate; |
366 | |
367 | coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); |
368 | fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate); |
369 | } |
370 | } |
371 | |
372 | /* ---------------------------------------------------------------- |
373 | * ExecShutdownForeignScan |
374 | * |
375 | * Gives FDW chance to stop asynchronous resource consumption |
376 | * and release any resources still held. |
377 | * ---------------------------------------------------------------- |
378 | */ |
379 | void |
380 | ExecShutdownForeignScan(ForeignScanState *node) |
381 | { |
382 | FdwRoutine *fdwroutine = node->fdwroutine; |
383 | |
384 | if (fdwroutine->ShutdownForeignScan) |
385 | fdwroutine->ShutdownForeignScan(node); |
386 | } |
387 | |