1/*-------------------------------------------------------------------------
2 *
3 * syncrep.c
4 *
5 * Synchronous replication is new as of PostgreSQL 9.1.
6 *
7 * If requested, transaction commits wait until their commit LSN are
8 * acknowledged by the synchronous standbys.
9 *
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
13 *
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standbys are completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
21 *
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then we wait for
24 * the write, flush or apply location on the standby before releasing
25 * the waiting backend. Further complexity in that interaction is
26 * expected in later releases.
27 *
28 * The best performing way to manage the waiting backends is to have a
29 * single ordered queue of waiting backends, so that we can avoid
30 * searching the through all waiters each time we receive a reply.
31 *
32 * In 9.5 or before only a single standby could be considered as
33 * synchronous. In 9.6 we support a priority-based multiple synchronous
34 * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 * supported. The number of synchronous standbys that transactions
36 * must wait for replies from is specified in synchronous_standby_names.
37 * This parameter also specifies a list of standby names and the method
38 * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 *
40 * The method FIRST specifies a priority-based synchronous replication
41 * and makes transaction commits wait until their WAL records are
42 * replicated to the requested number of synchronous standbys chosen based
43 * on their priorities. The standbys whose names appear earlier in the list
44 * are given higher priority and will be considered as synchronous.
45 * Other standby servers appearing later in this list represent potential
46 * synchronous standbys. If any of the current synchronous standbys
47 * disconnects for whatever reason, it will be replaced immediately with
48 * the next-highest-priority standby.
49 *
50 * The method ANY specifies a quorum-based synchronous replication
51 * and makes transaction commits wait until their WAL records are
52 * replicated to at least the requested number of synchronous standbys
53 * in the list. All the standbys appearing in the list are considered as
54 * candidates for quorum synchronous standbys.
55 *
56 * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 * This is for backward compatibility with 9.6 or before where only a
58 * priority-based sync replication was supported.
59 *
60 * Before the standbys chosen from synchronous_standby_names can
61 * become the synchronous standbys they must have caught up with
62 * the primary; that may take some time. Once caught up,
63 * the standbys which are considered as synchronous at that moment
64 * will release waiters from the queue.
65 *
66 * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
67 *
68 * IDENTIFICATION
69 * src/backend/replication/syncrep.c
70 *
71 *-------------------------------------------------------------------------
72 */
73#include "postgres.h"
74
75#include <unistd.h>
76
77#include "access/xact.h"
78#include "miscadmin.h"
79#include "pgstat.h"
80#include "replication/syncrep.h"
81#include "replication/walsender.h"
82#include "replication/walsender_private.h"
83#include "storage/pmsignal.h"
84#include "storage/proc.h"
85#include "tcop/tcopprot.h"
86#include "utils/builtins.h"
87#include "utils/ps_status.h"
88
89/* User-settable parameters for sync rep */
90char *SyncRepStandbyNames;
91
92#define SyncStandbysDefined() \
93 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94
95static bool announce_next_takeover = true;
96
97SyncRepConfigData *SyncRepConfig = NULL;
98static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
99
100static void SyncRepQueueInsert(int mode);
101static void SyncRepCancelWait(void);
102static int SyncRepWakeQueue(bool all, int mode);
103
104static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 XLogRecPtr *flushPtr,
106 XLogRecPtr *applyPtr,
107 bool *am_sync);
108static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 XLogRecPtr *flushPtr,
110 XLogRecPtr *applyPtr,
111 List *sync_standbys);
112static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
113 XLogRecPtr *flushPtr,
114 XLogRecPtr *applyPtr,
115 List *sync_standbys, uint8 nth);
116static int SyncRepGetStandbyPriority(void);
117static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
118static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
119static int cmp_lsn(const void *a, const void *b);
120
121#ifdef USE_ASSERT_CHECKING
122static bool SyncRepQueueIsOrderedByLSN(int mode);
123#endif
124
125/*
126 * ===========================================================
127 * Synchronous Replication functions for normal user backends
128 * ===========================================================
129 */
130
131/*
132 * Wait for synchronous replication, if requested by user.
133 *
134 * Initially backends start in state SYNC_REP_NOT_WAITING and then
135 * change that state to SYNC_REP_WAITING before adding ourselves
136 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
137 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
138 * This backend then resets its state to SYNC_REP_NOT_WAITING.
139 *
140 * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
141 * represents a commit record. If it doesn't, then we wait only for the WAL
142 * to be flushed if synchronous_commit is set to the higher level of
143 * remote_apply, because only commit records provide apply feedback.
144 */
145void
146SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
147{
148 char *new_status = NULL;
149 const char *old_status;
150 int mode;
151
152 /* Cap the level for anything other than commit to remote flush only. */
153 if (commit)
154 mode = SyncRepWaitMode;
155 else
156 mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
157
158 /*
159 * Fast exit if user has not requested sync replication.
160 */
161 if (!SyncRepRequested())
162 return;
163
164 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
165 Assert(WalSndCtl != NULL);
166
167 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
168 Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
169
170 /*
171 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
172 * set. See SyncRepUpdateSyncStandbysDefined.
173 *
174 * Also check that the standby hasn't already replied. Unlikely race
175 * condition but we'll be fetching that cache line anyway so it's likely
176 * to be a low cost check.
177 */
178 if (!WalSndCtl->sync_standbys_defined ||
179 lsn <= WalSndCtl->lsn[mode])
180 {
181 LWLockRelease(SyncRepLock);
182 return;
183 }
184
185 /*
186 * Set our waitLSN so WALSender will know when to wake us, and add
187 * ourselves to the queue.
188 */
189 MyProc->waitLSN = lsn;
190 MyProc->syncRepState = SYNC_REP_WAITING;
191 SyncRepQueueInsert(mode);
192 Assert(SyncRepQueueIsOrderedByLSN(mode));
193 LWLockRelease(SyncRepLock);
194
195 /* Alter ps display to show waiting for sync rep. */
196 if (update_process_title)
197 {
198 int len;
199
200 old_status = get_ps_display(&len);
201 new_status = (char *) palloc(len + 32 + 1);
202 memcpy(new_status, old_status, len);
203 sprintf(new_status + len, " waiting for %X/%X",
204 (uint32) (lsn >> 32), (uint32) lsn);
205 set_ps_display(new_status, false);
206 new_status[len] = '\0'; /* truncate off " waiting ..." */
207 }
208
209 /*
210 * Wait for specified LSN to be confirmed.
211 *
212 * Each proc has its own wait latch, so we perform a normal latch
213 * check/wait loop here.
214 */
215 for (;;)
216 {
217 int rc;
218
219 /* Must reset the latch before testing state. */
220 ResetLatch(MyLatch);
221
222 /*
223 * Acquiring the lock is not needed, the latch ensures proper
224 * barriers. If it looks like we're done, we must really be done,
225 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
226 * it will never update it again, so we can't be seeing a stale value
227 * in that case.
228 */
229 if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
230 break;
231
232 /*
233 * If a wait for synchronous replication is pending, we can neither
234 * acknowledge the commit nor raise ERROR or FATAL. The latter would
235 * lead the client to believe that the transaction aborted, which is
236 * not true: it's already committed locally. The former is no good
237 * either: the client has requested synchronous replication, and is
238 * entitled to assume that an acknowledged commit is also replicated,
239 * which might not be true. So in this case we issue a WARNING (which
240 * some clients may be able to interpret) and shut off further output.
241 * We do NOT reset ProcDiePending, so that the process will die after
242 * the commit is cleaned up.
243 */
244 if (ProcDiePending)
245 {
246 ereport(WARNING,
247 (errcode(ERRCODE_ADMIN_SHUTDOWN),
248 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
249 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
250 whereToSendOutput = DestNone;
251 SyncRepCancelWait();
252 break;
253 }
254
255 /*
256 * It's unclear what to do if a query cancel interrupt arrives. We
257 * can't actually abort at this point, but ignoring the interrupt
258 * altogether is not helpful, so we just terminate the wait with a
259 * suitable warning.
260 */
261 if (QueryCancelPending)
262 {
263 QueryCancelPending = false;
264 ereport(WARNING,
265 (errmsg("canceling wait for synchronous replication due to user request"),
266 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
267 SyncRepCancelWait();
268 break;
269 }
270
271 /*
272 * Wait on latch. Any condition that should wake us up will set the
273 * latch, so no need for timeout.
274 */
275 rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
276 WAIT_EVENT_SYNC_REP);
277
278 /*
279 * If the postmaster dies, we'll probably never get an acknowledgment,
280 * because all the wal sender processes will exit. So just bail out.
281 */
282 if (rc & WL_POSTMASTER_DEATH)
283 {
284 ProcDiePending = true;
285 whereToSendOutput = DestNone;
286 SyncRepCancelWait();
287 break;
288 }
289 }
290
291 /*
292 * WalSender has checked our LSN and has removed us from queue. Clean up
293 * state and leave. It's OK to reset these shared memory fields without
294 * holding SyncRepLock, because any walsenders will ignore us anyway when
295 * we're not on the queue. We need a read barrier to make sure we see the
296 * changes to the queue link (this might be unnecessary without
297 * assertions, but better safe than sorry).
298 */
299 pg_read_barrier();
300 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
301 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
302 MyProc->waitLSN = 0;
303
304 if (new_status)
305 {
306 /* Reset ps display */
307 set_ps_display(new_status, false);
308 pfree(new_status);
309 }
310}
311
312/*
313 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
314 *
315 * Usually we will go at tail of queue, though it's possible that we arrive
316 * here out of order, so start at tail and work back to insertion point.
317 */
318static void
319SyncRepQueueInsert(int mode)
320{
321 PGPROC *proc;
322
323 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
324 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
325 &(WalSndCtl->SyncRepQueue[mode]),
326 offsetof(PGPROC, syncRepLinks));
327
328 while (proc)
329 {
330 /*
331 * Stop at the queue element that we should after to ensure the queue
332 * is ordered by LSN.
333 */
334 if (proc->waitLSN < MyProc->waitLSN)
335 break;
336
337 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
338 &(proc->syncRepLinks),
339 offsetof(PGPROC, syncRepLinks));
340 }
341
342 if (proc)
343 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
344 else
345 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
346}
347
348/*
349 * Acquire SyncRepLock and cancel any wait currently in progress.
350 */
351static void
352SyncRepCancelWait(void)
353{
354 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
355 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
356 SHMQueueDelete(&(MyProc->syncRepLinks));
357 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
358 LWLockRelease(SyncRepLock);
359}
360
361void
362SyncRepCleanupAtProcExit(void)
363{
364 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
365 {
366 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
367 SHMQueueDelete(&(MyProc->syncRepLinks));
368 LWLockRelease(SyncRepLock);
369 }
370}
371
372/*
373 * ===========================================================
374 * Synchronous Replication functions for wal sender processes
375 * ===========================================================
376 */
377
378/*
379 * Take any action required to initialise sync rep state from config
380 * data. Called at WALSender startup and after each SIGHUP.
381 */
382void
383SyncRepInitConfig(void)
384{
385 int priority;
386
387 /*
388 * Determine if we are a potential sync standby and remember the result
389 * for handling replies from standby.
390 */
391 priority = SyncRepGetStandbyPriority();
392 if (MyWalSnd->sync_standby_priority != priority)
393 {
394 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
395 MyWalSnd->sync_standby_priority = priority;
396 LWLockRelease(SyncRepLock);
397 ereport(DEBUG1,
398 (errmsg("standby \"%s\" now has synchronous standby priority %u",
399 application_name, priority)));
400 }
401}
402
403/*
404 * Update the LSNs on each queue based upon our latest state. This
405 * implements a simple policy of first-valid-sync-standby-releases-waiter.
406 *
407 * Other policies are possible, which would change what we do here and
408 * perhaps also which information we store as well.
409 */
410void
411SyncRepReleaseWaiters(void)
412{
413 volatile WalSndCtlData *walsndctl = WalSndCtl;
414 XLogRecPtr writePtr;
415 XLogRecPtr flushPtr;
416 XLogRecPtr applyPtr;
417 bool got_recptr;
418 bool am_sync;
419 int numwrite = 0;
420 int numflush = 0;
421 int numapply = 0;
422
423 /*
424 * If this WALSender is serving a standby that is not on the list of
425 * potential sync standbys then we have nothing to do. If we are still
426 * starting up, still running base backup or the current flush position is
427 * still invalid, then leave quickly also. Streaming or stopping WAL
428 * senders are allowed to release waiters.
429 */
430 if (MyWalSnd->sync_standby_priority == 0 ||
431 (MyWalSnd->state != WALSNDSTATE_STREAMING &&
432 MyWalSnd->state != WALSNDSTATE_STOPPING) ||
433 XLogRecPtrIsInvalid(MyWalSnd->flush))
434 {
435 announce_next_takeover = true;
436 return;
437 }
438
439 /*
440 * We're a potential sync standby. Release waiters if there are enough
441 * sync standbys and we are considered as sync.
442 */
443 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
444
445 /*
446 * Check whether we are a sync standby or not, and calculate the synced
447 * positions among all sync standbys.
448 */
449 got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
450
451 /*
452 * If we are managing a sync standby, though we weren't prior to this,
453 * then announce we are now a sync standby.
454 */
455 if (announce_next_takeover && am_sync)
456 {
457 announce_next_takeover = false;
458
459 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
460 ereport(LOG,
461 (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
462 application_name, MyWalSnd->sync_standby_priority)));
463 else
464 ereport(LOG,
465 (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
466 application_name)));
467 }
468
469 /*
470 * If the number of sync standbys is less than requested or we aren't
471 * managing a sync standby then just leave.
472 */
473 if (!got_recptr || !am_sync)
474 {
475 LWLockRelease(SyncRepLock);
476 announce_next_takeover = !am_sync;
477 return;
478 }
479
480 /*
481 * Set the lsn first so that when we wake backends they will release up to
482 * this location.
483 */
484 if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
485 {
486 walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
487 numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
488 }
489 if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
490 {
491 walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
492 numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
493 }
494 if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
495 {
496 walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
497 numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
498 }
499
500 LWLockRelease(SyncRepLock);
501
502 elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
503 numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
504 numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
505 numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
506}
507
508/*
509 * Calculate the synced Write, Flush and Apply positions among sync standbys.
510 *
511 * Return false if the number of sync standbys is less than
512 * synchronous_standby_names specifies. Otherwise return true and
513 * store the positions into *writePtr, *flushPtr and *applyPtr.
514 *
515 * On return, *am_sync is set to true if this walsender is connecting to
516 * sync standby. Otherwise it's set to false.
517 */
518static bool
519SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
520 XLogRecPtr *applyPtr, bool *am_sync)
521{
522 List *sync_standbys;
523
524 *writePtr = InvalidXLogRecPtr;
525 *flushPtr = InvalidXLogRecPtr;
526 *applyPtr = InvalidXLogRecPtr;
527 *am_sync = false;
528
529 /* Get standbys that are considered as synchronous at this moment */
530 sync_standbys = SyncRepGetSyncStandbys(am_sync);
531
532 /*
533 * Quick exit if we are not managing a sync standby or there are not
534 * enough synchronous standbys.
535 */
536 if (!(*am_sync) ||
537 SyncRepConfig == NULL ||
538 list_length(sync_standbys) < SyncRepConfig->num_sync)
539 {
540 list_free(sync_standbys);
541 return false;
542 }
543
544 /*
545 * In a priority-based sync replication, the synced positions are the
546 * oldest ones among sync standbys. In a quorum-based, they are the Nth
547 * latest ones.
548 *
549 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
550 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
551 * because it's a bit more efficient.
552 *
553 * XXX If the numbers of current and requested sync standbys are the same,
554 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
555 * positions even in a quorum-based sync replication.
556 */
557 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
558 {
559 SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
560 sync_standbys);
561 }
562 else
563 {
564 SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
565 sync_standbys, SyncRepConfig->num_sync);
566 }
567
568 list_free(sync_standbys);
569 return true;
570}
571
572/*
573 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
574 */
575static void
576SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
577 XLogRecPtr *applyPtr, List *sync_standbys)
578{
579 ListCell *cell;
580
581 /*
582 * Scan through all sync standbys and calculate the oldest Write, Flush
583 * and Apply positions.
584 */
585 foreach(cell, sync_standbys)
586 {
587 WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
588 XLogRecPtr write;
589 XLogRecPtr flush;
590 XLogRecPtr apply;
591
592 SpinLockAcquire(&walsnd->mutex);
593 write = walsnd->write;
594 flush = walsnd->flush;
595 apply = walsnd->apply;
596 SpinLockRelease(&walsnd->mutex);
597
598 if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
599 *writePtr = write;
600 if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
601 *flushPtr = flush;
602 if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
603 *applyPtr = apply;
604 }
605}
606
607/*
608 * Calculate the Nth latest Write, Flush and Apply positions among sync
609 * standbys.
610 */
611static void
612SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
613 XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
614{
615 ListCell *cell;
616 XLogRecPtr *write_array;
617 XLogRecPtr *flush_array;
618 XLogRecPtr *apply_array;
619 int len;
620 int i = 0;
621
622 len = list_length(sync_standbys);
623 write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
624 flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
625 apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
626
627 foreach(cell, sync_standbys)
628 {
629 WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
630
631 SpinLockAcquire(&walsnd->mutex);
632 write_array[i] = walsnd->write;
633 flush_array[i] = walsnd->flush;
634 apply_array[i] = walsnd->apply;
635 SpinLockRelease(&walsnd->mutex);
636
637 i++;
638 }
639
640 /* Sort each array in descending order */
641 qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
642 qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
643 qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
644
645 /* Get Nth latest Write, Flush, Apply positions */
646 *writePtr = write_array[nth - 1];
647 *flushPtr = flush_array[nth - 1];
648 *applyPtr = apply_array[nth - 1];
649
650 pfree(write_array);
651 pfree(flush_array);
652 pfree(apply_array);
653}
654
655/*
656 * Compare lsn in order to sort array in descending order.
657 */
658static int
659cmp_lsn(const void *a, const void *b)
660{
661 XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
662 XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
663
664 if (lsn1 > lsn2)
665 return -1;
666 else if (lsn1 == lsn2)
667 return 0;
668 else
669 return 1;
670}
671
672/*
673 * Return the list of sync standbys, or NIL if no sync standby is connected.
674 *
675 * The caller must hold SyncRepLock.
676 *
677 * On return, *am_sync is set to true if this walsender is connecting to
678 * sync standby. Otherwise it's set to false.
679 */
680List *
681SyncRepGetSyncStandbys(bool *am_sync)
682{
683 /* Set default result */
684 if (am_sync != NULL)
685 *am_sync = false;
686
687 /* Quick exit if sync replication is not requested */
688 if (SyncRepConfig == NULL)
689 return NIL;
690
691 return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
692 SyncRepGetSyncStandbysPriority(am_sync) :
693 SyncRepGetSyncStandbysQuorum(am_sync);
694}
695
696/*
697 * Return the list of all the candidates for quorum sync standbys,
698 * or NIL if no such standby is connected.
699 *
700 * The caller must hold SyncRepLock. This function must be called only in
701 * a quorum-based sync replication.
702 *
703 * On return, *am_sync is set to true if this walsender is connecting to
704 * sync standby. Otherwise it's set to false.
705 */
706static List *
707SyncRepGetSyncStandbysQuorum(bool *am_sync)
708{
709 List *result = NIL;
710 int i;
711 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
712 * rearrangement */
713
714 Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
715
716 for (i = 0; i < max_wal_senders; i++)
717 {
718 XLogRecPtr flush;
719 WalSndState state;
720 int pid;
721
722 walsnd = &WalSndCtl->walsnds[i];
723
724 SpinLockAcquire(&walsnd->mutex);
725 pid = walsnd->pid;
726 flush = walsnd->flush;
727 state = walsnd->state;
728 SpinLockRelease(&walsnd->mutex);
729
730 /* Must be active */
731 if (pid == 0)
732 continue;
733
734 /* Must be streaming or stopping */
735 if (state != WALSNDSTATE_STREAMING &&
736 state != WALSNDSTATE_STOPPING)
737 continue;
738
739 /* Must be synchronous */
740 if (walsnd->sync_standby_priority == 0)
741 continue;
742
743 /* Must have a valid flush position */
744 if (XLogRecPtrIsInvalid(flush))
745 continue;
746
747 /*
748 * Consider this standby as a candidate for quorum sync standbys and
749 * append it to the result.
750 */
751 result = lappend_int(result, i);
752 if (am_sync != NULL && walsnd == MyWalSnd)
753 *am_sync = true;
754 }
755
756 return result;
757}
758
759/*
760 * Return the list of sync standbys chosen based on their priorities,
761 * or NIL if no sync standby is connected.
762 *
763 * If there are multiple standbys with the same priority,
764 * the first one found is selected preferentially.
765 *
766 * The caller must hold SyncRepLock. This function must be called only in
767 * a priority-based sync replication.
768 *
769 * On return, *am_sync is set to true if this walsender is connecting to
770 * sync standby. Otherwise it's set to false.
771 */
772static List *
773SyncRepGetSyncStandbysPriority(bool *am_sync)
774{
775 List *result = NIL;
776 List *pending = NIL;
777 int lowest_priority;
778 int next_highest_priority;
779 int this_priority;
780 int priority;
781 int i;
782 bool am_in_pending = false;
783 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
784 * rearrangement */
785
786 Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
787
788 lowest_priority = SyncRepConfig->nmembers;
789 next_highest_priority = lowest_priority + 1;
790
791 /*
792 * Find the sync standbys which have the highest priority (i.e, 1). Also
793 * store all the other potential sync standbys into the pending list, in
794 * order to scan it later and find other sync standbys from it quickly.
795 */
796 for (i = 0; i < max_wal_senders; i++)
797 {
798 XLogRecPtr flush;
799 WalSndState state;
800 int pid;
801
802 walsnd = &WalSndCtl->walsnds[i];
803
804 SpinLockAcquire(&walsnd->mutex);
805 pid = walsnd->pid;
806 flush = walsnd->flush;
807 state = walsnd->state;
808 SpinLockRelease(&walsnd->mutex);
809
810 /* Must be active */
811 if (pid == 0)
812 continue;
813
814 /* Must be streaming or stopping */
815 if (state != WALSNDSTATE_STREAMING &&
816 state != WALSNDSTATE_STOPPING)
817 continue;
818
819 /* Must be synchronous */
820 this_priority = walsnd->sync_standby_priority;
821 if (this_priority == 0)
822 continue;
823
824 /* Must have a valid flush position */
825 if (XLogRecPtrIsInvalid(flush))
826 continue;
827
828 /*
829 * If the priority is equal to 1, consider this standby as sync and
830 * append it to the result. Otherwise append this standby to the
831 * pending list to check if it's actually sync or not later.
832 */
833 if (this_priority == 1)
834 {
835 result = lappend_int(result, i);
836 if (am_sync != NULL && walsnd == MyWalSnd)
837 *am_sync = true;
838 if (list_length(result) == SyncRepConfig->num_sync)
839 {
840 list_free(pending);
841 return result; /* Exit if got enough sync standbys */
842 }
843 }
844 else
845 {
846 pending = lappend_int(pending, i);
847 if (am_sync != NULL && walsnd == MyWalSnd)
848 am_in_pending = true;
849
850 /*
851 * Track the highest priority among the standbys in the pending
852 * list, in order to use it as the starting priority for later
853 * scan of the list. This is useful to find quickly the sync
854 * standbys from the pending list later because we can skip
855 * unnecessary scans for the unused priorities.
856 */
857 if (this_priority < next_highest_priority)
858 next_highest_priority = this_priority;
859 }
860 }
861
862 /*
863 * Consider all pending standbys as sync if the number of them plus
864 * already-found sync ones is lower than the configuration requests.
865 */
866 if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
867 {
868 bool needfree = (result != NIL && pending != NIL);
869
870 /*
871 * Set *am_sync to true if this walsender is in the pending list
872 * because all pending standbys are considered as sync.
873 */
874 if (am_sync != NULL && !(*am_sync))
875 *am_sync = am_in_pending;
876
877 result = list_concat(result, pending);
878 if (needfree)
879 pfree(pending);
880 return result;
881 }
882
883 /*
884 * Find the sync standbys from the pending list.
885 */
886 priority = next_highest_priority;
887 while (priority <= lowest_priority)
888 {
889 ListCell *cell;
890 ListCell *prev = NULL;
891 ListCell *next;
892
893 next_highest_priority = lowest_priority + 1;
894
895 for (cell = list_head(pending); cell != NULL; cell = next)
896 {
897 i = lfirst_int(cell);
898 walsnd = &WalSndCtl->walsnds[i];
899
900 next = lnext(cell);
901
902 this_priority = walsnd->sync_standby_priority;
903 if (this_priority == priority)
904 {
905 result = lappend_int(result, i);
906 if (am_sync != NULL && walsnd == MyWalSnd)
907 *am_sync = true;
908
909 /*
910 * We should always exit here after the scan of pending list
911 * starts because we know that the list has enough elements to
912 * reach SyncRepConfig->num_sync.
913 */
914 if (list_length(result) == SyncRepConfig->num_sync)
915 {
916 list_free(pending);
917 return result; /* Exit if got enough sync standbys */
918 }
919
920 /*
921 * Remove the entry for this sync standby from the list to
922 * prevent us from looking at the same entry again.
923 */
924 pending = list_delete_cell(pending, cell, prev);
925
926 continue;
927 }
928
929 if (this_priority < next_highest_priority)
930 next_highest_priority = this_priority;
931
932 prev = cell;
933 }
934
935 priority = next_highest_priority;
936 }
937
938 /* never reached, but keep compiler quiet */
939 Assert(false);
940 return result;
941}
942
943/*
944 * Check if we are in the list of sync standbys, and if so, determine
945 * priority sequence. Return priority if set, or zero to indicate that
946 * we are not a potential sync standby.
947 *
948 * Compare the parameter SyncRepStandbyNames against the application_name
949 * for this WALSender, or allow any name if we find a wildcard "*".
950 */
951static int
952SyncRepGetStandbyPriority(void)
953{
954 const char *standby_name;
955 int priority;
956 bool found = false;
957
958 /*
959 * Since synchronous cascade replication is not allowed, we always set the
960 * priority of cascading walsender to zero.
961 */
962 if (am_cascading_walsender)
963 return 0;
964
965 if (!SyncStandbysDefined() || SyncRepConfig == NULL)
966 return 0;
967
968 standby_name = SyncRepConfig->member_names;
969 for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
970 {
971 if (pg_strcasecmp(standby_name, application_name) == 0 ||
972 strcmp(standby_name, "*") == 0)
973 {
974 found = true;
975 break;
976 }
977 standby_name += strlen(standby_name) + 1;
978 }
979
980 if (!found)
981 return 0;
982
983 /*
984 * In quorum-based sync replication, all the standbys in the list have the
985 * same priority, one.
986 */
987 return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
988}
989
990/*
991 * Walk the specified queue from head. Set the state of any backends that
992 * need to be woken, remove them from the queue, and then wake them.
993 * Pass all = true to wake whole queue; otherwise, just wake up to
994 * the walsender's LSN.
995 *
996 * Must hold SyncRepLock.
997 */
998static int
999SyncRepWakeQueue(bool all, int mode)
1000{
1001 volatile WalSndCtlData *walsndctl = WalSndCtl;
1002 PGPROC *proc = NULL;
1003 PGPROC *thisproc = NULL;
1004 int numprocs = 0;
1005
1006 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1007 Assert(SyncRepQueueIsOrderedByLSN(mode));
1008
1009 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1010 &(WalSndCtl->SyncRepQueue[mode]),
1011 offsetof(PGPROC, syncRepLinks));
1012
1013 while (proc)
1014 {
1015 /*
1016 * Assume the queue is ordered by LSN
1017 */
1018 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
1019 return numprocs;
1020
1021 /*
1022 * Move to next proc, so we can delete thisproc from the queue.
1023 * thisproc is valid, proc may be NULL after this.
1024 */
1025 thisproc = proc;
1026 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1027 &(proc->syncRepLinks),
1028 offsetof(PGPROC, syncRepLinks));
1029
1030 /*
1031 * Remove thisproc from queue.
1032 */
1033 SHMQueueDelete(&(thisproc->syncRepLinks));
1034
1035 /*
1036 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
1037 * make sure that it sees the queue link being removed before the
1038 * syncRepState change.
1039 */
1040 pg_write_barrier();
1041
1042 /*
1043 * Set state to complete; see SyncRepWaitForLSN() for discussion of
1044 * the various states.
1045 */
1046 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
1047
1048 /*
1049 * Wake only when we have set state and removed from queue.
1050 */
1051 SetLatch(&(thisproc->procLatch));
1052
1053 numprocs++;
1054 }
1055
1056 return numprocs;
1057}
1058
1059/*
1060 * The checkpointer calls this as needed to update the shared
1061 * sync_standbys_defined flag, so that backends don't remain permanently wedged
1062 * if synchronous_standby_names is unset. It's safe to check the current value
1063 * without the lock, because it's only ever updated by one process. But we
1064 * must take the lock to change it.
1065 */
1066void
1067SyncRepUpdateSyncStandbysDefined(void)
1068{
1069 bool sync_standbys_defined = SyncStandbysDefined();
1070
1071 if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
1072 {
1073 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
1074
1075 /*
1076 * If synchronous_standby_names has been reset to empty, it's futile
1077 * for backends to continue to waiting. Since the user no longer
1078 * wants synchronous replication, we'd better wake them up.
1079 */
1080 if (!sync_standbys_defined)
1081 {
1082 int i;
1083
1084 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
1085 SyncRepWakeQueue(true, i);
1086 }
1087
1088 /*
1089 * Only allow people to join the queue when there are synchronous
1090 * standbys defined. Without this interlock, there's a race
1091 * condition: we might wake up all the current waiters; then, some
1092 * backend that hasn't yet reloaded its config might go to sleep on
1093 * the queue (and never wake up). This prevents that.
1094 */
1095 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
1096
1097 LWLockRelease(SyncRepLock);
1098 }
1099}
1100
1101#ifdef USE_ASSERT_CHECKING
1102static bool
1103SyncRepQueueIsOrderedByLSN(int mode)
1104{
1105 PGPROC *proc = NULL;
1106 XLogRecPtr lastLSN;
1107
1108 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1109
1110 lastLSN = 0;
1111
1112 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1113 &(WalSndCtl->SyncRepQueue[mode]),
1114 offsetof(PGPROC, syncRepLinks));
1115
1116 while (proc)
1117 {
1118 /*
1119 * Check the queue is ordered by LSN and that multiple procs don't
1120 * have matching LSNs
1121 */
1122 if (proc->waitLSN <= lastLSN)
1123 return false;
1124
1125 lastLSN = proc->waitLSN;
1126
1127 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1128 &(proc->syncRepLinks),
1129 offsetof(PGPROC, syncRepLinks));
1130 }
1131
1132 return true;
1133}
1134#endif
1135
1136/*
1137 * ===========================================================
1138 * Synchronous Replication functions executed by any process
1139 * ===========================================================
1140 */
1141
1142bool
1143check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1144{
1145 if (*newval != NULL && (*newval)[0] != '\0')
1146 {
1147 int parse_rc;
1148 SyncRepConfigData *pconf;
1149
1150 /* Reset communication variables to ensure a fresh start */
1151 syncrep_parse_result = NULL;
1152 syncrep_parse_error_msg = NULL;
1153
1154 /* Parse the synchronous_standby_names string */
1155 syncrep_scanner_init(*newval);
1156 parse_rc = syncrep_yyparse();
1157 syncrep_scanner_finish();
1158
1159 if (parse_rc != 0 || syncrep_parse_result == NULL)
1160 {
1161 GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1162 if (syncrep_parse_error_msg)
1163 GUC_check_errdetail("%s", syncrep_parse_error_msg);
1164 else
1165 GUC_check_errdetail("synchronous_standby_names parser failed");
1166 return false;
1167 }
1168
1169 if (syncrep_parse_result->num_sync <= 0)
1170 {
1171 GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1172 syncrep_parse_result->num_sync);
1173 return false;
1174 }
1175
1176 /* GUC extra value must be malloc'd, not palloc'd */
1177 pconf = (SyncRepConfigData *)
1178 malloc(syncrep_parse_result->config_size);
1179 if (pconf == NULL)
1180 return false;
1181 memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1182
1183 *extra = (void *) pconf;
1184
1185 /*
1186 * We need not explicitly clean up syncrep_parse_result. It, and any
1187 * other cruft generated during parsing, will be freed when the
1188 * current memory context is deleted. (This code is generally run in
1189 * a short-lived context used for config file processing, so that will
1190 * not be very long.)
1191 */
1192 }
1193 else
1194 *extra = NULL;
1195
1196 return true;
1197}
1198
1199void
1200assign_synchronous_standby_names(const char *newval, void *extra)
1201{
1202 SyncRepConfig = (SyncRepConfigData *) extra;
1203}
1204
1205void
1206assign_synchronous_commit(int newval, void *extra)
1207{
1208 switch (newval)
1209 {
1210 case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1211 SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1212 break;
1213 case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1214 SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1215 break;
1216 case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1217 SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1218 break;
1219 default:
1220 SyncRepWaitMode = SYNC_REP_NO_WAIT;
1221 break;
1222 }
1223}
1224