1/*-------------------------------------------------------------------------
2 *
3 * twophase.c
4 * Two-phase commit support functions.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/access/transam/twophase.c
11 *
12 * NOTES
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
16 *
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
23 *
24 * A global transaction (gxact) also has dummy PGXACT and PGPROC; this is
25 * what keeps the XID considered running by TransactionIdIsInProgress.
26 * It is also convenient as a PGPROC to hook the gxact's locks to.
27 *
28 * Information to recover prepared transactions in case of crash is
29 * now stored in WAL for the common case. In some cases there will be
30 * an extended period between preparing a GXACT and commit/abort, in
31 * which case we need to separately record prepared transaction data
32 * in permanent storage. This includes locking information, pending
33 * notifications etc. All that state information is written to the
34 * per-transaction state file in the pg_twophase directory.
35 * All prepared transactions will be written prior to shutdown.
36 *
37 * Life track of state data is following:
38 *
39 * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 * stores pointer to the start of the WAL record in
41 * gxact->prepare_start_lsn.
42 * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 * using prepare_start_lsn.
44 * * On checkpoint state data copied to files in pg_twophase directory and
45 * fsynced
46 * * If COMMIT happens after checkpoint then backend reads state data from
47 * files
48 *
49 * During replay and replication, TwoPhaseState also holds information
50 * about active prepared transactions that haven't been moved to disk yet.
51 *
52 * Replay of twophase records happens by the following rules:
53 *
54 * * At the beginning of recovery, pg_twophase is scanned once, filling
55 * TwoPhaseState with entries marked with gxact->inredo and
56 * gxact->ondisk. Two-phase file data older than the XID horizon of
57 * the redo position are discarded.
58 * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 * gxact->inredo is set to true for such entries.
60 * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 * that have gxact->inredo set and are behind the redo_horizon. We
62 * save them to disk and then switch gxact->ondisk to true.
63 * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 * If gxact->ondisk is true, the corresponding entry from the disk
65 * is additionally deleted.
66 * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 * and PrescanPreparedTransactions() have been modified to go through
68 * gxact->inredo entries that have not made it to disk.
69 *
70 *-------------------------------------------------------------------------
71 */
72#include "postgres.h"
73
74#include <fcntl.h>
75#include <sys/stat.h>
76#include <time.h>
77#include <unistd.h>
78
79#include "access/commit_ts.h"
80#include "access/htup_details.h"
81#include "access/subtrans.h"
82#include "access/transam.h"
83#include "access/twophase.h"
84#include "access/twophase_rmgr.h"
85#include "access/xact.h"
86#include "access/xlog.h"
87#include "access/xloginsert.h"
88#include "access/xlogutils.h"
89#include "access/xlogreader.h"
90#include "catalog/pg_type.h"
91#include "catalog/storage.h"
92#include "funcapi.h"
93#include "miscadmin.h"
94#include "pg_trace.h"
95#include "pgstat.h"
96#include "replication/origin.h"
97#include "replication/syncrep.h"
98#include "replication/walsender.h"
99#include "storage/fd.h"
100#include "storage/ipc.h"
101#include "storage/md.h"
102#include "storage/predicate.h"
103#include "storage/proc.h"
104#include "storage/procarray.h"
105#include "storage/sinvaladt.h"
106#include "storage/smgr.h"
107#include "utils/builtins.h"
108#include "utils/memutils.h"
109#include "utils/timestamp.h"
110
111
112/*
113 * Directory where Two-phase commit files reside within PGDATA
114 */
115#define TWOPHASE_DIR "pg_twophase"
116
117/* GUC variable, can't be changed after startup */
118int max_prepared_xacts = 0;
119
120/*
121 * This struct describes one global transaction that is in prepared state
122 * or attempting to become prepared.
123 *
124 * The lifecycle of a global transaction is:
125 *
126 * 1. After checking that the requested GID is not in use, set up an entry in
127 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
128 * and mark it as locked by my backend.
129 *
130 * 2. After successfully completing prepare, set valid = true and enter the
131 * referenced PGPROC into the global ProcArray.
132 *
133 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
134 * valid and not locked, then mark the entry as locked by storing my current
135 * backend ID into locking_backend. This prevents concurrent attempts to
136 * commit or rollback the same prepared xact.
137 *
138 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
139 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
140 * the freelist.
141 *
142 * Note that if the preparing transaction fails between steps 1 and 2, the
143 * entry must be removed so that the GID and the GlobalTransaction struct
144 * can be reused. See AtAbort_Twophase().
145 *
146 * typedef struct GlobalTransactionData *GlobalTransaction appears in
147 * twophase.h
148 */
149
150typedef struct GlobalTransactionData
151{
152 GlobalTransaction next; /* list link for free list */
153 int pgprocno; /* ID of associated dummy PGPROC */
154 BackendId dummyBackendId; /* similar to backend id for backends */
155 TimestampTz prepared_at; /* time of preparation */
156
157 /*
158 * Note that we need to keep track of two LSNs for each GXACT. We keep
159 * track of the start LSN because this is the address we must use to read
160 * state data back from WAL when committing a prepared GXACT. We keep
161 * track of the end LSN because that is the LSN we need to wait for prior
162 * to commit.
163 */
164 XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
165 XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
166 TransactionId xid; /* The GXACT id */
167
168 Oid owner; /* ID of user that executed the xact */
169 BackendId locking_backend; /* backend currently working on the xact */
170 bool valid; /* true if PGPROC entry is in proc array */
171 bool ondisk; /* true if prepare state file is on disk */
172 bool inredo; /* true if entry was added via xlog_redo */
173 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
174} GlobalTransactionData;
175
176/*
177 * Two Phase Commit shared state. Access to this struct is protected
178 * by TwoPhaseStateLock.
179 */
180typedef struct TwoPhaseStateData
181{
182 /* Head of linked list of free GlobalTransactionData structs */
183 GlobalTransaction freeGXacts;
184
185 /* Number of valid prepXacts entries. */
186 int numPrepXacts;
187
188 /* There are max_prepared_xacts items in this array */
189 GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
190} TwoPhaseStateData;
191
192static TwoPhaseStateData *TwoPhaseState;
193
194/*
195 * Global transaction entry currently locked by us, if any. Note that any
196 * access to the entry pointed to by this variable must be protected by
197 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
198 * (since it's just local memory).
199 */
200static GlobalTransaction MyLockedGxact = NULL;
201
202static bool twophaseExitRegistered = false;
203
204static void RecordTransactionCommitPrepared(TransactionId xid,
205 int nchildren,
206 TransactionId *children,
207 int nrels,
208 RelFileNode *rels,
209 int ninvalmsgs,
210 SharedInvalidationMessage *invalmsgs,
211 bool initfileinval,
212 const char *gid);
213static void RecordTransactionAbortPrepared(TransactionId xid,
214 int nchildren,
215 TransactionId *children,
216 int nrels,
217 RelFileNode *rels,
218 const char *gid);
219static void ProcessRecords(char *bufptr, TransactionId xid,
220 const TwoPhaseCallback callbacks[]);
221static void RemoveGXact(GlobalTransaction gxact);
222
223static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
224static char *ProcessTwoPhaseBuffer(TransactionId xid,
225 XLogRecPtr prepare_start_lsn,
226 bool fromdisk, bool setParent, bool setNextXid);
227static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
228 const char *gid, TimestampTz prepared_at, Oid owner,
229 Oid databaseid);
230static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
231static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
232
233/*
234 * Initialization of shared memory
235 */
236Size
237TwoPhaseShmemSize(void)
238{
239 Size size;
240
241 /* Need the fixed struct, the array of pointers, and the GTD structs */
242 size = offsetof(TwoPhaseStateData, prepXacts);
243 size = add_size(size, mul_size(max_prepared_xacts,
244 sizeof(GlobalTransaction)));
245 size = MAXALIGN(size);
246 size = add_size(size, mul_size(max_prepared_xacts,
247 sizeof(GlobalTransactionData)));
248
249 return size;
250}
251
252void
253TwoPhaseShmemInit(void)
254{
255 bool found;
256
257 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
258 TwoPhaseShmemSize(),
259 &found);
260 if (!IsUnderPostmaster)
261 {
262 GlobalTransaction gxacts;
263 int i;
264
265 Assert(!found);
266 TwoPhaseState->freeGXacts = NULL;
267 TwoPhaseState->numPrepXacts = 0;
268
269 /*
270 * Initialize the linked list of free GlobalTransactionData structs
271 */
272 gxacts = (GlobalTransaction)
273 ((char *) TwoPhaseState +
274 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
275 sizeof(GlobalTransaction) * max_prepared_xacts));
276 for (i = 0; i < max_prepared_xacts; i++)
277 {
278 /* insert into linked list */
279 gxacts[i].next = TwoPhaseState->freeGXacts;
280 TwoPhaseState->freeGXacts = &gxacts[i];
281
282 /* associate it with a PGPROC assigned by InitProcGlobal */
283 gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
284
285 /*
286 * Assign a unique ID for each dummy proc, so that the range of
287 * dummy backend IDs immediately follows the range of normal
288 * backend IDs. We don't dare to assign a real backend ID to dummy
289 * procs, because prepared transactions don't take part in cache
290 * invalidation like a real backend ID would imply, but having a
291 * unique ID for them is nevertheless handy. This arrangement
292 * allows you to allocate an array of size (MaxBackends +
293 * max_prepared_xacts + 1), and have a slot for every backend and
294 * prepared transaction. Currently multixact.c uses that
295 * technique.
296 */
297 gxacts[i].dummyBackendId = MaxBackends + 1 + i;
298 }
299 }
300 else
301 Assert(found);
302}
303
304/*
305 * Exit hook to unlock the global transaction entry we're working on.
306 */
307static void
308AtProcExit_Twophase(int code, Datum arg)
309{
310 /* same logic as abort */
311 AtAbort_Twophase();
312}
313
314/*
315 * Abort hook to unlock the global transaction entry we're working on.
316 */
317void
318AtAbort_Twophase(void)
319{
320 if (MyLockedGxact == NULL)
321 return;
322
323 /*
324 * What to do with the locked global transaction entry? If we were in the
325 * process of preparing the transaction, but haven't written the WAL
326 * record and state file yet, the transaction must not be considered as
327 * prepared. Likewise, if we are in the process of finishing an
328 * already-prepared transaction, and fail after having already written the
329 * 2nd phase commit or rollback record to the WAL, the transaction should
330 * not be considered as prepared anymore. In those cases, just remove the
331 * entry from shared memory.
332 *
333 * Otherwise, the entry must be left in place so that the transaction can
334 * be finished later, so just unlock it.
335 *
336 * If we abort during prepare, after having written the WAL record, we
337 * might not have transferred all locks and other state to the prepared
338 * transaction yet. Likewise, if we abort during commit or rollback,
339 * after having written the WAL record, we might not have released all the
340 * resources held by the transaction yet. In those cases, the in-memory
341 * state can be wrong, but it's too late to back out.
342 */
343 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
344 if (!MyLockedGxact->valid)
345 RemoveGXact(MyLockedGxact);
346 else
347 MyLockedGxact->locking_backend = InvalidBackendId;
348 LWLockRelease(TwoPhaseStateLock);
349
350 MyLockedGxact = NULL;
351}
352
353/*
354 * This is called after we have finished transferring state to the prepared
355 * PGXACT entry.
356 */
357void
358PostPrepare_Twophase(void)
359{
360 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
361 MyLockedGxact->locking_backend = InvalidBackendId;
362 LWLockRelease(TwoPhaseStateLock);
363
364 MyLockedGxact = NULL;
365}
366
367
368/*
369 * MarkAsPreparing
370 * Reserve the GID for the given transaction.
371 */
372GlobalTransaction
373MarkAsPreparing(TransactionId xid, const char *gid,
374 TimestampTz prepared_at, Oid owner, Oid databaseid)
375{
376 GlobalTransaction gxact;
377 int i;
378
379 if (strlen(gid) >= GIDSIZE)
380 ereport(ERROR,
381 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
382 errmsg("transaction identifier \"%s\" is too long",
383 gid)));
384
385 /* fail immediately if feature is disabled */
386 if (max_prepared_xacts == 0)
387 ereport(ERROR,
388 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
389 errmsg("prepared transactions are disabled"),
390 errhint("Set max_prepared_transactions to a nonzero value.")));
391
392 /* on first call, register the exit hook */
393 if (!twophaseExitRegistered)
394 {
395 before_shmem_exit(AtProcExit_Twophase, 0);
396 twophaseExitRegistered = true;
397 }
398
399 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
400
401 /* Check for conflicting GID */
402 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
403 {
404 gxact = TwoPhaseState->prepXacts[i];
405 if (strcmp(gxact->gid, gid) == 0)
406 {
407 ereport(ERROR,
408 (errcode(ERRCODE_DUPLICATE_OBJECT),
409 errmsg("transaction identifier \"%s\" is already in use",
410 gid)));
411 }
412 }
413
414 /* Get a free gxact from the freelist */
415 if (TwoPhaseState->freeGXacts == NULL)
416 ereport(ERROR,
417 (errcode(ERRCODE_OUT_OF_MEMORY),
418 errmsg("maximum number of prepared transactions reached"),
419 errhint("Increase max_prepared_transactions (currently %d).",
420 max_prepared_xacts)));
421 gxact = TwoPhaseState->freeGXacts;
422 TwoPhaseState->freeGXacts = gxact->next;
423
424 MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
425
426 gxact->ondisk = false;
427
428 /* And insert it into the active array */
429 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
430 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
431
432 LWLockRelease(TwoPhaseStateLock);
433
434 return gxact;
435}
436
437/*
438 * MarkAsPreparingGuts
439 *
440 * This uses a gxact struct and puts it into the active array.
441 * NOTE: this is also used when reloading a gxact after a crash; so avoid
442 * assuming that we can use very much backend context.
443 *
444 * Note: This function should be called with appropriate locks held.
445 */
446static void
447MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
448 TimestampTz prepared_at, Oid owner, Oid databaseid)
449{
450 PGPROC *proc;
451 PGXACT *pgxact;
452 int i;
453
454 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
455
456 Assert(gxact != NULL);
457 proc = &ProcGlobal->allProcs[gxact->pgprocno];
458 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
459
460 /* Initialize the PGPROC entry */
461 MemSet(proc, 0, sizeof(PGPROC));
462 proc->pgprocno = gxact->pgprocno;
463 SHMQueueElemInit(&(proc->links));
464 proc->waitStatus = STATUS_OK;
465 /* We set up the gxact's VXID as InvalidBackendId/XID */
466 proc->lxid = (LocalTransactionId) xid;
467 pgxact->xid = xid;
468 pgxact->xmin = InvalidTransactionId;
469 pgxact->delayChkpt = false;
470 pgxact->vacuumFlags = 0;
471 proc->pid = 0;
472 proc->backendId = InvalidBackendId;
473 proc->databaseId = databaseid;
474 proc->roleId = owner;
475 proc->tempNamespaceId = InvalidOid;
476 proc->isBackgroundWorker = false;
477 proc->lwWaiting = false;
478 proc->lwWaitMode = 0;
479 proc->waitLock = NULL;
480 proc->waitProcLock = NULL;
481 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
482 SHMQueueInit(&(proc->myProcLocks[i]));
483 /* subxid data must be filled later by GXactLoadSubxactData */
484 pgxact->overflowed = false;
485 pgxact->nxids = 0;
486
487 gxact->prepared_at = prepared_at;
488 gxact->xid = xid;
489 gxact->owner = owner;
490 gxact->locking_backend = MyBackendId;
491 gxact->valid = false;
492 gxact->inredo = false;
493 strcpy(gxact->gid, gid);
494
495 /*
496 * Remember that we have this GlobalTransaction entry locked for us. If we
497 * abort after this, we must release it.
498 */
499 MyLockedGxact = gxact;
500}
501
502/*
503 * GXactLoadSubxactData
504 *
505 * If the transaction being persisted had any subtransactions, this must
506 * be called before MarkAsPrepared() to load information into the dummy
507 * PGPROC.
508 */
509static void
510GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
511 TransactionId *children)
512{
513 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
514 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
515
516 /* We need no extra lock since the GXACT isn't valid yet */
517 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
518 {
519 pgxact->overflowed = true;
520 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
521 }
522 if (nsubxacts > 0)
523 {
524 memcpy(proc->subxids.xids, children,
525 nsubxacts * sizeof(TransactionId));
526 pgxact->nxids = nsubxacts;
527 }
528}
529
530/*
531 * MarkAsPrepared
532 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
533 *
534 * lock_held indicates whether caller already holds TwoPhaseStateLock.
535 */
536static void
537MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
538{
539 /* Lock here may be overkill, but I'm not convinced of that ... */
540 if (!lock_held)
541 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
542 Assert(!gxact->valid);
543 gxact->valid = true;
544 if (!lock_held)
545 LWLockRelease(TwoPhaseStateLock);
546
547 /*
548 * Put it into the global ProcArray so TransactionIdIsInProgress considers
549 * the XID as still running.
550 */
551 ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
552}
553
554/*
555 * LockGXact
556 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
557 */
558static GlobalTransaction
559LockGXact(const char *gid, Oid user)
560{
561 int i;
562
563 /* on first call, register the exit hook */
564 if (!twophaseExitRegistered)
565 {
566 before_shmem_exit(AtProcExit_Twophase, 0);
567 twophaseExitRegistered = true;
568 }
569
570 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
571
572 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
573 {
574 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
575 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
576
577 /* Ignore not-yet-valid GIDs */
578 if (!gxact->valid)
579 continue;
580 if (strcmp(gxact->gid, gid) != 0)
581 continue;
582
583 /* Found it, but has someone else got it locked? */
584 if (gxact->locking_backend != InvalidBackendId)
585 ereport(ERROR,
586 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
587 errmsg("prepared transaction with identifier \"%s\" is busy",
588 gid)));
589
590 if (user != gxact->owner && !superuser_arg(user))
591 ereport(ERROR,
592 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
593 errmsg("permission denied to finish prepared transaction"),
594 errhint("Must be superuser or the user that prepared the transaction.")));
595
596 /*
597 * Note: it probably would be possible to allow committing from
598 * another database; but at the moment NOTIFY is known not to work and
599 * there may be some other issues as well. Hence disallow until
600 * someone gets motivated to make it work.
601 */
602 if (MyDatabaseId != proc->databaseId)
603 ereport(ERROR,
604 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
605 errmsg("prepared transaction belongs to another database"),
606 errhint("Connect to the database where the transaction was prepared to finish it.")));
607
608 /* OK for me to lock it */
609 gxact->locking_backend = MyBackendId;
610 MyLockedGxact = gxact;
611
612 LWLockRelease(TwoPhaseStateLock);
613
614 return gxact;
615 }
616
617 LWLockRelease(TwoPhaseStateLock);
618
619 ereport(ERROR,
620 (errcode(ERRCODE_UNDEFINED_OBJECT),
621 errmsg("prepared transaction with identifier \"%s\" does not exist",
622 gid)));
623
624 /* NOTREACHED */
625 return NULL;
626}
627
628/*
629 * RemoveGXact
630 * Remove the prepared transaction from the shared memory array.
631 *
632 * NB: caller should have already removed it from ProcArray
633 */
634static void
635RemoveGXact(GlobalTransaction gxact)
636{
637 int i;
638
639 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
640
641 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
642 {
643 if (gxact == TwoPhaseState->prepXacts[i])
644 {
645 /* remove from the active array */
646 TwoPhaseState->numPrepXacts--;
647 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
648
649 /* and put it back in the freelist */
650 gxact->next = TwoPhaseState->freeGXacts;
651 TwoPhaseState->freeGXacts = gxact;
652
653 return;
654 }
655 }
656
657 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
658}
659
660/*
661 * Returns an array of all prepared transactions for the user-level
662 * function pg_prepared_xact.
663 *
664 * The returned array and all its elements are copies of internal data
665 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
666 *
667 * WARNING -- we return even those transactions that are not fully prepared
668 * yet. The caller should filter them out if he doesn't want them.
669 *
670 * The returned array is palloc'd.
671 */
672static int
673GetPreparedTransactionList(GlobalTransaction *gxacts)
674{
675 GlobalTransaction array;
676 int num;
677 int i;
678
679 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
680
681 if (TwoPhaseState->numPrepXacts == 0)
682 {
683 LWLockRelease(TwoPhaseStateLock);
684
685 *gxacts = NULL;
686 return 0;
687 }
688
689 num = TwoPhaseState->numPrepXacts;
690 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
691 *gxacts = array;
692 for (i = 0; i < num; i++)
693 memcpy(array + i, TwoPhaseState->prepXacts[i],
694 sizeof(GlobalTransactionData));
695
696 LWLockRelease(TwoPhaseStateLock);
697
698 return num;
699}
700
701
702/* Working status for pg_prepared_xact */
703typedef struct
704{
705 GlobalTransaction array;
706 int ngxacts;
707 int currIdx;
708} Working_State;
709
710/*
711 * pg_prepared_xact
712 * Produce a view with one row per prepared transaction.
713 *
714 * This function is here so we don't have to export the
715 * GlobalTransactionData struct definition.
716 */
717Datum
718pg_prepared_xact(PG_FUNCTION_ARGS)
719{
720 FuncCallContext *funcctx;
721 Working_State *status;
722
723 if (SRF_IS_FIRSTCALL())
724 {
725 TupleDesc tupdesc;
726 MemoryContext oldcontext;
727
728 /* create a function context for cross-call persistence */
729 funcctx = SRF_FIRSTCALL_INIT();
730
731 /*
732 * Switch to memory context appropriate for multiple function calls
733 */
734 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
735
736 /* build tupdesc for result tuples */
737 /* this had better match pg_prepared_xacts view in system_views.sql */
738 tupdesc = CreateTemplateTupleDesc(5);
739 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
740 XIDOID, -1, 0);
741 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
742 TEXTOID, -1, 0);
743 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
744 TIMESTAMPTZOID, -1, 0);
745 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
746 OIDOID, -1, 0);
747 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
748 OIDOID, -1, 0);
749
750 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
751
752 /*
753 * Collect all the 2PC status information that we will format and send
754 * out as a result set.
755 */
756 status = (Working_State *) palloc(sizeof(Working_State));
757 funcctx->user_fctx = (void *) status;
758
759 status->ngxacts = GetPreparedTransactionList(&status->array);
760 status->currIdx = 0;
761
762 MemoryContextSwitchTo(oldcontext);
763 }
764
765 funcctx = SRF_PERCALL_SETUP();
766 status = (Working_State *) funcctx->user_fctx;
767
768 while (status->array != NULL && status->currIdx < status->ngxacts)
769 {
770 GlobalTransaction gxact = &status->array[status->currIdx++];
771 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
772 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
773 Datum values[5];
774 bool nulls[5];
775 HeapTuple tuple;
776 Datum result;
777
778 if (!gxact->valid)
779 continue;
780
781 /*
782 * Form tuple with appropriate data.
783 */
784 MemSet(values, 0, sizeof(values));
785 MemSet(nulls, 0, sizeof(nulls));
786
787 values[0] = TransactionIdGetDatum(pgxact->xid);
788 values[1] = CStringGetTextDatum(gxact->gid);
789 values[2] = TimestampTzGetDatum(gxact->prepared_at);
790 values[3] = ObjectIdGetDatum(gxact->owner);
791 values[4] = ObjectIdGetDatum(proc->databaseId);
792
793 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
794 result = HeapTupleGetDatum(tuple);
795 SRF_RETURN_NEXT(funcctx, result);
796 }
797
798 SRF_RETURN_DONE(funcctx);
799}
800
801/*
802 * TwoPhaseGetGXact
803 * Get the GlobalTransaction struct for a prepared transaction
804 * specified by XID
805 *
806 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
807 * caller had better hold it.
808 */
809static GlobalTransaction
810TwoPhaseGetGXact(TransactionId xid, bool lock_held)
811{
812 GlobalTransaction result = NULL;
813 int i;
814
815 static TransactionId cached_xid = InvalidTransactionId;
816 static GlobalTransaction cached_gxact = NULL;
817
818 Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
819
820 /*
821 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
822 * repeatedly for the same XID. We can save work with a simple cache.
823 */
824 if (xid == cached_xid)
825 return cached_gxact;
826
827 if (!lock_held)
828 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
829
830 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
831 {
832 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
833 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
834
835 if (pgxact->xid == xid)
836 {
837 result = gxact;
838 break;
839 }
840 }
841
842 if (!lock_held)
843 LWLockRelease(TwoPhaseStateLock);
844
845 if (result == NULL) /* should not happen */
846 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
847
848 cached_xid = xid;
849 cached_gxact = result;
850
851 return result;
852}
853
854/*
855 * TwoPhaseGetDummyBackendId
856 * Get the dummy backend ID for prepared transaction specified by XID
857 *
858 * Dummy backend IDs are similar to real backend IDs of real backends.
859 * They start at MaxBackends + 1, and are unique across all currently active
860 * real backends and prepared transactions. If lock_held is set to true,
861 * TwoPhaseStateLock will not be taken, so the caller had better hold it.
862 */
863BackendId
864TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
865{
866 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
867
868 return gxact->dummyBackendId;
869}
870
871/*
872 * TwoPhaseGetDummyProc
873 * Get the PGPROC that represents a prepared transaction specified by XID
874 *
875 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
876 * caller had better hold it.
877 */
878PGPROC *
879TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
880{
881 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
882
883 return &ProcGlobal->allProcs[gxact->pgprocno];
884}
885
886/************************************************************************/
887/* State file support */
888/************************************************************************/
889
890#define TwoPhaseFilePath(path, xid) \
891 snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
892
893/*
894 * 2PC state file format:
895 *
896 * 1. TwoPhaseFileHeader
897 * 2. TransactionId[] (subtransactions)
898 * 3. RelFileNode[] (files to be deleted at commit)
899 * 4. RelFileNode[] (files to be deleted at abort)
900 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
901 * 6. TwoPhaseRecordOnDisk
902 * 7. ...
903 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
904 * 9. checksum (CRC-32C)
905 *
906 * Each segment except the final checksum is MAXALIGN'd.
907 */
908
909/*
910 * Header for a 2PC state file
911 */
912#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
913
914typedef struct TwoPhaseFileHeader
915{
916 uint32 magic; /* format identifier */
917 uint32 total_len; /* actual file length */
918 TransactionId xid; /* original transaction XID */
919 Oid database; /* OID of database it was in */
920 TimestampTz prepared_at; /* time of preparation */
921 Oid owner; /* user running the transaction */
922 int32 nsubxacts; /* number of following subxact XIDs */
923 int32 ncommitrels; /* number of delete-on-commit rels */
924 int32 nabortrels; /* number of delete-on-abort rels */
925 int32 ninvalmsgs; /* number of cache invalidation messages */
926 bool initfileinval; /* does relcache init file need invalidation? */
927 uint16 gidlen; /* length of the GID - GID follows the header */
928 XLogRecPtr origin_lsn; /* lsn of this record at origin node */
929 TimestampTz origin_timestamp; /* time of prepare at origin node */
930} TwoPhaseFileHeader;
931
932/*
933 * Header for each record in a state file
934 *
935 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
936 * The rmgr data will be stored starting on a MAXALIGN boundary.
937 */
938typedef struct TwoPhaseRecordOnDisk
939{
940 uint32 len; /* length of rmgr data */
941 TwoPhaseRmgrId rmid; /* resource manager for this record */
942 uint16 info; /* flag bits for use by rmgr */
943} TwoPhaseRecordOnDisk;
944
945/*
946 * During prepare, the state file is assembled in memory before writing it
947 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
948 * for that.
949 */
950typedef struct StateFileChunk
951{
952 char *data;
953 uint32 len;
954 struct StateFileChunk *next;
955} StateFileChunk;
956
957static struct xllist
958{
959 StateFileChunk *head; /* first data block in the chain */
960 StateFileChunk *tail; /* last block in chain */
961 uint32 num_chunks;
962 uint32 bytes_free; /* free bytes left in tail block */
963 uint32 total_len; /* total data bytes in chain */
964} records;
965
966
967/*
968 * Append a block of data to records data structure.
969 *
970 * NB: each block is padded to a MAXALIGN multiple. This must be
971 * accounted for when the file is later read!
972 *
973 * The data is copied, so the caller is free to modify it afterwards.
974 */
975static void
976save_state_data(const void *data, uint32 len)
977{
978 uint32 padlen = MAXALIGN(len);
979
980 if (padlen > records.bytes_free)
981 {
982 records.tail->next = palloc0(sizeof(StateFileChunk));
983 records.tail = records.tail->next;
984 records.tail->len = 0;
985 records.tail->next = NULL;
986 records.num_chunks++;
987
988 records.bytes_free = Max(padlen, 512);
989 records.tail->data = palloc(records.bytes_free);
990 }
991
992 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
993 records.tail->len += padlen;
994 records.bytes_free -= padlen;
995 records.total_len += padlen;
996}
997
998/*
999 * Start preparing a state file.
1000 *
1001 * Initializes data structure and inserts the 2PC file header record.
1002 */
1003void
1004StartPrepare(GlobalTransaction gxact)
1005{
1006 PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
1007 PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1008 TransactionId xid = pgxact->xid;
1009 TwoPhaseFileHeader hdr;
1010 TransactionId *children;
1011 RelFileNode *commitrels;
1012 RelFileNode *abortrels;
1013 SharedInvalidationMessage *invalmsgs;
1014
1015 /* Initialize linked list */
1016 records.head = palloc0(sizeof(StateFileChunk));
1017 records.head->len = 0;
1018 records.head->next = NULL;
1019
1020 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1021 records.head->data = palloc(records.bytes_free);
1022
1023 records.tail = records.head;
1024 records.num_chunks = 1;
1025
1026 records.total_len = 0;
1027
1028 /* Create header */
1029 hdr.magic = TWOPHASE_MAGIC;
1030 hdr.total_len = 0; /* EndPrepare will fill this in */
1031 hdr.xid = xid;
1032 hdr.database = proc->databaseId;
1033 hdr.prepared_at = gxact->prepared_at;
1034 hdr.owner = gxact->owner;
1035 hdr.nsubxacts = xactGetCommittedChildren(&children);
1036 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1037 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1038 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1039 &hdr.initfileinval);
1040 hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1041
1042 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1043 save_state_data(gxact->gid, hdr.gidlen);
1044
1045 /*
1046 * Add the additional info about subxacts, deletable files and cache
1047 * invalidation messages.
1048 */
1049 if (hdr.nsubxacts > 0)
1050 {
1051 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1052 /* While we have the child-xact data, stuff it in the gxact too */
1053 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1054 }
1055 if (hdr.ncommitrels > 0)
1056 {
1057 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
1058 pfree(commitrels);
1059 }
1060 if (hdr.nabortrels > 0)
1061 {
1062 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
1063 pfree(abortrels);
1064 }
1065 if (hdr.ninvalmsgs > 0)
1066 {
1067 save_state_data(invalmsgs,
1068 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1069 pfree(invalmsgs);
1070 }
1071}
1072
1073/*
1074 * Finish preparing state data and writing it to WAL.
1075 */
1076void
1077EndPrepare(GlobalTransaction gxact)
1078{
1079 TwoPhaseFileHeader *hdr;
1080 StateFileChunk *record;
1081 bool replorigin;
1082
1083 /* Add the end sentinel to the list of 2PC records */
1084 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1085 NULL, 0);
1086
1087 /* Go back and fill in total_len in the file header record */
1088 hdr = (TwoPhaseFileHeader *) records.head->data;
1089 Assert(hdr->magic == TWOPHASE_MAGIC);
1090 hdr->total_len = records.total_len + sizeof(pg_crc32c);
1091
1092 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1093 replorigin_session_origin != DoNotReplicateId);
1094
1095 if (replorigin)
1096 {
1097 Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1098 hdr->origin_lsn = replorigin_session_origin_lsn;
1099 hdr->origin_timestamp = replorigin_session_origin_timestamp;
1100 }
1101 else
1102 {
1103 hdr->origin_lsn = InvalidXLogRecPtr;
1104 hdr->origin_timestamp = 0;
1105 }
1106
1107 /*
1108 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1109 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1110 * where we write data to file and then re-read at commit time.
1111 */
1112 if (hdr->total_len > MaxAllocSize)
1113 ereport(ERROR,
1114 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1115 errmsg("two-phase state file maximum length exceeded")));
1116
1117 /*
1118 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1119 * cover us, so no need to calculate a separate CRC.
1120 *
1121 * We have to set delayChkpt here, too; otherwise a checkpoint starting
1122 * immediately after the WAL record is inserted could complete without
1123 * fsync'ing our state file. (This is essentially the same kind of race
1124 * condition as the COMMIT-to-clog-write case that RecordTransactionCommit
1125 * uses delayChkpt for; see notes there.)
1126 *
1127 * We save the PREPARE record's location in the gxact for later use by
1128 * CheckPointTwoPhase.
1129 */
1130 XLogEnsureRecordSpace(0, records.num_chunks);
1131
1132 START_CRIT_SECTION();
1133
1134 MyPgXact->delayChkpt = true;
1135
1136 XLogBeginInsert();
1137 for (record = records.head; record != NULL; record = record->next)
1138 XLogRegisterData(record->data, record->len);
1139
1140 XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1141
1142 gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1143
1144 if (replorigin)
1145 {
1146 /* Move LSNs forward for this replication origin */
1147 replorigin_session_advance(replorigin_session_origin_lsn,
1148 gxact->prepare_end_lsn);
1149 }
1150
1151 XLogFlush(gxact->prepare_end_lsn);
1152
1153 /* If we crash now, we have prepared: WAL replay will fix things */
1154
1155 /* Store record's start location to read that later on Commit */
1156 gxact->prepare_start_lsn = ProcLastRecPtr;
1157
1158 /*
1159 * Mark the prepared transaction as valid. As soon as xact.c marks
1160 * MyPgXact as not running our XID (which it will do immediately after
1161 * this function returns), others can commit/rollback the xact.
1162 *
1163 * NB: a side effect of this is to make a dummy ProcArray entry for the
1164 * prepared XID. This must happen before we clear the XID from MyPgXact,
1165 * else there is a window where the XID is not running according to
1166 * TransactionIdIsInProgress, and onlookers would be entitled to assume
1167 * the xact crashed. Instead we have a window where the same XID appears
1168 * twice in ProcArray, which is OK.
1169 */
1170 MarkAsPrepared(gxact, false);
1171
1172 /*
1173 * Now we can mark ourselves as out of the commit critical section: a
1174 * checkpoint starting after this will certainly see the gxact as a
1175 * candidate for fsyncing.
1176 */
1177 MyPgXact->delayChkpt = false;
1178
1179 /*
1180 * Remember that we have this GlobalTransaction entry locked for us. If
1181 * we crash after this point, it's too late to abort, but we must unlock
1182 * it so that the prepared transaction can be committed or rolled back.
1183 */
1184 MyLockedGxact = gxact;
1185
1186 END_CRIT_SECTION();
1187
1188 /*
1189 * Wait for synchronous replication, if required.
1190 *
1191 * Note that at this stage we have marked the prepare, but still show as
1192 * running in the procarray (twice!) and continue to hold locks.
1193 */
1194 SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1195
1196 records.tail = records.head = NULL;
1197 records.num_chunks = 0;
1198}
1199
1200/*
1201 * Register a 2PC record to be written to state file.
1202 */
1203void
1204RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1205 const void *data, uint32 len)
1206{
1207 TwoPhaseRecordOnDisk record;
1208
1209 record.rmid = rmid;
1210 record.info = info;
1211 record.len = len;
1212 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1213 if (len > 0)
1214 save_state_data(data, len);
1215}
1216
1217
1218/*
1219 * Read and validate the state file for xid.
1220 *
1221 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1222 * contents of the file, issuing an error when finding corrupted data. If
1223 * missing_ok is true, which indicates that missing files can be safely
1224 * ignored, then return NULL. This state can be reached when doing recovery.
1225 */
1226static char *
1227ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1228{
1229 char path[MAXPGPATH];
1230 char *buf;
1231 TwoPhaseFileHeader *hdr;
1232 int fd;
1233 struct stat stat;
1234 uint32 crc_offset;
1235 pg_crc32c calc_crc,
1236 file_crc;
1237 int r;
1238
1239 TwoPhaseFilePath(path, xid);
1240
1241 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1242 if (fd < 0)
1243 {
1244 if (missing_ok && errno == ENOENT)
1245 return NULL;
1246
1247 ereport(ERROR,
1248 (errcode_for_file_access(),
1249 errmsg("could not open file \"%s\": %m", path)));
1250 }
1251
1252 /*
1253 * Check file length. We can determine a lower bound pretty easily. We
1254 * set an upper bound to avoid palloc() failure on a corrupt file, though
1255 * we can't guarantee that we won't get an out of memory error anyway,
1256 * even on a valid file.
1257 */
1258 if (fstat(fd, &stat))
1259 ereport(ERROR,
1260 (errcode_for_file_access(),
1261 errmsg("could not stat file \"%s\": %m", path)));
1262
1263 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1264 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1265 sizeof(pg_crc32c)) ||
1266 stat.st_size > MaxAllocSize)
1267 ereport(ERROR,
1268 (errcode(ERRCODE_DATA_CORRUPTED),
1269 errmsg_plural("incorrect size of file \"%s\": %zu byte",
1270 "incorrect size of file \"%s\": %zu bytes",
1271 (Size) stat.st_size, path,
1272 (Size) stat.st_size)));
1273
1274 crc_offset = stat.st_size - sizeof(pg_crc32c);
1275 if (crc_offset != MAXALIGN(crc_offset))
1276 ereport(ERROR,
1277 (errcode(ERRCODE_DATA_CORRUPTED),
1278 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1279 path)));
1280
1281 /*
1282 * OK, slurp in the file.
1283 */
1284 buf = (char *) palloc(stat.st_size);
1285
1286 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1287 r = read(fd, buf, stat.st_size);
1288 if (r != stat.st_size)
1289 {
1290 if (r < 0)
1291 ereport(ERROR,
1292 (errcode_for_file_access(),
1293 errmsg("could not read file \"%s\": %m", path)));
1294 else
1295 ereport(ERROR,
1296 (errmsg("could not read file \"%s\": read %d of %zu",
1297 path, r, (Size) stat.st_size)));
1298 }
1299
1300 pgstat_report_wait_end();
1301
1302 if (CloseTransientFile(fd))
1303 ereport(ERROR,
1304 (errcode_for_file_access(),
1305 errmsg("could not close file \"%s\": %m", path)));
1306
1307 hdr = (TwoPhaseFileHeader *) buf;
1308 if (hdr->magic != TWOPHASE_MAGIC)
1309 ereport(ERROR,
1310 (errcode(ERRCODE_DATA_CORRUPTED),
1311 errmsg("invalid magic number stored in file \"%s\"",
1312 path)));
1313
1314 if (hdr->total_len != stat.st_size)
1315 ereport(ERROR,
1316 (errcode(ERRCODE_DATA_CORRUPTED),
1317 errmsg("invalid size stored in file \"%s\"",
1318 path)));
1319
1320 INIT_CRC32C(calc_crc);
1321 COMP_CRC32C(calc_crc, buf, crc_offset);
1322 FIN_CRC32C(calc_crc);
1323
1324 file_crc = *((pg_crc32c *) (buf + crc_offset));
1325
1326 if (!EQ_CRC32C(calc_crc, file_crc))
1327 ereport(ERROR,
1328 (errcode(ERRCODE_DATA_CORRUPTED),
1329 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1330 path)));
1331
1332 return buf;
1333}
1334
1335/*
1336 * ParsePrepareRecord
1337 */
1338void
1339ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
1340{
1341 TwoPhaseFileHeader *hdr;
1342 char *bufptr;
1343
1344 hdr = (TwoPhaseFileHeader *) xlrec;
1345 bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1346
1347 parsed->origin_lsn = hdr->origin_lsn;
1348 parsed->origin_timestamp = hdr->origin_timestamp;
1349 parsed->twophase_xid = hdr->xid;
1350 parsed->dbId = hdr->database;
1351 parsed->nsubxacts = hdr->nsubxacts;
1352 parsed->nrels = hdr->ncommitrels;
1353 parsed->nabortrels = hdr->nabortrels;
1354 parsed->nmsgs = hdr->ninvalmsgs;
1355
1356 strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
1357 bufptr += MAXALIGN(hdr->gidlen);
1358
1359 parsed->subxacts = (TransactionId *) bufptr;
1360 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1361
1362 parsed->xnodes = (RelFileNode *) bufptr;
1363 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1364
1365 parsed->abortnodes = (RelFileNode *) bufptr;
1366 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1367
1368 parsed->msgs = (SharedInvalidationMessage *) bufptr;
1369 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1370}
1371
1372
1373
1374/*
1375 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1376 * twophase files and ReadTwoPhaseFile should be used instead.
1377 *
1378 * Note clearly that this function can access WAL during normal operation,
1379 * similarly to the way WALSender or Logical Decoding would do.
1380 *
1381 */
1382static void
1383XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1384{
1385 XLogRecord *record;
1386 XLogReaderState *xlogreader;
1387 char *errormsg;
1388
1389 xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
1390 NULL);
1391 if (!xlogreader)
1392 ereport(ERROR,
1393 (errcode(ERRCODE_OUT_OF_MEMORY),
1394 errmsg("out of memory"),
1395 errdetail("Failed while allocating a WAL reading processor.")));
1396
1397 record = XLogReadRecord(xlogreader, lsn, &errormsg);
1398 if (record == NULL)
1399 ereport(ERROR,
1400 (errcode_for_file_access(),
1401 errmsg("could not read two-phase state from WAL at %X/%X",
1402 (uint32) (lsn >> 32),
1403 (uint32) lsn)));
1404
1405 if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1406 (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1407 ereport(ERROR,
1408 (errcode_for_file_access(),
1409 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1410 (uint32) (lsn >> 32),
1411 (uint32) lsn)));
1412
1413 if (len != NULL)
1414 *len = XLogRecGetDataLen(xlogreader);
1415
1416 *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1417 memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1418
1419 XLogReaderFree(xlogreader);
1420}
1421
1422
1423/*
1424 * Confirms an xid is prepared, during recovery
1425 */
1426bool
1427StandbyTransactionIdIsPrepared(TransactionId xid)
1428{
1429 char *buf;
1430 TwoPhaseFileHeader *hdr;
1431 bool result;
1432
1433 Assert(TransactionIdIsValid(xid));
1434
1435 if (max_prepared_xacts <= 0)
1436 return false; /* nothing to do */
1437
1438 /* Read and validate file */
1439 buf = ReadTwoPhaseFile(xid, true);
1440 if (buf == NULL)
1441 return false;
1442
1443 /* Check header also */
1444 hdr = (TwoPhaseFileHeader *) buf;
1445 result = TransactionIdEquals(hdr->xid, xid);
1446 pfree(buf);
1447
1448 return result;
1449}
1450
1451/*
1452 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1453 */
1454void
1455FinishPreparedTransaction(const char *gid, bool isCommit)
1456{
1457 GlobalTransaction gxact;
1458 PGPROC *proc;
1459 PGXACT *pgxact;
1460 TransactionId xid;
1461 char *buf;
1462 char *bufptr;
1463 TwoPhaseFileHeader *hdr;
1464 TransactionId latestXid;
1465 TransactionId *children;
1466 RelFileNode *commitrels;
1467 RelFileNode *abortrels;
1468 RelFileNode *delrels;
1469 int ndelrels;
1470 SharedInvalidationMessage *invalmsgs;
1471
1472 /*
1473 * Validate the GID, and lock the GXACT to ensure that two backends do not
1474 * try to commit the same GID at once.
1475 */
1476 gxact = LockGXact(gid, GetUserId());
1477 proc = &ProcGlobal->allProcs[gxact->pgprocno];
1478 pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1479 xid = pgxact->xid;
1480
1481 /*
1482 * Read and validate 2PC state data. State data will typically be stored
1483 * in WAL files if the LSN is after the last checkpoint record, or moved
1484 * to disk if for some reason they have lived for a long time.
1485 */
1486 if (gxact->ondisk)
1487 buf = ReadTwoPhaseFile(xid, false);
1488 else
1489 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1490
1491
1492 /*
1493 * Disassemble the header area
1494 */
1495 hdr = (TwoPhaseFileHeader *) buf;
1496 Assert(TransactionIdEquals(hdr->xid, xid));
1497 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1498 bufptr += MAXALIGN(hdr->gidlen);
1499 children = (TransactionId *) bufptr;
1500 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1501 commitrels = (RelFileNode *) bufptr;
1502 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
1503 abortrels = (RelFileNode *) bufptr;
1504 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
1505 invalmsgs = (SharedInvalidationMessage *) bufptr;
1506 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1507
1508 /* compute latestXid among all children */
1509 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1510
1511 /* Prevent cancel/die interrupt while cleaning up */
1512 HOLD_INTERRUPTS();
1513
1514 /*
1515 * The order of operations here is critical: make the XLOG entry for
1516 * commit or abort, then mark the transaction committed or aborted in
1517 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1518 * TransactionIdIsInProgress will stop saying the prepared xact is in
1519 * progress), then run the post-commit or post-abort callbacks. The
1520 * callbacks will release the locks the transaction held.
1521 */
1522 if (isCommit)
1523 RecordTransactionCommitPrepared(xid,
1524 hdr->nsubxacts, children,
1525 hdr->ncommitrels, commitrels,
1526 hdr->ninvalmsgs, invalmsgs,
1527 hdr->initfileinval, gid);
1528 else
1529 RecordTransactionAbortPrepared(xid,
1530 hdr->nsubxacts, children,
1531 hdr->nabortrels, abortrels,
1532 gid);
1533
1534 ProcArrayRemove(proc, latestXid);
1535
1536 /*
1537 * In case we fail while running the callbacks, mark the gxact invalid so
1538 * no one else will try to commit/rollback, and so it will be recycled if
1539 * we fail after this point. It is still locked by our backend so it
1540 * won't go away yet.
1541 *
1542 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1543 */
1544 gxact->valid = false;
1545
1546 /*
1547 * We have to remove any files that were supposed to be dropped. For
1548 * consistency with the regular xact.c code paths, must do this before
1549 * releasing locks, so do it before running the callbacks.
1550 *
1551 * NB: this code knows that we couldn't be dropping any temp rels ...
1552 */
1553 if (isCommit)
1554 {
1555 delrels = commitrels;
1556 ndelrels = hdr->ncommitrels;
1557 }
1558 else
1559 {
1560 delrels = abortrels;
1561 ndelrels = hdr->nabortrels;
1562 }
1563
1564 /* Make sure files supposed to be dropped are dropped */
1565 DropRelationFiles(delrels, ndelrels, false);
1566
1567 /*
1568 * Handle cache invalidation messages.
1569 *
1570 * Relcache init file invalidation requires processing both before and
1571 * after we send the SI messages. See AtEOXact_Inval()
1572 */
1573 if (hdr->initfileinval)
1574 RelationCacheInitFilePreInvalidate();
1575 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1576 if (hdr->initfileinval)
1577 RelationCacheInitFilePostInvalidate();
1578
1579 /*
1580 * Acquire the two-phase lock. We want to work on the two-phase callbacks
1581 * while holding it to avoid potential conflicts with other transactions
1582 * attempting to use the same GID, so the lock is released once the shared
1583 * memory state is cleared.
1584 */
1585 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1586
1587 /* And now do the callbacks */
1588 if (isCommit)
1589 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1590 else
1591 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1592
1593 PredicateLockTwoPhaseFinish(xid, isCommit);
1594
1595 /* Clear shared memory state */
1596 RemoveGXact(gxact);
1597
1598 /*
1599 * Release the lock as all callbacks are called and shared memory cleanup
1600 * is done.
1601 */
1602 LWLockRelease(TwoPhaseStateLock);
1603
1604 /* Count the prepared xact as committed or aborted */
1605 AtEOXact_PgStat(isCommit, false);
1606
1607 /*
1608 * And now we can clean up any files we may have left.
1609 */
1610 if (gxact->ondisk)
1611 RemoveTwoPhaseFile(xid, true);
1612
1613 MyLockedGxact = NULL;
1614
1615 RESUME_INTERRUPTS();
1616
1617 pfree(buf);
1618}
1619
1620/*
1621 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1622 */
1623static void
1624ProcessRecords(char *bufptr, TransactionId xid,
1625 const TwoPhaseCallback callbacks[])
1626{
1627 for (;;)
1628 {
1629 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1630
1631 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1632 if (record->rmid == TWOPHASE_RM_END_ID)
1633 break;
1634
1635 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1636
1637 if (callbacks[record->rmid] != NULL)
1638 callbacks[record->rmid] (xid, record->info,
1639 (void *) bufptr, record->len);
1640
1641 bufptr += MAXALIGN(record->len);
1642 }
1643}
1644
1645/*
1646 * Remove the 2PC file for the specified XID.
1647 *
1648 * If giveWarning is false, do not complain about file-not-present;
1649 * this is an expected case during WAL replay.
1650 */
1651static void
1652RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1653{
1654 char path[MAXPGPATH];
1655
1656 TwoPhaseFilePath(path, xid);
1657 if (unlink(path))
1658 if (errno != ENOENT || giveWarning)
1659 ereport(WARNING,
1660 (errcode_for_file_access(),
1661 errmsg("could not remove file \"%s\": %m", path)));
1662}
1663
1664/*
1665 * Recreates a state file. This is used in WAL replay and during
1666 * checkpoint creation.
1667 *
1668 * Note: content and len don't include CRC.
1669 */
1670static void
1671RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1672{
1673 char path[MAXPGPATH];
1674 pg_crc32c statefile_crc;
1675 int fd;
1676
1677 /* Recompute CRC */
1678 INIT_CRC32C(statefile_crc);
1679 COMP_CRC32C(statefile_crc, content, len);
1680 FIN_CRC32C(statefile_crc);
1681
1682 TwoPhaseFilePath(path, xid);
1683
1684 fd = OpenTransientFile(path,
1685 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1686 if (fd < 0)
1687 ereport(ERROR,
1688 (errcode_for_file_access(),
1689 errmsg("could not recreate file \"%s\": %m", path)));
1690
1691 /* Write content and CRC */
1692 errno = 0;
1693 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1694 if (write(fd, content, len) != len)
1695 {
1696 /* if write didn't set errno, assume problem is no disk space */
1697 if (errno == 0)
1698 errno = ENOSPC;
1699 ereport(ERROR,
1700 (errcode_for_file_access(),
1701 errmsg("could not write file \"%s\": %m", path)));
1702 }
1703 if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1704 {
1705 /* if write didn't set errno, assume problem is no disk space */
1706 if (errno == 0)
1707 errno = ENOSPC;
1708 ereport(ERROR,
1709 (errcode_for_file_access(),
1710 errmsg("could not write file \"%s\": %m", path)));
1711 }
1712 pgstat_report_wait_end();
1713
1714 /*
1715 * We must fsync the file because the end-of-replay checkpoint will not do
1716 * so, there being no GXACT in shared memory yet to tell it to.
1717 */
1718 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1719 if (pg_fsync(fd) != 0)
1720 ereport(ERROR,
1721 (errcode_for_file_access(),
1722 errmsg("could not fsync file \"%s\": %m", path)));
1723 pgstat_report_wait_end();
1724
1725 if (CloseTransientFile(fd) != 0)
1726 ereport(ERROR,
1727 (errcode_for_file_access(),
1728 errmsg("could not close file \"%s\": %m", path)));
1729}
1730
1731/*
1732 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1733 *
1734 * We must fsync the state file of any GXACT that is valid or has been
1735 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1736 * horizon. (If the gxact isn't valid yet, has not been generated in
1737 * redo, or has a later LSN, this checkpoint is not responsible for
1738 * fsyncing it.)
1739 *
1740 * This is deliberately run as late as possible in the checkpoint sequence,
1741 * because GXACTs ordinarily have short lifespans, and so it is quite
1742 * possible that GXACTs that were valid at checkpoint start will no longer
1743 * exist if we wait a little bit. With typical checkpoint settings this
1744 * will be about 3 minutes for an online checkpoint, so as a result we
1745 * expect that there will be no GXACTs that need to be copied to disk.
1746 *
1747 * If a GXACT remains valid across multiple checkpoints, it will already
1748 * be on disk so we don't bother to repeat that write.
1749 */
1750void
1751CheckPointTwoPhase(XLogRecPtr redo_horizon)
1752{
1753 int i;
1754 int serialized_xacts = 0;
1755
1756 if (max_prepared_xacts <= 0)
1757 return; /* nothing to do */
1758
1759 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1760
1761 /*
1762 * We are expecting there to be zero GXACTs that need to be copied to
1763 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1764 * simplicity. This prevents any new xacts from preparing while this
1765 * occurs, which shouldn't be a problem since the presence of long-lived
1766 * prepared xacts indicates the transaction manager isn't active.
1767 *
1768 * It's also possible to move I/O out of the lock, but on every error we
1769 * should check whether somebody committed our transaction in different
1770 * backend. Let's leave this optimization for future, if somebody will
1771 * spot that this place cause bottleneck.
1772 *
1773 * Note that it isn't possible for there to be a GXACT with a
1774 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1775 * because of the efforts with delayChkpt.
1776 */
1777 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1778 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1779 {
1780 /*
1781 * Note that we are using gxact not pgxact so this works in recovery
1782 * also
1783 */
1784 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1785
1786 if ((gxact->valid || gxact->inredo) &&
1787 !gxact->ondisk &&
1788 gxact->prepare_end_lsn <= redo_horizon)
1789 {
1790 char *buf;
1791 int len;
1792
1793 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1794 RecreateTwoPhaseFile(gxact->xid, buf, len);
1795 gxact->ondisk = true;
1796 gxact->prepare_start_lsn = InvalidXLogRecPtr;
1797 gxact->prepare_end_lsn = InvalidXLogRecPtr;
1798 pfree(buf);
1799 serialized_xacts++;
1800 }
1801 }
1802 LWLockRelease(TwoPhaseStateLock);
1803
1804 /*
1805 * Flush unconditionally the parent directory to make any information
1806 * durable on disk. Two-phase files could have been removed and those
1807 * removals need to be made persistent as well as any files newly created
1808 * previously since the last checkpoint.
1809 */
1810 fsync_fname(TWOPHASE_DIR, true);
1811
1812 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1813
1814 if (log_checkpoints && serialized_xacts > 0)
1815 ereport(LOG,
1816 (errmsg_plural("%u two-phase state file was written "
1817 "for a long-running prepared transaction",
1818 "%u two-phase state files were written "
1819 "for long-running prepared transactions",
1820 serialized_xacts,
1821 serialized_xacts)));
1822}
1823
1824/*
1825 * restoreTwoPhaseData
1826 *
1827 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1828 * This is called once at the beginning of recovery, saving any extra
1829 * lookups in the future. Two-phase files that are newer than the
1830 * minimum XID horizon are discarded on the way.
1831 */
1832void
1833restoreTwoPhaseData(void)
1834{
1835 DIR *cldir;
1836 struct dirent *clde;
1837
1838 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1839 cldir = AllocateDir(TWOPHASE_DIR);
1840 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1841 {
1842 if (strlen(clde->d_name) == 8 &&
1843 strspn(clde->d_name, "0123456789ABCDEF") == 8)
1844 {
1845 TransactionId xid;
1846 char *buf;
1847
1848 xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1849
1850 buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1851 true, false, false);
1852 if (buf == NULL)
1853 continue;
1854
1855 PrepareRedoAdd(buf, InvalidXLogRecPtr,
1856 InvalidXLogRecPtr, InvalidRepOriginId);
1857 }
1858 }
1859 LWLockRelease(TwoPhaseStateLock);
1860 FreeDir(cldir);
1861}
1862
1863/*
1864 * PrescanPreparedTransactions
1865 *
1866 * Scan the shared memory entries of TwoPhaseState and determine the range
1867 * of valid XIDs present. This is run during database startup, after we
1868 * have completed reading WAL. ShmemVariableCache->nextFullXid has been set to
1869 * one more than the highest XID for which evidence exists in WAL.
1870 *
1871 * We throw away any prepared xacts with main XID beyond nextFullXid --- if any
1872 * are present, it suggests that the DBA has done a PITR recovery to an
1873 * earlier point in time without cleaning out pg_twophase. We dare not
1874 * try to recover such prepared xacts since they likely depend on database
1875 * state that doesn't exist now.
1876 *
1877 * However, we will advance nextFullXid beyond any subxact XIDs belonging to
1878 * valid prepared xacts. We need to do this since subxact commit doesn't
1879 * write a WAL entry, and so there might be no evidence in WAL of those
1880 * subxact XIDs.
1881 *
1882 * On corrupted two-phase files, fail immediately. Keeping around broken
1883 * entries and let replay continue causes harm on the system, and a new
1884 * backup should be rolled in.
1885 *
1886 * Our other responsibility is to determine and return the oldest valid XID
1887 * among the prepared xacts (if none, return ShmemVariableCache->nextFullXid).
1888 * This is needed to synchronize pg_subtrans startup properly.
1889 *
1890 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1891 * top-level xids is stored in *xids_p. The number of entries in the array
1892 * is returned in *nxids_p.
1893 */
1894TransactionId
1895PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1896{
1897 FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
1898 TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
1899 TransactionId result = origNextXid;
1900 TransactionId *xids = NULL;
1901 int nxids = 0;
1902 int allocsize = 0;
1903 int i;
1904
1905 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1906 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1907 {
1908 TransactionId xid;
1909 char *buf;
1910 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1911
1912 Assert(gxact->inredo);
1913
1914 xid = gxact->xid;
1915
1916 buf = ProcessTwoPhaseBuffer(xid,
1917 gxact->prepare_start_lsn,
1918 gxact->ondisk, false, true);
1919
1920 if (buf == NULL)
1921 continue;
1922
1923 /*
1924 * OK, we think this file is valid. Incorporate xid into the
1925 * running-minimum result.
1926 */
1927 if (TransactionIdPrecedes(xid, result))
1928 result = xid;
1929
1930 if (xids_p)
1931 {
1932 if (nxids == allocsize)
1933 {
1934 if (nxids == 0)
1935 {
1936 allocsize = 10;
1937 xids = palloc(allocsize * sizeof(TransactionId));
1938 }
1939 else
1940 {
1941 allocsize = allocsize * 2;
1942 xids = repalloc(xids, allocsize * sizeof(TransactionId));
1943 }
1944 }
1945 xids[nxids++] = xid;
1946 }
1947
1948 pfree(buf);
1949 }
1950 LWLockRelease(TwoPhaseStateLock);
1951
1952 if (xids_p)
1953 {
1954 *xids_p = xids;
1955 *nxids_p = nxids;
1956 }
1957
1958 return result;
1959}
1960
1961/*
1962 * StandbyRecoverPreparedTransactions
1963 *
1964 * Scan the shared memory entries of TwoPhaseState and setup all the required
1965 * information to allow standby queries to treat prepared transactions as still
1966 * active.
1967 *
1968 * This is never called at the end of recovery - we use
1969 * RecoverPreparedTransactions() at that point.
1970 *
1971 * The lack of calls to SubTransSetParent() calls here is by design;
1972 * those calls are made by RecoverPreparedTransactions() at the end of recovery
1973 * for those xacts that need this.
1974 */
1975void
1976StandbyRecoverPreparedTransactions(void)
1977{
1978 int i;
1979
1980 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1981 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1982 {
1983 TransactionId xid;
1984 char *buf;
1985 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1986
1987 Assert(gxact->inredo);
1988
1989 xid = gxact->xid;
1990
1991 buf = ProcessTwoPhaseBuffer(xid,
1992 gxact->prepare_start_lsn,
1993 gxact->ondisk, false, false);
1994 if (buf != NULL)
1995 pfree(buf);
1996 }
1997 LWLockRelease(TwoPhaseStateLock);
1998}
1999
2000/*
2001 * RecoverPreparedTransactions
2002 *
2003 * Scan the shared memory entries of TwoPhaseState and reload the state for
2004 * each prepared transaction (reacquire locks, etc).
2005 *
2006 * This is run at the end of recovery, but before we allow backends to write
2007 * WAL.
2008 *
2009 * At the end of recovery the way we take snapshots will change. We now need
2010 * to mark all running transactions with their full SubTransSetParent() info
2011 * to allow normal snapshots to work correctly if snapshots overflow.
2012 * We do this here because by definition prepared transactions are the only
2013 * type of write transaction still running, so this is necessary and
2014 * complete.
2015 */
2016void
2017RecoverPreparedTransactions(void)
2018{
2019 int i;
2020
2021 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2022 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2023 {
2024 TransactionId xid;
2025 char *buf;
2026 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2027 char *bufptr;
2028 TwoPhaseFileHeader *hdr;
2029 TransactionId *subxids;
2030 const char *gid;
2031
2032 xid = gxact->xid;
2033
2034 /*
2035 * Reconstruct subtrans state for the transaction --- needed because
2036 * pg_subtrans is not preserved over a restart. Note that we are
2037 * linking all the subtransactions directly to the top-level XID;
2038 * there may originally have been a more complex hierarchy, but
2039 * there's no need to restore that exactly. It's possible that
2040 * SubTransSetParent has been set before, if the prepared transaction
2041 * generated xid assignment records.
2042 */
2043 buf = ProcessTwoPhaseBuffer(xid,
2044 gxact->prepare_start_lsn,
2045 gxact->ondisk, true, false);
2046 if (buf == NULL)
2047 continue;
2048
2049 ereport(LOG,
2050 (errmsg("recovering prepared transaction %u from shared memory", xid)));
2051
2052 hdr = (TwoPhaseFileHeader *) buf;
2053 Assert(TransactionIdEquals(hdr->xid, xid));
2054 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2055 gid = (const char *) bufptr;
2056 bufptr += MAXALIGN(hdr->gidlen);
2057 subxids = (TransactionId *) bufptr;
2058 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2059 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
2060 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
2061 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2062
2063 /*
2064 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2065 * added in redo and already has a shmem entry for it.
2066 */
2067 MarkAsPreparingGuts(gxact, xid, gid,
2068 hdr->prepared_at,
2069 hdr->owner, hdr->database);
2070
2071 /* recovered, so reset the flag for entries generated by redo */
2072 gxact->inredo = false;
2073
2074 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2075 MarkAsPrepared(gxact, true);
2076
2077 LWLockRelease(TwoPhaseStateLock);
2078
2079 /*
2080 * Recover other state (notably locks) using resource managers.
2081 */
2082 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2083
2084 /*
2085 * Release locks held by the standby process after we process each
2086 * prepared transaction. As a result, we don't need too many
2087 * additional locks at any one time.
2088 */
2089 if (InHotStandby)
2090 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2091
2092 /*
2093 * We're done with recovering this transaction. Clear MyLockedGxact,
2094 * like we do in PrepareTransaction() during normal operation.
2095 */
2096 PostPrepare_Twophase();
2097
2098 pfree(buf);
2099
2100 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2101 }
2102
2103 LWLockRelease(TwoPhaseStateLock);
2104}
2105
2106/*
2107 * ProcessTwoPhaseBuffer
2108 *
2109 * Given a transaction id, read it either from disk or read it directly
2110 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2111 *
2112 * If setParent is true, set up subtransaction parent linkages.
2113 *
2114 * If setNextXid is true, set ShmemVariableCache->nextFullXid to the newest
2115 * value scanned.
2116 */
2117static char *
2118ProcessTwoPhaseBuffer(TransactionId xid,
2119 XLogRecPtr prepare_start_lsn,
2120 bool fromdisk,
2121 bool setParent, bool setNextXid)
2122{
2123 FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid;
2124 TransactionId origNextXid = XidFromFullTransactionId(nextFullXid);
2125 TransactionId *subxids;
2126 char *buf;
2127 TwoPhaseFileHeader *hdr;
2128 int i;
2129
2130 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2131
2132 if (!fromdisk)
2133 Assert(prepare_start_lsn != InvalidXLogRecPtr);
2134
2135 /* Already processed? */
2136 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2137 {
2138 if (fromdisk)
2139 {
2140 ereport(WARNING,
2141 (errmsg("removing stale two-phase state file for transaction %u",
2142 xid)));
2143 RemoveTwoPhaseFile(xid, true);
2144 }
2145 else
2146 {
2147 ereport(WARNING,
2148 (errmsg("removing stale two-phase state from memory for transaction %u",
2149 xid)));
2150 PrepareRedoRemove(xid, true);
2151 }
2152 return NULL;
2153 }
2154
2155 /* Reject XID if too new */
2156 if (TransactionIdFollowsOrEquals(xid, origNextXid))
2157 {
2158 if (fromdisk)
2159 {
2160 ereport(WARNING,
2161 (errmsg("removing future two-phase state file for transaction %u",
2162 xid)));
2163 RemoveTwoPhaseFile(xid, true);
2164 }
2165 else
2166 {
2167 ereport(WARNING,
2168 (errmsg("removing future two-phase state from memory for transaction %u",
2169 xid)));
2170 PrepareRedoRemove(xid, true);
2171 }
2172 return NULL;
2173 }
2174
2175 if (fromdisk)
2176 {
2177 /* Read and validate file */
2178 buf = ReadTwoPhaseFile(xid, false);
2179 }
2180 else
2181 {
2182 /* Read xlog data */
2183 XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2184 }
2185
2186 /* Deconstruct header */
2187 hdr = (TwoPhaseFileHeader *) buf;
2188 if (!TransactionIdEquals(hdr->xid, xid))
2189 {
2190 if (fromdisk)
2191 ereport(ERROR,
2192 (errcode(ERRCODE_DATA_CORRUPTED),
2193 errmsg("corrupted two-phase state file for transaction %u",
2194 xid)));
2195 else
2196 ereport(ERROR,
2197 (errcode(ERRCODE_DATA_CORRUPTED),
2198 errmsg("corrupted two-phase state in memory for transaction %u",
2199 xid)));
2200 }
2201
2202 /*
2203 * Examine subtransaction XIDs ... they should all follow main XID, and
2204 * they may force us to advance nextFullXid.
2205 */
2206 subxids = (TransactionId *) (buf +
2207 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2208 MAXALIGN(hdr->gidlen));
2209 for (i = 0; i < hdr->nsubxacts; i++)
2210 {
2211 TransactionId subxid = subxids[i];
2212
2213 Assert(TransactionIdFollows(subxid, xid));
2214
2215 /* update nextFullXid if needed */
2216 if (setNextXid)
2217 AdvanceNextFullTransactionIdPastXid(subxid);
2218
2219 if (setParent)
2220 SubTransSetParent(subxid, xid);
2221 }
2222
2223 return buf;
2224}
2225
2226
2227/*
2228 * RecordTransactionCommitPrepared
2229 *
2230 * This is basically the same as RecordTransactionCommit (q.v. if you change
2231 * this function): in particular, we must set the delayChkpt flag to avoid a
2232 * race condition.
2233 *
2234 * We know the transaction made at least one XLOG entry (its PREPARE),
2235 * so it is never possible to optimize out the commit record.
2236 */
2237static void
2238RecordTransactionCommitPrepared(TransactionId xid,
2239 int nchildren,
2240 TransactionId *children,
2241 int nrels,
2242 RelFileNode *rels,
2243 int ninvalmsgs,
2244 SharedInvalidationMessage *invalmsgs,
2245 bool initfileinval,
2246 const char *gid)
2247{
2248 XLogRecPtr recptr;
2249 TimestampTz committs = GetCurrentTimestamp();
2250 bool replorigin;
2251
2252 /*
2253 * Are we using the replication origins feature? Or, in other words, are
2254 * we replaying remote actions?
2255 */
2256 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2257 replorigin_session_origin != DoNotReplicateId);
2258
2259 START_CRIT_SECTION();
2260
2261 /* See notes in RecordTransactionCommit */
2262 MyPgXact->delayChkpt = true;
2263
2264 /*
2265 * Emit the XLOG commit record. Note that we mark 2PC commits as
2266 * potentially having AccessExclusiveLocks since we don't know whether or
2267 * not they do.
2268 */
2269 recptr = XactLogCommitRecord(committs,
2270 nchildren, children, nrels, rels,
2271 ninvalmsgs, invalmsgs,
2272 initfileinval, false,
2273 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2274 xid, gid);
2275
2276
2277 if (replorigin)
2278 /* Move LSNs forward for this replication origin */
2279 replorigin_session_advance(replorigin_session_origin_lsn,
2280 XactLastRecEnd);
2281
2282 /*
2283 * Record commit timestamp. The value comes from plain commit timestamp
2284 * if replorigin is not enabled, or replorigin already set a value for us
2285 * in replorigin_session_origin_timestamp otherwise.
2286 *
2287 * We don't need to WAL-log anything here, as the commit record written
2288 * above already contains the data.
2289 */
2290 if (!replorigin || replorigin_session_origin_timestamp == 0)
2291 replorigin_session_origin_timestamp = committs;
2292
2293 TransactionTreeSetCommitTsData(xid, nchildren, children,
2294 replorigin_session_origin_timestamp,
2295 replorigin_session_origin, false);
2296
2297 /*
2298 * We don't currently try to sleep before flush here ... nor is there any
2299 * support for async commit of a prepared xact (the very idea is probably
2300 * a contradiction)
2301 */
2302
2303 /* Flush XLOG to disk */
2304 XLogFlush(recptr);
2305
2306 /* Mark the transaction committed in pg_xact */
2307 TransactionIdCommitTree(xid, nchildren, children);
2308
2309 /* Checkpoint can proceed now */
2310 MyPgXact->delayChkpt = false;
2311
2312 END_CRIT_SECTION();
2313
2314 /*
2315 * Wait for synchronous replication, if required.
2316 *
2317 * Note that at this stage we have marked clog, but still show as running
2318 * in the procarray and continue to hold locks.
2319 */
2320 SyncRepWaitForLSN(recptr, true);
2321}
2322
2323/*
2324 * RecordTransactionAbortPrepared
2325 *
2326 * This is basically the same as RecordTransactionAbort.
2327 *
2328 * We know the transaction made at least one XLOG entry (its PREPARE),
2329 * so it is never possible to optimize out the abort record.
2330 */
2331static void
2332RecordTransactionAbortPrepared(TransactionId xid,
2333 int nchildren,
2334 TransactionId *children,
2335 int nrels,
2336 RelFileNode *rels,
2337 const char *gid)
2338{
2339 XLogRecPtr recptr;
2340
2341 /*
2342 * Catch the scenario where we aborted partway through
2343 * RecordTransactionCommitPrepared ...
2344 */
2345 if (TransactionIdDidCommit(xid))
2346 elog(PANIC, "cannot abort transaction %u, it was already committed",
2347 xid);
2348
2349 START_CRIT_SECTION();
2350
2351 /*
2352 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2353 * potentially having AccessExclusiveLocks since we don't know whether or
2354 * not they do.
2355 */
2356 recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2357 nchildren, children,
2358 nrels, rels,
2359 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2360 xid, gid);
2361
2362 /* Always flush, since we're about to remove the 2PC state file */
2363 XLogFlush(recptr);
2364
2365 /*
2366 * Mark the transaction aborted in clog. This is not absolutely necessary
2367 * but we may as well do it while we are here.
2368 */
2369 TransactionIdAbortTree(xid, nchildren, children);
2370
2371 END_CRIT_SECTION();
2372
2373 /*
2374 * Wait for synchronous replication, if required.
2375 *
2376 * Note that at this stage we have marked clog, but still show as running
2377 * in the procarray and continue to hold locks.
2378 */
2379 SyncRepWaitForLSN(recptr, false);
2380}
2381
2382/*
2383 * PrepareRedoAdd
2384 *
2385 * Store pointers to the start/end of the WAL record along with the xid in
2386 * a gxact entry in shared memory TwoPhaseState structure. If caller
2387 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2388 * data, the entry is marked as located on disk.
2389 */
2390void
2391PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2392 XLogRecPtr end_lsn, RepOriginId origin_id)
2393{
2394 TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2395 char *bufptr;
2396 const char *gid;
2397 GlobalTransaction gxact;
2398
2399 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2400 Assert(RecoveryInProgress());
2401
2402 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2403 gid = (const char *) bufptr;
2404
2405 /*
2406 * Reserve the GID for the given transaction in the redo code path.
2407 *
2408 * This creates a gxact struct and puts it into the active array.
2409 *
2410 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2411 * shared memory. Hence, we only fill up the bare minimum contents here.
2412 * The gxact also gets marked with gxact->inredo set to true to indicate
2413 * that it got added in the redo phase
2414 */
2415
2416 /* Get a free gxact from the freelist */
2417 if (TwoPhaseState->freeGXacts == NULL)
2418 ereport(ERROR,
2419 (errcode(ERRCODE_OUT_OF_MEMORY),
2420 errmsg("maximum number of prepared transactions reached"),
2421 errhint("Increase max_prepared_transactions (currently %d).",
2422 max_prepared_xacts)));
2423 gxact = TwoPhaseState->freeGXacts;
2424 TwoPhaseState->freeGXacts = gxact->next;
2425
2426 gxact->prepared_at = hdr->prepared_at;
2427 gxact->prepare_start_lsn = start_lsn;
2428 gxact->prepare_end_lsn = end_lsn;
2429 gxact->xid = hdr->xid;
2430 gxact->owner = hdr->owner;
2431 gxact->locking_backend = InvalidBackendId;
2432 gxact->valid = false;
2433 gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2434 gxact->inredo = true; /* yes, added in redo */
2435 strcpy(gxact->gid, gid);
2436
2437 /* And insert it into the active array */
2438 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2439 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2440
2441 if (origin_id != InvalidRepOriginId)
2442 {
2443 /* recover apply progress */
2444 replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2445 false /* backward */ , false /* WAL */ );
2446 }
2447
2448 elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2449}
2450
2451/*
2452 * PrepareRedoRemove
2453 *
2454 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2455 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2456 *
2457 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2458 * is updated.
2459 */
2460void
2461PrepareRedoRemove(TransactionId xid, bool giveWarning)
2462{
2463 GlobalTransaction gxact = NULL;
2464 int i;
2465 bool found = false;
2466
2467 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2468 Assert(RecoveryInProgress());
2469
2470 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2471 {
2472 gxact = TwoPhaseState->prepXacts[i];
2473
2474 if (gxact->xid == xid)
2475 {
2476 Assert(gxact->inredo);
2477 found = true;
2478 break;
2479 }
2480 }
2481
2482 /*
2483 * Just leave if there is nothing, this is expected during WAL replay.
2484 */
2485 if (!found)
2486 return;
2487
2488 /*
2489 * And now we can clean up any files we may have left.
2490 */
2491 elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2492 if (gxact->ondisk)
2493 RemoveTwoPhaseFile(xid, giveWarning);
2494 RemoveGXact(gxact);
2495
2496 return;
2497}
2498