1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * syncrep.c |
4 | * |
5 | * Synchronous replication is new as of PostgreSQL 9.1. |
6 | * |
7 | * If requested, transaction commits wait until their commit LSN are |
8 | * acknowledged by the synchronous standbys. |
9 | * |
10 | * This module contains the code for waiting and release of backends. |
11 | * All code in this module executes on the primary. The core streaming |
12 | * replication transport remains within WALreceiver/WALsender modules. |
13 | * |
14 | * The essence of this design is that it isolates all logic about |
15 | * waiting/releasing onto the primary. The primary defines which standbys |
16 | * it wishes to wait for. The standbys are completely unaware of the |
17 | * durability requirements of transactions on the primary, reducing the |
18 | * complexity of the code and streamlining both standby operations and |
19 | * network bandwidth because there is no requirement to ship |
20 | * per-transaction state information. |
21 | * |
22 | * Replication is either synchronous or not synchronous (async). If it is |
23 | * async, we just fastpath out of here. If it is sync, then we wait for |
24 | * the write, flush or apply location on the standby before releasing |
25 | * the waiting backend. Further complexity in that interaction is |
26 | * expected in later releases. |
27 | * |
28 | * The best performing way to manage the waiting backends is to have a |
29 | * single ordered queue of waiting backends, so that we can avoid |
30 | * searching the through all waiters each time we receive a reply. |
31 | * |
32 | * In 9.5 or before only a single standby could be considered as |
33 | * synchronous. In 9.6 we support a priority-based multiple synchronous |
34 | * standbys. In 10.0 a quorum-based multiple synchronous standbys is also |
35 | * supported. The number of synchronous standbys that transactions |
36 | * must wait for replies from is specified in synchronous_standby_names. |
37 | * This parameter also specifies a list of standby names and the method |
38 | * (FIRST and ANY) to choose synchronous standbys from the listed ones. |
39 | * |
40 | * The method FIRST specifies a priority-based synchronous replication |
41 | * and makes transaction commits wait until their WAL records are |
42 | * replicated to the requested number of synchronous standbys chosen based |
43 | * on their priorities. The standbys whose names appear earlier in the list |
44 | * are given higher priority and will be considered as synchronous. |
45 | * Other standby servers appearing later in this list represent potential |
46 | * synchronous standbys. If any of the current synchronous standbys |
47 | * disconnects for whatever reason, it will be replaced immediately with |
48 | * the next-highest-priority standby. |
49 | * |
50 | * The method ANY specifies a quorum-based synchronous replication |
51 | * and makes transaction commits wait until their WAL records are |
52 | * replicated to at least the requested number of synchronous standbys |
53 | * in the list. All the standbys appearing in the list are considered as |
54 | * candidates for quorum synchronous standbys. |
55 | * |
56 | * If neither FIRST nor ANY is specified, FIRST is used as the method. |
57 | * This is for backward compatibility with 9.6 or before where only a |
58 | * priority-based sync replication was supported. |
59 | * |
60 | * Before the standbys chosen from synchronous_standby_names can |
61 | * become the synchronous standbys they must have caught up with |
62 | * the primary; that may take some time. Once caught up, |
63 | * the standbys which are considered as synchronous at that moment |
64 | * will release waiters from the queue. |
65 | * |
66 | * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group |
67 | * |
68 | * IDENTIFICATION |
69 | * src/backend/replication/syncrep.c |
70 | * |
71 | *------------------------------------------------------------------------- |
72 | */ |
73 | #include "postgres.h" |
74 | |
75 | #include <unistd.h> |
76 | |
77 | #include "access/xact.h" |
78 | #include "miscadmin.h" |
79 | #include "pgstat.h" |
80 | #include "replication/syncrep.h" |
81 | #include "replication/walsender.h" |
82 | #include "replication/walsender_private.h" |
83 | #include "storage/pmsignal.h" |
84 | #include "storage/proc.h" |
85 | #include "tcop/tcopprot.h" |
86 | #include "utils/builtins.h" |
87 | #include "utils/ps_status.h" |
88 | |
89 | /* User-settable parameters for sync rep */ |
90 | char *SyncRepStandbyNames; |
91 | |
92 | #define SyncStandbysDefined() \ |
93 | (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') |
94 | |
95 | static bool announce_next_takeover = true; |
96 | |
97 | SyncRepConfigData *SyncRepConfig = NULL; |
98 | static int SyncRepWaitMode = SYNC_REP_NO_WAIT; |
99 | |
100 | static void SyncRepQueueInsert(int mode); |
101 | static void SyncRepCancelWait(void); |
102 | static int SyncRepWakeQueue(bool all, int mode); |
103 | |
104 | static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, |
105 | XLogRecPtr *flushPtr, |
106 | XLogRecPtr *applyPtr, |
107 | bool *am_sync); |
108 | static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, |
109 | XLogRecPtr *flushPtr, |
110 | XLogRecPtr *applyPtr, |
111 | List *sync_standbys); |
112 | static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, |
113 | XLogRecPtr *flushPtr, |
114 | XLogRecPtr *applyPtr, |
115 | List *sync_standbys, uint8 nth); |
116 | static int SyncRepGetStandbyPriority(void); |
117 | static List *SyncRepGetSyncStandbysPriority(bool *am_sync); |
118 | static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); |
119 | static int cmp_lsn(const void *a, const void *b); |
120 | |
121 | #ifdef USE_ASSERT_CHECKING |
122 | static bool SyncRepQueueIsOrderedByLSN(int mode); |
123 | #endif |
124 | |
125 | /* |
126 | * =========================================================== |
127 | * Synchronous Replication functions for normal user backends |
128 | * =========================================================== |
129 | */ |
130 | |
131 | /* |
132 | * Wait for synchronous replication, if requested by user. |
133 | * |
134 | * Initially backends start in state SYNC_REP_NOT_WAITING and then |
135 | * change that state to SYNC_REP_WAITING before adding ourselves |
136 | * to the wait queue. During SyncRepWakeQueue() a WALSender changes |
137 | * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. |
138 | * This backend then resets its state to SYNC_REP_NOT_WAITING. |
139 | * |
140 | * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN |
141 | * represents a commit record. If it doesn't, then we wait only for the WAL |
142 | * to be flushed if synchronous_commit is set to the higher level of |
143 | * remote_apply, because only commit records provide apply feedback. |
144 | */ |
145 | void |
146 | SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) |
147 | { |
148 | char *new_status = NULL; |
149 | const char *old_status; |
150 | int mode; |
151 | |
152 | /* Cap the level for anything other than commit to remote flush only. */ |
153 | if (commit) |
154 | mode = SyncRepWaitMode; |
155 | else |
156 | mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); |
157 | |
158 | /* |
159 | * Fast exit if user has not requested sync replication. |
160 | */ |
161 | if (!SyncRepRequested()) |
162 | return; |
163 | |
164 | Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); |
165 | Assert(WalSndCtl != NULL); |
166 | |
167 | LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); |
168 | Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING); |
169 | |
170 | /* |
171 | * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not |
172 | * set. See SyncRepUpdateSyncStandbysDefined. |
173 | * |
174 | * Also check that the standby hasn't already replied. Unlikely race |
175 | * condition but we'll be fetching that cache line anyway so it's likely |
176 | * to be a low cost check. |
177 | */ |
178 | if (!WalSndCtl->sync_standbys_defined || |
179 | lsn <= WalSndCtl->lsn[mode]) |
180 | { |
181 | LWLockRelease(SyncRepLock); |
182 | return; |
183 | } |
184 | |
185 | /* |
186 | * Set our waitLSN so WALSender will know when to wake us, and add |
187 | * ourselves to the queue. |
188 | */ |
189 | MyProc->waitLSN = lsn; |
190 | MyProc->syncRepState = SYNC_REP_WAITING; |
191 | SyncRepQueueInsert(mode); |
192 | Assert(SyncRepQueueIsOrderedByLSN(mode)); |
193 | LWLockRelease(SyncRepLock); |
194 | |
195 | /* Alter ps display to show waiting for sync rep. */ |
196 | if (update_process_title) |
197 | { |
198 | int len; |
199 | |
200 | old_status = get_ps_display(&len); |
201 | new_status = (char *) palloc(len + 32 + 1); |
202 | memcpy(new_status, old_status, len); |
203 | sprintf(new_status + len, " waiting for %X/%X" , |
204 | (uint32) (lsn >> 32), (uint32) lsn); |
205 | set_ps_display(new_status, false); |
206 | new_status[len] = '\0'; /* truncate off " waiting ..." */ |
207 | } |
208 | |
209 | /* |
210 | * Wait for specified LSN to be confirmed. |
211 | * |
212 | * Each proc has its own wait latch, so we perform a normal latch |
213 | * check/wait loop here. |
214 | */ |
215 | for (;;) |
216 | { |
217 | int rc; |
218 | |
219 | /* Must reset the latch before testing state. */ |
220 | ResetLatch(MyLatch); |
221 | |
222 | /* |
223 | * Acquiring the lock is not needed, the latch ensures proper |
224 | * barriers. If it looks like we're done, we must really be done, |
225 | * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE, |
226 | * it will never update it again, so we can't be seeing a stale value |
227 | * in that case. |
228 | */ |
229 | if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE) |
230 | break; |
231 | |
232 | /* |
233 | * If a wait for synchronous replication is pending, we can neither |
234 | * acknowledge the commit nor raise ERROR or FATAL. The latter would |
235 | * lead the client to believe that the transaction aborted, which is |
236 | * not true: it's already committed locally. The former is no good |
237 | * either: the client has requested synchronous replication, and is |
238 | * entitled to assume that an acknowledged commit is also replicated, |
239 | * which might not be true. So in this case we issue a WARNING (which |
240 | * some clients may be able to interpret) and shut off further output. |
241 | * We do NOT reset ProcDiePending, so that the process will die after |
242 | * the commit is cleaned up. |
243 | */ |
244 | if (ProcDiePending) |
245 | { |
246 | ereport(WARNING, |
247 | (errcode(ERRCODE_ADMIN_SHUTDOWN), |
248 | errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command" ), |
249 | errdetail("The transaction has already committed locally, but might not have been replicated to the standby." ))); |
250 | whereToSendOutput = DestNone; |
251 | SyncRepCancelWait(); |
252 | break; |
253 | } |
254 | |
255 | /* |
256 | * It's unclear what to do if a query cancel interrupt arrives. We |
257 | * can't actually abort at this point, but ignoring the interrupt |
258 | * altogether is not helpful, so we just terminate the wait with a |
259 | * suitable warning. |
260 | */ |
261 | if (QueryCancelPending) |
262 | { |
263 | QueryCancelPending = false; |
264 | ereport(WARNING, |
265 | (errmsg("canceling wait for synchronous replication due to user request" ), |
266 | errdetail("The transaction has already committed locally, but might not have been replicated to the standby." ))); |
267 | SyncRepCancelWait(); |
268 | break; |
269 | } |
270 | |
271 | /* |
272 | * Wait on latch. Any condition that should wake us up will set the |
273 | * latch, so no need for timeout. |
274 | */ |
275 | rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, |
276 | WAIT_EVENT_SYNC_REP); |
277 | |
278 | /* |
279 | * If the postmaster dies, we'll probably never get an acknowledgment, |
280 | * because all the wal sender processes will exit. So just bail out. |
281 | */ |
282 | if (rc & WL_POSTMASTER_DEATH) |
283 | { |
284 | ProcDiePending = true; |
285 | whereToSendOutput = DestNone; |
286 | SyncRepCancelWait(); |
287 | break; |
288 | } |
289 | } |
290 | |
291 | /* |
292 | * WalSender has checked our LSN and has removed us from queue. Clean up |
293 | * state and leave. It's OK to reset these shared memory fields without |
294 | * holding SyncRepLock, because any walsenders will ignore us anyway when |
295 | * we're not on the queue. We need a read barrier to make sure we see the |
296 | * changes to the queue link (this might be unnecessary without |
297 | * assertions, but better safe than sorry). |
298 | */ |
299 | pg_read_barrier(); |
300 | Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); |
301 | MyProc->syncRepState = SYNC_REP_NOT_WAITING; |
302 | MyProc->waitLSN = 0; |
303 | |
304 | if (new_status) |
305 | { |
306 | /* Reset ps display */ |
307 | set_ps_display(new_status, false); |
308 | pfree(new_status); |
309 | } |
310 | } |
311 | |
312 | /* |
313 | * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant. |
314 | * |
315 | * Usually we will go at tail of queue, though it's possible that we arrive |
316 | * here out of order, so start at tail and work back to insertion point. |
317 | */ |
318 | static void |
319 | SyncRepQueueInsert(int mode) |
320 | { |
321 | PGPROC *proc; |
322 | |
323 | Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); |
324 | proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), |
325 | &(WalSndCtl->SyncRepQueue[mode]), |
326 | offsetof(PGPROC, syncRepLinks)); |
327 | |
328 | while (proc) |
329 | { |
330 | /* |
331 | * Stop at the queue element that we should after to ensure the queue |
332 | * is ordered by LSN. |
333 | */ |
334 | if (proc->waitLSN < MyProc->waitLSN) |
335 | break; |
336 | |
337 | proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), |
338 | &(proc->syncRepLinks), |
339 | offsetof(PGPROC, syncRepLinks)); |
340 | } |
341 | |
342 | if (proc) |
343 | SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); |
344 | else |
345 | SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); |
346 | } |
347 | |
348 | /* |
349 | * Acquire SyncRepLock and cancel any wait currently in progress. |
350 | */ |
351 | static void |
352 | SyncRepCancelWait(void) |
353 | { |
354 | LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); |
355 | if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) |
356 | SHMQueueDelete(&(MyProc->syncRepLinks)); |
357 | MyProc->syncRepState = SYNC_REP_NOT_WAITING; |
358 | LWLockRelease(SyncRepLock); |
359 | } |
360 | |
361 | void |
362 | SyncRepCleanupAtProcExit(void) |
363 | { |
364 | if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) |
365 | { |
366 | LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); |
367 | SHMQueueDelete(&(MyProc->syncRepLinks)); |
368 | LWLockRelease(SyncRepLock); |
369 | } |
370 | } |
371 | |
372 | /* |
373 | * =========================================================== |
374 | * Synchronous Replication functions for wal sender processes |
375 | * =========================================================== |
376 | */ |
377 | |
378 | /* |
379 | * Take any action required to initialise sync rep state from config |
380 | * data. Called at WALSender startup and after each SIGHUP. |
381 | */ |
382 | void |
383 | SyncRepInitConfig(void) |
384 | { |
385 | int priority; |
386 | |
387 | /* |
388 | * Determine if we are a potential sync standby and remember the result |
389 | * for handling replies from standby. |
390 | */ |
391 | priority = SyncRepGetStandbyPriority(); |
392 | if (MyWalSnd->sync_standby_priority != priority) |
393 | { |
394 | LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); |
395 | MyWalSnd->sync_standby_priority = priority; |
396 | LWLockRelease(SyncRepLock); |
397 | ereport(DEBUG1, |
398 | (errmsg("standby \"%s\" now has synchronous standby priority %u" , |
399 | application_name, priority))); |
400 | } |
401 | } |
402 | |
403 | /* |
404 | * Update the LSNs on each queue based upon our latest state. This |
405 | * implements a simple policy of first-valid-sync-standby-releases-waiter. |
406 | * |
407 | * Other policies are possible, which would change what we do here and |
408 | * perhaps also which information we store as well. |
409 | */ |
410 | void |
411 | SyncRepReleaseWaiters(void) |
412 | { |
413 | volatile WalSndCtlData *walsndctl = WalSndCtl; |
414 | XLogRecPtr writePtr; |
415 | XLogRecPtr flushPtr; |
416 | XLogRecPtr applyPtr; |
417 | bool got_recptr; |
418 | bool am_sync; |
419 | int numwrite = 0; |
420 | int numflush = 0; |
421 | int numapply = 0; |
422 | |
423 | /* |
424 | * If this WALSender is serving a standby that is not on the list of |
425 | * potential sync standbys then we have nothing to do. If we are still |
426 | * starting up, still running base backup or the current flush position is |
427 | * still invalid, then leave quickly also. Streaming or stopping WAL |
428 | * senders are allowed to release waiters. |
429 | */ |
430 | if (MyWalSnd->sync_standby_priority == 0 || |
431 | (MyWalSnd->state != WALSNDSTATE_STREAMING && |
432 | MyWalSnd->state != WALSNDSTATE_STOPPING) || |
433 | XLogRecPtrIsInvalid(MyWalSnd->flush)) |
434 | { |
435 | announce_next_takeover = true; |
436 | return; |
437 | } |
438 | |
439 | /* |
440 | * We're a potential sync standby. Release waiters if there are enough |
441 | * sync standbys and we are considered as sync. |
442 | */ |
443 | LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); |
444 | |
445 | /* |
446 | * Check whether we are a sync standby or not, and calculate the synced |
447 | * positions among all sync standbys. |
448 | */ |
449 | got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); |
450 | |
451 | /* |
452 | * If we are managing a sync standby, though we weren't prior to this, |
453 | * then announce we are now a sync standby. |
454 | */ |
455 | if (announce_next_takeover && am_sync) |
456 | { |
457 | announce_next_takeover = false; |
458 | |
459 | if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) |
460 | ereport(LOG, |
461 | (errmsg("standby \"%s\" is now a synchronous standby with priority %u" , |
462 | application_name, MyWalSnd->sync_standby_priority))); |
463 | else |
464 | ereport(LOG, |
465 | (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby" , |
466 | application_name))); |
467 | } |
468 | |
469 | /* |
470 | * If the number of sync standbys is less than requested or we aren't |
471 | * managing a sync standby then just leave. |
472 | */ |
473 | if (!got_recptr || !am_sync) |
474 | { |
475 | LWLockRelease(SyncRepLock); |
476 | announce_next_takeover = !am_sync; |
477 | return; |
478 | } |
479 | |
480 | /* |
481 | * Set the lsn first so that when we wake backends they will release up to |
482 | * this location. |
483 | */ |
484 | if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr) |
485 | { |
486 | walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr; |
487 | numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); |
488 | } |
489 | if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr) |
490 | { |
491 | walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr; |
492 | numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); |
493 | } |
494 | if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr) |
495 | { |
496 | walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr; |
497 | numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); |
498 | } |
499 | |
500 | LWLockRelease(SyncRepLock); |
501 | |
502 | elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X" , |
503 | numwrite, (uint32) (writePtr >> 32), (uint32) writePtr, |
504 | numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr, |
505 | numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr); |
506 | } |
507 | |
508 | /* |
509 | * Calculate the synced Write, Flush and Apply positions among sync standbys. |
510 | * |
511 | * Return false if the number of sync standbys is less than |
512 | * synchronous_standby_names specifies. Otherwise return true and |
513 | * store the positions into *writePtr, *flushPtr and *applyPtr. |
514 | * |
515 | * On return, *am_sync is set to true if this walsender is connecting to |
516 | * sync standby. Otherwise it's set to false. |
517 | */ |
518 | static bool |
519 | SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, |
520 | XLogRecPtr *applyPtr, bool *am_sync) |
521 | { |
522 | List *sync_standbys; |
523 | |
524 | *writePtr = InvalidXLogRecPtr; |
525 | *flushPtr = InvalidXLogRecPtr; |
526 | *applyPtr = InvalidXLogRecPtr; |
527 | *am_sync = false; |
528 | |
529 | /* Get standbys that are considered as synchronous at this moment */ |
530 | sync_standbys = SyncRepGetSyncStandbys(am_sync); |
531 | |
532 | /* |
533 | * Quick exit if we are not managing a sync standby or there are not |
534 | * enough synchronous standbys. |
535 | */ |
536 | if (!(*am_sync) || |
537 | SyncRepConfig == NULL || |
538 | list_length(sync_standbys) < SyncRepConfig->num_sync) |
539 | { |
540 | list_free(sync_standbys); |
541 | return false; |
542 | } |
543 | |
544 | /* |
545 | * In a priority-based sync replication, the synced positions are the |
546 | * oldest ones among sync standbys. In a quorum-based, they are the Nth |
547 | * latest ones. |
548 | * |
549 | * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest |
550 | * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation |
551 | * because it's a bit more efficient. |
552 | * |
553 | * XXX If the numbers of current and requested sync standbys are the same, |
554 | * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced |
555 | * positions even in a quorum-based sync replication. |
556 | */ |
557 | if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) |
558 | { |
559 | SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, |
560 | sync_standbys); |
561 | } |
562 | else |
563 | { |
564 | SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, |
565 | sync_standbys, SyncRepConfig->num_sync); |
566 | } |
567 | |
568 | list_free(sync_standbys); |
569 | return true; |
570 | } |
571 | |
572 | /* |
573 | * Calculate the oldest Write, Flush and Apply positions among sync standbys. |
574 | */ |
575 | static void |
576 | SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, |
577 | XLogRecPtr *applyPtr, List *sync_standbys) |
578 | { |
579 | ListCell *cell; |
580 | |
581 | /* |
582 | * Scan through all sync standbys and calculate the oldest Write, Flush |
583 | * and Apply positions. |
584 | */ |
585 | foreach(cell, sync_standbys) |
586 | { |
587 | WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; |
588 | XLogRecPtr write; |
589 | XLogRecPtr flush; |
590 | XLogRecPtr apply; |
591 | |
592 | SpinLockAcquire(&walsnd->mutex); |
593 | write = walsnd->write; |
594 | flush = walsnd->flush; |
595 | apply = walsnd->apply; |
596 | SpinLockRelease(&walsnd->mutex); |
597 | |
598 | if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) |
599 | *writePtr = write; |
600 | if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush) |
601 | *flushPtr = flush; |
602 | if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) |
603 | *applyPtr = apply; |
604 | } |
605 | } |
606 | |
607 | /* |
608 | * Calculate the Nth latest Write, Flush and Apply positions among sync |
609 | * standbys. |
610 | */ |
611 | static void |
612 | SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, |
613 | XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) |
614 | { |
615 | ListCell *cell; |
616 | XLogRecPtr *write_array; |
617 | XLogRecPtr *flush_array; |
618 | XLogRecPtr *apply_array; |
619 | int len; |
620 | int i = 0; |
621 | |
622 | len = list_length(sync_standbys); |
623 | write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); |
624 | flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); |
625 | apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); |
626 | |
627 | foreach(cell, sync_standbys) |
628 | { |
629 | WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; |
630 | |
631 | SpinLockAcquire(&walsnd->mutex); |
632 | write_array[i] = walsnd->write; |
633 | flush_array[i] = walsnd->flush; |
634 | apply_array[i] = walsnd->apply; |
635 | SpinLockRelease(&walsnd->mutex); |
636 | |
637 | i++; |
638 | } |
639 | |
640 | /* Sort each array in descending order */ |
641 | qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); |
642 | qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); |
643 | qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); |
644 | |
645 | /* Get Nth latest Write, Flush, Apply positions */ |
646 | *writePtr = write_array[nth - 1]; |
647 | *flushPtr = flush_array[nth - 1]; |
648 | *applyPtr = apply_array[nth - 1]; |
649 | |
650 | pfree(write_array); |
651 | pfree(flush_array); |
652 | pfree(apply_array); |
653 | } |
654 | |
655 | /* |
656 | * Compare lsn in order to sort array in descending order. |
657 | */ |
658 | static int |
659 | cmp_lsn(const void *a, const void *b) |
660 | { |
661 | XLogRecPtr lsn1 = *((const XLogRecPtr *) a); |
662 | XLogRecPtr lsn2 = *((const XLogRecPtr *) b); |
663 | |
664 | if (lsn1 > lsn2) |
665 | return -1; |
666 | else if (lsn1 == lsn2) |
667 | return 0; |
668 | else |
669 | return 1; |
670 | } |
671 | |
672 | /* |
673 | * Return the list of sync standbys, or NIL if no sync standby is connected. |
674 | * |
675 | * The caller must hold SyncRepLock. |
676 | * |
677 | * On return, *am_sync is set to true if this walsender is connecting to |
678 | * sync standby. Otherwise it's set to false. |
679 | */ |
680 | List * |
681 | SyncRepGetSyncStandbys(bool *am_sync) |
682 | { |
683 | /* Set default result */ |
684 | if (am_sync != NULL) |
685 | *am_sync = false; |
686 | |
687 | /* Quick exit if sync replication is not requested */ |
688 | if (SyncRepConfig == NULL) |
689 | return NIL; |
690 | |
691 | return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? |
692 | SyncRepGetSyncStandbysPriority(am_sync) : |
693 | SyncRepGetSyncStandbysQuorum(am_sync); |
694 | } |
695 | |
696 | /* |
697 | * Return the list of all the candidates for quorum sync standbys, |
698 | * or NIL if no such standby is connected. |
699 | * |
700 | * The caller must hold SyncRepLock. This function must be called only in |
701 | * a quorum-based sync replication. |
702 | * |
703 | * On return, *am_sync is set to true if this walsender is connecting to |
704 | * sync standby. Otherwise it's set to false. |
705 | */ |
706 | static List * |
707 | SyncRepGetSyncStandbysQuorum(bool *am_sync) |
708 | { |
709 | List *result = NIL; |
710 | int i; |
711 | volatile WalSnd *walsnd; /* Use volatile pointer to prevent code |
712 | * rearrangement */ |
713 | |
714 | Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); |
715 | |
716 | for (i = 0; i < max_wal_senders; i++) |
717 | { |
718 | XLogRecPtr flush; |
719 | WalSndState state; |
720 | int pid; |
721 | |
722 | walsnd = &WalSndCtl->walsnds[i]; |
723 | |
724 | SpinLockAcquire(&walsnd->mutex); |
725 | pid = walsnd->pid; |
726 | flush = walsnd->flush; |
727 | state = walsnd->state; |
728 | SpinLockRelease(&walsnd->mutex); |
729 | |
730 | /* Must be active */ |
731 | if (pid == 0) |
732 | continue; |
733 | |
734 | /* Must be streaming or stopping */ |
735 | if (state != WALSNDSTATE_STREAMING && |
736 | state != WALSNDSTATE_STOPPING) |
737 | continue; |
738 | |
739 | /* Must be synchronous */ |
740 | if (walsnd->sync_standby_priority == 0) |
741 | continue; |
742 | |
743 | /* Must have a valid flush position */ |
744 | if (XLogRecPtrIsInvalid(flush)) |
745 | continue; |
746 | |
747 | /* |
748 | * Consider this standby as a candidate for quorum sync standbys and |
749 | * append it to the result. |
750 | */ |
751 | result = lappend_int(result, i); |
752 | if (am_sync != NULL && walsnd == MyWalSnd) |
753 | *am_sync = true; |
754 | } |
755 | |
756 | return result; |
757 | } |
758 | |
759 | /* |
760 | * Return the list of sync standbys chosen based on their priorities, |
761 | * or NIL if no sync standby is connected. |
762 | * |
763 | * If there are multiple standbys with the same priority, |
764 | * the first one found is selected preferentially. |
765 | * |
766 | * The caller must hold SyncRepLock. This function must be called only in |
767 | * a priority-based sync replication. |
768 | * |
769 | * On return, *am_sync is set to true if this walsender is connecting to |
770 | * sync standby. Otherwise it's set to false. |
771 | */ |
772 | static List * |
773 | SyncRepGetSyncStandbysPriority(bool *am_sync) |
774 | { |
775 | List *result = NIL; |
776 | List *pending = NIL; |
777 | int lowest_priority; |
778 | int next_highest_priority; |
779 | int this_priority; |
780 | int priority; |
781 | int i; |
782 | bool am_in_pending = false; |
783 | volatile WalSnd *walsnd; /* Use volatile pointer to prevent code |
784 | * rearrangement */ |
785 | |
786 | Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); |
787 | |
788 | lowest_priority = SyncRepConfig->nmembers; |
789 | next_highest_priority = lowest_priority + 1; |
790 | |
791 | /* |
792 | * Find the sync standbys which have the highest priority (i.e, 1). Also |
793 | * store all the other potential sync standbys into the pending list, in |
794 | * order to scan it later and find other sync standbys from it quickly. |
795 | */ |
796 | for (i = 0; i < max_wal_senders; i++) |
797 | { |
798 | XLogRecPtr flush; |
799 | WalSndState state; |
800 | int pid; |
801 | |
802 | walsnd = &WalSndCtl->walsnds[i]; |
803 | |
804 | SpinLockAcquire(&walsnd->mutex); |
805 | pid = walsnd->pid; |
806 | flush = walsnd->flush; |
807 | state = walsnd->state; |
808 | SpinLockRelease(&walsnd->mutex); |
809 | |
810 | /* Must be active */ |
811 | if (pid == 0) |
812 | continue; |
813 | |
814 | /* Must be streaming or stopping */ |
815 | if (state != WALSNDSTATE_STREAMING && |
816 | state != WALSNDSTATE_STOPPING) |
817 | continue; |
818 | |
819 | /* Must be synchronous */ |
820 | this_priority = walsnd->sync_standby_priority; |
821 | if (this_priority == 0) |
822 | continue; |
823 | |
824 | /* Must have a valid flush position */ |
825 | if (XLogRecPtrIsInvalid(flush)) |
826 | continue; |
827 | |
828 | /* |
829 | * If the priority is equal to 1, consider this standby as sync and |
830 | * append it to the result. Otherwise append this standby to the |
831 | * pending list to check if it's actually sync or not later. |
832 | */ |
833 | if (this_priority == 1) |
834 | { |
835 | result = lappend_int(result, i); |
836 | if (am_sync != NULL && walsnd == MyWalSnd) |
837 | *am_sync = true; |
838 | if (list_length(result) == SyncRepConfig->num_sync) |
839 | { |
840 | list_free(pending); |
841 | return result; /* Exit if got enough sync standbys */ |
842 | } |
843 | } |
844 | else |
845 | { |
846 | pending = lappend_int(pending, i); |
847 | if (am_sync != NULL && walsnd == MyWalSnd) |
848 | am_in_pending = true; |
849 | |
850 | /* |
851 | * Track the highest priority among the standbys in the pending |
852 | * list, in order to use it as the starting priority for later |
853 | * scan of the list. This is useful to find quickly the sync |
854 | * standbys from the pending list later because we can skip |
855 | * unnecessary scans for the unused priorities. |
856 | */ |
857 | if (this_priority < next_highest_priority) |
858 | next_highest_priority = this_priority; |
859 | } |
860 | } |
861 | |
862 | /* |
863 | * Consider all pending standbys as sync if the number of them plus |
864 | * already-found sync ones is lower than the configuration requests. |
865 | */ |
866 | if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) |
867 | { |
868 | bool needfree = (result != NIL && pending != NIL); |
869 | |
870 | /* |
871 | * Set *am_sync to true if this walsender is in the pending list |
872 | * because all pending standbys are considered as sync. |
873 | */ |
874 | if (am_sync != NULL && !(*am_sync)) |
875 | *am_sync = am_in_pending; |
876 | |
877 | result = list_concat(result, pending); |
878 | if (needfree) |
879 | pfree(pending); |
880 | return result; |
881 | } |
882 | |
883 | /* |
884 | * Find the sync standbys from the pending list. |
885 | */ |
886 | priority = next_highest_priority; |
887 | while (priority <= lowest_priority) |
888 | { |
889 | ListCell *cell; |
890 | ListCell *prev = NULL; |
891 | ListCell *next; |
892 | |
893 | next_highest_priority = lowest_priority + 1; |
894 | |
895 | for (cell = list_head(pending); cell != NULL; cell = next) |
896 | { |
897 | i = lfirst_int(cell); |
898 | walsnd = &WalSndCtl->walsnds[i]; |
899 | |
900 | next = lnext(cell); |
901 | |
902 | this_priority = walsnd->sync_standby_priority; |
903 | if (this_priority == priority) |
904 | { |
905 | result = lappend_int(result, i); |
906 | if (am_sync != NULL && walsnd == MyWalSnd) |
907 | *am_sync = true; |
908 | |
909 | /* |
910 | * We should always exit here after the scan of pending list |
911 | * starts because we know that the list has enough elements to |
912 | * reach SyncRepConfig->num_sync. |
913 | */ |
914 | if (list_length(result) == SyncRepConfig->num_sync) |
915 | { |
916 | list_free(pending); |
917 | return result; /* Exit if got enough sync standbys */ |
918 | } |
919 | |
920 | /* |
921 | * Remove the entry for this sync standby from the list to |
922 | * prevent us from looking at the same entry again. |
923 | */ |
924 | pending = list_delete_cell(pending, cell, prev); |
925 | |
926 | continue; |
927 | } |
928 | |
929 | if (this_priority < next_highest_priority) |
930 | next_highest_priority = this_priority; |
931 | |
932 | prev = cell; |
933 | } |
934 | |
935 | priority = next_highest_priority; |
936 | } |
937 | |
938 | /* never reached, but keep compiler quiet */ |
939 | Assert(false); |
940 | return result; |
941 | } |
942 | |
943 | /* |
944 | * Check if we are in the list of sync standbys, and if so, determine |
945 | * priority sequence. Return priority if set, or zero to indicate that |
946 | * we are not a potential sync standby. |
947 | * |
948 | * Compare the parameter SyncRepStandbyNames against the application_name |
949 | * for this WALSender, or allow any name if we find a wildcard "*". |
950 | */ |
951 | static int |
952 | SyncRepGetStandbyPriority(void) |
953 | { |
954 | const char *standby_name; |
955 | int priority; |
956 | bool found = false; |
957 | |
958 | /* |
959 | * Since synchronous cascade replication is not allowed, we always set the |
960 | * priority of cascading walsender to zero. |
961 | */ |
962 | if (am_cascading_walsender) |
963 | return 0; |
964 | |
965 | if (!SyncStandbysDefined() || SyncRepConfig == NULL) |
966 | return 0; |
967 | |
968 | standby_name = SyncRepConfig->member_names; |
969 | for (priority = 1; priority <= SyncRepConfig->nmembers; priority++) |
970 | { |
971 | if (pg_strcasecmp(standby_name, application_name) == 0 || |
972 | strcmp(standby_name, "*" ) == 0) |
973 | { |
974 | found = true; |
975 | break; |
976 | } |
977 | standby_name += strlen(standby_name) + 1; |
978 | } |
979 | |
980 | if (!found) |
981 | return 0; |
982 | |
983 | /* |
984 | * In quorum-based sync replication, all the standbys in the list have the |
985 | * same priority, one. |
986 | */ |
987 | return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; |
988 | } |
989 | |
990 | /* |
991 | * Walk the specified queue from head. Set the state of any backends that |
992 | * need to be woken, remove them from the queue, and then wake them. |
993 | * Pass all = true to wake whole queue; otherwise, just wake up to |
994 | * the walsender's LSN. |
995 | * |
996 | * Must hold SyncRepLock. |
997 | */ |
998 | static int |
999 | SyncRepWakeQueue(bool all, int mode) |
1000 | { |
1001 | volatile WalSndCtlData *walsndctl = WalSndCtl; |
1002 | PGPROC *proc = NULL; |
1003 | PGPROC *thisproc = NULL; |
1004 | int numprocs = 0; |
1005 | |
1006 | Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); |
1007 | Assert(SyncRepQueueIsOrderedByLSN(mode)); |
1008 | |
1009 | proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), |
1010 | &(WalSndCtl->SyncRepQueue[mode]), |
1011 | offsetof(PGPROC, syncRepLinks)); |
1012 | |
1013 | while (proc) |
1014 | { |
1015 | /* |
1016 | * Assume the queue is ordered by LSN |
1017 | */ |
1018 | if (!all && walsndctl->lsn[mode] < proc->waitLSN) |
1019 | return numprocs; |
1020 | |
1021 | /* |
1022 | * Move to next proc, so we can delete thisproc from the queue. |
1023 | * thisproc is valid, proc may be NULL after this. |
1024 | */ |
1025 | thisproc = proc; |
1026 | proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), |
1027 | &(proc->syncRepLinks), |
1028 | offsetof(PGPROC, syncRepLinks)); |
1029 | |
1030 | /* |
1031 | * Remove thisproc from queue. |
1032 | */ |
1033 | SHMQueueDelete(&(thisproc->syncRepLinks)); |
1034 | |
1035 | /* |
1036 | * SyncRepWaitForLSN() reads syncRepState without holding the lock, so |
1037 | * make sure that it sees the queue link being removed before the |
1038 | * syncRepState change. |
1039 | */ |
1040 | pg_write_barrier(); |
1041 | |
1042 | /* |
1043 | * Set state to complete; see SyncRepWaitForLSN() for discussion of |
1044 | * the various states. |
1045 | */ |
1046 | thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; |
1047 | |
1048 | /* |
1049 | * Wake only when we have set state and removed from queue. |
1050 | */ |
1051 | SetLatch(&(thisproc->procLatch)); |
1052 | |
1053 | numprocs++; |
1054 | } |
1055 | |
1056 | return numprocs; |
1057 | } |
1058 | |
1059 | /* |
1060 | * The checkpointer calls this as needed to update the shared |
1061 | * sync_standbys_defined flag, so that backends don't remain permanently wedged |
1062 | * if synchronous_standby_names is unset. It's safe to check the current value |
1063 | * without the lock, because it's only ever updated by one process. But we |
1064 | * must take the lock to change it. |
1065 | */ |
1066 | void |
1067 | SyncRepUpdateSyncStandbysDefined(void) |
1068 | { |
1069 | bool sync_standbys_defined = SyncStandbysDefined(); |
1070 | |
1071 | if (sync_standbys_defined != WalSndCtl->sync_standbys_defined) |
1072 | { |
1073 | LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); |
1074 | |
1075 | /* |
1076 | * If synchronous_standby_names has been reset to empty, it's futile |
1077 | * for backends to continue to waiting. Since the user no longer |
1078 | * wants synchronous replication, we'd better wake them up. |
1079 | */ |
1080 | if (!sync_standbys_defined) |
1081 | { |
1082 | int i; |
1083 | |
1084 | for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) |
1085 | SyncRepWakeQueue(true, i); |
1086 | } |
1087 | |
1088 | /* |
1089 | * Only allow people to join the queue when there are synchronous |
1090 | * standbys defined. Without this interlock, there's a race |
1091 | * condition: we might wake up all the current waiters; then, some |
1092 | * backend that hasn't yet reloaded its config might go to sleep on |
1093 | * the queue (and never wake up). This prevents that. |
1094 | */ |
1095 | WalSndCtl->sync_standbys_defined = sync_standbys_defined; |
1096 | |
1097 | LWLockRelease(SyncRepLock); |
1098 | } |
1099 | } |
1100 | |
1101 | #ifdef USE_ASSERT_CHECKING |
1102 | static bool |
1103 | SyncRepQueueIsOrderedByLSN(int mode) |
1104 | { |
1105 | PGPROC *proc = NULL; |
1106 | XLogRecPtr lastLSN; |
1107 | |
1108 | Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); |
1109 | |
1110 | lastLSN = 0; |
1111 | |
1112 | proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), |
1113 | &(WalSndCtl->SyncRepQueue[mode]), |
1114 | offsetof(PGPROC, syncRepLinks)); |
1115 | |
1116 | while (proc) |
1117 | { |
1118 | /* |
1119 | * Check the queue is ordered by LSN and that multiple procs don't |
1120 | * have matching LSNs |
1121 | */ |
1122 | if (proc->waitLSN <= lastLSN) |
1123 | return false; |
1124 | |
1125 | lastLSN = proc->waitLSN; |
1126 | |
1127 | proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), |
1128 | &(proc->syncRepLinks), |
1129 | offsetof(PGPROC, syncRepLinks)); |
1130 | } |
1131 | |
1132 | return true; |
1133 | } |
1134 | #endif |
1135 | |
1136 | /* |
1137 | * =========================================================== |
1138 | * Synchronous Replication functions executed by any process |
1139 | * =========================================================== |
1140 | */ |
1141 | |
1142 | bool |
1143 | check_synchronous_standby_names(char **newval, void **, GucSource source) |
1144 | { |
1145 | if (*newval != NULL && (*newval)[0] != '\0') |
1146 | { |
1147 | int parse_rc; |
1148 | SyncRepConfigData *pconf; |
1149 | |
1150 | /* Reset communication variables to ensure a fresh start */ |
1151 | syncrep_parse_result = NULL; |
1152 | syncrep_parse_error_msg = NULL; |
1153 | |
1154 | /* Parse the synchronous_standby_names string */ |
1155 | syncrep_scanner_init(*newval); |
1156 | parse_rc = syncrep_yyparse(); |
1157 | syncrep_scanner_finish(); |
1158 | |
1159 | if (parse_rc != 0 || syncrep_parse_result == NULL) |
1160 | { |
1161 | GUC_check_errcode(ERRCODE_SYNTAX_ERROR); |
1162 | if (syncrep_parse_error_msg) |
1163 | GUC_check_errdetail("%s" , syncrep_parse_error_msg); |
1164 | else |
1165 | GUC_check_errdetail("synchronous_standby_names parser failed" ); |
1166 | return false; |
1167 | } |
1168 | |
1169 | if (syncrep_parse_result->num_sync <= 0) |
1170 | { |
1171 | GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero" , |
1172 | syncrep_parse_result->num_sync); |
1173 | return false; |
1174 | } |
1175 | |
1176 | /* GUC extra value must be malloc'd, not palloc'd */ |
1177 | pconf = (SyncRepConfigData *) |
1178 | malloc(syncrep_parse_result->config_size); |
1179 | if (pconf == NULL) |
1180 | return false; |
1181 | memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size); |
1182 | |
1183 | *extra = (void *) pconf; |
1184 | |
1185 | /* |
1186 | * We need not explicitly clean up syncrep_parse_result. It, and any |
1187 | * other cruft generated during parsing, will be freed when the |
1188 | * current memory context is deleted. (This code is generally run in |
1189 | * a short-lived context used for config file processing, so that will |
1190 | * not be very long.) |
1191 | */ |
1192 | } |
1193 | else |
1194 | *extra = NULL; |
1195 | |
1196 | return true; |
1197 | } |
1198 | |
1199 | void |
1200 | assign_synchronous_standby_names(const char *newval, void *) |
1201 | { |
1202 | SyncRepConfig = (SyncRepConfigData *) extra; |
1203 | } |
1204 | |
1205 | void |
1206 | assign_synchronous_commit(int newval, void *) |
1207 | { |
1208 | switch (newval) |
1209 | { |
1210 | case SYNCHRONOUS_COMMIT_REMOTE_WRITE: |
1211 | SyncRepWaitMode = SYNC_REP_WAIT_WRITE; |
1212 | break; |
1213 | case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: |
1214 | SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; |
1215 | break; |
1216 | case SYNCHRONOUS_COMMIT_REMOTE_APPLY: |
1217 | SyncRepWaitMode = SYNC_REP_WAIT_APPLY; |
1218 | break; |
1219 | default: |
1220 | SyncRepWaitMode = SYNC_REP_NO_WAIT; |
1221 | break; |
1222 | } |
1223 | } |
1224 | |