1/*-------------------------------------------------------------------------
2 *
3 * nodeForeignscan.c
4 * Routines to support scans of foreign tables
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeForeignscan.c
12 *
13 *-------------------------------------------------------------------------
14 */
15/*
16 * INTERFACE ROUTINES
17 *
18 * ExecForeignScan scans a foreign table.
19 * ExecInitForeignScan creates and initializes state info.
20 * ExecReScanForeignScan rescans the foreign relation.
21 * ExecEndForeignScan releases any resources allocated.
22 */
23#include "postgres.h"
24
25#include "executor/executor.h"
26#include "executor/nodeForeignscan.h"
27#include "foreign/fdwapi.h"
28#include "utils/memutils.h"
29#include "utils/rel.h"
30
31static TupleTableSlot *ForeignNext(ForeignScanState *node);
32static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33
34
35/* ----------------------------------------------------------------
36 * ForeignNext
37 *
38 * This is a workhorse for ExecForeignScan
39 * ----------------------------------------------------------------
40 */
41static TupleTableSlot *
42ForeignNext(ForeignScanState *node)
43{
44 TupleTableSlot *slot;
45 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 MemoryContext oldcontext;
48
49 /* Call the Iterate function in short-lived context */
50 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
51 if (plan->operation != CMD_SELECT)
52 slot = node->fdwroutine->IterateDirectModify(node);
53 else
54 slot = node->fdwroutine->IterateForeignScan(node);
55 MemoryContextSwitchTo(oldcontext);
56
57 /*
58 * Insert valid value into tableoid, the only actually-useful system
59 * column.
60 */
61 if (plan->fsSystemCol && !TupIsNull(slot))
62 slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
63
64 return slot;
65}
66
67/*
68 * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
69 */
70static bool
71ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
72{
73 FdwRoutine *fdwroutine = node->fdwroutine;
74 ExprContext *econtext;
75
76 /*
77 * extract necessary information from foreign scan node
78 */
79 econtext = node->ss.ps.ps_ExprContext;
80
81 /* Does the tuple meet the remote qual condition? */
82 econtext->ecxt_scantuple = slot;
83
84 ResetExprContext(econtext);
85
86 /*
87 * If an outer join is pushed down, RecheckForeignScan may need to store a
88 * different tuple in the slot, because a different set of columns may go
89 * to NULL upon recheck. Otherwise, it shouldn't need to change the slot
90 * contents, just return true or false to indicate whether the quals still
91 * pass. For simple cases, setting fdw_recheck_quals may be easier than
92 * providing this callback.
93 */
94 if (fdwroutine->RecheckForeignScan &&
95 !fdwroutine->RecheckForeignScan(node, slot))
96 return false;
97
98 return ExecQual(node->fdw_recheck_quals, econtext);
99}
100
101/* ----------------------------------------------------------------
102 * ExecForeignScan(node)
103 *
104 * Fetches the next tuple from the FDW, checks local quals, and
105 * returns it.
106 * We call the ExecScan() routine and pass it the appropriate
107 * access method functions.
108 * ----------------------------------------------------------------
109 */
110static TupleTableSlot *
111ExecForeignScan(PlanState *pstate)
112{
113 ForeignScanState *node = castNode(ForeignScanState, pstate);
114
115 return ExecScan(&node->ss,
116 (ExecScanAccessMtd) ForeignNext,
117 (ExecScanRecheckMtd) ForeignRecheck);
118}
119
120
121/* ----------------------------------------------------------------
122 * ExecInitForeignScan
123 * ----------------------------------------------------------------
124 */
125ForeignScanState *
126ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
127{
128 ForeignScanState *scanstate;
129 Relation currentRelation = NULL;
130 Index scanrelid = node->scan.scanrelid;
131 Index tlistvarno;
132 FdwRoutine *fdwroutine;
133
134 /* check for unsupported flags */
135 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
136
137 /*
138 * create state structure
139 */
140 scanstate = makeNode(ForeignScanState);
141 scanstate->ss.ps.plan = (Plan *) node;
142 scanstate->ss.ps.state = estate;
143 scanstate->ss.ps.ExecProcNode = ExecForeignScan;
144
145 /*
146 * Miscellaneous initialization
147 *
148 * create expression context for node
149 */
150 ExecAssignExprContext(estate, &scanstate->ss.ps);
151
152 /*
153 * open the scan relation, if any; also acquire function pointers from the
154 * FDW's handler
155 */
156 if (scanrelid > 0)
157 {
158 currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
159 scanstate->ss.ss_currentRelation = currentRelation;
160 fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
161 }
162 else
163 {
164 /* We can't use the relcache, so get fdwroutine the hard way */
165 fdwroutine = GetFdwRoutineByServerId(node->fs_server);
166 }
167
168 /*
169 * Determine the scan tuple type. If the FDW provided a targetlist
170 * describing the scan tuples, use that; else use base relation's rowtype.
171 */
172 if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
173 {
174 TupleDesc scan_tupdesc;
175
176 scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
177 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
178 &TTSOpsHeapTuple);
179 /* Node's targetlist will contain Vars with varno = INDEX_VAR */
180 tlistvarno = INDEX_VAR;
181 }
182 else
183 {
184 TupleDesc scan_tupdesc;
185
186 /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
187 scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
188 ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
189 &TTSOpsHeapTuple);
190 /* Node's targetlist will contain Vars with varno = scanrelid */
191 tlistvarno = scanrelid;
192 }
193
194 /* Don't know what an FDW might return */
195 scanstate->ss.ps.scanopsfixed = false;
196 scanstate->ss.ps.scanopsset = true;
197
198 /*
199 * Initialize result slot, type and projection.
200 */
201 ExecInitResultTypeTL(&scanstate->ss.ps);
202 ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
203
204 /*
205 * initialize child expressions
206 */
207 scanstate->ss.ps.qual =
208 ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
209 scanstate->fdw_recheck_quals =
210 ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
211
212 /*
213 * Initialize FDW-related state.
214 */
215 scanstate->fdwroutine = fdwroutine;
216 scanstate->fdw_state = NULL;
217
218 /* Initialize any outer plan. */
219 if (outerPlan(node))
220 outerPlanState(scanstate) =
221 ExecInitNode(outerPlan(node), estate, eflags);
222
223 /*
224 * Tell the FDW to initialize the scan.
225 */
226 if (node->operation != CMD_SELECT)
227 fdwroutine->BeginDirectModify(scanstate, eflags);
228 else
229 fdwroutine->BeginForeignScan(scanstate, eflags);
230
231 return scanstate;
232}
233
234/* ----------------------------------------------------------------
235 * ExecEndForeignScan
236 *
237 * frees any storage allocated through C routines.
238 * ----------------------------------------------------------------
239 */
240void
241ExecEndForeignScan(ForeignScanState *node)
242{
243 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
244
245 /* Let the FDW shut down */
246 if (plan->operation != CMD_SELECT)
247 node->fdwroutine->EndDirectModify(node);
248 else
249 node->fdwroutine->EndForeignScan(node);
250
251 /* Shut down any outer plan. */
252 if (outerPlanState(node))
253 ExecEndNode(outerPlanState(node));
254
255 /* Free the exprcontext */
256 ExecFreeExprContext(&node->ss.ps);
257
258 /* clean out the tuple table */
259 if (node->ss.ps.ps_ResultTupleSlot)
260 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
261 ExecClearTuple(node->ss.ss_ScanTupleSlot);
262}
263
264/* ----------------------------------------------------------------
265 * ExecReScanForeignScan
266 *
267 * Rescans the relation.
268 * ----------------------------------------------------------------
269 */
270void
271ExecReScanForeignScan(ForeignScanState *node)
272{
273 PlanState *outerPlan = outerPlanState(node);
274
275 node->fdwroutine->ReScanForeignScan(node);
276
277 /*
278 * If chgParam of subnode is not null then plan will be re-scanned by
279 * first ExecProcNode. outerPlan may also be NULL, in which case there is
280 * nothing to rescan at all.
281 */
282 if (outerPlan != NULL && outerPlan->chgParam == NULL)
283 ExecReScan(outerPlan);
284
285 ExecScanReScan(&node->ss);
286}
287
288/* ----------------------------------------------------------------
289 * ExecForeignScanEstimate
290 *
291 * Informs size of the parallel coordination information, if any
292 * ----------------------------------------------------------------
293 */
294void
295ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
296{
297 FdwRoutine *fdwroutine = node->fdwroutine;
298
299 if (fdwroutine->EstimateDSMForeignScan)
300 {
301 node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
302 shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
303 shm_toc_estimate_keys(&pcxt->estimator, 1);
304 }
305}
306
307/* ----------------------------------------------------------------
308 * ExecForeignScanInitializeDSM
309 *
310 * Initialize the parallel coordination information
311 * ----------------------------------------------------------------
312 */
313void
314ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
315{
316 FdwRoutine *fdwroutine = node->fdwroutine;
317
318 if (fdwroutine->InitializeDSMForeignScan)
319 {
320 int plan_node_id = node->ss.ps.plan->plan_node_id;
321 void *coordinate;
322
323 coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
324 fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
325 shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
326 }
327}
328
329/* ----------------------------------------------------------------
330 * ExecForeignScanReInitializeDSM
331 *
332 * Reset shared state before beginning a fresh scan.
333 * ----------------------------------------------------------------
334 */
335void
336ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
337{
338 FdwRoutine *fdwroutine = node->fdwroutine;
339
340 if (fdwroutine->ReInitializeDSMForeignScan)
341 {
342 int plan_node_id = node->ss.ps.plan->plan_node_id;
343 void *coordinate;
344
345 coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
346 fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
347 }
348}
349
350/* ----------------------------------------------------------------
351 * ExecForeignScanInitializeWorker
352 *
353 * Initialization according to the parallel coordination information
354 * ----------------------------------------------------------------
355 */
356void
357ExecForeignScanInitializeWorker(ForeignScanState *node,
358 ParallelWorkerContext *pwcxt)
359{
360 FdwRoutine *fdwroutine = node->fdwroutine;
361
362 if (fdwroutine->InitializeWorkerForeignScan)
363 {
364 int plan_node_id = node->ss.ps.plan->plan_node_id;
365 void *coordinate;
366
367 coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
368 fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
369 }
370}
371
372/* ----------------------------------------------------------------
373 * ExecShutdownForeignScan
374 *
375 * Gives FDW chance to stop asynchronous resource consumption
376 * and release any resources still held.
377 * ----------------------------------------------------------------
378 */
379void
380ExecShutdownForeignScan(ForeignScanState *node)
381{
382 FdwRoutine *fdwroutine = node->fdwroutine;
383
384 if (fdwroutine->ShutdownForeignScan)
385 fdwroutine->ShutdownForeignScan(node);
386}
387