1/*-------------------------------------------------------------------------
2 *
3 * portalcmds.c
4 * Utility commands affecting portals (that is, SQL cursor commands)
5 *
6 * Note: see also tcop/pquery.c, which implements portal operations for
7 * the FE/BE protocol. This module uses pquery.c for some operations.
8 * And both modules depend on utils/mmgr/portalmem.c, which controls
9 * storage management for portals (but doesn't run any queries in them).
10 *
11 *
12 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
13 * Portions Copyright (c) 1994, Regents of the University of California
14 *
15 *
16 * IDENTIFICATION
17 * src/backend/commands/portalcmds.c
18 *
19 *-------------------------------------------------------------------------
20 */
21
22#include "postgres.h"
23
24#include <limits.h>
25
26#include "access/xact.h"
27#include "commands/portalcmds.h"
28#include "executor/executor.h"
29#include "executor/tstoreReceiver.h"
30#include "rewrite/rewriteHandler.h"
31#include "tcop/pquery.h"
32#include "tcop/tcopprot.h"
33#include "utils/memutils.h"
34#include "utils/snapmgr.h"
35
36
37/*
38 * PerformCursorOpen
39 * Execute SQL DECLARE CURSOR command.
40 */
41void
42PerformCursorOpen(DeclareCursorStmt *cstmt, ParamListInfo params,
43 const char *queryString, bool isTopLevel)
44{
45 Query *query = castNode(Query, cstmt->query);
46 List *rewritten;
47 PlannedStmt *plan;
48 Portal portal;
49 MemoryContext oldContext;
50
51 /*
52 * Disallow empty-string cursor name (conflicts with protocol-level
53 * unnamed portal).
54 */
55 if (!cstmt->portalname || cstmt->portalname[0] == '\0')
56 ereport(ERROR,
57 (errcode(ERRCODE_INVALID_CURSOR_NAME),
58 errmsg("invalid cursor name: must not be empty")));
59
60 /*
61 * If this is a non-holdable cursor, we require that this statement has
62 * been executed inside a transaction block (or else, it would have no
63 * user-visible effect).
64 */
65 if (!(cstmt->options & CURSOR_OPT_HOLD))
66 RequireTransactionBlock(isTopLevel, "DECLARE CURSOR");
67
68 /*
69 * Parse analysis was done already, but we still have to run the rule
70 * rewriter. We do not do AcquireRewriteLocks: we assume the query either
71 * came straight from the parser, or suitable locks were acquired by
72 * plancache.c.
73 *
74 * Because the rewriter and planner tend to scribble on the input, we make
75 * a preliminary copy of the source querytree. This prevents problems in
76 * the case that the DECLARE CURSOR is in a portal or plpgsql function and
77 * is executed repeatedly. (See also the same hack in EXPLAIN and
78 * PREPARE.) XXX FIXME someday.
79 */
80 rewritten = QueryRewrite((Query *) copyObject(query));
81
82 /* SELECT should never rewrite to more or less than one query */
83 if (list_length(rewritten) != 1)
84 elog(ERROR, "non-SELECT statement in DECLARE CURSOR");
85
86 query = linitial_node(Query, rewritten);
87
88 if (query->commandType != CMD_SELECT)
89 elog(ERROR, "non-SELECT statement in DECLARE CURSOR");
90
91 /* Plan the query, applying the specified options */
92 plan = pg_plan_query(query, cstmt->options, params);
93
94 /*
95 * Create a portal and copy the plan and queryString into its memory.
96 */
97 portal = CreatePortal(cstmt->portalname, false, false);
98
99 oldContext = MemoryContextSwitchTo(portal->portalContext);
100
101 plan = copyObject(plan);
102
103 queryString = pstrdup(queryString);
104
105 PortalDefineQuery(portal,
106 NULL,
107 queryString,
108 "SELECT", /* cursor's query is always a SELECT */
109 list_make1(plan),
110 NULL);
111
112 /*----------
113 * Also copy the outer portal's parameter list into the inner portal's
114 * memory context. We want to pass down the parameter values in case we
115 * had a command like
116 * DECLARE c CURSOR FOR SELECT ... WHERE foo = $1
117 * This will have been parsed using the outer parameter set and the
118 * parameter value needs to be preserved for use when the cursor is
119 * executed.
120 *----------
121 */
122 params = copyParamList(params);
123
124 MemoryContextSwitchTo(oldContext);
125
126 /*
127 * Set up options for portal.
128 *
129 * If the user didn't specify a SCROLL type, allow or disallow scrolling
130 * based on whether it would require any additional runtime overhead to do
131 * so. Also, we disallow scrolling for FOR UPDATE cursors.
132 */
133 portal->cursorOptions = cstmt->options;
134 if (!(portal->cursorOptions & (CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL)))
135 {
136 if (plan->rowMarks == NIL &&
137 ExecSupportsBackwardScan(plan->planTree))
138 portal->cursorOptions |= CURSOR_OPT_SCROLL;
139 else
140 portal->cursorOptions |= CURSOR_OPT_NO_SCROLL;
141 }
142
143 /*
144 * Start execution, inserting parameters if any.
145 */
146 PortalStart(portal, params, 0, GetActiveSnapshot());
147
148 Assert(portal->strategy == PORTAL_ONE_SELECT);
149
150 /*
151 * We're done; the query won't actually be run until PerformPortalFetch is
152 * called.
153 */
154}
155
156/*
157 * PerformPortalFetch
158 * Execute SQL FETCH or MOVE command.
159 *
160 * stmt: parsetree node for command
161 * dest: where to send results
162 * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
163 * in which to store a command completion status string.
164 *
165 * completionTag may be NULL if caller doesn't want a status string.
166 */
167void
168PerformPortalFetch(FetchStmt *stmt,
169 DestReceiver *dest,
170 char *completionTag)
171{
172 Portal portal;
173 uint64 nprocessed;
174
175 /*
176 * Disallow empty-string cursor name (conflicts with protocol-level
177 * unnamed portal).
178 */
179 if (!stmt->portalname || stmt->portalname[0] == '\0')
180 ereport(ERROR,
181 (errcode(ERRCODE_INVALID_CURSOR_NAME),
182 errmsg("invalid cursor name: must not be empty")));
183
184 /* get the portal from the portal name */
185 portal = GetPortalByName(stmt->portalname);
186 if (!PortalIsValid(portal))
187 {
188 ereport(ERROR,
189 (errcode(ERRCODE_UNDEFINED_CURSOR),
190 errmsg("cursor \"%s\" does not exist", stmt->portalname)));
191 return; /* keep compiler happy */
192 }
193
194 /* Adjust dest if needed. MOVE wants destination DestNone */
195 if (stmt->ismove)
196 dest = None_Receiver;
197
198 /* Do it */
199 nprocessed = PortalRunFetch(portal,
200 stmt->direction,
201 stmt->howMany,
202 dest);
203
204 /* Return command status if wanted */
205 if (completionTag)
206 snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s " UINT64_FORMAT,
207 stmt->ismove ? "MOVE" : "FETCH",
208 nprocessed);
209}
210
211/*
212 * PerformPortalClose
213 * Close a cursor.
214 */
215void
216PerformPortalClose(const char *name)
217{
218 Portal portal;
219
220 /* NULL means CLOSE ALL */
221 if (name == NULL)
222 {
223 PortalHashTableDeleteAll();
224 return;
225 }
226
227 /*
228 * Disallow empty-string cursor name (conflicts with protocol-level
229 * unnamed portal).
230 */
231 if (name[0] == '\0')
232 ereport(ERROR,
233 (errcode(ERRCODE_INVALID_CURSOR_NAME),
234 errmsg("invalid cursor name: must not be empty")));
235
236 /*
237 * get the portal from the portal name
238 */
239 portal = GetPortalByName(name);
240 if (!PortalIsValid(portal))
241 {
242 ereport(ERROR,
243 (errcode(ERRCODE_UNDEFINED_CURSOR),
244 errmsg("cursor \"%s\" does not exist", name)));
245 return; /* keep compiler happy */
246 }
247
248 /*
249 * Note: PortalCleanup is called as a side-effect, if not already done.
250 */
251 PortalDrop(portal, false);
252}
253
254/*
255 * PortalCleanup
256 *
257 * Clean up a portal when it's dropped. This is the standard cleanup hook
258 * for portals.
259 *
260 * Note: if portal->status is PORTAL_FAILED, we are probably being called
261 * during error abort, and must be careful to avoid doing anything that
262 * is likely to fail again.
263 */
264void
265PortalCleanup(Portal portal)
266{
267 QueryDesc *queryDesc;
268
269 /*
270 * sanity checks
271 */
272 AssertArg(PortalIsValid(portal));
273 AssertArg(portal->cleanup == PortalCleanup);
274
275 /*
276 * Shut down executor, if still running. We skip this during error abort,
277 * since other mechanisms will take care of releasing executor resources,
278 * and we can't be sure that ExecutorEnd itself wouldn't fail.
279 */
280 queryDesc = portal->queryDesc;
281 if (queryDesc)
282 {
283 /*
284 * Reset the queryDesc before anything else. This prevents us from
285 * trying to shut down the executor twice, in case of an error below.
286 * The transaction abort mechanisms will take care of resource cleanup
287 * in such a case.
288 */
289 portal->queryDesc = NULL;
290
291 if (portal->status != PORTAL_FAILED)
292 {
293 ResourceOwner saveResourceOwner;
294
295 /* We must make the portal's resource owner current */
296 saveResourceOwner = CurrentResourceOwner;
297 if (portal->resowner)
298 CurrentResourceOwner = portal->resowner;
299
300 ExecutorFinish(queryDesc);
301 ExecutorEnd(queryDesc);
302 FreeQueryDesc(queryDesc);
303
304 CurrentResourceOwner = saveResourceOwner;
305 }
306 }
307}
308
309/*
310 * PersistHoldablePortal
311 *
312 * Prepare the specified Portal for access outside of the current
313 * transaction. When this function returns, all future accesses to the
314 * portal must be done via the Tuplestore (not by invoking the
315 * executor).
316 */
317void
318PersistHoldablePortal(Portal portal)
319{
320 QueryDesc *queryDesc = portal->queryDesc;
321 Portal saveActivePortal;
322 ResourceOwner saveResourceOwner;
323 MemoryContext savePortalContext;
324 MemoryContext oldcxt;
325
326 /*
327 * If we're preserving a holdable portal, we had better be inside the
328 * transaction that originally created it.
329 */
330 Assert(portal->createSubid != InvalidSubTransactionId);
331 Assert(queryDesc != NULL);
332
333 /*
334 * Caller must have created the tuplestore already ... but not a snapshot.
335 */
336 Assert(portal->holdContext != NULL);
337 Assert(portal->holdStore != NULL);
338 Assert(portal->holdSnapshot == NULL);
339
340 /*
341 * Before closing down the executor, we must copy the tupdesc into
342 * long-term memory, since it was created in executor memory.
343 */
344 oldcxt = MemoryContextSwitchTo(portal->holdContext);
345
346 portal->tupDesc = CreateTupleDescCopy(portal->tupDesc);
347
348 MemoryContextSwitchTo(oldcxt);
349
350 /*
351 * Check for improper portal use, and mark portal active.
352 */
353 MarkPortalActive(portal);
354
355 /*
356 * Set up global portal context pointers.
357 */
358 saveActivePortal = ActivePortal;
359 saveResourceOwner = CurrentResourceOwner;
360 savePortalContext = PortalContext;
361 PG_TRY();
362 {
363 ActivePortal = portal;
364 if (portal->resowner)
365 CurrentResourceOwner = portal->resowner;
366 PortalContext = portal->portalContext;
367
368 MemoryContextSwitchTo(PortalContext);
369
370 PushActiveSnapshot(queryDesc->snapshot);
371
372 /*
373 * Rewind the executor: we need to store the entire result set in the
374 * tuplestore, so that subsequent backward FETCHs can be processed.
375 */
376 ExecutorRewind(queryDesc);
377
378 /*
379 * Change the destination to output to the tuplestore. Note we tell
380 * the tuplestore receiver to detoast all data passed through it; this
381 * makes it safe to not keep a snapshot associated with the data.
382 */
383 queryDesc->dest = CreateDestReceiver(DestTuplestore);
384 SetTuplestoreDestReceiverParams(queryDesc->dest,
385 portal->holdStore,
386 portal->holdContext,
387 true);
388
389 /* Fetch the result set into the tuplestore */
390 ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
391
392 queryDesc->dest->rDestroy(queryDesc->dest);
393 queryDesc->dest = NULL;
394
395 /*
396 * Now shut down the inner executor.
397 */
398 portal->queryDesc = NULL; /* prevent double shutdown */
399 ExecutorFinish(queryDesc);
400 ExecutorEnd(queryDesc);
401 FreeQueryDesc(queryDesc);
402
403 /*
404 * Set the position in the result set.
405 */
406 MemoryContextSwitchTo(portal->holdContext);
407
408 if (portal->atEnd)
409 {
410 /*
411 * Just force the tuplestore forward to its end. The size of the
412 * skip request here is arbitrary.
413 */
414 while (tuplestore_skiptuples(portal->holdStore, 1000000, true))
415 /* continue */ ;
416 }
417 else
418 {
419 tuplestore_rescan(portal->holdStore);
420
421 if (!tuplestore_skiptuples(portal->holdStore,
422 portal->portalPos,
423 true))
424 elog(ERROR, "unexpected end of tuple stream");
425 }
426 }
427 PG_CATCH();
428 {
429 /* Uncaught error while executing portal: mark it dead */
430 MarkPortalFailed(portal);
431
432 /* Restore global vars and propagate error */
433 ActivePortal = saveActivePortal;
434 CurrentResourceOwner = saveResourceOwner;
435 PortalContext = savePortalContext;
436
437 PG_RE_THROW();
438 }
439 PG_END_TRY();
440
441 MemoryContextSwitchTo(oldcxt);
442
443 /* Mark portal not active */
444 portal->status = PORTAL_READY;
445
446 ActivePortal = saveActivePortal;
447 CurrentResourceOwner = saveResourceOwner;
448 PortalContext = savePortalContext;
449
450 PopActiveSnapshot();
451
452 /*
453 * We can now release any subsidiary memory of the portal's context; we'll
454 * never use it again. The executor already dropped its context, but this
455 * will clean up anything that glommed onto the portal's context via
456 * PortalContext.
457 */
458 MemoryContextDeleteChildren(portal->portalContext);
459}
460