1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * execCurrent.c |
4 | * executor support for WHERE CURRENT OF cursor |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * src/backend/executor/execCurrent.c |
10 | * |
11 | *------------------------------------------------------------------------- |
12 | */ |
13 | #include "postgres.h" |
14 | |
15 | #include "access/genam.h" |
16 | #include "access/relscan.h" |
17 | #include "access/sysattr.h" |
18 | #include "catalog/pg_type.h" |
19 | #include "executor/executor.h" |
20 | #include "utils/builtins.h" |
21 | #include "utils/lsyscache.h" |
22 | #include "utils/portal.h" |
23 | #include "utils/rel.h" |
24 | |
25 | |
26 | static char *fetch_cursor_param_value(ExprContext *econtext, int paramId); |
27 | static ScanState *search_plan_tree(PlanState *node, Oid table_oid, |
28 | bool *pending_rescan); |
29 | |
30 | |
31 | /* |
32 | * execCurrentOf |
33 | * |
34 | * Given a CURRENT OF expression and the OID of a table, determine which row |
35 | * of the table is currently being scanned by the cursor named by CURRENT OF, |
36 | * and return the row's TID into *current_tid. |
37 | * |
38 | * Returns true if a row was identified. Returns false if the cursor is valid |
39 | * for the table but is not currently scanning a row of the table (this is a |
40 | * legal situation in inheritance cases). Raises error if cursor is not a |
41 | * valid updatable scan of the specified table. |
42 | */ |
43 | bool |
44 | execCurrentOf(CurrentOfExpr *cexpr, |
45 | ExprContext *econtext, |
46 | Oid table_oid, |
47 | ItemPointer current_tid) |
48 | { |
49 | char *cursor_name; |
50 | char *table_name; |
51 | Portal portal; |
52 | QueryDesc *queryDesc; |
53 | |
54 | /* Get the cursor name --- may have to look up a parameter reference */ |
55 | if (cexpr->cursor_name) |
56 | cursor_name = cexpr->cursor_name; |
57 | else |
58 | cursor_name = fetch_cursor_param_value(econtext, cexpr->cursor_param); |
59 | |
60 | /* Fetch table name for possible use in error messages */ |
61 | table_name = get_rel_name(table_oid); |
62 | if (table_name == NULL) |
63 | elog(ERROR, "cache lookup failed for relation %u" , table_oid); |
64 | |
65 | /* Find the cursor's portal */ |
66 | portal = GetPortalByName(cursor_name); |
67 | if (!PortalIsValid(portal)) |
68 | ereport(ERROR, |
69 | (errcode(ERRCODE_UNDEFINED_CURSOR), |
70 | errmsg("cursor \"%s\" does not exist" , cursor_name))); |
71 | |
72 | /* |
73 | * We have to watch out for non-SELECT queries as well as held cursors, |
74 | * both of which may have null queryDesc. |
75 | */ |
76 | if (portal->strategy != PORTAL_ONE_SELECT) |
77 | ereport(ERROR, |
78 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
79 | errmsg("cursor \"%s\" is not a SELECT query" , |
80 | cursor_name))); |
81 | queryDesc = portal->queryDesc; |
82 | if (queryDesc == NULL || queryDesc->estate == NULL) |
83 | ereport(ERROR, |
84 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
85 | errmsg("cursor \"%s\" is held from a previous transaction" , |
86 | cursor_name))); |
87 | |
88 | /* |
89 | * We have two different strategies depending on whether the cursor uses |
90 | * FOR UPDATE/SHARE or not. The reason for supporting both is that the |
91 | * FOR UPDATE code is able to identify a target table in many cases where |
92 | * the other code can't, while the non-FOR-UPDATE case allows use of WHERE |
93 | * CURRENT OF with an insensitive cursor. |
94 | */ |
95 | if (queryDesc->estate->es_rowmarks) |
96 | { |
97 | ExecRowMark *erm; |
98 | Index i; |
99 | |
100 | /* |
101 | * Here, the query must have exactly one FOR UPDATE/SHARE reference to |
102 | * the target table, and we dig the ctid info out of that. |
103 | */ |
104 | erm = NULL; |
105 | for (i = 0; i < queryDesc->estate->es_range_table_size; i++) |
106 | { |
107 | ExecRowMark *thiserm = queryDesc->estate->es_rowmarks[i]; |
108 | |
109 | if (thiserm == NULL || |
110 | !RowMarkRequiresRowShareLock(thiserm->markType)) |
111 | continue; /* ignore non-FOR UPDATE/SHARE items */ |
112 | |
113 | if (thiserm->relid == table_oid) |
114 | { |
115 | if (erm) |
116 | ereport(ERROR, |
117 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
118 | errmsg("cursor \"%s\" has multiple FOR UPDATE/SHARE references to table \"%s\"" , |
119 | cursor_name, table_name))); |
120 | erm = thiserm; |
121 | } |
122 | } |
123 | |
124 | if (erm == NULL) |
125 | ereport(ERROR, |
126 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
127 | errmsg("cursor \"%s\" does not have a FOR UPDATE/SHARE reference to table \"%s\"" , |
128 | cursor_name, table_name))); |
129 | |
130 | /* |
131 | * The cursor must have a current result row: per the SQL spec, it's |
132 | * an error if not. |
133 | */ |
134 | if (portal->atStart || portal->atEnd) |
135 | ereport(ERROR, |
136 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
137 | errmsg("cursor \"%s\" is not positioned on a row" , |
138 | cursor_name))); |
139 | |
140 | /* Return the currently scanned TID, if there is one */ |
141 | if (ItemPointerIsValid(&(erm->curCtid))) |
142 | { |
143 | *current_tid = erm->curCtid; |
144 | return true; |
145 | } |
146 | |
147 | /* |
148 | * This table didn't produce the cursor's current row; some other |
149 | * inheritance child of the same parent must have. Signal caller to |
150 | * do nothing on this table. |
151 | */ |
152 | return false; |
153 | } |
154 | else |
155 | { |
156 | /* |
157 | * Without FOR UPDATE, we dig through the cursor's plan to find the |
158 | * scan node. Fail if it's not there or buried underneath |
159 | * aggregation. |
160 | */ |
161 | ScanState *scanstate; |
162 | bool pending_rescan = false; |
163 | |
164 | scanstate = search_plan_tree(queryDesc->planstate, table_oid, |
165 | &pending_rescan); |
166 | if (!scanstate) |
167 | ereport(ERROR, |
168 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
169 | errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"" , |
170 | cursor_name, table_name))); |
171 | |
172 | /* |
173 | * The cursor must have a current result row: per the SQL spec, it's |
174 | * an error if not. We test this at the top level, rather than at the |
175 | * scan node level, because in inheritance cases any one table scan |
176 | * could easily not be on a row. We want to return false, not raise |
177 | * error, if the passed-in table OID is for one of the inactive scans. |
178 | */ |
179 | if (portal->atStart || portal->atEnd) |
180 | ereport(ERROR, |
181 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
182 | errmsg("cursor \"%s\" is not positioned on a row" , |
183 | cursor_name))); |
184 | |
185 | /* |
186 | * Now OK to return false if we found an inactive scan. It is |
187 | * inactive either if it's not positioned on a row, or there's a |
188 | * rescan pending for it. |
189 | */ |
190 | if (TupIsNull(scanstate->ss_ScanTupleSlot) || pending_rescan) |
191 | return false; |
192 | |
193 | /* |
194 | * Extract TID of the scan's current row. The mechanism for this is |
195 | * in principle scan-type-dependent, but for most scan types, we can |
196 | * just dig the TID out of the physical scan tuple. |
197 | */ |
198 | if (IsA(scanstate, IndexOnlyScanState)) |
199 | { |
200 | /* |
201 | * For IndexOnlyScan, the tuple stored in ss_ScanTupleSlot may be |
202 | * a virtual tuple that does not have the ctid column, so we have |
203 | * to get the TID from xs_ctup.t_self. |
204 | */ |
205 | IndexScanDesc scan = ((IndexOnlyScanState *) scanstate)->ioss_ScanDesc; |
206 | |
207 | *current_tid = scan->xs_heaptid; |
208 | } |
209 | else |
210 | { |
211 | /* |
212 | * Default case: try to fetch TID from the scan node's current |
213 | * tuple. As an extra cross-check, verify tableoid in the current |
214 | * tuple. If the scan hasn't provided a physical tuple, we have |
215 | * to fail. |
216 | */ |
217 | Datum ldatum; |
218 | bool lisnull; |
219 | ItemPointer tuple_tid; |
220 | |
221 | #ifdef USE_ASSERT_CHECKING |
222 | ldatum = slot_getsysattr(scanstate->ss_ScanTupleSlot, |
223 | TableOidAttributeNumber, |
224 | &lisnull); |
225 | if (lisnull) |
226 | ereport(ERROR, |
227 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
228 | errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"" , |
229 | cursor_name, table_name))); |
230 | Assert(DatumGetObjectId(ldatum) == table_oid); |
231 | #endif |
232 | |
233 | ldatum = slot_getsysattr(scanstate->ss_ScanTupleSlot, |
234 | SelfItemPointerAttributeNumber, |
235 | &lisnull); |
236 | if (lisnull) |
237 | ereport(ERROR, |
238 | (errcode(ERRCODE_INVALID_CURSOR_STATE), |
239 | errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"" , |
240 | cursor_name, table_name))); |
241 | tuple_tid = (ItemPointer) DatumGetPointer(ldatum); |
242 | |
243 | *current_tid = *tuple_tid; |
244 | } |
245 | |
246 | Assert(ItemPointerIsValid(current_tid)); |
247 | |
248 | return true; |
249 | } |
250 | } |
251 | |
252 | /* |
253 | * fetch_cursor_param_value |
254 | * |
255 | * Fetch the string value of a param, verifying it is of type REFCURSOR. |
256 | */ |
257 | static char * |
258 | fetch_cursor_param_value(ExprContext *econtext, int paramId) |
259 | { |
260 | ParamListInfo paramInfo = econtext->ecxt_param_list_info; |
261 | |
262 | if (paramInfo && |
263 | paramId > 0 && paramId <= paramInfo->numParams) |
264 | { |
265 | ParamExternData *prm; |
266 | ParamExternData prmdata; |
267 | |
268 | /* give hook a chance in case parameter is dynamic */ |
269 | if (paramInfo->paramFetch != NULL) |
270 | prm = paramInfo->paramFetch(paramInfo, paramId, false, &prmdata); |
271 | else |
272 | prm = ¶mInfo->params[paramId - 1]; |
273 | |
274 | if (OidIsValid(prm->ptype) && !prm->isnull) |
275 | { |
276 | /* safety check in case hook did something unexpected */ |
277 | if (prm->ptype != REFCURSOROID) |
278 | ereport(ERROR, |
279 | (errcode(ERRCODE_DATATYPE_MISMATCH), |
280 | errmsg("type of parameter %d (%s) does not match that when preparing the plan (%s)" , |
281 | paramId, |
282 | format_type_be(prm->ptype), |
283 | format_type_be(REFCURSOROID)))); |
284 | |
285 | /* We know that refcursor uses text's I/O routines */ |
286 | return TextDatumGetCString(prm->value); |
287 | } |
288 | } |
289 | |
290 | ereport(ERROR, |
291 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
292 | errmsg("no value found for parameter %d" , paramId))); |
293 | return NULL; |
294 | } |
295 | |
296 | /* |
297 | * search_plan_tree |
298 | * |
299 | * Search through a PlanState tree for a scan node on the specified table. |
300 | * Return NULL if not found or multiple candidates. |
301 | * |
302 | * If a candidate is found, set *pending_rescan to true if that candidate |
303 | * or any node above it has a pending rescan action, i.e. chgParam != NULL. |
304 | * That indicates that we shouldn't consider the node to be positioned on a |
305 | * valid tuple, even if its own state would indicate that it is. (Caller |
306 | * must initialize *pending_rescan to false, and should not trust its state |
307 | * if multiple candidates are found.) |
308 | */ |
309 | static ScanState * |
310 | search_plan_tree(PlanState *node, Oid table_oid, |
311 | bool *pending_rescan) |
312 | { |
313 | ScanState *result = NULL; |
314 | |
315 | if (node == NULL) |
316 | return NULL; |
317 | switch (nodeTag(node)) |
318 | { |
319 | /* |
320 | * Relation scan nodes can all be treated alike |
321 | */ |
322 | case T_SeqScanState: |
323 | case T_SampleScanState: |
324 | case T_IndexScanState: |
325 | case T_IndexOnlyScanState: |
326 | case T_BitmapHeapScanState: |
327 | case T_TidScanState: |
328 | case T_ForeignScanState: |
329 | case T_CustomScanState: |
330 | { |
331 | ScanState *sstate = (ScanState *) node; |
332 | |
333 | if (RelationGetRelid(sstate->ss_currentRelation) == table_oid) |
334 | result = sstate; |
335 | break; |
336 | } |
337 | |
338 | /* |
339 | * For Append, we must look through the members; watch out for |
340 | * multiple matches (possible if it was from UNION ALL) |
341 | */ |
342 | case T_AppendState: |
343 | { |
344 | AppendState *astate = (AppendState *) node; |
345 | int i; |
346 | |
347 | for (i = 0; i < astate->as_nplans; i++) |
348 | { |
349 | ScanState *elem = search_plan_tree(astate->appendplans[i], |
350 | table_oid, |
351 | pending_rescan); |
352 | |
353 | if (!elem) |
354 | continue; |
355 | if (result) |
356 | return NULL; /* multiple matches */ |
357 | result = elem; |
358 | } |
359 | break; |
360 | } |
361 | |
362 | /* |
363 | * Similarly for MergeAppend |
364 | */ |
365 | case T_MergeAppendState: |
366 | { |
367 | MergeAppendState *mstate = (MergeAppendState *) node; |
368 | int i; |
369 | |
370 | for (i = 0; i < mstate->ms_nplans; i++) |
371 | { |
372 | ScanState *elem = search_plan_tree(mstate->mergeplans[i], |
373 | table_oid, |
374 | pending_rescan); |
375 | |
376 | if (!elem) |
377 | continue; |
378 | if (result) |
379 | return NULL; /* multiple matches */ |
380 | result = elem; |
381 | } |
382 | break; |
383 | } |
384 | |
385 | /* |
386 | * Result and Limit can be descended through (these are safe |
387 | * because they always return their input's current row) |
388 | */ |
389 | case T_ResultState: |
390 | case T_LimitState: |
391 | result = search_plan_tree(node->lefttree, |
392 | table_oid, |
393 | pending_rescan); |
394 | break; |
395 | |
396 | /* |
397 | * SubqueryScan too, but it keeps the child in a different place |
398 | */ |
399 | case T_SubqueryScanState: |
400 | result = search_plan_tree(((SubqueryScanState *) node)->subplan, |
401 | table_oid, |
402 | pending_rescan); |
403 | break; |
404 | |
405 | default: |
406 | /* Otherwise, assume we can't descend through it */ |
407 | break; |
408 | } |
409 | |
410 | /* |
411 | * If we found a candidate at or below this node, then this node's |
412 | * chgParam indicates a pending rescan that will affect the candidate. |
413 | */ |
414 | if (result && node->chgParam != NULL) |
415 | *pending_rescan = true; |
416 | |
417 | return result; |
418 | } |
419 | |