1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * pquery.c |
4 | * POSTGRES process query command code |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/tcop/pquery.c |
12 | * |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | |
16 | #include "postgres.h" |
17 | |
18 | #include <limits.h> |
19 | |
20 | #include "access/xact.h" |
21 | #include "commands/prepare.h" |
22 | #include "executor/tstoreReceiver.h" |
23 | #include "miscadmin.h" |
24 | #include "pg_trace.h" |
25 | #include "tcop/pquery.h" |
26 | #include "tcop/utility.h" |
27 | #include "utils/memutils.h" |
28 | #include "utils/snapmgr.h" |
29 | |
30 | |
31 | /* |
32 | * ActivePortal is the currently executing Portal (the most closely nested, |
33 | * if there are several). |
34 | */ |
35 | Portal ActivePortal = NULL; |
36 | |
37 | |
38 | static void ProcessQuery(PlannedStmt *plan, |
39 | const char *sourceText, |
40 | ParamListInfo params, |
41 | QueryEnvironment *queryEnv, |
42 | DestReceiver *dest, |
43 | char *completionTag); |
44 | static void FillPortalStore(Portal portal, bool isTopLevel); |
45 | static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count, |
46 | DestReceiver *dest); |
47 | static uint64 PortalRunSelect(Portal portal, bool forward, long count, |
48 | DestReceiver *dest); |
49 | static void PortalRunUtility(Portal portal, PlannedStmt *pstmt, |
50 | bool isTopLevel, bool setHoldSnapshot, |
51 | DestReceiver *dest, char *completionTag); |
52 | static void PortalRunMulti(Portal portal, |
53 | bool isTopLevel, bool setHoldSnapshot, |
54 | DestReceiver *dest, DestReceiver *altdest, |
55 | char *completionTag); |
56 | static uint64 DoPortalRunFetch(Portal portal, |
57 | FetchDirection fdirection, |
58 | long count, |
59 | DestReceiver *dest); |
60 | static void DoPortalRewind(Portal portal); |
61 | |
62 | |
63 | /* |
64 | * CreateQueryDesc |
65 | */ |
66 | QueryDesc * |
67 | CreateQueryDesc(PlannedStmt *plannedstmt, |
68 | const char *sourceText, |
69 | Snapshot snapshot, |
70 | Snapshot crosscheck_snapshot, |
71 | DestReceiver *dest, |
72 | ParamListInfo params, |
73 | QueryEnvironment *queryEnv, |
74 | int instrument_options) |
75 | { |
76 | QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc)); |
77 | |
78 | qd->operation = plannedstmt->commandType; /* operation */ |
79 | qd->plannedstmt = plannedstmt; /* plan */ |
80 | qd->sourceText = sourceText; /* query text */ |
81 | qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */ |
82 | /* RI check snapshot */ |
83 | qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot); |
84 | qd->dest = dest; /* output dest */ |
85 | qd->params = params; /* parameter values passed into query */ |
86 | qd->queryEnv = queryEnv; |
87 | qd->instrument_options = instrument_options; /* instrumentation wanted? */ |
88 | |
89 | /* null these fields until set by ExecutorStart */ |
90 | qd->tupDesc = NULL; |
91 | qd->estate = NULL; |
92 | qd->planstate = NULL; |
93 | qd->totaltime = NULL; |
94 | |
95 | /* not yet executed */ |
96 | qd->already_executed = false; |
97 | |
98 | return qd; |
99 | } |
100 | |
101 | /* |
102 | * FreeQueryDesc |
103 | */ |
104 | void |
105 | FreeQueryDesc(QueryDesc *qdesc) |
106 | { |
107 | /* Can't be a live query */ |
108 | Assert(qdesc->estate == NULL); |
109 | |
110 | /* forget our snapshots */ |
111 | UnregisterSnapshot(qdesc->snapshot); |
112 | UnregisterSnapshot(qdesc->crosscheck_snapshot); |
113 | |
114 | /* Only the QueryDesc itself need be freed */ |
115 | pfree(qdesc); |
116 | } |
117 | |
118 | |
119 | /* |
120 | * ProcessQuery |
121 | * Execute a single plannable query within a PORTAL_MULTI_QUERY, |
122 | * PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal |
123 | * |
124 | * plan: the plan tree for the query |
125 | * sourceText: the source text of the query |
126 | * params: any parameters needed |
127 | * dest: where to send results |
128 | * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE |
129 | * in which to store a command completion status string. |
130 | * |
131 | * completionTag may be NULL if caller doesn't want a status string. |
132 | * |
133 | * Must be called in a memory context that will be reset or deleted on |
134 | * error; otherwise the executor's memory usage will be leaked. |
135 | */ |
136 | static void |
137 | ProcessQuery(PlannedStmt *plan, |
138 | const char *sourceText, |
139 | ParamListInfo params, |
140 | QueryEnvironment *queryEnv, |
141 | DestReceiver *dest, |
142 | char *completionTag) |
143 | { |
144 | QueryDesc *queryDesc; |
145 | |
146 | /* |
147 | * Create the QueryDesc object |
148 | */ |
149 | queryDesc = CreateQueryDesc(plan, sourceText, |
150 | GetActiveSnapshot(), InvalidSnapshot, |
151 | dest, params, queryEnv, 0); |
152 | |
153 | /* |
154 | * Call ExecutorStart to prepare the plan for execution |
155 | */ |
156 | ExecutorStart(queryDesc, 0); |
157 | |
158 | /* |
159 | * Run the plan to completion. |
160 | */ |
161 | ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); |
162 | |
163 | /* |
164 | * Build command completion status string, if caller wants one. |
165 | */ |
166 | if (completionTag) |
167 | { |
168 | Oid lastOid; |
169 | |
170 | switch (queryDesc->operation) |
171 | { |
172 | case CMD_SELECT: |
173 | snprintf(completionTag, COMPLETION_TAG_BUFSIZE, |
174 | "SELECT " UINT64_FORMAT, |
175 | queryDesc->estate->es_processed); |
176 | break; |
177 | case CMD_INSERT: |
178 | /* lastoid doesn't exist anymore */ |
179 | lastOid = InvalidOid; |
180 | snprintf(completionTag, COMPLETION_TAG_BUFSIZE, |
181 | "INSERT %u " UINT64_FORMAT, |
182 | lastOid, queryDesc->estate->es_processed); |
183 | break; |
184 | case CMD_UPDATE: |
185 | snprintf(completionTag, COMPLETION_TAG_BUFSIZE, |
186 | "UPDATE " UINT64_FORMAT, |
187 | queryDesc->estate->es_processed); |
188 | break; |
189 | case CMD_DELETE: |
190 | snprintf(completionTag, COMPLETION_TAG_BUFSIZE, |
191 | "DELETE " UINT64_FORMAT, |
192 | queryDesc->estate->es_processed); |
193 | break; |
194 | default: |
195 | strcpy(completionTag, "???" ); |
196 | break; |
197 | } |
198 | } |
199 | |
200 | /* |
201 | * Now, we close down all the scans and free allocated resources. |
202 | */ |
203 | ExecutorFinish(queryDesc); |
204 | ExecutorEnd(queryDesc); |
205 | |
206 | FreeQueryDesc(queryDesc); |
207 | } |
208 | |
209 | /* |
210 | * ChoosePortalStrategy |
211 | * Select portal execution strategy given the intended statement list. |
212 | * |
213 | * The list elements can be Querys or PlannedStmts. |
214 | * That's more general than portals need, but plancache.c uses this too. |
215 | * |
216 | * See the comments in portal.h. |
217 | */ |
218 | PortalStrategy |
219 | ChoosePortalStrategy(List *stmts) |
220 | { |
221 | int nSetTag; |
222 | ListCell *lc; |
223 | |
224 | /* |
225 | * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the |
226 | * single-statement case, since there are no rewrite rules that can add |
227 | * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH |
228 | * likewise allows only one top-level statement. |
229 | */ |
230 | if (list_length(stmts) == 1) |
231 | { |
232 | Node *stmt = (Node *) linitial(stmts); |
233 | |
234 | if (IsA(stmt, Query)) |
235 | { |
236 | Query *query = (Query *) stmt; |
237 | |
238 | if (query->canSetTag) |
239 | { |
240 | if (query->commandType == CMD_SELECT) |
241 | { |
242 | if (query->hasModifyingCTE) |
243 | return PORTAL_ONE_MOD_WITH; |
244 | else |
245 | return PORTAL_ONE_SELECT; |
246 | } |
247 | if (query->commandType == CMD_UTILITY) |
248 | { |
249 | if (UtilityReturnsTuples(query->utilityStmt)) |
250 | return PORTAL_UTIL_SELECT; |
251 | /* it can't be ONE_RETURNING, so give up */ |
252 | return PORTAL_MULTI_QUERY; |
253 | } |
254 | } |
255 | } |
256 | else if (IsA(stmt, PlannedStmt)) |
257 | { |
258 | PlannedStmt *pstmt = (PlannedStmt *) stmt; |
259 | |
260 | if (pstmt->canSetTag) |
261 | { |
262 | if (pstmt->commandType == CMD_SELECT) |
263 | { |
264 | if (pstmt->hasModifyingCTE) |
265 | return PORTAL_ONE_MOD_WITH; |
266 | else |
267 | return PORTAL_ONE_SELECT; |
268 | } |
269 | if (pstmt->commandType == CMD_UTILITY) |
270 | { |
271 | if (UtilityReturnsTuples(pstmt->utilityStmt)) |
272 | return PORTAL_UTIL_SELECT; |
273 | /* it can't be ONE_RETURNING, so give up */ |
274 | return PORTAL_MULTI_QUERY; |
275 | } |
276 | } |
277 | } |
278 | else |
279 | elog(ERROR, "unrecognized node type: %d" , (int) nodeTag(stmt)); |
280 | } |
281 | |
282 | /* |
283 | * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite. |
284 | * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and |
285 | * it has a RETURNING list. |
286 | */ |
287 | nSetTag = 0; |
288 | foreach(lc, stmts) |
289 | { |
290 | Node *stmt = (Node *) lfirst(lc); |
291 | |
292 | if (IsA(stmt, Query)) |
293 | { |
294 | Query *query = (Query *) stmt; |
295 | |
296 | if (query->canSetTag) |
297 | { |
298 | if (++nSetTag > 1) |
299 | return PORTAL_MULTI_QUERY; /* no need to look further */ |
300 | if (query->commandType == CMD_UTILITY || |
301 | query->returningList == NIL) |
302 | return PORTAL_MULTI_QUERY; /* no need to look further */ |
303 | } |
304 | } |
305 | else if (IsA(stmt, PlannedStmt)) |
306 | { |
307 | PlannedStmt *pstmt = (PlannedStmt *) stmt; |
308 | |
309 | if (pstmt->canSetTag) |
310 | { |
311 | if (++nSetTag > 1) |
312 | return PORTAL_MULTI_QUERY; /* no need to look further */ |
313 | if (pstmt->commandType == CMD_UTILITY || |
314 | !pstmt->hasReturning) |
315 | return PORTAL_MULTI_QUERY; /* no need to look further */ |
316 | } |
317 | } |
318 | else |
319 | elog(ERROR, "unrecognized node type: %d" , (int) nodeTag(stmt)); |
320 | } |
321 | if (nSetTag == 1) |
322 | return PORTAL_ONE_RETURNING; |
323 | |
324 | /* Else, it's the general case... */ |
325 | return PORTAL_MULTI_QUERY; |
326 | } |
327 | |
328 | /* |
329 | * FetchPortalTargetList |
330 | * Given a portal that returns tuples, extract the query targetlist. |
331 | * Returns NIL if the portal doesn't have a determinable targetlist. |
332 | * |
333 | * Note: do not modify the result. |
334 | */ |
335 | List * |
336 | FetchPortalTargetList(Portal portal) |
337 | { |
338 | /* no point in looking if we determined it doesn't return tuples */ |
339 | if (portal->strategy == PORTAL_MULTI_QUERY) |
340 | return NIL; |
341 | /* get the primary statement and find out what it returns */ |
342 | return FetchStatementTargetList((Node *) PortalGetPrimaryStmt(portal)); |
343 | } |
344 | |
345 | /* |
346 | * FetchStatementTargetList |
347 | * Given a statement that returns tuples, extract the query targetlist. |
348 | * Returns NIL if the statement doesn't have a determinable targetlist. |
349 | * |
350 | * This can be applied to a Query or a PlannedStmt. |
351 | * That's more general than portals need, but plancache.c uses this too. |
352 | * |
353 | * Note: do not modify the result. |
354 | * |
355 | * XXX be careful to keep this in sync with UtilityReturnsTuples. |
356 | */ |
357 | List * |
358 | FetchStatementTargetList(Node *stmt) |
359 | { |
360 | if (stmt == NULL) |
361 | return NIL; |
362 | if (IsA(stmt, Query)) |
363 | { |
364 | Query *query = (Query *) stmt; |
365 | |
366 | if (query->commandType == CMD_UTILITY) |
367 | { |
368 | /* transfer attention to utility statement */ |
369 | stmt = query->utilityStmt; |
370 | } |
371 | else |
372 | { |
373 | if (query->commandType == CMD_SELECT) |
374 | return query->targetList; |
375 | if (query->returningList) |
376 | return query->returningList; |
377 | return NIL; |
378 | } |
379 | } |
380 | if (IsA(stmt, PlannedStmt)) |
381 | { |
382 | PlannedStmt *pstmt = (PlannedStmt *) stmt; |
383 | |
384 | if (pstmt->commandType == CMD_UTILITY) |
385 | { |
386 | /* transfer attention to utility statement */ |
387 | stmt = pstmt->utilityStmt; |
388 | } |
389 | else |
390 | { |
391 | if (pstmt->commandType == CMD_SELECT) |
392 | return pstmt->planTree->targetlist; |
393 | if (pstmt->hasReturning) |
394 | return pstmt->planTree->targetlist; |
395 | return NIL; |
396 | } |
397 | } |
398 | if (IsA(stmt, FetchStmt)) |
399 | { |
400 | FetchStmt *fstmt = (FetchStmt *) stmt; |
401 | Portal subportal; |
402 | |
403 | Assert(!fstmt->ismove); |
404 | subportal = GetPortalByName(fstmt->portalname); |
405 | Assert(PortalIsValid(subportal)); |
406 | return FetchPortalTargetList(subportal); |
407 | } |
408 | if (IsA(stmt, ExecuteStmt)) |
409 | { |
410 | ExecuteStmt *estmt = (ExecuteStmt *) stmt; |
411 | PreparedStatement *entry; |
412 | |
413 | entry = FetchPreparedStatement(estmt->name, true); |
414 | return FetchPreparedStatementTargetList(entry); |
415 | } |
416 | return NIL; |
417 | } |
418 | |
419 | /* |
420 | * PortalStart |
421 | * Prepare a portal for execution. |
422 | * |
423 | * Caller must already have created the portal, done PortalDefineQuery(), |
424 | * and adjusted portal options if needed. |
425 | * |
426 | * If parameters are needed by the query, they must be passed in "params" |
427 | * (caller is responsible for giving them appropriate lifetime). |
428 | * |
429 | * The caller can also provide an initial set of "eflags" to be passed to |
430 | * ExecutorStart (but note these can be modified internally, and they are |
431 | * currently only honored for PORTAL_ONE_SELECT portals). Most callers |
432 | * should simply pass zero. |
433 | * |
434 | * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot |
435 | * for the normal behavior of setting a new snapshot. This parameter is |
436 | * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended |
437 | * to be used for cursors). |
438 | * |
439 | * On return, portal is ready to accept PortalRun() calls, and the result |
440 | * tupdesc (if any) is known. |
441 | */ |
442 | void |
443 | PortalStart(Portal portal, ParamListInfo params, |
444 | int eflags, Snapshot snapshot) |
445 | { |
446 | Portal saveActivePortal; |
447 | ResourceOwner saveResourceOwner; |
448 | MemoryContext savePortalContext; |
449 | MemoryContext oldContext; |
450 | QueryDesc *queryDesc; |
451 | int myeflags; |
452 | |
453 | AssertArg(PortalIsValid(portal)); |
454 | AssertState(portal->status == PORTAL_DEFINED); |
455 | |
456 | /* |
457 | * Set up global portal context pointers. |
458 | */ |
459 | saveActivePortal = ActivePortal; |
460 | saveResourceOwner = CurrentResourceOwner; |
461 | savePortalContext = PortalContext; |
462 | PG_TRY(); |
463 | { |
464 | ActivePortal = portal; |
465 | if (portal->resowner) |
466 | CurrentResourceOwner = portal->resowner; |
467 | PortalContext = portal->portalContext; |
468 | |
469 | oldContext = MemoryContextSwitchTo(PortalContext); |
470 | |
471 | /* Must remember portal param list, if any */ |
472 | portal->portalParams = params; |
473 | |
474 | /* |
475 | * Determine the portal execution strategy |
476 | */ |
477 | portal->strategy = ChoosePortalStrategy(portal->stmts); |
478 | |
479 | /* |
480 | * Fire her up according to the strategy |
481 | */ |
482 | switch (portal->strategy) |
483 | { |
484 | case PORTAL_ONE_SELECT: |
485 | |
486 | /* Must set snapshot before starting executor. */ |
487 | if (snapshot) |
488 | PushActiveSnapshot(snapshot); |
489 | else |
490 | PushActiveSnapshot(GetTransactionSnapshot()); |
491 | |
492 | /* |
493 | * Create QueryDesc in portal's context; for the moment, set |
494 | * the destination to DestNone. |
495 | */ |
496 | queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts), |
497 | portal->sourceText, |
498 | GetActiveSnapshot(), |
499 | InvalidSnapshot, |
500 | None_Receiver, |
501 | params, |
502 | portal->queryEnv, |
503 | 0); |
504 | |
505 | /* |
506 | * If it's a scrollable cursor, executor needs to support |
507 | * REWIND and backwards scan, as well as whatever the caller |
508 | * might've asked for. |
509 | */ |
510 | if (portal->cursorOptions & CURSOR_OPT_SCROLL) |
511 | myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD; |
512 | else |
513 | myeflags = eflags; |
514 | |
515 | /* |
516 | * Call ExecutorStart to prepare the plan for execution |
517 | */ |
518 | ExecutorStart(queryDesc, myeflags); |
519 | |
520 | /* |
521 | * This tells PortalCleanup to shut down the executor |
522 | */ |
523 | portal->queryDesc = queryDesc; |
524 | |
525 | /* |
526 | * Remember tuple descriptor (computed by ExecutorStart) |
527 | */ |
528 | portal->tupDesc = queryDesc->tupDesc; |
529 | |
530 | /* |
531 | * Reset cursor position data to "start of query" |
532 | */ |
533 | portal->atStart = true; |
534 | portal->atEnd = false; /* allow fetches */ |
535 | portal->portalPos = 0; |
536 | |
537 | PopActiveSnapshot(); |
538 | break; |
539 | |
540 | case PORTAL_ONE_RETURNING: |
541 | case PORTAL_ONE_MOD_WITH: |
542 | |
543 | /* |
544 | * We don't start the executor until we are told to run the |
545 | * portal. We do need to set up the result tupdesc. |
546 | */ |
547 | { |
548 | PlannedStmt *pstmt; |
549 | |
550 | pstmt = PortalGetPrimaryStmt(portal); |
551 | portal->tupDesc = |
552 | ExecCleanTypeFromTL(pstmt->planTree->targetlist); |
553 | } |
554 | |
555 | /* |
556 | * Reset cursor position data to "start of query" |
557 | */ |
558 | portal->atStart = true; |
559 | portal->atEnd = false; /* allow fetches */ |
560 | portal->portalPos = 0; |
561 | break; |
562 | |
563 | case PORTAL_UTIL_SELECT: |
564 | |
565 | /* |
566 | * We don't set snapshot here, because PortalRunUtility will |
567 | * take care of it if needed. |
568 | */ |
569 | { |
570 | PlannedStmt *pstmt = PortalGetPrimaryStmt(portal); |
571 | |
572 | Assert(pstmt->commandType == CMD_UTILITY); |
573 | portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt); |
574 | } |
575 | |
576 | /* |
577 | * Reset cursor position data to "start of query" |
578 | */ |
579 | portal->atStart = true; |
580 | portal->atEnd = false; /* allow fetches */ |
581 | portal->portalPos = 0; |
582 | break; |
583 | |
584 | case PORTAL_MULTI_QUERY: |
585 | /* Need do nothing now */ |
586 | portal->tupDesc = NULL; |
587 | break; |
588 | } |
589 | } |
590 | PG_CATCH(); |
591 | { |
592 | /* Uncaught error while executing portal: mark it dead */ |
593 | MarkPortalFailed(portal); |
594 | |
595 | /* Restore global vars and propagate error */ |
596 | ActivePortal = saveActivePortal; |
597 | CurrentResourceOwner = saveResourceOwner; |
598 | PortalContext = savePortalContext; |
599 | |
600 | PG_RE_THROW(); |
601 | } |
602 | PG_END_TRY(); |
603 | |
604 | MemoryContextSwitchTo(oldContext); |
605 | |
606 | ActivePortal = saveActivePortal; |
607 | CurrentResourceOwner = saveResourceOwner; |
608 | PortalContext = savePortalContext; |
609 | |
610 | portal->status = PORTAL_READY; |
611 | } |
612 | |
613 | /* |
614 | * PortalSetResultFormat |
615 | * Select the format codes for a portal's output. |
616 | * |
617 | * This must be run after PortalStart for a portal that will be read by |
618 | * a DestRemote or DestRemoteExecute destination. It is not presently needed |
619 | * for other destination types. |
620 | * |
621 | * formats[] is the client format request, as per Bind message conventions. |
622 | */ |
623 | void |
624 | PortalSetResultFormat(Portal portal, int nFormats, int16 *formats) |
625 | { |
626 | int natts; |
627 | int i; |
628 | |
629 | /* Do nothing if portal won't return tuples */ |
630 | if (portal->tupDesc == NULL) |
631 | return; |
632 | natts = portal->tupDesc->natts; |
633 | portal->formats = (int16 *) |
634 | MemoryContextAlloc(portal->portalContext, |
635 | natts * sizeof(int16)); |
636 | if (nFormats > 1) |
637 | { |
638 | /* format specified for each column */ |
639 | if (nFormats != natts) |
640 | ereport(ERROR, |
641 | (errcode(ERRCODE_PROTOCOL_VIOLATION), |
642 | errmsg("bind message has %d result formats but query has %d columns" , |
643 | nFormats, natts))); |
644 | memcpy(portal->formats, formats, natts * sizeof(int16)); |
645 | } |
646 | else if (nFormats > 0) |
647 | { |
648 | /* single format specified, use for all columns */ |
649 | int16 format1 = formats[0]; |
650 | |
651 | for (i = 0; i < natts; i++) |
652 | portal->formats[i] = format1; |
653 | } |
654 | else |
655 | { |
656 | /* use default format for all columns */ |
657 | for (i = 0; i < natts; i++) |
658 | portal->formats[i] = 0; |
659 | } |
660 | } |
661 | |
662 | /* |
663 | * PortalRun |
664 | * Run a portal's query or queries. |
665 | * |
666 | * count <= 0 is interpreted as a no-op: the destination gets started up |
667 | * and shut down, but nothing else happens. Also, count == FETCH_ALL is |
668 | * interpreted as "all rows". Note that count is ignored in multi-query |
669 | * situations, where we always run the portal to completion. |
670 | * |
671 | * isTopLevel: true if query is being executed at backend "top level" |
672 | * (that is, directly from a client command message) |
673 | * |
674 | * dest: where to send output of primary (canSetTag) query |
675 | * |
676 | * altdest: where to send output of non-primary queries |
677 | * |
678 | * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE |
679 | * in which to store a command completion status string. |
680 | * May be NULL if caller doesn't want a status string. |
681 | * |
682 | * Returns true if the portal's execution is complete, false if it was |
683 | * suspended due to exhaustion of the count parameter. |
684 | */ |
685 | bool |
686 | PortalRun(Portal portal, long count, bool isTopLevel, bool run_once, |
687 | DestReceiver *dest, DestReceiver *altdest, |
688 | char *completionTag) |
689 | { |
690 | bool result; |
691 | uint64 nprocessed; |
692 | ResourceOwner saveTopTransactionResourceOwner; |
693 | MemoryContext saveTopTransactionContext; |
694 | Portal saveActivePortal; |
695 | ResourceOwner saveResourceOwner; |
696 | MemoryContext savePortalContext; |
697 | MemoryContext saveMemoryContext; |
698 | |
699 | AssertArg(PortalIsValid(portal)); |
700 | |
701 | TRACE_POSTGRESQL_QUERY_EXECUTE_START(); |
702 | |
703 | /* Initialize completion tag to empty string */ |
704 | if (completionTag) |
705 | completionTag[0] = '\0'; |
706 | |
707 | if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY) |
708 | { |
709 | elog(DEBUG3, "PortalRun" ); |
710 | /* PORTAL_MULTI_QUERY logs its own stats per query */ |
711 | ResetUsage(); |
712 | } |
713 | |
714 | /* |
715 | * Check for improper portal use, and mark portal active. |
716 | */ |
717 | MarkPortalActive(portal); |
718 | |
719 | /* Set run_once flag. Shouldn't be clear if previously set. */ |
720 | Assert(!portal->run_once || run_once); |
721 | portal->run_once = run_once; |
722 | |
723 | /* |
724 | * Set up global portal context pointers. |
725 | * |
726 | * We have to play a special game here to support utility commands like |
727 | * VACUUM and CLUSTER, which internally start and commit transactions. |
728 | * When we are called to execute such a command, CurrentResourceOwner will |
729 | * be pointing to the TopTransactionResourceOwner --- which will be |
730 | * destroyed and replaced in the course of the internal commit and |
731 | * restart. So we need to be prepared to restore it as pointing to the |
732 | * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of |
733 | * internally starting whole new transactions is not good.) |
734 | * CurrentMemoryContext has a similar problem, but the other pointers we |
735 | * save here will be NULL or pointing to longer-lived objects. |
736 | */ |
737 | saveTopTransactionResourceOwner = TopTransactionResourceOwner; |
738 | saveTopTransactionContext = TopTransactionContext; |
739 | saveActivePortal = ActivePortal; |
740 | saveResourceOwner = CurrentResourceOwner; |
741 | savePortalContext = PortalContext; |
742 | saveMemoryContext = CurrentMemoryContext; |
743 | PG_TRY(); |
744 | { |
745 | ActivePortal = portal; |
746 | if (portal->resowner) |
747 | CurrentResourceOwner = portal->resowner; |
748 | PortalContext = portal->portalContext; |
749 | |
750 | MemoryContextSwitchTo(PortalContext); |
751 | |
752 | switch (portal->strategy) |
753 | { |
754 | case PORTAL_ONE_SELECT: |
755 | case PORTAL_ONE_RETURNING: |
756 | case PORTAL_ONE_MOD_WITH: |
757 | case PORTAL_UTIL_SELECT: |
758 | |
759 | /* |
760 | * If we have not yet run the command, do so, storing its |
761 | * results in the portal's tuplestore. But we don't do that |
762 | * for the PORTAL_ONE_SELECT case. |
763 | */ |
764 | if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore) |
765 | FillPortalStore(portal, isTopLevel); |
766 | |
767 | /* |
768 | * Now fetch desired portion of results. |
769 | */ |
770 | nprocessed = PortalRunSelect(portal, true, count, dest); |
771 | |
772 | /* |
773 | * If the portal result contains a command tag and the caller |
774 | * gave us a pointer to store it, copy it. Patch the "SELECT" |
775 | * tag to also provide the rowcount. |
776 | */ |
777 | if (completionTag && portal->commandTag) |
778 | { |
779 | if (strcmp(portal->commandTag, "SELECT" ) == 0) |
780 | snprintf(completionTag, COMPLETION_TAG_BUFSIZE, |
781 | "SELECT " UINT64_FORMAT, nprocessed); |
782 | else |
783 | strcpy(completionTag, portal->commandTag); |
784 | } |
785 | |
786 | /* Mark portal not active */ |
787 | portal->status = PORTAL_READY; |
788 | |
789 | /* |
790 | * Since it's a forward fetch, say DONE iff atEnd is now true. |
791 | */ |
792 | result = portal->atEnd; |
793 | break; |
794 | |
795 | case PORTAL_MULTI_QUERY: |
796 | PortalRunMulti(portal, isTopLevel, false, |
797 | dest, altdest, completionTag); |
798 | |
799 | /* Prevent portal's commands from being re-executed */ |
800 | MarkPortalDone(portal); |
801 | |
802 | /* Always complete at end of RunMulti */ |
803 | result = true; |
804 | break; |
805 | |
806 | default: |
807 | elog(ERROR, "unrecognized portal strategy: %d" , |
808 | (int) portal->strategy); |
809 | result = false; /* keep compiler quiet */ |
810 | break; |
811 | } |
812 | } |
813 | PG_CATCH(); |
814 | { |
815 | /* Uncaught error while executing portal: mark it dead */ |
816 | MarkPortalFailed(portal); |
817 | |
818 | /* Restore global vars and propagate error */ |
819 | if (saveMemoryContext == saveTopTransactionContext) |
820 | MemoryContextSwitchTo(TopTransactionContext); |
821 | else |
822 | MemoryContextSwitchTo(saveMemoryContext); |
823 | ActivePortal = saveActivePortal; |
824 | if (saveResourceOwner == saveTopTransactionResourceOwner) |
825 | CurrentResourceOwner = TopTransactionResourceOwner; |
826 | else |
827 | CurrentResourceOwner = saveResourceOwner; |
828 | PortalContext = savePortalContext; |
829 | |
830 | PG_RE_THROW(); |
831 | } |
832 | PG_END_TRY(); |
833 | |
834 | if (saveMemoryContext == saveTopTransactionContext) |
835 | MemoryContextSwitchTo(TopTransactionContext); |
836 | else |
837 | MemoryContextSwitchTo(saveMemoryContext); |
838 | ActivePortal = saveActivePortal; |
839 | if (saveResourceOwner == saveTopTransactionResourceOwner) |
840 | CurrentResourceOwner = TopTransactionResourceOwner; |
841 | else |
842 | CurrentResourceOwner = saveResourceOwner; |
843 | PortalContext = savePortalContext; |
844 | |
845 | if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY) |
846 | ShowUsage("EXECUTOR STATISTICS" ); |
847 | |
848 | TRACE_POSTGRESQL_QUERY_EXECUTE_DONE(); |
849 | |
850 | return result; |
851 | } |
852 | |
853 | /* |
854 | * PortalRunSelect |
855 | * Execute a portal's query in PORTAL_ONE_SELECT mode, and also |
856 | * when fetching from a completed holdStore in PORTAL_ONE_RETURNING, |
857 | * PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases. |
858 | * |
859 | * This handles simple N-rows-forward-or-backward cases. For more complex |
860 | * nonsequential access to a portal, see PortalRunFetch. |
861 | * |
862 | * count <= 0 is interpreted as a no-op: the destination gets started up |
863 | * and shut down, but nothing else happens. Also, count == FETCH_ALL is |
864 | * interpreted as "all rows". (cf FetchStmt.howMany) |
865 | * |
866 | * Caller must already have validated the Portal and done appropriate |
867 | * setup (cf. PortalRun). |
868 | * |
869 | * Returns number of rows processed (suitable for use in result tag) |
870 | */ |
871 | static uint64 |
872 | PortalRunSelect(Portal portal, |
873 | bool forward, |
874 | long count, |
875 | DestReceiver *dest) |
876 | { |
877 | QueryDesc *queryDesc; |
878 | ScanDirection direction; |
879 | uint64 nprocessed; |
880 | |
881 | /* |
882 | * NB: queryDesc will be NULL if we are fetching from a held cursor or a |
883 | * completed utility query; can't use it in that path. |
884 | */ |
885 | queryDesc = portal->queryDesc; |
886 | |
887 | /* Caller messed up if we have neither a ready query nor held data. */ |
888 | Assert(queryDesc || portal->holdStore); |
889 | |
890 | /* |
891 | * Force the queryDesc destination to the right thing. This supports |
892 | * MOVE, for example, which will pass in dest = DestNone. This is okay to |
893 | * change as long as we do it on every fetch. (The Executor must not |
894 | * assume that dest never changes.) |
895 | */ |
896 | if (queryDesc) |
897 | queryDesc->dest = dest; |
898 | |
899 | /* |
900 | * Determine which direction to go in, and check to see if we're already |
901 | * at the end of the available tuples in that direction. If so, set the |
902 | * direction to NoMovement to avoid trying to fetch any tuples. (This |
903 | * check exists because not all plan node types are robust about being |
904 | * called again if they've already returned NULL once.) Then call the |
905 | * executor (we must not skip this, because the destination needs to see a |
906 | * setup and shutdown even if no tuples are available). Finally, update |
907 | * the portal position state depending on the number of tuples that were |
908 | * retrieved. |
909 | */ |
910 | if (forward) |
911 | { |
912 | if (portal->atEnd || count <= 0) |
913 | { |
914 | direction = NoMovementScanDirection; |
915 | count = 0; /* don't pass negative count to executor */ |
916 | } |
917 | else |
918 | direction = ForwardScanDirection; |
919 | |
920 | /* In the executor, zero count processes all rows */ |
921 | if (count == FETCH_ALL) |
922 | count = 0; |
923 | |
924 | if (portal->holdStore) |
925 | nprocessed = RunFromStore(portal, direction, (uint64) count, dest); |
926 | else |
927 | { |
928 | PushActiveSnapshot(queryDesc->snapshot); |
929 | ExecutorRun(queryDesc, direction, (uint64) count, |
930 | portal->run_once); |
931 | nprocessed = queryDesc->estate->es_processed; |
932 | PopActiveSnapshot(); |
933 | } |
934 | |
935 | if (!ScanDirectionIsNoMovement(direction)) |
936 | { |
937 | if (nprocessed > 0) |
938 | portal->atStart = false; /* OK to go backward now */ |
939 | if (count == 0 || nprocessed < (uint64) count) |
940 | portal->atEnd = true; /* we retrieved 'em all */ |
941 | portal->portalPos += nprocessed; |
942 | } |
943 | } |
944 | else |
945 | { |
946 | if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL) |
947 | ereport(ERROR, |
948 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
949 | errmsg("cursor can only scan forward" ), |
950 | errhint("Declare it with SCROLL option to enable backward scan." ))); |
951 | |
952 | if (portal->atStart || count <= 0) |
953 | { |
954 | direction = NoMovementScanDirection; |
955 | count = 0; /* don't pass negative count to executor */ |
956 | } |
957 | else |
958 | direction = BackwardScanDirection; |
959 | |
960 | /* In the executor, zero count processes all rows */ |
961 | if (count == FETCH_ALL) |
962 | count = 0; |
963 | |
964 | if (portal->holdStore) |
965 | nprocessed = RunFromStore(portal, direction, (uint64) count, dest); |
966 | else |
967 | { |
968 | PushActiveSnapshot(queryDesc->snapshot); |
969 | ExecutorRun(queryDesc, direction, (uint64) count, |
970 | portal->run_once); |
971 | nprocessed = queryDesc->estate->es_processed; |
972 | PopActiveSnapshot(); |
973 | } |
974 | |
975 | if (!ScanDirectionIsNoMovement(direction)) |
976 | { |
977 | if (nprocessed > 0 && portal->atEnd) |
978 | { |
979 | portal->atEnd = false; /* OK to go forward now */ |
980 | portal->portalPos++; /* adjust for endpoint case */ |
981 | } |
982 | if (count == 0 || nprocessed < (uint64) count) |
983 | { |
984 | portal->atStart = true; /* we retrieved 'em all */ |
985 | portal->portalPos = 0; |
986 | } |
987 | else |
988 | { |
989 | portal->portalPos -= nprocessed; |
990 | } |
991 | } |
992 | } |
993 | |
994 | return nprocessed; |
995 | } |
996 | |
997 | /* |
998 | * FillPortalStore |
999 | * Run the query and load result tuples into the portal's tuple store. |
1000 | * |
1001 | * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and |
1002 | * PORTAL_UTIL_SELECT cases only. |
1003 | */ |
1004 | static void |
1005 | FillPortalStore(Portal portal, bool isTopLevel) |
1006 | { |
1007 | DestReceiver *treceiver; |
1008 | char completionTag[COMPLETION_TAG_BUFSIZE]; |
1009 | |
1010 | PortalCreateHoldStore(portal); |
1011 | treceiver = CreateDestReceiver(DestTuplestore); |
1012 | SetTuplestoreDestReceiverParams(treceiver, |
1013 | portal->holdStore, |
1014 | portal->holdContext, |
1015 | false); |
1016 | |
1017 | completionTag[0] = '\0'; |
1018 | |
1019 | switch (portal->strategy) |
1020 | { |
1021 | case PORTAL_ONE_RETURNING: |
1022 | case PORTAL_ONE_MOD_WITH: |
1023 | |
1024 | /* |
1025 | * Run the portal to completion just as for the default |
1026 | * MULTI_QUERY case, but send the primary query's output to the |
1027 | * tuplestore. Auxiliary query outputs are discarded. Set the |
1028 | * portal's holdSnapshot to the snapshot used (or a copy of it). |
1029 | */ |
1030 | PortalRunMulti(portal, isTopLevel, true, |
1031 | treceiver, None_Receiver, completionTag); |
1032 | break; |
1033 | |
1034 | case PORTAL_UTIL_SELECT: |
1035 | PortalRunUtility(portal, linitial_node(PlannedStmt, portal->stmts), |
1036 | isTopLevel, true, treceiver, completionTag); |
1037 | break; |
1038 | |
1039 | default: |
1040 | elog(ERROR, "unsupported portal strategy: %d" , |
1041 | (int) portal->strategy); |
1042 | break; |
1043 | } |
1044 | |
1045 | /* Override default completion tag with actual command result */ |
1046 | if (completionTag[0] != '\0') |
1047 | portal->commandTag = pstrdup(completionTag); |
1048 | |
1049 | treceiver->rDestroy(treceiver); |
1050 | } |
1051 | |
1052 | /* |
1053 | * RunFromStore |
1054 | * Fetch tuples from the portal's tuple store. |
1055 | * |
1056 | * Calling conventions are similar to ExecutorRun, except that we |
1057 | * do not depend on having a queryDesc or estate. Therefore we return the |
1058 | * number of tuples processed as the result, not in estate->es_processed. |
1059 | * |
1060 | * One difference from ExecutorRun is that the destination receiver functions |
1061 | * are run in the caller's memory context (since we have no estate). Watch |
1062 | * out for memory leaks. |
1063 | */ |
1064 | static uint64 |
1065 | RunFromStore(Portal portal, ScanDirection direction, uint64 count, |
1066 | DestReceiver *dest) |
1067 | { |
1068 | uint64 current_tuple_count = 0; |
1069 | TupleTableSlot *slot; |
1070 | |
1071 | slot = MakeSingleTupleTableSlot(portal->tupDesc, &TTSOpsMinimalTuple); |
1072 | |
1073 | dest->rStartup(dest, CMD_SELECT, portal->tupDesc); |
1074 | |
1075 | if (ScanDirectionIsNoMovement(direction)) |
1076 | { |
1077 | /* do nothing except start/stop the destination */ |
1078 | } |
1079 | else |
1080 | { |
1081 | bool forward = ScanDirectionIsForward(direction); |
1082 | |
1083 | for (;;) |
1084 | { |
1085 | MemoryContext oldcontext; |
1086 | bool ok; |
1087 | |
1088 | oldcontext = MemoryContextSwitchTo(portal->holdContext); |
1089 | |
1090 | ok = tuplestore_gettupleslot(portal->holdStore, forward, false, |
1091 | slot); |
1092 | |
1093 | MemoryContextSwitchTo(oldcontext); |
1094 | |
1095 | if (!ok) |
1096 | break; |
1097 | |
1098 | /* |
1099 | * If we are not able to send the tuple, we assume the destination |
1100 | * has closed and no more tuples can be sent. If that's the case, |
1101 | * end the loop. |
1102 | */ |
1103 | if (!dest->receiveSlot(slot, dest)) |
1104 | break; |
1105 | |
1106 | ExecClearTuple(slot); |
1107 | |
1108 | /* |
1109 | * check our tuple count.. if we've processed the proper number |
1110 | * then quit, else loop again and process more tuples. Zero count |
1111 | * means no limit. |
1112 | */ |
1113 | current_tuple_count++; |
1114 | if (count && count == current_tuple_count) |
1115 | break; |
1116 | } |
1117 | } |
1118 | |
1119 | dest->rShutdown(dest); |
1120 | |
1121 | ExecDropSingleTupleTableSlot(slot); |
1122 | |
1123 | return current_tuple_count; |
1124 | } |
1125 | |
1126 | /* |
1127 | * PortalRunUtility |
1128 | * Execute a utility statement inside a portal. |
1129 | */ |
1130 | static void |
1131 | PortalRunUtility(Portal portal, PlannedStmt *pstmt, |
1132 | bool isTopLevel, bool setHoldSnapshot, |
1133 | DestReceiver *dest, char *completionTag) |
1134 | { |
1135 | Node *utilityStmt = pstmt->utilityStmt; |
1136 | Snapshot snapshot; |
1137 | |
1138 | /* |
1139 | * Set snapshot if utility stmt needs one. Most reliable way to do this |
1140 | * seems to be to enumerate those that do not need one; this is a short |
1141 | * list. Transaction control, LOCK, and SET must *not* set a snapshot |
1142 | * since they need to be executable at the start of a transaction-snapshot |
1143 | * mode transaction without freezing a snapshot. By extension we allow |
1144 | * SHOW not to set a snapshot. The other stmts listed are just efficiency |
1145 | * hacks. Beware of listing anything that can modify the database --- if, |
1146 | * say, it has to update an index with expressions that invoke |
1147 | * user-defined functions, then it had better have a snapshot. |
1148 | */ |
1149 | if (!(IsA(utilityStmt, TransactionStmt) || |
1150 | IsA(utilityStmt, LockStmt) || |
1151 | IsA(utilityStmt, VariableSetStmt) || |
1152 | IsA(utilityStmt, VariableShowStmt) || |
1153 | IsA(utilityStmt, ConstraintsSetStmt) || |
1154 | /* efficiency hacks from here down */ |
1155 | IsA(utilityStmt, FetchStmt) || |
1156 | IsA(utilityStmt, ListenStmt) || |
1157 | IsA(utilityStmt, NotifyStmt) || |
1158 | IsA(utilityStmt, UnlistenStmt) || |
1159 | IsA(utilityStmt, CheckPointStmt))) |
1160 | { |
1161 | snapshot = GetTransactionSnapshot(); |
1162 | /* If told to, register the snapshot we're using and save in portal */ |
1163 | if (setHoldSnapshot) |
1164 | { |
1165 | snapshot = RegisterSnapshot(snapshot); |
1166 | portal->holdSnapshot = snapshot; |
1167 | } |
1168 | PushActiveSnapshot(snapshot); |
1169 | /* PushActiveSnapshot might have copied the snapshot */ |
1170 | snapshot = GetActiveSnapshot(); |
1171 | } |
1172 | else |
1173 | snapshot = NULL; |
1174 | |
1175 | ProcessUtility(pstmt, |
1176 | portal->sourceText, |
1177 | isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY, |
1178 | portal->portalParams, |
1179 | portal->queryEnv, |
1180 | dest, |
1181 | completionTag); |
1182 | |
1183 | /* Some utility statements may change context on us */ |
1184 | MemoryContextSwitchTo(portal->portalContext); |
1185 | |
1186 | /* |
1187 | * Some utility commands may pop the ActiveSnapshot stack from under us, |
1188 | * so be careful to only pop the stack if our snapshot is still at the |
1189 | * top. |
1190 | */ |
1191 | if (snapshot != NULL && ActiveSnapshotSet() && |
1192 | snapshot == GetActiveSnapshot()) |
1193 | PopActiveSnapshot(); |
1194 | } |
1195 | |
1196 | /* |
1197 | * PortalRunMulti |
1198 | * Execute a portal's queries in the general case (multi queries |
1199 | * or non-SELECT-like queries) |
1200 | */ |
1201 | static void |
1202 | PortalRunMulti(Portal portal, |
1203 | bool isTopLevel, bool setHoldSnapshot, |
1204 | DestReceiver *dest, DestReceiver *altdest, |
1205 | char *completionTag) |
1206 | { |
1207 | bool active_snapshot_set = false; |
1208 | ListCell *stmtlist_item; |
1209 | |
1210 | /* |
1211 | * If the destination is DestRemoteExecute, change to DestNone. The |
1212 | * reason is that the client won't be expecting any tuples, and indeed has |
1213 | * no way to know what they are, since there is no provision for Describe |
1214 | * to send a RowDescription message when this portal execution strategy is |
1215 | * in effect. This presently will only affect SELECT commands added to |
1216 | * non-SELECT queries by rewrite rules: such commands will be executed, |
1217 | * but the results will be discarded unless you use "simple Query" |
1218 | * protocol. |
1219 | */ |
1220 | if (dest->mydest == DestRemoteExecute) |
1221 | dest = None_Receiver; |
1222 | if (altdest->mydest == DestRemoteExecute) |
1223 | altdest = None_Receiver; |
1224 | |
1225 | /* |
1226 | * Loop to handle the individual queries generated from a single parsetree |
1227 | * by analysis and rewrite. |
1228 | */ |
1229 | foreach(stmtlist_item, portal->stmts) |
1230 | { |
1231 | PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item); |
1232 | |
1233 | /* |
1234 | * If we got a cancel signal in prior command, quit |
1235 | */ |
1236 | CHECK_FOR_INTERRUPTS(); |
1237 | |
1238 | if (pstmt->utilityStmt == NULL) |
1239 | { |
1240 | /* |
1241 | * process a plannable query. |
1242 | */ |
1243 | TRACE_POSTGRESQL_QUERY_EXECUTE_START(); |
1244 | |
1245 | if (log_executor_stats) |
1246 | ResetUsage(); |
1247 | |
1248 | /* |
1249 | * Must always have a snapshot for plannable queries. First time |
1250 | * through, take a new snapshot; for subsequent queries in the |
1251 | * same portal, just update the snapshot's copy of the command |
1252 | * counter. |
1253 | */ |
1254 | if (!active_snapshot_set) |
1255 | { |
1256 | Snapshot snapshot = GetTransactionSnapshot(); |
1257 | |
1258 | /* If told to, register the snapshot and save in portal */ |
1259 | if (setHoldSnapshot) |
1260 | { |
1261 | snapshot = RegisterSnapshot(snapshot); |
1262 | portal->holdSnapshot = snapshot; |
1263 | } |
1264 | |
1265 | /* |
1266 | * We can't have the holdSnapshot also be the active one, |
1267 | * because UpdateActiveSnapshotCommandId would complain. So |
1268 | * force an extra snapshot copy. Plain PushActiveSnapshot |
1269 | * would have copied the transaction snapshot anyway, so this |
1270 | * only adds a copy step when setHoldSnapshot is true. (It's |
1271 | * okay for the command ID of the active snapshot to diverge |
1272 | * from what holdSnapshot has.) |
1273 | */ |
1274 | PushCopiedSnapshot(snapshot); |
1275 | active_snapshot_set = true; |
1276 | } |
1277 | else |
1278 | UpdateActiveSnapshotCommandId(); |
1279 | |
1280 | if (pstmt->canSetTag) |
1281 | { |
1282 | /* statement can set tag string */ |
1283 | ProcessQuery(pstmt, |
1284 | portal->sourceText, |
1285 | portal->portalParams, |
1286 | portal->queryEnv, |
1287 | dest, completionTag); |
1288 | } |
1289 | else |
1290 | { |
1291 | /* stmt added by rewrite cannot set tag */ |
1292 | ProcessQuery(pstmt, |
1293 | portal->sourceText, |
1294 | portal->portalParams, |
1295 | portal->queryEnv, |
1296 | altdest, NULL); |
1297 | } |
1298 | |
1299 | if (log_executor_stats) |
1300 | ShowUsage("EXECUTOR STATISTICS" ); |
1301 | |
1302 | TRACE_POSTGRESQL_QUERY_EXECUTE_DONE(); |
1303 | } |
1304 | else |
1305 | { |
1306 | /* |
1307 | * process utility functions (create, destroy, etc..) |
1308 | * |
1309 | * We must not set a snapshot here for utility commands (if one is |
1310 | * needed, PortalRunUtility will do it). If a utility command is |
1311 | * alone in a portal then everything's fine. The only case where |
1312 | * a utility command can be part of a longer list is that rules |
1313 | * are allowed to include NotifyStmt. NotifyStmt doesn't care |
1314 | * whether it has a snapshot or not, so we just leave the current |
1315 | * snapshot alone if we have one. |
1316 | */ |
1317 | if (pstmt->canSetTag) |
1318 | { |
1319 | Assert(!active_snapshot_set); |
1320 | /* statement can set tag string */ |
1321 | PortalRunUtility(portal, pstmt, isTopLevel, false, |
1322 | dest, completionTag); |
1323 | } |
1324 | else |
1325 | { |
1326 | Assert(IsA(pstmt->utilityStmt, NotifyStmt)); |
1327 | /* stmt added by rewrite cannot set tag */ |
1328 | PortalRunUtility(portal, pstmt, isTopLevel, false, |
1329 | altdest, NULL); |
1330 | } |
1331 | } |
1332 | |
1333 | /* |
1334 | * Increment command counter between queries, but not after the last |
1335 | * one. |
1336 | */ |
1337 | if (lnext(stmtlist_item) != NULL) |
1338 | CommandCounterIncrement(); |
1339 | |
1340 | /* |
1341 | * Clear subsidiary contexts to recover temporary memory. |
1342 | */ |
1343 | Assert(portal->portalContext == CurrentMemoryContext); |
1344 | |
1345 | MemoryContextDeleteChildren(portal->portalContext); |
1346 | } |
1347 | |
1348 | /* Pop the snapshot if we pushed one. */ |
1349 | if (active_snapshot_set) |
1350 | PopActiveSnapshot(); |
1351 | |
1352 | /* |
1353 | * If a command completion tag was supplied, use it. Otherwise use the |
1354 | * portal's commandTag as the default completion tag. |
1355 | * |
1356 | * Exception: Clients expect INSERT/UPDATE/DELETE tags to have counts, so |
1357 | * fake them with zeros. This can happen with DO INSTEAD rules if there |
1358 | * is no replacement query of the same type as the original. We print "0 |
1359 | * 0" here because technically there is no query of the matching tag type, |
1360 | * and printing a non-zero count for a different query type seems wrong, |
1361 | * e.g. an INSERT that does an UPDATE instead should not print "0 1" if |
1362 | * one row was updated. See QueryRewrite(), step 3, for details. |
1363 | */ |
1364 | if (completionTag && completionTag[0] == '\0') |
1365 | { |
1366 | if (portal->commandTag) |
1367 | strcpy(completionTag, portal->commandTag); |
1368 | if (strcmp(completionTag, "SELECT" ) == 0) |
1369 | sprintf(completionTag, "SELECT 0 0" ); |
1370 | else if (strcmp(completionTag, "INSERT" ) == 0) |
1371 | strcpy(completionTag, "INSERT 0 0" ); |
1372 | else if (strcmp(completionTag, "UPDATE" ) == 0) |
1373 | strcpy(completionTag, "UPDATE 0" ); |
1374 | else if (strcmp(completionTag, "DELETE" ) == 0) |
1375 | strcpy(completionTag, "DELETE 0" ); |
1376 | } |
1377 | } |
1378 | |
1379 | /* |
1380 | * PortalRunFetch |
1381 | * Variant form of PortalRun that supports SQL FETCH directions. |
1382 | * |
1383 | * Note: we presently assume that no callers of this want isTopLevel = true. |
1384 | * |
1385 | * count <= 0 is interpreted as a no-op: the destination gets started up |
1386 | * and shut down, but nothing else happens. Also, count == FETCH_ALL is |
1387 | * interpreted as "all rows". (cf FetchStmt.howMany) |
1388 | * |
1389 | * Returns number of rows processed (suitable for use in result tag) |
1390 | */ |
1391 | uint64 |
1392 | PortalRunFetch(Portal portal, |
1393 | FetchDirection fdirection, |
1394 | long count, |
1395 | DestReceiver *dest) |
1396 | { |
1397 | uint64 result; |
1398 | Portal saveActivePortal; |
1399 | ResourceOwner saveResourceOwner; |
1400 | MemoryContext savePortalContext; |
1401 | MemoryContext oldContext; |
1402 | |
1403 | AssertArg(PortalIsValid(portal)); |
1404 | |
1405 | /* |
1406 | * Check for improper portal use, and mark portal active. |
1407 | */ |
1408 | MarkPortalActive(portal); |
1409 | |
1410 | /* If supporting FETCH, portal can't be run-once. */ |
1411 | Assert(!portal->run_once); |
1412 | |
1413 | /* |
1414 | * Set up global portal context pointers. |
1415 | */ |
1416 | saveActivePortal = ActivePortal; |
1417 | saveResourceOwner = CurrentResourceOwner; |
1418 | savePortalContext = PortalContext; |
1419 | PG_TRY(); |
1420 | { |
1421 | ActivePortal = portal; |
1422 | if (portal->resowner) |
1423 | CurrentResourceOwner = portal->resowner; |
1424 | PortalContext = portal->portalContext; |
1425 | |
1426 | oldContext = MemoryContextSwitchTo(PortalContext); |
1427 | |
1428 | switch (portal->strategy) |
1429 | { |
1430 | case PORTAL_ONE_SELECT: |
1431 | result = DoPortalRunFetch(portal, fdirection, count, dest); |
1432 | break; |
1433 | |
1434 | case PORTAL_ONE_RETURNING: |
1435 | case PORTAL_ONE_MOD_WITH: |
1436 | case PORTAL_UTIL_SELECT: |
1437 | |
1438 | /* |
1439 | * If we have not yet run the command, do so, storing its |
1440 | * results in the portal's tuplestore. |
1441 | */ |
1442 | if (!portal->holdStore) |
1443 | FillPortalStore(portal, false /* isTopLevel */ ); |
1444 | |
1445 | /* |
1446 | * Now fetch desired portion of results. |
1447 | */ |
1448 | result = DoPortalRunFetch(portal, fdirection, count, dest); |
1449 | break; |
1450 | |
1451 | default: |
1452 | elog(ERROR, "unsupported portal strategy" ); |
1453 | result = 0; /* keep compiler quiet */ |
1454 | break; |
1455 | } |
1456 | } |
1457 | PG_CATCH(); |
1458 | { |
1459 | /* Uncaught error while executing portal: mark it dead */ |
1460 | MarkPortalFailed(portal); |
1461 | |
1462 | /* Restore global vars and propagate error */ |
1463 | ActivePortal = saveActivePortal; |
1464 | CurrentResourceOwner = saveResourceOwner; |
1465 | PortalContext = savePortalContext; |
1466 | |
1467 | PG_RE_THROW(); |
1468 | } |
1469 | PG_END_TRY(); |
1470 | |
1471 | MemoryContextSwitchTo(oldContext); |
1472 | |
1473 | /* Mark portal not active */ |
1474 | portal->status = PORTAL_READY; |
1475 | |
1476 | ActivePortal = saveActivePortal; |
1477 | CurrentResourceOwner = saveResourceOwner; |
1478 | PortalContext = savePortalContext; |
1479 | |
1480 | return result; |
1481 | } |
1482 | |
1483 | /* |
1484 | * DoPortalRunFetch |
1485 | * Guts of PortalRunFetch --- the portal context is already set up |
1486 | * |
1487 | * count <= 0 is interpreted as a no-op: the destination gets started up |
1488 | * and shut down, but nothing else happens. Also, count == FETCH_ALL is |
1489 | * interpreted as "all rows". (cf FetchStmt.howMany) |
1490 | * |
1491 | * Returns number of rows processed (suitable for use in result tag) |
1492 | */ |
1493 | static uint64 |
1494 | DoPortalRunFetch(Portal portal, |
1495 | FetchDirection fdirection, |
1496 | long count, |
1497 | DestReceiver *dest) |
1498 | { |
1499 | bool forward; |
1500 | |
1501 | Assert(portal->strategy == PORTAL_ONE_SELECT || |
1502 | portal->strategy == PORTAL_ONE_RETURNING || |
1503 | portal->strategy == PORTAL_ONE_MOD_WITH || |
1504 | portal->strategy == PORTAL_UTIL_SELECT); |
1505 | |
1506 | switch (fdirection) |
1507 | { |
1508 | case FETCH_FORWARD: |
1509 | if (count < 0) |
1510 | { |
1511 | fdirection = FETCH_BACKWARD; |
1512 | count = -count; |
1513 | } |
1514 | /* fall out of switch to share code with FETCH_BACKWARD */ |
1515 | break; |
1516 | case FETCH_BACKWARD: |
1517 | if (count < 0) |
1518 | { |
1519 | fdirection = FETCH_FORWARD; |
1520 | count = -count; |
1521 | } |
1522 | /* fall out of switch to share code with FETCH_FORWARD */ |
1523 | break; |
1524 | case FETCH_ABSOLUTE: |
1525 | if (count > 0) |
1526 | { |
1527 | /* |
1528 | * Definition: Rewind to start, advance count-1 rows, return |
1529 | * next row (if any). |
1530 | * |
1531 | * In practice, if the goal is less than halfway back to the |
1532 | * start, it's better to scan from where we are. |
1533 | * |
1534 | * Also, if current portalPos is outside the range of "long", |
1535 | * do it the hard way to avoid possible overflow of the count |
1536 | * argument to PortalRunSelect. We must exclude exactly |
1537 | * LONG_MAX, as well, lest the count look like FETCH_ALL. |
1538 | * |
1539 | * In any case, we arrange to fetch the target row going |
1540 | * forwards. |
1541 | */ |
1542 | if ((uint64) (count - 1) <= portal->portalPos / 2 || |
1543 | portal->portalPos >= (uint64) LONG_MAX) |
1544 | { |
1545 | DoPortalRewind(portal); |
1546 | if (count > 1) |
1547 | PortalRunSelect(portal, true, count - 1, |
1548 | None_Receiver); |
1549 | } |
1550 | else |
1551 | { |
1552 | long pos = (long) portal->portalPos; |
1553 | |
1554 | if (portal->atEnd) |
1555 | pos++; /* need one extra fetch if off end */ |
1556 | if (count <= pos) |
1557 | PortalRunSelect(portal, false, pos - count + 1, |
1558 | None_Receiver); |
1559 | else if (count > pos + 1) |
1560 | PortalRunSelect(portal, true, count - pos - 1, |
1561 | None_Receiver); |
1562 | } |
1563 | return PortalRunSelect(portal, true, 1L, dest); |
1564 | } |
1565 | else if (count < 0) |
1566 | { |
1567 | /* |
1568 | * Definition: Advance to end, back up abs(count)-1 rows, |
1569 | * return prior row (if any). We could optimize this if we |
1570 | * knew in advance where the end was, but typically we won't. |
1571 | * (Is it worth considering case where count > half of size of |
1572 | * query? We could rewind once we know the size ...) |
1573 | */ |
1574 | PortalRunSelect(portal, true, FETCH_ALL, None_Receiver); |
1575 | if (count < -1) |
1576 | PortalRunSelect(portal, false, -count - 1, None_Receiver); |
1577 | return PortalRunSelect(portal, false, 1L, dest); |
1578 | } |
1579 | else |
1580 | { |
1581 | /* count == 0 */ |
1582 | /* Rewind to start, return zero rows */ |
1583 | DoPortalRewind(portal); |
1584 | return PortalRunSelect(portal, true, 0L, dest); |
1585 | } |
1586 | break; |
1587 | case FETCH_RELATIVE: |
1588 | if (count > 0) |
1589 | { |
1590 | /* |
1591 | * Definition: advance count-1 rows, return next row (if any). |
1592 | */ |
1593 | if (count > 1) |
1594 | PortalRunSelect(portal, true, count - 1, None_Receiver); |
1595 | return PortalRunSelect(portal, true, 1L, dest); |
1596 | } |
1597 | else if (count < 0) |
1598 | { |
1599 | /* |
1600 | * Definition: back up abs(count)-1 rows, return prior row (if |
1601 | * any). |
1602 | */ |
1603 | if (count < -1) |
1604 | PortalRunSelect(portal, false, -count - 1, None_Receiver); |
1605 | return PortalRunSelect(portal, false, 1L, dest); |
1606 | } |
1607 | else |
1608 | { |
1609 | /* count == 0 */ |
1610 | /* Same as FETCH FORWARD 0, so fall out of switch */ |
1611 | fdirection = FETCH_FORWARD; |
1612 | } |
1613 | break; |
1614 | default: |
1615 | elog(ERROR, "bogus direction" ); |
1616 | break; |
1617 | } |
1618 | |
1619 | /* |
1620 | * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count |
1621 | * >= 0. |
1622 | */ |
1623 | forward = (fdirection == FETCH_FORWARD); |
1624 | |
1625 | /* |
1626 | * Zero count means to re-fetch the current row, if any (per SQL) |
1627 | */ |
1628 | if (count == 0) |
1629 | { |
1630 | bool on_row; |
1631 | |
1632 | /* Are we sitting on a row? */ |
1633 | on_row = (!portal->atStart && !portal->atEnd); |
1634 | |
1635 | if (dest->mydest == DestNone) |
1636 | { |
1637 | /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */ |
1638 | return on_row ? 1 : 0; |
1639 | } |
1640 | else |
1641 | { |
1642 | /* |
1643 | * If we are sitting on a row, back up one so we can re-fetch it. |
1644 | * If we are not sitting on a row, we still have to start up and |
1645 | * shut down the executor so that the destination is initialized |
1646 | * and shut down correctly; so keep going. To PortalRunSelect, |
1647 | * count == 0 means we will retrieve no row. |
1648 | */ |
1649 | if (on_row) |
1650 | { |
1651 | PortalRunSelect(portal, false, 1L, None_Receiver); |
1652 | /* Set up to fetch one row forward */ |
1653 | count = 1; |
1654 | forward = true; |
1655 | } |
1656 | } |
1657 | } |
1658 | |
1659 | /* |
1660 | * Optimize MOVE BACKWARD ALL into a Rewind. |
1661 | */ |
1662 | if (!forward && count == FETCH_ALL && dest->mydest == DestNone) |
1663 | { |
1664 | uint64 result = portal->portalPos; |
1665 | |
1666 | if (result > 0 && !portal->atEnd) |
1667 | result--; |
1668 | DoPortalRewind(portal); |
1669 | return result; |
1670 | } |
1671 | |
1672 | return PortalRunSelect(portal, forward, count, dest); |
1673 | } |
1674 | |
1675 | /* |
1676 | * DoPortalRewind - rewind a Portal to starting point |
1677 | */ |
1678 | static void |
1679 | DoPortalRewind(Portal portal) |
1680 | { |
1681 | QueryDesc *queryDesc; |
1682 | |
1683 | /* Rewind holdStore, if we have one */ |
1684 | if (portal->holdStore) |
1685 | { |
1686 | MemoryContext oldcontext; |
1687 | |
1688 | oldcontext = MemoryContextSwitchTo(portal->holdContext); |
1689 | tuplestore_rescan(portal->holdStore); |
1690 | MemoryContextSwitchTo(oldcontext); |
1691 | } |
1692 | |
1693 | /* Rewind executor, if active */ |
1694 | queryDesc = portal->queryDesc; |
1695 | if (queryDesc) |
1696 | { |
1697 | PushActiveSnapshot(queryDesc->snapshot); |
1698 | ExecutorRewind(queryDesc); |
1699 | PopActiveSnapshot(); |
1700 | } |
1701 | |
1702 | portal->atStart = true; |
1703 | portal->atEnd = false; |
1704 | portal->portalPos = 0; |
1705 | } |
1706 | |