1/*-------------------------------------------------------------------------
2 *
3 * standby.c
4 * Misc functions used in Hot Standby mode.
5 *
6 * All functions for handling RM_STANDBY_ID, which relate to
7 * AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8 * Plus conflict recovery processing.
9 *
10 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
11 * Portions Copyright (c) 1994, Regents of the University of California
12 *
13 * IDENTIFICATION
14 * src/backend/storage/ipc/standby.c
15 *
16 *-------------------------------------------------------------------------
17 */
18#include "postgres.h"
19#include "access/transam.h"
20#include "access/twophase.h"
21#include "access/xact.h"
22#include "access/xlog.h"
23#include "access/xloginsert.h"
24#include "miscadmin.h"
25#include "pgstat.h"
26#include "storage/bufmgr.h"
27#include "storage/lmgr.h"
28#include "storage/proc.h"
29#include "storage/procarray.h"
30#include "storage/sinvaladt.h"
31#include "storage/standby.h"
32#include "utils/hsearch.h"
33#include "utils/memutils.h"
34#include "utils/ps_status.h"
35#include "utils/timeout.h"
36#include "utils/timestamp.h"
37
38/* User-settable GUC parameters */
39int vacuum_defer_cleanup_age;
40int max_standby_archive_delay = 30 * 1000;
41int max_standby_streaming_delay = 30 * 1000;
42
43static HTAB *RecoveryLockLists;
44
45static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
46 ProcSignalReason reason);
47static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
48static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
49static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
50
51/*
52 * Keep track of all the locks owned by a given transaction.
53 */
54typedef struct RecoveryLockListsEntry
55{
56 TransactionId xid;
57 List *locks;
58} RecoveryLockListsEntry;
59
60/*
61 * InitRecoveryTransactionEnvironment
62 * Initialize tracking of in-progress transactions in master
63 *
64 * We need to issue shared invalidations and hold locks. Holding locks
65 * means others may want to wait on us, so we need to make a lock table
66 * vxact entry like a real transaction. We could create and delete
67 * lock table entries for each transaction but its simpler just to create
68 * one permanent entry and leave it there all the time. Locks are then
69 * acquired and released as needed. Yes, this means you can see the
70 * Startup process in pg_locks once we have run this.
71 */
72void
73InitRecoveryTransactionEnvironment(void)
74{
75 VirtualTransactionId vxid;
76 HASHCTL hash_ctl;
77
78 /*
79 * Initialize the hash table for tracking the list of locks held by each
80 * transaction.
81 */
82 memset(&hash_ctl, 0, sizeof(hash_ctl));
83 hash_ctl.keysize = sizeof(TransactionId);
84 hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
85 RecoveryLockLists = hash_create("RecoveryLockLists",
86 64,
87 &hash_ctl,
88 HASH_ELEM | HASH_BLOBS);
89
90 /*
91 * Initialize shared invalidation management for Startup process, being
92 * careful to register ourselves as a sendOnly process so we don't need to
93 * read messages, nor will we get signalled when the queue starts filling
94 * up.
95 */
96 SharedInvalBackendInit(true);
97
98 /*
99 * Lock a virtual transaction id for Startup process.
100 *
101 * We need to do GetNextLocalTransactionId() because
102 * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
103 * manager doesn't like that at all.
104 *
105 * Note that we don't need to run XactLockTableInsert() because nobody
106 * needs to wait on xids. That sounds a little strange, but table locks
107 * are held by vxids and row level locks are held by xids. All queries
108 * hold AccessShareLocks so never block while we write or lock new rows.
109 */
110 vxid.backendId = MyBackendId;
111 vxid.localTransactionId = GetNextLocalTransactionId();
112 VirtualXactLockTableInsert(vxid);
113
114 standbyState = STANDBY_INITIALIZED;
115}
116
117/*
118 * ShutdownRecoveryTransactionEnvironment
119 * Shut down transaction tracking
120 *
121 * Prepare to switch from hot standby mode to normal operation. Shut down
122 * recovery-time transaction tracking.
123 */
124void
125ShutdownRecoveryTransactionEnvironment(void)
126{
127 /* Mark all tracked in-progress transactions as finished. */
128 ExpireAllKnownAssignedTransactionIds();
129
130 /* Release all locks the tracked transactions were holding */
131 StandbyReleaseAllLocks();
132
133 /* Destroy the hash table of locks. */
134 hash_destroy(RecoveryLockLists);
135 RecoveryLockLists = NULL;
136
137 /* Cleanup our VirtualTransaction */
138 VirtualXactLockTableCleanup();
139}
140
141
142/*
143 * -----------------------------------------------------
144 * Standby wait timers and backend cancel logic
145 * -----------------------------------------------------
146 */
147
148/*
149 * Determine the cutoff time at which we want to start canceling conflicting
150 * transactions. Returns zero (a time safely in the past) if we are willing
151 * to wait forever.
152 */
153static TimestampTz
154GetStandbyLimitTime(void)
155{
156 TimestampTz rtime;
157 bool fromStream;
158
159 /*
160 * The cutoff time is the last WAL data receipt time plus the appropriate
161 * delay variable. Delay of -1 means wait forever.
162 */
163 GetXLogReceiptTime(&rtime, &fromStream);
164 if (fromStream)
165 {
166 if (max_standby_streaming_delay < 0)
167 return 0; /* wait forever */
168 return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
169 }
170 else
171 {
172 if (max_standby_archive_delay < 0)
173 return 0; /* wait forever */
174 return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
175 }
176}
177
178#define STANDBY_INITIAL_WAIT_US 1000
179static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
180
181/*
182 * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
183 * We wait here for a while then return. If we decide we can't wait any
184 * more then we return true, if we can wait some more return false.
185 */
186static bool
187WaitExceedsMaxStandbyDelay(void)
188{
189 TimestampTz ltime;
190
191 CHECK_FOR_INTERRUPTS();
192
193 /* Are we past the limit time? */
194 ltime = GetStandbyLimitTime();
195 if (ltime && GetCurrentTimestamp() >= ltime)
196 return true;
197
198 /*
199 * Sleep a bit (this is essential to avoid busy-waiting).
200 */
201 pg_usleep(standbyWait_us);
202
203 /*
204 * Progressively increase the sleep times, but not to more than 1s, since
205 * pg_usleep isn't interruptible on some platforms.
206 */
207 standbyWait_us *= 2;
208 if (standbyWait_us > 1000000)
209 standbyWait_us = 1000000;
210
211 return false;
212}
213
214/*
215 * This is the main executioner for any query backend that conflicts with
216 * recovery processing. Judgement has already been passed on it within
217 * a specific rmgr. Here we just issue the orders to the procs. The procs
218 * then throw the required error as instructed.
219 */
220static void
221ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
222 ProcSignalReason reason)
223{
224 TimestampTz waitStart;
225 char *new_status;
226
227 /* Fast exit, to avoid a kernel call if there's no work to be done. */
228 if (!VirtualTransactionIdIsValid(*waitlist))
229 return;
230
231 waitStart = GetCurrentTimestamp();
232 new_status = NULL; /* we haven't changed the ps display */
233
234 while (VirtualTransactionIdIsValid(*waitlist))
235 {
236 /* reset standbyWait_us for each xact we wait for */
237 standbyWait_us = STANDBY_INITIAL_WAIT_US;
238
239 /* wait until the virtual xid is gone */
240 while (!VirtualXactLock(*waitlist, false))
241 {
242 /*
243 * Report via ps if we have been waiting for more than 500 msec
244 * (should that be configurable?)
245 */
246 if (update_process_title && new_status == NULL &&
247 TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
248 500))
249 {
250 const char *old_status;
251 int len;
252
253 old_status = get_ps_display(&len);
254 new_status = (char *) palloc(len + 8 + 1);
255 memcpy(new_status, old_status, len);
256 strcpy(new_status + len, " waiting");
257 set_ps_display(new_status, false);
258 new_status[len] = '\0'; /* truncate off " waiting" */
259 }
260
261 /* Is it time to kill it? */
262 if (WaitExceedsMaxStandbyDelay())
263 {
264 pid_t pid;
265
266 /*
267 * Now find out who to throw out of the balloon.
268 */
269 Assert(VirtualTransactionIdIsValid(*waitlist));
270 pid = CancelVirtualTransaction(*waitlist, reason);
271
272 /*
273 * Wait a little bit for it to die so that we avoid flooding
274 * an unresponsive backend when system is heavily loaded.
275 */
276 if (pid != 0)
277 pg_usleep(5000L);
278 }
279 }
280
281 /* The virtual transaction is gone now, wait for the next one */
282 waitlist++;
283 }
284
285 /* Reset ps display if we changed it */
286 if (new_status)
287 {
288 set_ps_display(new_status, false);
289 pfree(new_status);
290 }
291}
292
293void
294ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
295{
296 VirtualTransactionId *backends;
297
298 /*
299 * If we get passed InvalidTransactionId then we are a little surprised,
300 * but it is theoretically possible in normal running. It also happens
301 * when replaying already applied WAL records after a standby crash or
302 * restart, or when replaying an XLOG_HEAP2_VISIBLE record that marks as
303 * frozen a page which was already all-visible. If latestRemovedXid is
304 * invalid then there is no conflict. That rule applies across all record
305 * types that suffer from this conflict.
306 */
307 if (!TransactionIdIsValid(latestRemovedXid))
308 return;
309
310 backends = GetConflictingVirtualXIDs(latestRemovedXid,
311 node.dbNode);
312
313 ResolveRecoveryConflictWithVirtualXIDs(backends,
314 PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
315}
316
317void
318ResolveRecoveryConflictWithTablespace(Oid tsid)
319{
320 VirtualTransactionId *temp_file_users;
321
322 /*
323 * Standby users may be currently using this tablespace for their
324 * temporary files. We only care about current users because
325 * temp_tablespace parameter will just ignore tablespaces that no longer
326 * exist.
327 *
328 * Ask everybody to cancel their queries immediately so we can ensure no
329 * temp files remain and we can remove the tablespace. Nuke the entire
330 * site from orbit, it's the only way to be sure.
331 *
332 * XXX: We could work out the pids of active backends using this
333 * tablespace by examining the temp filenames in the directory. We would
334 * then convert the pids into VirtualXIDs before attempting to cancel
335 * them.
336 *
337 * We don't wait for commit because drop tablespace is non-transactional.
338 */
339 temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
340 InvalidOid);
341 ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
342 PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
343}
344
345void
346ResolveRecoveryConflictWithDatabase(Oid dbid)
347{
348 /*
349 * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
350 * only waits for transactions and completely idle sessions would block
351 * us. This is rare enough that we do this as simply as possible: no wait,
352 * just force them off immediately.
353 *
354 * No locking is required here because we already acquired
355 * AccessExclusiveLock. Anybody trying to connect while we do this will
356 * block during InitPostgres() and then disconnect when they see the
357 * database has been removed.
358 */
359 while (CountDBBackends(dbid) > 0)
360 {
361 CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
362
363 /*
364 * Wait awhile for them to die so that we avoid flooding an
365 * unresponsive backend when system is heavily loaded.
366 */
367 pg_usleep(10000);
368 }
369}
370
371/*
372 * ResolveRecoveryConflictWithLock is called from ProcSleep()
373 * to resolve conflicts with other backends holding relation locks.
374 *
375 * The WaitLatch sleep normally done in ProcSleep()
376 * (when not InHotStandby) is performed here, for code clarity.
377 *
378 * We either resolve conflicts immediately or set a timeout to wake us at
379 * the limit of our patience.
380 *
381 * Resolve conflicts by canceling to all backends holding a conflicting
382 * lock. As we are already queued to be granted the lock, no new lock
383 * requests conflicting with ours will be granted in the meantime.
384 *
385 * Deadlocks involving the Startup process and an ordinary backend process
386 * will be detected by the deadlock detector within the ordinary backend.
387 */
388void
389ResolveRecoveryConflictWithLock(LOCKTAG locktag)
390{
391 TimestampTz ltime;
392
393 Assert(InHotStandby);
394
395 ltime = GetStandbyLimitTime();
396
397 if (GetCurrentTimestamp() >= ltime)
398 {
399 /*
400 * We're already behind, so clear a path as quickly as possible.
401 */
402 VirtualTransactionId *backends;
403
404 backends = GetLockConflicts(&locktag, AccessExclusiveLock, NULL);
405 ResolveRecoveryConflictWithVirtualXIDs(backends,
406 PROCSIG_RECOVERY_CONFLICT_LOCK);
407 }
408 else
409 {
410 /*
411 * Wait (or wait again) until ltime
412 */
413 EnableTimeoutParams timeouts[1];
414
415 timeouts[0].id = STANDBY_LOCK_TIMEOUT;
416 timeouts[0].type = TMPARAM_AT;
417 timeouts[0].fin_time = ltime;
418 enable_timeouts(timeouts, 1);
419 }
420
421 /* Wait to be signaled by the release of the Relation Lock */
422 ProcWaitForSignal(PG_WAIT_LOCK | locktag.locktag_type);
423
424 /*
425 * Clear any timeout requests established above. We assume here that the
426 * Startup process doesn't have any other outstanding timeouts than those
427 * used by this function. If that stops being true, we could cancel the
428 * timeouts individually, but that'd be slower.
429 */
430 disable_all_timeouts(false);
431}
432
433/*
434 * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
435 * to resolve conflicts with other backends holding buffer pins.
436 *
437 * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
438 * (when not InHotStandby) is performed here, for code clarity.
439 *
440 * We either resolve conflicts immediately or set a timeout to wake us at
441 * the limit of our patience.
442 *
443 * Resolve conflicts by sending a PROCSIG signal to all backends to check if
444 * they hold one of the buffer pins that is blocking Startup process. If so,
445 * those backends will take an appropriate error action, ERROR or FATAL.
446 *
447 * We also must check for deadlocks. Deadlocks occur because if queries
448 * wait on a lock, that must be behind an AccessExclusiveLock, which can only
449 * be cleared if the Startup process replays a transaction completion record.
450 * If Startup process is also waiting then that is a deadlock. The deadlock
451 * can occur if the query is waiting and then the Startup sleeps, or if
452 * Startup is sleeping and the query waits on a lock. We protect against
453 * only the former sequence here, the latter sequence is checked prior to
454 * the query sleeping, in CheckRecoveryConflictDeadlock().
455 *
456 * Deadlocks are extremely rare, and relatively expensive to check for,
457 * so we don't do a deadlock check right away ... only if we have had to wait
458 * at least deadlock_timeout.
459 */
460void
461ResolveRecoveryConflictWithBufferPin(void)
462{
463 TimestampTz ltime;
464
465 Assert(InHotStandby);
466
467 ltime = GetStandbyLimitTime();
468
469 if (ltime == 0)
470 {
471 /*
472 * We're willing to wait forever for conflicts, so set timeout for
473 * deadlock check only
474 */
475 enable_timeout_after(STANDBY_DEADLOCK_TIMEOUT, DeadlockTimeout);
476 }
477 else if (GetCurrentTimestamp() >= ltime)
478 {
479 /*
480 * We're already behind, so clear a path as quickly as possible.
481 */
482 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
483 }
484 else
485 {
486 /*
487 * Wake up at ltime, and check for deadlocks as well if we will be
488 * waiting longer than deadlock_timeout
489 */
490 EnableTimeoutParams timeouts[2];
491
492 timeouts[0].id = STANDBY_TIMEOUT;
493 timeouts[0].type = TMPARAM_AT;
494 timeouts[0].fin_time = ltime;
495 timeouts[1].id = STANDBY_DEADLOCK_TIMEOUT;
496 timeouts[1].type = TMPARAM_AFTER;
497 timeouts[1].delay_ms = DeadlockTimeout;
498 enable_timeouts(timeouts, 2);
499 }
500
501 /* Wait to be signaled by UnpinBuffer() */
502 ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
503
504 /*
505 * Clear any timeout requests established above. We assume here that the
506 * Startup process doesn't have any other timeouts than what this function
507 * uses. If that stops being true, we could cancel the timeouts
508 * individually, but that'd be slower.
509 */
510 disable_all_timeouts(false);
511}
512
513static void
514SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
515{
516 Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
517 reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
518
519 /*
520 * We send signal to all backends to ask them if they are holding the
521 * buffer pin which is delaying the Startup process. We must not set the
522 * conflict flag yet, since most backends will be innocent. Let the
523 * SIGUSR1 handling in each backend decide their own fate.
524 */
525 CancelDBBackends(InvalidOid, reason, false);
526}
527
528/*
529 * In Hot Standby perform early deadlock detection. We abort the lock
530 * wait if we are about to sleep while holding the buffer pin that Startup
531 * process is waiting for.
532 *
533 * Note: this code is pessimistic, because there is no way for it to
534 * determine whether an actual deadlock condition is present: the lock we
535 * need to wait for might be unrelated to any held by the Startup process.
536 * Sooner or later, this mechanism should get ripped out in favor of somehow
537 * accounting for buffer locks in DeadLockCheck(). However, errors here
538 * seem to be very low-probability in practice, so for now it's not worth
539 * the trouble.
540 */
541void
542CheckRecoveryConflictDeadlock(void)
543{
544 Assert(!InRecovery); /* do not call in Startup process */
545
546 if (!HoldingBufferPinThatDelaysRecovery())
547 return;
548
549 /*
550 * Error message should match ProcessInterrupts() but we avoid calling
551 * that because we aren't handling an interrupt at this point. Note that
552 * we only cancel the current transaction here, so if we are in a
553 * subtransaction and the pin is held by a parent, then the Startup
554 * process will continue to wait even though we have avoided deadlock.
555 */
556 ereport(ERROR,
557 (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
558 errmsg("canceling statement due to conflict with recovery"),
559 errdetail("User transaction caused buffer deadlock with recovery.")));
560}
561
562
563/* --------------------------------
564 * timeout handler routines
565 * --------------------------------
566 */
567
568/*
569 * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
570 * occurs before STANDBY_TIMEOUT. Send out a request for hot-standby
571 * backends to check themselves for deadlocks.
572 */
573void
574StandbyDeadLockHandler(void)
575{
576 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
577}
578
579/*
580 * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
581 * Send out a request to release conflicting buffer pins unconditionally,
582 * so we can press ahead with applying changes in recovery.
583 */
584void
585StandbyTimeoutHandler(void)
586{
587 /* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
588 disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
589
590 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
591}
592
593/*
594 * StandbyLockTimeoutHandler() will be called if STANDBY_LOCK_TIMEOUT is exceeded.
595 * This doesn't need to do anything, simply waking up is enough.
596 */
597void
598StandbyLockTimeoutHandler(void)
599{
600}
601
602/*
603 * -----------------------------------------------------
604 * Locking in Recovery Mode
605 * -----------------------------------------------------
606 *
607 * All locks are held by the Startup process using a single virtual
608 * transaction. This implementation is both simpler and in some senses,
609 * more correct. The locks held mean "some original transaction held
610 * this lock, so query access is not allowed at this time". So the Startup
611 * process is the proxy by which the original locks are implemented.
612 *
613 * We only keep track of AccessExclusiveLocks, which are only ever held by
614 * one transaction on one relation.
615 *
616 * We keep a hash table of lists of locks in local memory keyed by xid,
617 * RecoveryLockLists, so we can keep track of the various entries made by
618 * the Startup process's virtual xid in the shared lock table.
619 *
620 * List elements use type xl_standby_lock, since the WAL record type exactly
621 * matches the information that we need to keep track of.
622 *
623 * We use session locks rather than normal locks so we don't need
624 * ResourceOwners.
625 */
626
627
628void
629StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
630{
631 RecoveryLockListsEntry *entry;
632 xl_standby_lock *newlock;
633 LOCKTAG locktag;
634 bool found;
635
636 /* Already processed? */
637 if (!TransactionIdIsValid(xid) ||
638 TransactionIdDidCommit(xid) ||
639 TransactionIdDidAbort(xid))
640 return;
641
642 elog(trace_recovery(DEBUG4),
643 "adding recovery lock: db %u rel %u", dbOid, relOid);
644
645 /* dbOid is InvalidOid when we are locking a shared relation. */
646 Assert(OidIsValid(relOid));
647
648 /* Create a new list for this xid, if we don't have one already. */
649 entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
650 if (!found)
651 {
652 entry->xid = xid;
653 entry->locks = NIL;
654 }
655
656 newlock = palloc(sizeof(xl_standby_lock));
657 newlock->xid = xid;
658 newlock->dbOid = dbOid;
659 newlock->relOid = relOid;
660 entry->locks = lappend(entry->locks, newlock);
661
662 SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
663
664 (void) LockAcquire(&locktag, AccessExclusiveLock, true, false);
665}
666
667static void
668StandbyReleaseLockList(List *locks)
669{
670 while (locks)
671 {
672 xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
673 LOCKTAG locktag;
674
675 elog(trace_recovery(DEBUG4),
676 "releasing recovery lock: xid %u db %u rel %u",
677 lock->xid, lock->dbOid, lock->relOid);
678 SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
679 if (!LockRelease(&locktag, AccessExclusiveLock, true))
680 {
681 elog(LOG,
682 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
683 lock->xid, lock->dbOid, lock->relOid);
684 Assert(false);
685 }
686 pfree(lock);
687 locks = list_delete_first(locks);
688 }
689}
690
691static void
692StandbyReleaseLocks(TransactionId xid)
693{
694 RecoveryLockListsEntry *entry;
695
696 if (TransactionIdIsValid(xid))
697 {
698 if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
699 {
700 StandbyReleaseLockList(entry->locks);
701 hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
702 }
703 }
704 else
705 StandbyReleaseAllLocks();
706}
707
708/*
709 * Release locks for a transaction tree, starting at xid down, from
710 * RecoveryLockLists.
711 *
712 * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
713 * to remove any AccessExclusiveLocks requested by a transaction.
714 */
715void
716StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
717{
718 int i;
719
720 StandbyReleaseLocks(xid);
721
722 for (i = 0; i < nsubxids; i++)
723 StandbyReleaseLocks(subxids[i]);
724}
725
726/*
727 * Called at end of recovery and when we see a shutdown checkpoint.
728 */
729void
730StandbyReleaseAllLocks(void)
731{
732 HASH_SEQ_STATUS status;
733 RecoveryLockListsEntry *entry;
734
735 elog(trace_recovery(DEBUG2), "release all standby locks");
736
737 hash_seq_init(&status, RecoveryLockLists);
738 while ((entry = hash_seq_search(&status)))
739 {
740 StandbyReleaseLockList(entry->locks);
741 hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
742 }
743}
744
745/*
746 * StandbyReleaseOldLocks
747 * Release standby locks held by top-level XIDs that aren't running,
748 * as long as they're not prepared transactions.
749 */
750void
751StandbyReleaseOldLocks(TransactionId oldxid)
752{
753 HASH_SEQ_STATUS status;
754 RecoveryLockListsEntry *entry;
755
756 hash_seq_init(&status, RecoveryLockLists);
757 while ((entry = hash_seq_search(&status)))
758 {
759 Assert(TransactionIdIsValid(entry->xid));
760
761 /* Skip if prepared transaction. */
762 if (StandbyTransactionIdIsPrepared(entry->xid))
763 continue;
764
765 /* Skip if >= oldxid. */
766 if (!TransactionIdPrecedes(entry->xid, oldxid))
767 continue;
768
769 /* Remove all locks and hash table entry. */
770 StandbyReleaseLockList(entry->locks);
771 hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
772 }
773}
774
775/*
776 * --------------------------------------------------------------------
777 * Recovery handling for Rmgr RM_STANDBY_ID
778 *
779 * These record types will only be created if XLogStandbyInfoActive()
780 * --------------------------------------------------------------------
781 */
782
783void
784standby_redo(XLogReaderState *record)
785{
786 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
787
788 /* Backup blocks are not used in standby records */
789 Assert(!XLogRecHasAnyBlockRefs(record));
790
791 /* Do nothing if we're not in hot standby mode */
792 if (standbyState == STANDBY_DISABLED)
793 return;
794
795 if (info == XLOG_STANDBY_LOCK)
796 {
797 xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
798 int i;
799
800 for (i = 0; i < xlrec->nlocks; i++)
801 StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
802 xlrec->locks[i].dbOid,
803 xlrec->locks[i].relOid);
804 }
805 else if (info == XLOG_RUNNING_XACTS)
806 {
807 xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
808 RunningTransactionsData running;
809
810 running.xcnt = xlrec->xcnt;
811 running.subxcnt = xlrec->subxcnt;
812 running.subxid_overflow = xlrec->subxid_overflow;
813 running.nextXid = xlrec->nextXid;
814 running.latestCompletedXid = xlrec->latestCompletedXid;
815 running.oldestRunningXid = xlrec->oldestRunningXid;
816 running.xids = xlrec->xids;
817
818 ProcArrayApplyRecoveryInfo(&running);
819 }
820 else if (info == XLOG_INVALIDATIONS)
821 {
822 xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
823
824 ProcessCommittedInvalidationMessages(xlrec->msgs,
825 xlrec->nmsgs,
826 xlrec->relcacheInitFileInval,
827 xlrec->dbId,
828 xlrec->tsId);
829 }
830 else
831 elog(PANIC, "standby_redo: unknown op code %u", info);
832}
833
834/*
835 * Log details of the current snapshot to WAL. This allows the snapshot state
836 * to be reconstructed on the standby and for logical decoding.
837 *
838 * This is used for Hot Standby as follows:
839 *
840 * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
841 * start from a shutdown checkpoint because we know nothing was running
842 * at that time and our recovery snapshot is known empty. In the more
843 * typical case of an online checkpoint we need to jump through a few
844 * hoops to get a correct recovery snapshot and this requires a two or
845 * sometimes a three stage process.
846 *
847 * The initial snapshot must contain all running xids and all current
848 * AccessExclusiveLocks at a point in time on the standby. Assembling
849 * that information while the server is running requires many and
850 * various LWLocks, so we choose to derive that information piece by
851 * piece and then re-assemble that info on the standby. When that
852 * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
853 *
854 * Since locking on the primary when we derive the information is not
855 * strict, we note that there is a time window between the derivation and
856 * writing to WAL of the derived information. That allows race conditions
857 * that we must resolve, since xids and locks may enter or leave the
858 * snapshot during that window. This creates the issue that an xid or
859 * lock may start *after* the snapshot has been derived yet *before* the
860 * snapshot is logged in the running xacts WAL record. We resolve this by
861 * starting to accumulate changes at a point just prior to when we derive
862 * the snapshot on the primary, then ignore duplicates when we later apply
863 * the snapshot from the running xacts record. This is implemented during
864 * CreateCheckpoint() where we use the logical checkpoint location as
865 * our starting point and then write the running xacts record immediately
866 * before writing the main checkpoint WAL record. Since we always start
867 * up from a checkpoint and are immediately at our starting point, we
868 * unconditionally move to STANDBY_INITIALIZED. After this point we
869 * must do 4 things:
870 * * move shared nextFullXid forwards as we see new xids
871 * * extend the clog and subtrans with each new xid
872 * * keep track of uncommitted known assigned xids
873 * * keep track of uncommitted AccessExclusiveLocks
874 *
875 * When we see a commit/abort we must remove known assigned xids and locks
876 * from the completing transaction. Attempted removals that cannot locate
877 * an entry are expected and must not cause an error when we are in state
878 * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
879 * KnownAssignedXidsRemove().
880 *
881 * Later, when we apply the running xact data we must be careful to ignore
882 * transactions already committed, since those commits raced ahead when
883 * making WAL entries.
884 *
885 * The loose timing also means that locks may be recorded that have a
886 * zero xid, since xids are removed from procs before locks are removed.
887 * So we must prune the lock list down to ensure we hold locks only for
888 * currently running xids, performed by StandbyReleaseOldLocks().
889 * Zero xids should no longer be possible, but we may be replaying WAL
890 * from a time when they were possible.
891 *
892 * For logical decoding only the running xacts information is needed;
893 * there's no need to look at the locking information, but it's logged anyway,
894 * as there's no independent knob to just enable logical decoding. For
895 * details of how this is used, check snapbuild.c's introductory comment.
896 *
897 *
898 * Returns the RecPtr of the last inserted record.
899 */
900XLogRecPtr
901LogStandbySnapshot(void)
902{
903 XLogRecPtr recptr;
904 RunningTransactions running;
905 xl_standby_lock *locks;
906 int nlocks;
907
908 Assert(XLogStandbyInfoActive());
909
910 /*
911 * Get details of any AccessExclusiveLocks being held at the moment.
912 */
913 locks = GetRunningTransactionLocks(&nlocks);
914 if (nlocks > 0)
915 LogAccessExclusiveLocks(nlocks, locks);
916 pfree(locks);
917
918 /*
919 * Log details of all in-progress transactions. This should be the last
920 * record we write, because standby will open up when it sees this.
921 */
922 running = GetRunningTransactionData();
923
924 /*
925 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
926 * For Hot Standby this can be done before inserting the WAL record
927 * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
928 * the clog. For logical decoding, though, the lock can't be released
929 * early because the clog might be "in the future" from the POV of the
930 * historic snapshot. This would allow for situations where we're waiting
931 * for the end of a transaction listed in the xl_running_xacts record
932 * which, according to the WAL, has committed before the xl_running_xacts
933 * record. Fortunately this routine isn't executed frequently, and it's
934 * only a shared lock.
935 */
936 if (wal_level < WAL_LEVEL_LOGICAL)
937 LWLockRelease(ProcArrayLock);
938
939 recptr = LogCurrentRunningXacts(running);
940
941 /* Release lock if we kept it longer ... */
942 if (wal_level >= WAL_LEVEL_LOGICAL)
943 LWLockRelease(ProcArrayLock);
944
945 /* GetRunningTransactionData() acquired XidGenLock, we must release it */
946 LWLockRelease(XidGenLock);
947
948 return recptr;
949}
950
951/*
952 * Record an enhanced snapshot of running transactions into WAL.
953 *
954 * The definitions of RunningTransactionsData and xl_xact_running_xacts are
955 * similar. We keep them separate because xl_xact_running_xacts is a
956 * contiguous chunk of memory and never exists fully until it is assembled in
957 * WAL. The inserted records are marked as not being important for durability,
958 * to avoid triggering superfluous checkpoint / archiving activity.
959 */
960static XLogRecPtr
961LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
962{
963 xl_running_xacts xlrec;
964 XLogRecPtr recptr;
965
966 xlrec.xcnt = CurrRunningXacts->xcnt;
967 xlrec.subxcnt = CurrRunningXacts->subxcnt;
968 xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
969 xlrec.nextXid = CurrRunningXacts->nextXid;
970 xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
971 xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
972
973 /* Header */
974 XLogBeginInsert();
975 XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
976 XLogRegisterData((char *) (&xlrec), MinSizeOfXactRunningXacts);
977
978 /* array of TransactionIds */
979 if (xlrec.xcnt > 0)
980 XLogRegisterData((char *) CurrRunningXacts->xids,
981 (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
982
983 recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
984
985 if (CurrRunningXacts->subxid_overflow)
986 elog(trace_recovery(DEBUG2),
987 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
988 CurrRunningXacts->xcnt,
989 (uint32) (recptr >> 32), (uint32) recptr,
990 CurrRunningXacts->oldestRunningXid,
991 CurrRunningXacts->latestCompletedXid,
992 CurrRunningXacts->nextXid);
993 else
994 elog(trace_recovery(DEBUG2),
995 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
996 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
997 (uint32) (recptr >> 32), (uint32) recptr,
998 CurrRunningXacts->oldestRunningXid,
999 CurrRunningXacts->latestCompletedXid,
1000 CurrRunningXacts->nextXid);
1001
1002 /*
1003 * Ensure running_xacts information is synced to disk not too far in the
1004 * future. We don't want to stall anything though (i.e. use XLogFlush()),
1005 * so we let the wal writer do it during normal operation.
1006 * XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1007 * and nudge the WALWriter into action if sleeping. Check
1008 * XLogBackgroundFlush() for details why a record might not be flushed
1009 * without it.
1010 */
1011 XLogSetAsyncXactLSN(recptr);
1012
1013 return recptr;
1014}
1015
1016/*
1017 * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
1018 * logged, as described in backend/storage/lmgr/README.
1019 */
1020static void
1021LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
1022{
1023 xl_standby_locks xlrec;
1024
1025 xlrec.nlocks = nlocks;
1026
1027 XLogBeginInsert();
1028 XLogRegisterData((char *) &xlrec, offsetof(xl_standby_locks, locks));
1029 XLogRegisterData((char *) locks, nlocks * sizeof(xl_standby_lock));
1030 XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1031
1032 (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK);
1033}
1034
1035/*
1036 * Individual logging of AccessExclusiveLocks for use during LockAcquire()
1037 */
1038void
1039LogAccessExclusiveLock(Oid dbOid, Oid relOid)
1040{
1041 xl_standby_lock xlrec;
1042
1043 xlrec.xid = GetCurrentTransactionId();
1044
1045 xlrec.dbOid = dbOid;
1046 xlrec.relOid = relOid;
1047
1048 LogAccessExclusiveLocks(1, &xlrec);
1049 MyXactFlags |= XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK;
1050}
1051
1052/*
1053 * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
1054 */
1055void
1056LogAccessExclusiveLockPrepare(void)
1057{
1058 /*
1059 * Ensure that a TransactionId has been assigned to this transaction, for
1060 * two reasons, both related to lock release on the standby. First, we
1061 * must assign an xid so that RecordTransactionCommit() and
1062 * RecordTransactionAbort() do not optimise away the transaction
1063 * completion record which recovery relies upon to release locks. It's a
1064 * hack, but for a corner case not worth adding code for into the main
1065 * commit path. Second, we must assign an xid before the lock is recorded
1066 * in shared memory, otherwise a concurrently executing
1067 * GetRunningTransactionLocks() might see a lock associated with an
1068 * InvalidTransactionId which we later assert cannot happen.
1069 */
1070 (void) GetCurrentTransactionId();
1071}
1072
1073/*
1074 * Emit WAL for invalidations. This currently is only used for commits without
1075 * an xid but which contain invalidations.
1076 */
1077void
1078LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
1079 bool relcacheInitFileInval)
1080{
1081 xl_invalidations xlrec;
1082
1083 /* prepare record */
1084 memset(&xlrec, 0, sizeof(xlrec));
1085 xlrec.dbId = MyDatabaseId;
1086 xlrec.tsId = MyDatabaseTableSpace;
1087 xlrec.relcacheInitFileInval = relcacheInitFileInval;
1088 xlrec.nmsgs = nmsgs;
1089
1090 /* perform insertion */
1091 XLogBeginInsert();
1092 XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
1093 XLogRegisterData((char *) msgs,
1094 nmsgs * sizeof(SharedInvalidationMessage));
1095 XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
1096}
1097