| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * nodeForeignscan.c |
| 4 | * Routines to support scans of foreign tables |
| 5 | * |
| 6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 7 | * Portions Copyright (c) 1994, Regents of the University of California |
| 8 | * |
| 9 | * |
| 10 | * IDENTIFICATION |
| 11 | * src/backend/executor/nodeForeignscan.c |
| 12 | * |
| 13 | *------------------------------------------------------------------------- |
| 14 | */ |
| 15 | /* |
| 16 | * INTERFACE ROUTINES |
| 17 | * |
| 18 | * ExecForeignScan scans a foreign table. |
| 19 | * ExecInitForeignScan creates and initializes state info. |
| 20 | * ExecReScanForeignScan rescans the foreign relation. |
| 21 | * ExecEndForeignScan releases any resources allocated. |
| 22 | */ |
| 23 | #include "postgres.h" |
| 24 | |
| 25 | #include "executor/executor.h" |
| 26 | #include "executor/nodeForeignscan.h" |
| 27 | #include "foreign/fdwapi.h" |
| 28 | #include "utils/memutils.h" |
| 29 | #include "utils/rel.h" |
| 30 | |
| 31 | static TupleTableSlot *ForeignNext(ForeignScanState *node); |
| 32 | static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot); |
| 33 | |
| 34 | |
| 35 | /* ---------------------------------------------------------------- |
| 36 | * ForeignNext |
| 37 | * |
| 38 | * This is a workhorse for ExecForeignScan |
| 39 | * ---------------------------------------------------------------- |
| 40 | */ |
| 41 | static TupleTableSlot * |
| 42 | ForeignNext(ForeignScanState *node) |
| 43 | { |
| 44 | TupleTableSlot *slot; |
| 45 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
| 46 | ExprContext *econtext = node->ss.ps.ps_ExprContext; |
| 47 | MemoryContext oldcontext; |
| 48 | |
| 49 | /* Call the Iterate function in short-lived context */ |
| 50 | oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| 51 | if (plan->operation != CMD_SELECT) |
| 52 | slot = node->fdwroutine->IterateDirectModify(node); |
| 53 | else |
| 54 | slot = node->fdwroutine->IterateForeignScan(node); |
| 55 | MemoryContextSwitchTo(oldcontext); |
| 56 | |
| 57 | /* |
| 58 | * Insert valid value into tableoid, the only actually-useful system |
| 59 | * column. |
| 60 | */ |
| 61 | if (plan->fsSystemCol && !TupIsNull(slot)) |
| 62 | slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation); |
| 63 | |
| 64 | return slot; |
| 65 | } |
| 66 | |
| 67 | /* |
| 68 | * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual |
| 69 | */ |
| 70 | static bool |
| 71 | ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot) |
| 72 | { |
| 73 | FdwRoutine *fdwroutine = node->fdwroutine; |
| 74 | ExprContext *econtext; |
| 75 | |
| 76 | /* |
| 77 | * extract necessary information from foreign scan node |
| 78 | */ |
| 79 | econtext = node->ss.ps.ps_ExprContext; |
| 80 | |
| 81 | /* Does the tuple meet the remote qual condition? */ |
| 82 | econtext->ecxt_scantuple = slot; |
| 83 | |
| 84 | ResetExprContext(econtext); |
| 85 | |
| 86 | /* |
| 87 | * If an outer join is pushed down, RecheckForeignScan may need to store a |
| 88 | * different tuple in the slot, because a different set of columns may go |
| 89 | * to NULL upon recheck. Otherwise, it shouldn't need to change the slot |
| 90 | * contents, just return true or false to indicate whether the quals still |
| 91 | * pass. For simple cases, setting fdw_recheck_quals may be easier than |
| 92 | * providing this callback. |
| 93 | */ |
| 94 | if (fdwroutine->RecheckForeignScan && |
| 95 | !fdwroutine->RecheckForeignScan(node, slot)) |
| 96 | return false; |
| 97 | |
| 98 | return ExecQual(node->fdw_recheck_quals, econtext); |
| 99 | } |
| 100 | |
| 101 | /* ---------------------------------------------------------------- |
| 102 | * ExecForeignScan(node) |
| 103 | * |
| 104 | * Fetches the next tuple from the FDW, checks local quals, and |
| 105 | * returns it. |
| 106 | * We call the ExecScan() routine and pass it the appropriate |
| 107 | * access method functions. |
| 108 | * ---------------------------------------------------------------- |
| 109 | */ |
| 110 | static TupleTableSlot * |
| 111 | ExecForeignScan(PlanState *pstate) |
| 112 | { |
| 113 | ForeignScanState *node = castNode(ForeignScanState, pstate); |
| 114 | |
| 115 | return ExecScan(&node->ss, |
| 116 | (ExecScanAccessMtd) ForeignNext, |
| 117 | (ExecScanRecheckMtd) ForeignRecheck); |
| 118 | } |
| 119 | |
| 120 | |
| 121 | /* ---------------------------------------------------------------- |
| 122 | * ExecInitForeignScan |
| 123 | * ---------------------------------------------------------------- |
| 124 | */ |
| 125 | ForeignScanState * |
| 126 | ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) |
| 127 | { |
| 128 | ForeignScanState *scanstate; |
| 129 | Relation currentRelation = NULL; |
| 130 | Index scanrelid = node->scan.scanrelid; |
| 131 | Index tlistvarno; |
| 132 | FdwRoutine *fdwroutine; |
| 133 | |
| 134 | /* check for unsupported flags */ |
| 135 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
| 136 | |
| 137 | /* |
| 138 | * create state structure |
| 139 | */ |
| 140 | scanstate = makeNode(ForeignScanState); |
| 141 | scanstate->ss.ps.plan = (Plan *) node; |
| 142 | scanstate->ss.ps.state = estate; |
| 143 | scanstate->ss.ps.ExecProcNode = ExecForeignScan; |
| 144 | |
| 145 | /* |
| 146 | * Miscellaneous initialization |
| 147 | * |
| 148 | * create expression context for node |
| 149 | */ |
| 150 | ExecAssignExprContext(estate, &scanstate->ss.ps); |
| 151 | |
| 152 | /* |
| 153 | * open the scan relation, if any; also acquire function pointers from the |
| 154 | * FDW's handler |
| 155 | */ |
| 156 | if (scanrelid > 0) |
| 157 | { |
| 158 | currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags); |
| 159 | scanstate->ss.ss_currentRelation = currentRelation; |
| 160 | fdwroutine = GetFdwRoutineForRelation(currentRelation, true); |
| 161 | } |
| 162 | else |
| 163 | { |
| 164 | /* We can't use the relcache, so get fdwroutine the hard way */ |
| 165 | fdwroutine = GetFdwRoutineByServerId(node->fs_server); |
| 166 | } |
| 167 | |
| 168 | /* |
| 169 | * Determine the scan tuple type. If the FDW provided a targetlist |
| 170 | * describing the scan tuples, use that; else use base relation's rowtype. |
| 171 | */ |
| 172 | if (node->fdw_scan_tlist != NIL || currentRelation == NULL) |
| 173 | { |
| 174 | TupleDesc scan_tupdesc; |
| 175 | |
| 176 | scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist); |
| 177 | ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
| 178 | &TTSOpsHeapTuple); |
| 179 | /* Node's targetlist will contain Vars with varno = INDEX_VAR */ |
| 180 | tlistvarno = INDEX_VAR; |
| 181 | } |
| 182 | else |
| 183 | { |
| 184 | TupleDesc scan_tupdesc; |
| 185 | |
| 186 | /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */ |
| 187 | scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation)); |
| 188 | ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
| 189 | &TTSOpsHeapTuple); |
| 190 | /* Node's targetlist will contain Vars with varno = scanrelid */ |
| 191 | tlistvarno = scanrelid; |
| 192 | } |
| 193 | |
| 194 | /* Don't know what an FDW might return */ |
| 195 | scanstate->ss.ps.scanopsfixed = false; |
| 196 | scanstate->ss.ps.scanopsset = true; |
| 197 | |
| 198 | /* |
| 199 | * Initialize result slot, type and projection. |
| 200 | */ |
| 201 | ExecInitResultTypeTL(&scanstate->ss.ps); |
| 202 | ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno); |
| 203 | |
| 204 | /* |
| 205 | * initialize child expressions |
| 206 | */ |
| 207 | scanstate->ss.ps.qual = |
| 208 | ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate); |
| 209 | scanstate->fdw_recheck_quals = |
| 210 | ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); |
| 211 | |
| 212 | /* |
| 213 | * Initialize FDW-related state. |
| 214 | */ |
| 215 | scanstate->fdwroutine = fdwroutine; |
| 216 | scanstate->fdw_state = NULL; |
| 217 | |
| 218 | /* Initialize any outer plan. */ |
| 219 | if (outerPlan(node)) |
| 220 | outerPlanState(scanstate) = |
| 221 | ExecInitNode(outerPlan(node), estate, eflags); |
| 222 | |
| 223 | /* |
| 224 | * Tell the FDW to initialize the scan. |
| 225 | */ |
| 226 | if (node->operation != CMD_SELECT) |
| 227 | fdwroutine->BeginDirectModify(scanstate, eflags); |
| 228 | else |
| 229 | fdwroutine->BeginForeignScan(scanstate, eflags); |
| 230 | |
| 231 | return scanstate; |
| 232 | } |
| 233 | |
| 234 | /* ---------------------------------------------------------------- |
| 235 | * ExecEndForeignScan |
| 236 | * |
| 237 | * frees any storage allocated through C routines. |
| 238 | * ---------------------------------------------------------------- |
| 239 | */ |
| 240 | void |
| 241 | ExecEndForeignScan(ForeignScanState *node) |
| 242 | { |
| 243 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
| 244 | |
| 245 | /* Let the FDW shut down */ |
| 246 | if (plan->operation != CMD_SELECT) |
| 247 | node->fdwroutine->EndDirectModify(node); |
| 248 | else |
| 249 | node->fdwroutine->EndForeignScan(node); |
| 250 | |
| 251 | /* Shut down any outer plan. */ |
| 252 | if (outerPlanState(node)) |
| 253 | ExecEndNode(outerPlanState(node)); |
| 254 | |
| 255 | /* Free the exprcontext */ |
| 256 | ExecFreeExprContext(&node->ss.ps); |
| 257 | |
| 258 | /* clean out the tuple table */ |
| 259 | if (node->ss.ps.ps_ResultTupleSlot) |
| 260 | ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); |
| 261 | ExecClearTuple(node->ss.ss_ScanTupleSlot); |
| 262 | } |
| 263 | |
| 264 | /* ---------------------------------------------------------------- |
| 265 | * ExecReScanForeignScan |
| 266 | * |
| 267 | * Rescans the relation. |
| 268 | * ---------------------------------------------------------------- |
| 269 | */ |
| 270 | void |
| 271 | ExecReScanForeignScan(ForeignScanState *node) |
| 272 | { |
| 273 | PlanState *outerPlan = outerPlanState(node); |
| 274 | |
| 275 | node->fdwroutine->ReScanForeignScan(node); |
| 276 | |
| 277 | /* |
| 278 | * If chgParam of subnode is not null then plan will be re-scanned by |
| 279 | * first ExecProcNode. outerPlan may also be NULL, in which case there is |
| 280 | * nothing to rescan at all. |
| 281 | */ |
| 282 | if (outerPlan != NULL && outerPlan->chgParam == NULL) |
| 283 | ExecReScan(outerPlan); |
| 284 | |
| 285 | ExecScanReScan(&node->ss); |
| 286 | } |
| 287 | |
| 288 | /* ---------------------------------------------------------------- |
| 289 | * ExecForeignScanEstimate |
| 290 | * |
| 291 | * Informs size of the parallel coordination information, if any |
| 292 | * ---------------------------------------------------------------- |
| 293 | */ |
| 294 | void |
| 295 | ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) |
| 296 | { |
| 297 | FdwRoutine *fdwroutine = node->fdwroutine; |
| 298 | |
| 299 | if (fdwroutine->EstimateDSMForeignScan) |
| 300 | { |
| 301 | node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); |
| 302 | shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); |
| 303 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
| 304 | } |
| 305 | } |
| 306 | |
| 307 | /* ---------------------------------------------------------------- |
| 308 | * ExecForeignScanInitializeDSM |
| 309 | * |
| 310 | * Initialize the parallel coordination information |
| 311 | * ---------------------------------------------------------------- |
| 312 | */ |
| 313 | void |
| 314 | ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
| 315 | { |
| 316 | FdwRoutine *fdwroutine = node->fdwroutine; |
| 317 | |
| 318 | if (fdwroutine->InitializeDSMForeignScan) |
| 319 | { |
| 320 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
| 321 | void *coordinate; |
| 322 | |
| 323 | coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); |
| 324 | fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); |
| 325 | shm_toc_insert(pcxt->toc, plan_node_id, coordinate); |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | /* ---------------------------------------------------------------- |
| 330 | * ExecForeignScanReInitializeDSM |
| 331 | * |
| 332 | * Reset shared state before beginning a fresh scan. |
| 333 | * ---------------------------------------------------------------- |
| 334 | */ |
| 335 | void |
| 336 | ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
| 337 | { |
| 338 | FdwRoutine *fdwroutine = node->fdwroutine; |
| 339 | |
| 340 | if (fdwroutine->ReInitializeDSMForeignScan) |
| 341 | { |
| 342 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
| 343 | void *coordinate; |
| 344 | |
| 345 | coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); |
| 346 | fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate); |
| 347 | } |
| 348 | } |
| 349 | |
| 350 | /* ---------------------------------------------------------------- |
| 351 | * ExecForeignScanInitializeWorker |
| 352 | * |
| 353 | * Initialization according to the parallel coordination information |
| 354 | * ---------------------------------------------------------------- |
| 355 | */ |
| 356 | void |
| 357 | ExecForeignScanInitializeWorker(ForeignScanState *node, |
| 358 | ParallelWorkerContext *pwcxt) |
| 359 | { |
| 360 | FdwRoutine *fdwroutine = node->fdwroutine; |
| 361 | |
| 362 | if (fdwroutine->InitializeWorkerForeignScan) |
| 363 | { |
| 364 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
| 365 | void *coordinate; |
| 366 | |
| 367 | coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); |
| 368 | fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate); |
| 369 | } |
| 370 | } |
| 371 | |
| 372 | /* ---------------------------------------------------------------- |
| 373 | * ExecShutdownForeignScan |
| 374 | * |
| 375 | * Gives FDW chance to stop asynchronous resource consumption |
| 376 | * and release any resources still held. |
| 377 | * ---------------------------------------------------------------- |
| 378 | */ |
| 379 | void |
| 380 | ExecShutdownForeignScan(ForeignScanState *node) |
| 381 | { |
| 382 | FdwRoutine *fdwroutine = node->fdwroutine; |
| 383 | |
| 384 | if (fdwroutine->ShutdownForeignScan) |
| 385 | fdwroutine->ShutdownForeignScan(node); |
| 386 | } |
| 387 | |