1/*-------------------------------------------------------------------------
2 *
3 * walsender.c
4 *
5 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 * care of sending XLOG from the primary server to a single recipient.
7 * (Note that there can be more than one walsender process concurrently.)
8 * It is started by the postmaster when the walreceiver of a standby server
9 * connects to the primary server and requests XLOG streaming replication.
10 *
11 * A walsender is similar to a regular backend, ie. there is a one-to-one
12 * relationship between a connection and a walsender process, but instead
13 * of processing SQL queries, it understands a small set of special
14 * replication-mode commands. The START_REPLICATION command begins streaming
15 * WAL to the client. While streaming, the walsender keeps reading XLOG
16 * records from the disk and sends them to the standby server over the
17 * COPY protocol, until either side ends the replication by exiting COPY
18 * mode (or until the connection is closed).
19 *
20 * Normal termination is by SIGTERM, which instructs the walsender to
21 * close the connection and exit(0) at the next convenient moment. Emergency
22 * termination is by SIGQUIT; like any backend, the walsender will simply
23 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 * are treated as not a crash but approximately normal termination;
25 * the walsender will exit quickly without sending any more XLOG records.
26 *
27 * If the server is shut down, checkpointer sends us
28 * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 * the backend is idle or runs an SQL query this causes the backend to
30 * shutdown, if logical replication is in progress all existing WAL records
31 * are processed followed by a shutdown. Otherwise this causes the walsender
32 * to switch to the "stopping" state. In this state, the walsender will reject
33 * any further replication commands. The checkpointer begins the shutdown
34 * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 * walsender to send any outstanding WAL, including the shutdown checkpoint
37 * record, wait for it to be replicated to the standby, and then exit.
38 *
39 *
40 * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
41 *
42 * IDENTIFICATION
43 * src/backend/replication/walsender.c
44 *
45 *-------------------------------------------------------------------------
46 */
47#include "postgres.h"
48
49#include <signal.h>
50#include <unistd.h>
51
52#include "access/printtup.h"
53#include "access/timeline.h"
54#include "access/transam.h"
55#include "access/xact.h"
56#include "access/xlog_internal.h"
57#include "access/xlogutils.h"
58
59#include "catalog/pg_authid.h"
60#include "catalog/pg_type.h"
61#include "commands/dbcommands.h"
62#include "commands/defrem.h"
63#include "funcapi.h"
64#include "libpq/libpq.h"
65#include "libpq/pqformat.h"
66#include "miscadmin.h"
67#include "nodes/replnodes.h"
68#include "pgstat.h"
69#include "replication/basebackup.h"
70#include "replication/decode.h"
71#include "replication/logical.h"
72#include "replication/logicalfuncs.h"
73#include "replication/slot.h"
74#include "replication/snapbuild.h"
75#include "replication/syncrep.h"
76#include "replication/walreceiver.h"
77#include "replication/walsender.h"
78#include "replication/walsender_private.h"
79#include "storage/condition_variable.h"
80#include "storage/fd.h"
81#include "storage/ipc.h"
82#include "storage/pmsignal.h"
83#include "storage/proc.h"
84#include "storage/procarray.h"
85#include "tcop/dest.h"
86#include "tcop/tcopprot.h"
87#include "utils/builtins.h"
88#include "utils/guc.h"
89#include "utils/memutils.h"
90#include "utils/pg_lsn.h"
91#include "utils/portal.h"
92#include "utils/ps_status.h"
93#include "utils/timeout.h"
94#include "utils/timestamp.h"
95
96/*
97 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
98 *
99 * We don't have a good idea of what a good value would be; there's some
100 * overhead per message in both walsender and walreceiver, but on the other
101 * hand sending large batches makes walsender less responsive to signals
102 * because signals are checked only between messages. 128kB (with
103 * default 8k blocks) seems like a reasonable guess for now.
104 */
105#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
106
107/* Array of WalSnds in shared memory */
108WalSndCtlData *WalSndCtl = NULL;
109
110/* My slot in the shared memory array */
111WalSnd *MyWalSnd = NULL;
112
113/* Global state */
114bool am_walsender = false; /* Am I a walsender process? */
115bool am_cascading_walsender = false; /* Am I cascading WAL to another
116 * standby? */
117bool am_db_walsender = false; /* Connected to a database? */
118
119/* User-settable parameters for walsender */
120int max_wal_senders = 0; /* the maximum number of concurrent
121 * walsenders */
122int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
123 * data message */
124bool log_replication_commands = false;
125
126/*
127 * State for WalSndWakeupRequest
128 */
129bool wake_wal_senders = false;
130
131/*
132 * These variables are used similarly to openLogFile/SegNo/Off,
133 * but for walsender to read the XLOG.
134 */
135static int sendFile = -1;
136static XLogSegNo sendSegNo = 0;
137static uint32 sendOff = 0;
138
139/* Timeline ID of the currently open file */
140static TimeLineID curFileTimeLine = 0;
141
142/*
143 * These variables keep track of the state of the timeline we're currently
144 * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
145 * the timeline is not the latest timeline on this server, and the server's
146 * history forked off from that timeline at sendTimeLineValidUpto.
147 */
148static TimeLineID sendTimeLine = 0;
149static TimeLineID sendTimeLineNextTLI = 0;
150static bool sendTimeLineIsHistoric = false;
151static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
152
153/*
154 * How far have we sent WAL already? This is also advertised in
155 * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
156 */
157static XLogRecPtr sentPtr = 0;
158
159/* Buffers for constructing outgoing messages and processing reply messages. */
160static StringInfoData output_message;
161static StringInfoData reply_message;
162static StringInfoData tmpbuf;
163
164/* Timestamp of last ProcessRepliesIfAny(). */
165static TimestampTz last_processing = 0;
166
167/*
168 * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
169 * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
170 */
171static TimestampTz last_reply_timestamp = 0;
172
173/* Have we sent a heartbeat message asking for reply, since last reply? */
174static bool waiting_for_ping_response = false;
175
176/*
177 * While streaming WAL in Copy mode, streamingDoneSending is set to true
178 * after we have sent CopyDone. We should not send any more CopyData messages
179 * after that. streamingDoneReceiving is set to true when we receive CopyDone
180 * from the other end. When both become true, it's time to exit Copy mode.
181 */
182static bool streamingDoneSending;
183static bool streamingDoneReceiving;
184
185/* Are we there yet? */
186static bool WalSndCaughtUp = false;
187
188/* Flags set by signal handlers for later service in main loop */
189static volatile sig_atomic_t got_SIGUSR2 = false;
190static volatile sig_atomic_t got_STOPPING = false;
191
192/*
193 * This is set while we are streaming. When not set
194 * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
195 * the main loop is responsible for checking got_STOPPING and terminating when
196 * it's set (after streaming any remaining WAL).
197 */
198static volatile sig_atomic_t replication_active = false;
199
200static LogicalDecodingContext *logical_decoding_ctx = NULL;
201static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
202
203/* A sample associating a WAL location with the time it was written. */
204typedef struct
205{
206 XLogRecPtr lsn;
207 TimestampTz time;
208} WalTimeSample;
209
210/* The size of our buffer of time samples. */
211#define LAG_TRACKER_BUFFER_SIZE 8192
212
213/* A mechanism for tracking replication lag. */
214typedef struct
215{
216 XLogRecPtr last_lsn;
217 WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
218 int write_head;
219 int read_heads[NUM_SYNC_REP_WAIT_MODE];
220 WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
221} LagTracker;
222
223static LagTracker *lag_tracker;
224
225/* Signal handlers */
226static void WalSndLastCycleHandler(SIGNAL_ARGS);
227
228/* Prototypes for private functions */
229typedef void (*WalSndSendDataCallback) (void);
230static void WalSndLoop(WalSndSendDataCallback send_data);
231static void InitWalSenderSlot(void);
232static void WalSndKill(int code, Datum arg);
233static void WalSndShutdown(void) pg_attribute_noreturn();
234static void XLogSendPhysical(void);
235static void XLogSendLogical(void);
236static void WalSndDone(WalSndSendDataCallback send_data);
237static XLogRecPtr GetStandbyFlushRecPtr(void);
238static void IdentifySystem(void);
239static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
240static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
241static void StartReplication(StartReplicationCmd *cmd);
242static void StartLogicalReplication(StartReplicationCmd *cmd);
243static void ProcessStandbyMessage(void);
244static void ProcessStandbyReplyMessage(void);
245static void ProcessStandbyHSFeedbackMessage(void);
246static void ProcessRepliesIfAny(void);
247static void WalSndKeepalive(bool requestReply);
248static void WalSndKeepaliveIfNecessary(void);
249static void WalSndCheckTimeOut(void);
250static long WalSndComputeSleeptime(TimestampTz now);
251static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
253static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
254static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
255static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
256static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
257static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
258
259static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
260
261
262/* Initialize walsender process before entering the main command loop */
263void
264InitWalSender(void)
265{
266 am_cascading_walsender = RecoveryInProgress();
267
268 /* Create a per-walsender data structure in shared memory */
269 InitWalSenderSlot();
270
271 /*
272 * We don't currently need any ResourceOwner in a walsender process, but
273 * if we did, we could call CreateAuxProcessResourceOwner here.
274 */
275
276 /*
277 * Let postmaster know that we're a WAL sender. Once we've declared us as
278 * a WAL sender process, postmaster will let us outlive the bgwriter and
279 * kill us last in the shutdown sequence, so we get a chance to stream all
280 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
281 * there's no going back, and we mustn't write any WAL records after this.
282 */
283 MarkPostmasterChildWalSender();
284 SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
285
286 /* Initialize empty timestamp buffer for lag tracking. */
287 lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
288}
289
290/*
291 * Clean up after an error.
292 *
293 * WAL sender processes don't use transactions like regular backends do.
294 * This function does any cleanup required after an error in a WAL sender
295 * process, similar to what transaction abort does in a regular backend.
296 */
297void
298WalSndErrorCleanup(void)
299{
300 LWLockReleaseAll();
301 ConditionVariableCancelSleep();
302 pgstat_report_wait_end();
303
304 if (sendFile >= 0)
305 {
306 close(sendFile);
307 sendFile = -1;
308 }
309
310 if (MyReplicationSlot != NULL)
311 ReplicationSlotRelease();
312
313 ReplicationSlotCleanup();
314
315 replication_active = false;
316
317 if (got_STOPPING || got_SIGUSR2)
318 proc_exit(0);
319
320 /* Revert back to startup state */
321 WalSndSetState(WALSNDSTATE_STARTUP);
322}
323
324/*
325 * Handle a client's connection abort in an orderly manner.
326 */
327static void
328WalSndShutdown(void)
329{
330 /*
331 * Reset whereToSendOutput to prevent ereport from attempting to send any
332 * more messages to the standby.
333 */
334 if (whereToSendOutput == DestRemote)
335 whereToSendOutput = DestNone;
336
337 proc_exit(0);
338 abort(); /* keep the compiler quiet */
339}
340
341/*
342 * Handle the IDENTIFY_SYSTEM command.
343 */
344static void
345IdentifySystem(void)
346{
347 char sysid[32];
348 char xloc[MAXFNAMELEN];
349 XLogRecPtr logptr;
350 char *dbname = NULL;
351 DestReceiver *dest;
352 TupOutputState *tstate;
353 TupleDesc tupdesc;
354 Datum values[4];
355 bool nulls[4];
356
357 /*
358 * Reply with a result set with one row, four columns. First col is system
359 * ID, second is timeline ID, third is current xlog location and the
360 * fourth contains the database name if we are connected to one.
361 */
362
363 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
364 GetSystemIdentifier());
365
366 am_cascading_walsender = RecoveryInProgress();
367 if (am_cascading_walsender)
368 {
369 /* this also updates ThisTimeLineID */
370 logptr = GetStandbyFlushRecPtr();
371 }
372 else
373 logptr = GetFlushRecPtr();
374
375 snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
376
377 if (MyDatabaseId != InvalidOid)
378 {
379 MemoryContext cur = CurrentMemoryContext;
380
381 /* syscache access needs a transaction env. */
382 StartTransactionCommand();
383 /* make dbname live outside TX context */
384 MemoryContextSwitchTo(cur);
385 dbname = get_database_name(MyDatabaseId);
386 CommitTransactionCommand();
387 /* CommitTransactionCommand switches to TopMemoryContext */
388 MemoryContextSwitchTo(cur);
389 }
390
391 dest = CreateDestReceiver(DestRemoteSimple);
392 MemSet(nulls, false, sizeof(nulls));
393
394 /* need a tuple descriptor representing four columns */
395 tupdesc = CreateTemplateTupleDesc(4);
396 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
397 TEXTOID, -1, 0);
398 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
399 INT4OID, -1, 0);
400 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
401 TEXTOID, -1, 0);
402 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
403 TEXTOID, -1, 0);
404
405 /* prepare for projection of tuples */
406 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
407
408 /* column 1: system identifier */
409 values[0] = CStringGetTextDatum(sysid);
410
411 /* column 2: timeline */
412 values[1] = Int32GetDatum(ThisTimeLineID);
413
414 /* column 3: wal location */
415 values[2] = CStringGetTextDatum(xloc);
416
417 /* column 4: database name, or NULL if none */
418 if (dbname)
419 values[3] = CStringGetTextDatum(dbname);
420 else
421 nulls[3] = true;
422
423 /* send it to dest */
424 do_tup_output(tstate, values, nulls);
425
426 end_tup_output(tstate);
427}
428
429
430/*
431 * Handle TIMELINE_HISTORY command.
432 */
433static void
434SendTimeLineHistory(TimeLineHistoryCmd *cmd)
435{
436 StringInfoData buf;
437 char histfname[MAXFNAMELEN];
438 char path[MAXPGPATH];
439 int fd;
440 off_t histfilelen;
441 off_t bytesleft;
442 Size len;
443
444 /*
445 * Reply with a result set with one row, and two columns. The first col is
446 * the name of the history file, 2nd is the contents.
447 */
448
449 TLHistoryFileName(histfname, cmd->timeline);
450 TLHistoryFilePath(path, cmd->timeline);
451
452 /* Send a RowDescription message */
453 pq_beginmessage(&buf, 'T');
454 pq_sendint16(&buf, 2); /* 2 fields */
455
456 /* first field */
457 pq_sendstring(&buf, "filename"); /* col name */
458 pq_sendint32(&buf, 0); /* table oid */
459 pq_sendint16(&buf, 0); /* attnum */
460 pq_sendint32(&buf, TEXTOID); /* type oid */
461 pq_sendint16(&buf, -1); /* typlen */
462 pq_sendint32(&buf, 0); /* typmod */
463 pq_sendint16(&buf, 0); /* format code */
464
465 /* second field */
466 pq_sendstring(&buf, "content"); /* col name */
467 pq_sendint32(&buf, 0); /* table oid */
468 pq_sendint16(&buf, 0); /* attnum */
469 pq_sendint32(&buf, BYTEAOID); /* type oid */
470 pq_sendint16(&buf, -1); /* typlen */
471 pq_sendint32(&buf, 0); /* typmod */
472 pq_sendint16(&buf, 0); /* format code */
473 pq_endmessage(&buf);
474
475 /* Send a DataRow message */
476 pq_beginmessage(&buf, 'D');
477 pq_sendint16(&buf, 2); /* # of columns */
478 len = strlen(histfname);
479 pq_sendint32(&buf, len); /* col1 len */
480 pq_sendbytes(&buf, histfname, len);
481
482 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
483 if (fd < 0)
484 ereport(ERROR,
485 (errcode_for_file_access(),
486 errmsg("could not open file \"%s\": %m", path)));
487
488 /* Determine file length and send it to client */
489 histfilelen = lseek(fd, 0, SEEK_END);
490 if (histfilelen < 0)
491 ereport(ERROR,
492 (errcode_for_file_access(),
493 errmsg("could not seek to end of file \"%s\": %m", path)));
494 if (lseek(fd, 0, SEEK_SET) != 0)
495 ereport(ERROR,
496 (errcode_for_file_access(),
497 errmsg("could not seek to beginning of file \"%s\": %m", path)));
498
499 pq_sendint32(&buf, histfilelen); /* col2 len */
500
501 bytesleft = histfilelen;
502 while (bytesleft > 0)
503 {
504 PGAlignedBlock rbuf;
505 int nread;
506
507 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
508 nread = read(fd, rbuf.data, sizeof(rbuf));
509 pgstat_report_wait_end();
510 if (nread < 0)
511 ereport(ERROR,
512 (errcode_for_file_access(),
513 errmsg("could not read file \"%s\": %m",
514 path)));
515 else if (nread == 0)
516 ereport(ERROR,
517 (errcode(ERRCODE_DATA_CORRUPTED),
518 errmsg("could not read file \"%s\": read %d of %zu",
519 path, nread, (Size) bytesleft)));
520
521 pq_sendbytes(&buf, rbuf.data, nread);
522 bytesleft -= nread;
523 }
524
525 if (CloseTransientFile(fd))
526 ereport(ERROR,
527 (errcode_for_file_access(),
528 errmsg("could not close file \"%s\": %m", path)));
529
530 pq_endmessage(&buf);
531}
532
533/*
534 * Handle START_REPLICATION command.
535 *
536 * At the moment, this never returns, but an ereport(ERROR) will take us back
537 * to the main loop.
538 */
539static void
540StartReplication(StartReplicationCmd *cmd)
541{
542 StringInfoData buf;
543 XLogRecPtr FlushPtr;
544
545 if (ThisTimeLineID == 0)
546 ereport(ERROR,
547 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
549
550 /*
551 * We assume here that we're logging enough information in the WAL for
552 * log-shipping, since this is checked in PostmasterMain().
553 *
554 * NOTE: wal_level can only change at shutdown, so in most cases it is
555 * difficult for there to be WAL data that we can still see that was
556 * written at wal_level='minimal'.
557 */
558
559 if (cmd->slotname)
560 {
561 ReplicationSlotAcquire(cmd->slotname, true);
562 if (SlotIsLogical(MyReplicationSlot))
563 ereport(ERROR,
564 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
565 (errmsg("cannot use a logical replication slot for physical replication"))));
566 }
567
568 /*
569 * Select the timeline. If it was given explicitly by the client, use
570 * that. Otherwise use the timeline of the last replayed record, which is
571 * kept in ThisTimeLineID.
572 */
573 if (am_cascading_walsender)
574 {
575 /* this also updates ThisTimeLineID */
576 FlushPtr = GetStandbyFlushRecPtr();
577 }
578 else
579 FlushPtr = GetFlushRecPtr();
580
581 if (cmd->timeline != 0)
582 {
583 XLogRecPtr switchpoint;
584
585 sendTimeLine = cmd->timeline;
586 if (sendTimeLine == ThisTimeLineID)
587 {
588 sendTimeLineIsHistoric = false;
589 sendTimeLineValidUpto = InvalidXLogRecPtr;
590 }
591 else
592 {
593 List *timeLineHistory;
594
595 sendTimeLineIsHistoric = true;
596
597 /*
598 * Check that the timeline the client requested exists, and the
599 * requested start location is on that timeline.
600 */
601 timeLineHistory = readTimeLineHistory(ThisTimeLineID);
602 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
603 &sendTimeLineNextTLI);
604 list_free_deep(timeLineHistory);
605
606 /*
607 * Found the requested timeline in the history. Check that
608 * requested startpoint is on that timeline in our history.
609 *
610 * This is quite loose on purpose. We only check that we didn't
611 * fork off the requested timeline before the switchpoint. We
612 * don't check that we switched *to* it before the requested
613 * starting point. This is because the client can legitimately
614 * request to start replication from the beginning of the WAL
615 * segment that contains switchpoint, but on the new timeline, so
616 * that it doesn't end up with a partial segment. If you ask for
617 * too old a starting point, you'll get an error later when we
618 * fail to find the requested WAL segment in pg_wal.
619 *
620 * XXX: we could be more strict here and only allow a startpoint
621 * that's older than the switchpoint, if it's still in the same
622 * WAL segment.
623 */
624 if (!XLogRecPtrIsInvalid(switchpoint) &&
625 switchpoint < cmd->startpoint)
626 {
627 ereport(ERROR,
628 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
629 (uint32) (cmd->startpoint >> 32),
630 (uint32) (cmd->startpoint),
631 cmd->timeline),
632 errdetail("This server's history forked from timeline %u at %X/%X.",
633 cmd->timeline,
634 (uint32) (switchpoint >> 32),
635 (uint32) (switchpoint))));
636 }
637 sendTimeLineValidUpto = switchpoint;
638 }
639 }
640 else
641 {
642 sendTimeLine = ThisTimeLineID;
643 sendTimeLineValidUpto = InvalidXLogRecPtr;
644 sendTimeLineIsHistoric = false;
645 }
646
647 streamingDoneSending = streamingDoneReceiving = false;
648
649 /* If there is nothing to stream, don't even enter COPY mode */
650 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
651 {
652 /*
653 * When we first start replication the standby will be behind the
654 * primary. For some applications, for example synchronous
655 * replication, it is important to have a clear state for this initial
656 * catchup mode, so we can trigger actions when we change streaming
657 * state later. We may stay in this state for a long time, which is
658 * exactly why we want to be able to monitor whether or not we are
659 * still here.
660 */
661 WalSndSetState(WALSNDSTATE_CATCHUP);
662
663 /* Send a CopyBothResponse message, and start streaming */
664 pq_beginmessage(&buf, 'W');
665 pq_sendbyte(&buf, 0);
666 pq_sendint16(&buf, 0);
667 pq_endmessage(&buf);
668 pq_flush();
669
670 /*
671 * Don't allow a request to stream from a future point in WAL that
672 * hasn't been flushed to disk in this server yet.
673 */
674 if (FlushPtr < cmd->startpoint)
675 {
676 ereport(ERROR,
677 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
678 (uint32) (cmd->startpoint >> 32),
679 (uint32) (cmd->startpoint),
680 (uint32) (FlushPtr >> 32),
681 (uint32) (FlushPtr))));
682 }
683
684 /* Start streaming from the requested point */
685 sentPtr = cmd->startpoint;
686
687 /* Initialize shared memory status, too */
688 SpinLockAcquire(&MyWalSnd->mutex);
689 MyWalSnd->sentPtr = sentPtr;
690 SpinLockRelease(&MyWalSnd->mutex);
691
692 SyncRepInitConfig();
693
694 /* Main loop of walsender */
695 replication_active = true;
696
697 WalSndLoop(XLogSendPhysical);
698
699 replication_active = false;
700 if (got_STOPPING)
701 proc_exit(0);
702 WalSndSetState(WALSNDSTATE_STARTUP);
703
704 Assert(streamingDoneSending && streamingDoneReceiving);
705 }
706
707 if (cmd->slotname)
708 ReplicationSlotRelease();
709
710 /*
711 * Copy is finished now. Send a single-row result set indicating the next
712 * timeline.
713 */
714 if (sendTimeLineIsHistoric)
715 {
716 char startpos_str[8 + 1 + 8 + 1];
717 DestReceiver *dest;
718 TupOutputState *tstate;
719 TupleDesc tupdesc;
720 Datum values[2];
721 bool nulls[2];
722
723 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
724 (uint32) (sendTimeLineValidUpto >> 32),
725 (uint32) sendTimeLineValidUpto);
726
727 dest = CreateDestReceiver(DestRemoteSimple);
728 MemSet(nulls, false, sizeof(nulls));
729
730 /*
731 * Need a tuple descriptor representing two columns. int8 may seem
732 * like a surprising data type for this, but in theory int4 would not
733 * be wide enough for this, as TimeLineID is unsigned.
734 */
735 tupdesc = CreateTemplateTupleDesc(2);
736 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
737 INT8OID, -1, 0);
738 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
739 TEXTOID, -1, 0);
740
741 /* prepare for projection of tuple */
742 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
743
744 values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
745 values[1] = CStringGetTextDatum(startpos_str);
746
747 /* send it to dest */
748 do_tup_output(tstate, values, nulls);
749
750 end_tup_output(tstate);
751 }
752
753 /* Send CommandComplete message */
754 pq_puttextmessage('C', "START_STREAMING");
755}
756
757/*
758 * read_page callback for logical decoding contexts, as a walsender process.
759 *
760 * Inside the walsender we can do better than logical_read_local_xlog_page,
761 * which has to do a plain sleep/busy loop, because the walsender's latch gets
762 * set every time WAL is flushed.
763 */
764static int
765logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
766 XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
767{
768 XLogRecPtr flushptr;
769 int count;
770
771 XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
772 sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
773 sendTimeLine = state->currTLI;
774 sendTimeLineValidUpto = state->currTLIValidUntil;
775 sendTimeLineNextTLI = state->nextTLI;
776
777 /* make sure we have enough WAL available */
778 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
779
780 /* fail if not (implies we are going to shut down) */
781 if (flushptr < targetPagePtr + reqLen)
782 return -1;
783
784 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
785 count = XLOG_BLCKSZ; /* more than one block available */
786 else
787 count = flushptr - targetPagePtr; /* part of the page available */
788
789 /* now actually read the data, we know it's there */
790 XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
791
792 return count;
793}
794
795/*
796 * Process extra options given to CREATE_REPLICATION_SLOT.
797 */
798static void
799parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
800 bool *reserve_wal,
801 CRSSnapshotAction *snapshot_action)
802{
803 ListCell *lc;
804 bool snapshot_action_given = false;
805 bool reserve_wal_given = false;
806
807 /* Parse options */
808 foreach(lc, cmd->options)
809 {
810 DefElem *defel = (DefElem *) lfirst(lc);
811
812 if (strcmp(defel->defname, "export_snapshot") == 0)
813 {
814 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
815 ereport(ERROR,
816 (errcode(ERRCODE_SYNTAX_ERROR),
817 errmsg("conflicting or redundant options")));
818
819 snapshot_action_given = true;
820 *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
821 CRS_NOEXPORT_SNAPSHOT;
822 }
823 else if (strcmp(defel->defname, "use_snapshot") == 0)
824 {
825 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
826 ereport(ERROR,
827 (errcode(ERRCODE_SYNTAX_ERROR),
828 errmsg("conflicting or redundant options")));
829
830 snapshot_action_given = true;
831 *snapshot_action = CRS_USE_SNAPSHOT;
832 }
833 else if (strcmp(defel->defname, "reserve_wal") == 0)
834 {
835 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
836 ereport(ERROR,
837 (errcode(ERRCODE_SYNTAX_ERROR),
838 errmsg("conflicting or redundant options")));
839
840 reserve_wal_given = true;
841 *reserve_wal = true;
842 }
843 else
844 elog(ERROR, "unrecognized option: %s", defel->defname);
845 }
846}
847
848/*
849 * Create a new replication slot.
850 */
851static void
852CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
853{
854 const char *snapshot_name = NULL;
855 char xloc[MAXFNAMELEN];
856 char *slot_name;
857 bool reserve_wal = false;
858 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
859 DestReceiver *dest;
860 TupOutputState *tstate;
861 TupleDesc tupdesc;
862 Datum values[4];
863 bool nulls[4];
864
865 Assert(!MyReplicationSlot);
866
867 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
868
869 /* setup state for XLogReadPage */
870 sendTimeLineIsHistoric = false;
871 sendTimeLine = ThisTimeLineID;
872
873 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
874 {
875 ReplicationSlotCreate(cmd->slotname, false,
876 cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
877 }
878 else
879 {
880 CheckLogicalDecodingRequirements();
881
882 /*
883 * Initially create persistent slot as ephemeral - that allows us to
884 * nicely handle errors during initialization because it'll get
885 * dropped if this transaction fails. We'll make it persistent at the
886 * end. Temporary slots can be created as temporary from beginning as
887 * they get dropped on error as well.
888 */
889 ReplicationSlotCreate(cmd->slotname, true,
890 cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
891 }
892
893 if (cmd->kind == REPLICATION_KIND_LOGICAL)
894 {
895 LogicalDecodingContext *ctx;
896 bool need_full_snapshot = false;
897
898 /*
899 * Do options check early so that we can bail before calling the
900 * DecodingContextFindStartpoint which can take long time.
901 */
902 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
903 {
904 if (IsTransactionBlock())
905 ereport(ERROR,
906 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
907 (errmsg("%s must not be called inside a transaction",
908 "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
909
910 need_full_snapshot = true;
911 }
912 else if (snapshot_action == CRS_USE_SNAPSHOT)
913 {
914 if (!IsTransactionBlock())
915 ereport(ERROR,
916 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
917 (errmsg("%s must be called inside a transaction",
918 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
919
920 if (XactIsoLevel != XACT_REPEATABLE_READ)
921 ereport(ERROR,
922 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
923 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
924 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
925
926 if (FirstSnapshotSet)
927 ereport(ERROR,
928 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
929 (errmsg("%s must be called before any query",
930 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
931
932 if (IsSubTransaction())
933 ereport(ERROR,
934 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
935 (errmsg("%s must not be called in a subtransaction",
936 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
937
938 need_full_snapshot = true;
939 }
940
941 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
942 InvalidXLogRecPtr,
943 logical_read_xlog_page,
944 WalSndPrepareWrite, WalSndWriteData,
945 WalSndUpdateProgress);
946
947 /*
948 * Signal that we don't need the timeout mechanism. We're just
949 * creating the replication slot and don't yet accept feedback
950 * messages or send keepalives. As we possibly need to wait for
951 * further WAL the walsender would otherwise possibly be killed too
952 * soon.
953 */
954 last_reply_timestamp = 0;
955
956 /* build initial snapshot, might take a while */
957 DecodingContextFindStartpoint(ctx);
958
959 /*
960 * Export or use the snapshot if we've been asked to do so.
961 *
962 * NB. We will convert the snapbuild.c kind of snapshot to normal
963 * snapshot when doing this.
964 */
965 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
966 {
967 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
968 }
969 else if (snapshot_action == CRS_USE_SNAPSHOT)
970 {
971 Snapshot snap;
972
973 snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
974 RestoreTransactionSnapshot(snap, MyProc);
975 }
976
977 /* don't need the decoding context anymore */
978 FreeDecodingContext(ctx);
979
980 if (!cmd->temporary)
981 ReplicationSlotPersist();
982 }
983 else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
984 {
985 ReplicationSlotReserveWal();
986
987 ReplicationSlotMarkDirty();
988
989 /* Write this slot to disk if it's a permanent one. */
990 if (!cmd->temporary)
991 ReplicationSlotSave();
992 }
993
994 snprintf(xloc, sizeof(xloc), "%X/%X",
995 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
996 (uint32) MyReplicationSlot->data.confirmed_flush);
997
998 dest = CreateDestReceiver(DestRemoteSimple);
999 MemSet(nulls, false, sizeof(nulls));
1000
1001 /*----------
1002 * Need a tuple descriptor representing four columns:
1003 * - first field: the slot name
1004 * - second field: LSN at which we became consistent
1005 * - third field: exported snapshot's name
1006 * - fourth field: output plugin
1007 *----------
1008 */
1009 tupdesc = CreateTemplateTupleDesc(4);
1010 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1011 TEXTOID, -1, 0);
1012 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1013 TEXTOID, -1, 0);
1014 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1015 TEXTOID, -1, 0);
1016 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1017 TEXTOID, -1, 0);
1018
1019 /* prepare for projection of tuples */
1020 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1021
1022 /* slot_name */
1023 slot_name = NameStr(MyReplicationSlot->data.name);
1024 values[0] = CStringGetTextDatum(slot_name);
1025
1026 /* consistent wal location */
1027 values[1] = CStringGetTextDatum(xloc);
1028
1029 /* snapshot name, or NULL if none */
1030 if (snapshot_name != NULL)
1031 values[2] = CStringGetTextDatum(snapshot_name);
1032 else
1033 nulls[2] = true;
1034
1035 /* plugin, or NULL if none */
1036 if (cmd->plugin != NULL)
1037 values[3] = CStringGetTextDatum(cmd->plugin);
1038 else
1039 nulls[3] = true;
1040
1041 /* send it to dest */
1042 do_tup_output(tstate, values, nulls);
1043 end_tup_output(tstate);
1044
1045 ReplicationSlotRelease();
1046}
1047
1048/*
1049 * Get rid of a replication slot that is no longer wanted.
1050 */
1051static void
1052DropReplicationSlot(DropReplicationSlotCmd *cmd)
1053{
1054 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1055 EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1056}
1057
1058/*
1059 * Load previously initiated logical slot and prepare for sending data (via
1060 * WalSndLoop).
1061 */
1062static void
1063StartLogicalReplication(StartReplicationCmd *cmd)
1064{
1065 StringInfoData buf;
1066
1067 /* make sure that our requirements are still fulfilled */
1068 CheckLogicalDecodingRequirements();
1069
1070 Assert(!MyReplicationSlot);
1071
1072 ReplicationSlotAcquire(cmd->slotname, true);
1073
1074 /*
1075 * Force a disconnect, so that the decoding code doesn't need to care
1076 * about an eventual switch from running in recovery, to running in a
1077 * normal environment. Client code is expected to handle reconnects.
1078 */
1079 if (am_cascading_walsender && !RecoveryInProgress())
1080 {
1081 ereport(LOG,
1082 (errmsg("terminating walsender process after promotion")));
1083 got_STOPPING = true;
1084 }
1085
1086 /*
1087 * Create our decoding context, making it start at the previously ack'ed
1088 * position.
1089 *
1090 * Do this before sending a CopyBothResponse message, so that any errors
1091 * are reported early.
1092 */
1093 logical_decoding_ctx =
1094 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1095 logical_read_xlog_page,
1096 WalSndPrepareWrite, WalSndWriteData,
1097 WalSndUpdateProgress);
1098
1099
1100 WalSndSetState(WALSNDSTATE_CATCHUP);
1101
1102 /* Send a CopyBothResponse message, and start streaming */
1103 pq_beginmessage(&buf, 'W');
1104 pq_sendbyte(&buf, 0);
1105 pq_sendint16(&buf, 0);
1106 pq_endmessage(&buf);
1107 pq_flush();
1108
1109
1110 /* Start reading WAL from the oldest required WAL. */
1111 logical_startptr = MyReplicationSlot->data.restart_lsn;
1112
1113 /*
1114 * Report the location after which we'll send out further commits as the
1115 * current sentPtr.
1116 */
1117 sentPtr = MyReplicationSlot->data.confirmed_flush;
1118
1119 /* Also update the sent position status in shared memory */
1120 SpinLockAcquire(&MyWalSnd->mutex);
1121 MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1122 SpinLockRelease(&MyWalSnd->mutex);
1123
1124 replication_active = true;
1125
1126 SyncRepInitConfig();
1127
1128 /* Main loop of walsender */
1129 WalSndLoop(XLogSendLogical);
1130
1131 FreeDecodingContext(logical_decoding_ctx);
1132 ReplicationSlotRelease();
1133
1134 replication_active = false;
1135 if (got_STOPPING)
1136 proc_exit(0);
1137 WalSndSetState(WALSNDSTATE_STARTUP);
1138
1139 /* Get out of COPY mode (CommandComplete). */
1140 EndCommand("COPY 0", DestRemote);
1141}
1142
1143/*
1144 * LogicalDecodingContext 'prepare_write' callback.
1145 *
1146 * Prepare a write into a StringInfo.
1147 *
1148 * Don't do anything lasting in here, it's quite possible that nothing will be done
1149 * with the data.
1150 */
1151static void
1152WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1153{
1154 /* can't have sync rep confused by sending the same LSN several times */
1155 if (!last_write)
1156 lsn = InvalidXLogRecPtr;
1157
1158 resetStringInfo(ctx->out);
1159
1160 pq_sendbyte(ctx->out, 'w');
1161 pq_sendint64(ctx->out, lsn); /* dataStart */
1162 pq_sendint64(ctx->out, lsn); /* walEnd */
1163
1164 /*
1165 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1166 * reserve space here.
1167 */
1168 pq_sendint64(ctx->out, 0); /* sendtime */
1169}
1170
1171/*
1172 * LogicalDecodingContext 'write' callback.
1173 *
1174 * Actually write out data previously prepared by WalSndPrepareWrite out to
1175 * the network. Take as long as needed, but process replies from the other
1176 * side and check timeouts during that.
1177 */
1178static void
1179WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1180 bool last_write)
1181{
1182 TimestampTz now;
1183
1184 /* output previously gathered data in a CopyData packet */
1185 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1186
1187 /*
1188 * Fill the send timestamp last, so that it is taken as late as possible.
1189 * This is somewhat ugly, but the protocol is set as it's already used for
1190 * several releases by streaming physical replication.
1191 */
1192 resetStringInfo(&tmpbuf);
1193 now = GetCurrentTimestamp();
1194 pq_sendint64(&tmpbuf, now);
1195 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1196 tmpbuf.data, sizeof(int64));
1197
1198 CHECK_FOR_INTERRUPTS();
1199
1200 /* Try to flush pending output to the client */
1201 if (pq_flush_if_writable() != 0)
1202 WalSndShutdown();
1203
1204 /* Try taking fast path unless we get too close to walsender timeout. */
1205 if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1206 wal_sender_timeout / 2) &&
1207 !pq_is_send_pending())
1208 {
1209 return;
1210 }
1211
1212 /* If we have pending write here, go to slow path */
1213 for (;;)
1214 {
1215 int wakeEvents;
1216 long sleeptime;
1217
1218 /* Check for input from the client */
1219 ProcessRepliesIfAny();
1220
1221 /* die if timeout was reached */
1222 WalSndCheckTimeOut();
1223
1224 /* Send keepalive if the time has come */
1225 WalSndKeepaliveIfNecessary();
1226
1227 if (!pq_is_send_pending())
1228 break;
1229
1230 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1231
1232 wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1233 WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1234
1235 /* Sleep until something happens or we time out */
1236 (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1237 MyProcPort->sock, sleeptime,
1238 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1239
1240 /* Clear any already-pending wakeups */
1241 ResetLatch(MyLatch);
1242
1243 CHECK_FOR_INTERRUPTS();
1244
1245 /* Process any requests or signals received recently */
1246 if (ConfigReloadPending)
1247 {
1248 ConfigReloadPending = false;
1249 ProcessConfigFile(PGC_SIGHUP);
1250 SyncRepInitConfig();
1251 }
1252
1253 /* Try to flush pending output to the client */
1254 if (pq_flush_if_writable() != 0)
1255 WalSndShutdown();
1256 }
1257
1258 /* reactivate latch so WalSndLoop knows to continue */
1259 SetLatch(MyLatch);
1260}
1261
1262/*
1263 * LogicalDecodingContext 'update_progress' callback.
1264 *
1265 * Write the current position to the lag tracker (see XLogSendPhysical).
1266 */
1267static void
1268WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1269{
1270 static TimestampTz sendTime = 0;
1271 TimestampTz now = GetCurrentTimestamp();
1272
1273 /*
1274 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1275 * avoid flooding the lag tracker when we commit frequently.
1276 */
1277#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1278 if (!TimestampDifferenceExceeds(sendTime, now,
1279 WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1280 return;
1281
1282 LagTrackerWrite(lsn, now);
1283 sendTime = now;
1284}
1285
1286/*
1287 * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1288 *
1289 * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1290 * if we detect a shutdown request (either from postmaster or client)
1291 * we will return early, so caller must always check.
1292 */
1293static XLogRecPtr
1294WalSndWaitForWal(XLogRecPtr loc)
1295{
1296 int wakeEvents;
1297 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1298
1299
1300 /*
1301 * Fast path to avoid acquiring the spinlock in case we already know we
1302 * have enough WAL available. This is particularly interesting if we're
1303 * far behind.
1304 */
1305 if (RecentFlushPtr != InvalidXLogRecPtr &&
1306 loc <= RecentFlushPtr)
1307 return RecentFlushPtr;
1308
1309 /* Get a more recent flush pointer. */
1310 if (!RecoveryInProgress())
1311 RecentFlushPtr = GetFlushRecPtr();
1312 else
1313 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1314
1315 for (;;)
1316 {
1317 long sleeptime;
1318
1319 /* Clear any already-pending wakeups */
1320 ResetLatch(MyLatch);
1321
1322 CHECK_FOR_INTERRUPTS();
1323
1324 /* Process any requests or signals received recently */
1325 if (ConfigReloadPending)
1326 {
1327 ConfigReloadPending = false;
1328 ProcessConfigFile(PGC_SIGHUP);
1329 SyncRepInitConfig();
1330 }
1331
1332 /* Check for input from the client */
1333 ProcessRepliesIfAny();
1334
1335 /*
1336 * If we're shutting down, trigger pending WAL to be written out,
1337 * otherwise we'd possibly end up waiting for WAL that never gets
1338 * written, because walwriter has shut down already.
1339 */
1340 if (got_STOPPING)
1341 XLogBackgroundFlush();
1342
1343 /* Update our idea of the currently flushed position. */
1344 if (!RecoveryInProgress())
1345 RecentFlushPtr = GetFlushRecPtr();
1346 else
1347 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1348
1349 /*
1350 * If postmaster asked us to stop, don't wait anymore.
1351 *
1352 * It's important to do this check after the recomputation of
1353 * RecentFlushPtr, so we can send all remaining data before shutting
1354 * down.
1355 */
1356 if (got_STOPPING)
1357 break;
1358
1359 /*
1360 * We only send regular messages to the client for full decoded
1361 * transactions, but a synchronous replication and walsender shutdown
1362 * possibly are waiting for a later location. So we send pings
1363 * containing the flush location every now and then.
1364 */
1365 if (MyWalSnd->flush < sentPtr &&
1366 MyWalSnd->write < sentPtr &&
1367 !waiting_for_ping_response)
1368 {
1369 WalSndKeepalive(false);
1370 waiting_for_ping_response = true;
1371 }
1372
1373 /* check whether we're done */
1374 if (loc <= RecentFlushPtr)
1375 break;
1376
1377 /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1378 WalSndCaughtUp = true;
1379
1380 /*
1381 * Try to flush any pending output to the client.
1382 */
1383 if (pq_flush_if_writable() != 0)
1384 WalSndShutdown();
1385
1386 /*
1387 * If we have received CopyDone from the client, sent CopyDone
1388 * ourselves, and the output buffer is empty, it's time to exit
1389 * streaming, so fail the current WAL fetch request.
1390 */
1391 if (streamingDoneReceiving && streamingDoneSending &&
1392 !pq_is_send_pending())
1393 break;
1394
1395 /* die if timeout was reached */
1396 WalSndCheckTimeOut();
1397
1398 /* Send keepalive if the time has come */
1399 WalSndKeepaliveIfNecessary();
1400
1401 /*
1402 * Sleep until something happens or we time out. Also wait for the
1403 * socket becoming writable, if there's still pending output.
1404 * Otherwise we might sit on sendable output data while waiting for
1405 * new WAL to be generated. (But if we have nothing to send, we don't
1406 * want to wake on socket-writable.)
1407 */
1408 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1409
1410 wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1411 WL_SOCKET_READABLE | WL_TIMEOUT;
1412
1413 if (pq_is_send_pending())
1414 wakeEvents |= WL_SOCKET_WRITEABLE;
1415
1416 (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1417 MyProcPort->sock, sleeptime,
1418 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1419 }
1420
1421 /* reactivate latch so WalSndLoop knows to continue */
1422 SetLatch(MyLatch);
1423 return RecentFlushPtr;
1424}
1425
1426/*
1427 * Execute an incoming replication command.
1428 *
1429 * Returns true if the cmd_string was recognized as WalSender command, false
1430 * if not.
1431 */
1432bool
1433exec_replication_command(const char *cmd_string)
1434{
1435 int parse_rc;
1436 Node *cmd_node;
1437 MemoryContext cmd_context;
1438 MemoryContext old_context;
1439
1440 /*
1441 * If WAL sender has been told that shutdown is getting close, switch its
1442 * status accordingly to handle the next replication commands correctly.
1443 */
1444 if (got_STOPPING)
1445 WalSndSetState(WALSNDSTATE_STOPPING);
1446
1447 /*
1448 * Throw error if in stopping mode. We need prevent commands that could
1449 * generate WAL while the shutdown checkpoint is being written. To be
1450 * safe, we just prohibit all new commands.
1451 */
1452 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1453 ereport(ERROR,
1454 (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1455
1456 /*
1457 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1458 * command arrives. Clean up the old stuff if there's anything.
1459 */
1460 SnapBuildClearExportedSnapshot();
1461
1462 CHECK_FOR_INTERRUPTS();
1463
1464 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1465 "Replication command context",
1466 ALLOCSET_DEFAULT_SIZES);
1467 old_context = MemoryContextSwitchTo(cmd_context);
1468
1469 replication_scanner_init(cmd_string);
1470 parse_rc = replication_yyparse();
1471 if (parse_rc != 0)
1472 ereport(ERROR,
1473 (errcode(ERRCODE_SYNTAX_ERROR),
1474 (errmsg_internal("replication command parser returned %d",
1475 parse_rc))));
1476
1477 cmd_node = replication_parse_result;
1478
1479 /*
1480 * Log replication command if log_replication_commands is enabled. Even
1481 * when it's disabled, log the command with DEBUG1 level for backward
1482 * compatibility. Note that SQL commands are not logged here, and will be
1483 * logged later if log_statement is enabled.
1484 */
1485 if (cmd_node->type != T_SQLCmd)
1486 ereport(log_replication_commands ? LOG : DEBUG1,
1487 (errmsg("received replication command: %s", cmd_string)));
1488
1489 /*
1490 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1491 * called outside of transaction the snapshot should be cleared here.
1492 */
1493 if (!IsTransactionBlock())
1494 SnapBuildClearExportedSnapshot();
1495
1496 /*
1497 * For aborted transactions, don't allow anything except pure SQL, the
1498 * exec_simple_query() will handle it correctly.
1499 */
1500 if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1501 ereport(ERROR,
1502 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1503 errmsg("current transaction is aborted, "
1504 "commands ignored until end of transaction block")));
1505
1506 CHECK_FOR_INTERRUPTS();
1507
1508 /*
1509 * Allocate buffers that will be used for each outgoing and incoming
1510 * message. We do this just once per command to reduce palloc overhead.
1511 */
1512 initStringInfo(&output_message);
1513 initStringInfo(&reply_message);
1514 initStringInfo(&tmpbuf);
1515
1516 /* Report to pgstat that this process is running */
1517 pgstat_report_activity(STATE_RUNNING, NULL);
1518
1519 switch (cmd_node->type)
1520 {
1521 case T_IdentifySystemCmd:
1522 IdentifySystem();
1523 break;
1524
1525 case T_BaseBackupCmd:
1526 PreventInTransactionBlock(true, "BASE_BACKUP");
1527 SendBaseBackup((BaseBackupCmd *) cmd_node);
1528 break;
1529
1530 case T_CreateReplicationSlotCmd:
1531 CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1532 break;
1533
1534 case T_DropReplicationSlotCmd:
1535 DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1536 break;
1537
1538 case T_StartReplicationCmd:
1539 {
1540 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1541
1542 PreventInTransactionBlock(true, "START_REPLICATION");
1543
1544 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1545 StartReplication(cmd);
1546 else
1547 StartLogicalReplication(cmd);
1548 break;
1549 }
1550
1551 case T_TimeLineHistoryCmd:
1552 PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1553 SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1554 break;
1555
1556 case T_VariableShowStmt:
1557 {
1558 DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1559 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1560
1561 /* syscache access needs a transaction environment */
1562 StartTransactionCommand();
1563 GetPGVariable(n->name, dest);
1564 CommitTransactionCommand();
1565 }
1566 break;
1567
1568 case T_SQLCmd:
1569 if (MyDatabaseId == InvalidOid)
1570 ereport(ERROR,
1571 (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572
1573 /* Report to pgstat that this process is now idle */
1574 pgstat_report_activity(STATE_IDLE, NULL);
1575
1576 /* Tell the caller that this wasn't a WalSender command. */
1577 return false;
1578
1579 default:
1580 elog(ERROR, "unrecognized replication command node tag: %u",
1581 cmd_node->type);
1582 }
1583
1584 /* done */
1585 MemoryContextSwitchTo(old_context);
1586 MemoryContextDelete(cmd_context);
1587
1588 /* Send CommandComplete message */
1589 EndCommand("SELECT", DestRemote);
1590
1591 /* Report to pgstat that this process is now idle */
1592 pgstat_report_activity(STATE_IDLE, NULL);
1593
1594 return true;
1595}
1596
1597/*
1598 * Process any incoming messages while streaming. Also checks if the remote
1599 * end has closed the connection.
1600 */
1601static void
1602ProcessRepliesIfAny(void)
1603{
1604 unsigned char firstchar;
1605 int r;
1606 bool received = false;
1607
1608 last_processing = GetCurrentTimestamp();
1609
1610 for (;;)
1611 {
1612 pq_startmsgread();
1613 r = pq_getbyte_if_available(&firstchar);
1614 if (r < 0)
1615 {
1616 /* unexpected error or EOF */
1617 ereport(COMMERROR,
1618 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1619 errmsg("unexpected EOF on standby connection")));
1620 proc_exit(0);
1621 }
1622 if (r == 0)
1623 {
1624 /* no data available without blocking */
1625 pq_endmsgread();
1626 break;
1627 }
1628
1629 /* Read the message contents */
1630 resetStringInfo(&reply_message);
1631 if (pq_getmessage(&reply_message, 0))
1632 {
1633 ereport(COMMERROR,
1634 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1635 errmsg("unexpected EOF on standby connection")));
1636 proc_exit(0);
1637 }
1638
1639 /*
1640 * If we already received a CopyDone from the frontend, the frontend
1641 * should not send us anything until we've closed our end of the COPY.
1642 * XXX: In theory, the frontend could already send the next command
1643 * before receiving the CopyDone, but libpq doesn't currently allow
1644 * that.
1645 */
1646 if (streamingDoneReceiving && firstchar != 'X')
1647 ereport(FATAL,
1648 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1649 errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1650 firstchar)));
1651
1652 /* Handle the very limited subset of commands expected in this phase */
1653 switch (firstchar)
1654 {
1655 /*
1656 * 'd' means a standby reply wrapped in a CopyData packet.
1657 */
1658 case 'd':
1659 ProcessStandbyMessage();
1660 received = true;
1661 break;
1662
1663 /*
1664 * CopyDone means the standby requested to finish streaming.
1665 * Reply with CopyDone, if we had not sent that already.
1666 */
1667 case 'c':
1668 if (!streamingDoneSending)
1669 {
1670 pq_putmessage_noblock('c', NULL, 0);
1671 streamingDoneSending = true;
1672 }
1673
1674 streamingDoneReceiving = true;
1675 received = true;
1676 break;
1677
1678 /*
1679 * 'X' means that the standby is closing down the socket.
1680 */
1681 case 'X':
1682 proc_exit(0);
1683
1684 default:
1685 ereport(FATAL,
1686 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1687 errmsg("invalid standby message type \"%c\"",
1688 firstchar)));
1689 }
1690 }
1691
1692 /*
1693 * Save the last reply timestamp if we've received at least one reply.
1694 */
1695 if (received)
1696 {
1697 last_reply_timestamp = last_processing;
1698 waiting_for_ping_response = false;
1699 }
1700}
1701
1702/*
1703 * Process a status update message received from standby.
1704 */
1705static void
1706ProcessStandbyMessage(void)
1707{
1708 char msgtype;
1709
1710 /*
1711 * Check message type from the first byte.
1712 */
1713 msgtype = pq_getmsgbyte(&reply_message);
1714
1715 switch (msgtype)
1716 {
1717 case 'r':
1718 ProcessStandbyReplyMessage();
1719 break;
1720
1721 case 'h':
1722 ProcessStandbyHSFeedbackMessage();
1723 break;
1724
1725 default:
1726 ereport(COMMERROR,
1727 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1728 errmsg("unexpected message type \"%c\"", msgtype)));
1729 proc_exit(0);
1730 }
1731}
1732
1733/*
1734 * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1735 */
1736static void
1737PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1738{
1739 bool changed = false;
1740 ReplicationSlot *slot = MyReplicationSlot;
1741
1742 Assert(lsn != InvalidXLogRecPtr);
1743 SpinLockAcquire(&slot->mutex);
1744 if (slot->data.restart_lsn != lsn)
1745 {
1746 changed = true;
1747 slot->data.restart_lsn = lsn;
1748 }
1749 SpinLockRelease(&slot->mutex);
1750
1751 if (changed)
1752 {
1753 ReplicationSlotMarkDirty();
1754 ReplicationSlotsComputeRequiredLSN();
1755 }
1756
1757 /*
1758 * One could argue that the slot should be saved to disk now, but that'd
1759 * be energy wasted - the worst lost information can do here is give us
1760 * wrong information in a statistics view - we'll just potentially be more
1761 * conservative in removing files.
1762 */
1763}
1764
1765/*
1766 * Regular reply from standby advising of WAL locations on standby server.
1767 */
1768static void
1769ProcessStandbyReplyMessage(void)
1770{
1771 XLogRecPtr writePtr,
1772 flushPtr,
1773 applyPtr;
1774 bool replyRequested;
1775 TimeOffset writeLag,
1776 flushLag,
1777 applyLag;
1778 bool clearLagTimes;
1779 TimestampTz now;
1780 TimestampTz replyTime;
1781
1782 static bool fullyAppliedLastTime = false;
1783
1784 /* the caller already consumed the msgtype byte */
1785 writePtr = pq_getmsgint64(&reply_message);
1786 flushPtr = pq_getmsgint64(&reply_message);
1787 applyPtr = pq_getmsgint64(&reply_message);
1788 replyTime = pq_getmsgint64(&reply_message);
1789 replyRequested = pq_getmsgbyte(&reply_message);
1790
1791 if (log_min_messages <= DEBUG2)
1792 {
1793 char *replyTimeStr;
1794
1795 /* Copy because timestamptz_to_str returns a static buffer */
1796 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1797
1798 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1799 (uint32) (writePtr >> 32), (uint32) writePtr,
1800 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1801 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1802 replyRequested ? " (reply requested)" : "",
1803 replyTimeStr);
1804
1805 pfree(replyTimeStr);
1806 }
1807
1808 /* See if we can compute the round-trip lag for these positions. */
1809 now = GetCurrentTimestamp();
1810 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1811 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1812 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1813
1814 /*
1815 * If the standby reports that it has fully replayed the WAL in two
1816 * consecutive reply messages, then the second such message must result
1817 * from wal_receiver_status_interval expiring on the standby. This is a
1818 * convenient time to forget the lag times measured when it last
1819 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1820 * until more WAL traffic arrives.
1821 */
1822 clearLagTimes = false;
1823 if (applyPtr == sentPtr)
1824 {
1825 if (fullyAppliedLastTime)
1826 clearLagTimes = true;
1827 fullyAppliedLastTime = true;
1828 }
1829 else
1830 fullyAppliedLastTime = false;
1831
1832 /* Send a reply if the standby requested one. */
1833 if (replyRequested)
1834 WalSndKeepalive(false);
1835
1836 /*
1837 * Update shared state for this WalSender process based on reply data from
1838 * standby.
1839 */
1840 {
1841 WalSnd *walsnd = MyWalSnd;
1842
1843 SpinLockAcquire(&walsnd->mutex);
1844 walsnd->write = writePtr;
1845 walsnd->flush = flushPtr;
1846 walsnd->apply = applyPtr;
1847 if (writeLag != -1 || clearLagTimes)
1848 walsnd->writeLag = writeLag;
1849 if (flushLag != -1 || clearLagTimes)
1850 walsnd->flushLag = flushLag;
1851 if (applyLag != -1 || clearLagTimes)
1852 walsnd->applyLag = applyLag;
1853 walsnd->replyTime = replyTime;
1854 SpinLockRelease(&walsnd->mutex);
1855 }
1856
1857 if (!am_cascading_walsender)
1858 SyncRepReleaseWaiters();
1859
1860 /*
1861 * Advance our local xmin horizon when the client confirmed a flush.
1862 */
1863 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1864 {
1865 if (SlotIsLogical(MyReplicationSlot))
1866 LogicalConfirmReceivedLocation(flushPtr);
1867 else
1868 PhysicalConfirmReceivedLocation(flushPtr);
1869 }
1870}
1871
1872/* compute new replication slot xmin horizon if needed */
1873static void
1874PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1875{
1876 bool changed = false;
1877 ReplicationSlot *slot = MyReplicationSlot;
1878
1879 SpinLockAcquire(&slot->mutex);
1880 MyPgXact->xmin = InvalidTransactionId;
1881
1882 /*
1883 * For physical replication we don't need the interlock provided by xmin
1884 * and effective_xmin since the consequences of a missed increase are
1885 * limited to query cancellations, so set both at once.
1886 */
1887 if (!TransactionIdIsNormal(slot->data.xmin) ||
1888 !TransactionIdIsNormal(feedbackXmin) ||
1889 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1890 {
1891 changed = true;
1892 slot->data.xmin = feedbackXmin;
1893 slot->effective_xmin = feedbackXmin;
1894 }
1895 if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1896 !TransactionIdIsNormal(feedbackCatalogXmin) ||
1897 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1898 {
1899 changed = true;
1900 slot->data.catalog_xmin = feedbackCatalogXmin;
1901 slot->effective_catalog_xmin = feedbackCatalogXmin;
1902 }
1903 SpinLockRelease(&slot->mutex);
1904
1905 if (changed)
1906 {
1907 ReplicationSlotMarkDirty();
1908 ReplicationSlotsComputeRequiredXmin(false);
1909 }
1910}
1911
1912/*
1913 * Check that the provided xmin/epoch are sane, that is, not in the future
1914 * and not so far back as to be already wrapped around.
1915 *
1916 * Epoch of nextXid should be same as standby, or if the counter has
1917 * wrapped, then one greater than standby.
1918 *
1919 * This check doesn't care about whether clog exists for these xids
1920 * at all.
1921 */
1922static bool
1923TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
1924{
1925 FullTransactionId nextFullXid;
1926 TransactionId nextXid;
1927 uint32 nextEpoch;
1928
1929 nextFullXid = ReadNextFullTransactionId();
1930 nextXid = XidFromFullTransactionId(nextFullXid);
1931 nextEpoch = EpochFromFullTransactionId(nextFullXid);
1932
1933 if (xid <= nextXid)
1934 {
1935 if (epoch != nextEpoch)
1936 return false;
1937 }
1938 else
1939 {
1940 if (epoch + 1 != nextEpoch)
1941 return false;
1942 }
1943
1944 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1945 return false; /* epoch OK, but it's wrapped around */
1946
1947 return true;
1948}
1949
1950/*
1951 * Hot Standby feedback
1952 */
1953static void
1954ProcessStandbyHSFeedbackMessage(void)
1955{
1956 TransactionId feedbackXmin;
1957 uint32 feedbackEpoch;
1958 TransactionId feedbackCatalogXmin;
1959 uint32 feedbackCatalogEpoch;
1960 TimestampTz replyTime;
1961
1962 /*
1963 * Decipher the reply message. The caller already consumed the msgtype
1964 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1965 * of this message.
1966 */
1967 replyTime = pq_getmsgint64(&reply_message);
1968 feedbackXmin = pq_getmsgint(&reply_message, 4);
1969 feedbackEpoch = pq_getmsgint(&reply_message, 4);
1970 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1971 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1972
1973 if (log_min_messages <= DEBUG2)
1974 {
1975 char *replyTimeStr;
1976
1977 /* Copy because timestamptz_to_str returns a static buffer */
1978 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1979
1980 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
1981 feedbackXmin,
1982 feedbackEpoch,
1983 feedbackCatalogXmin,
1984 feedbackCatalogEpoch,
1985 replyTimeStr);
1986
1987 pfree(replyTimeStr);
1988 }
1989
1990 /*
1991 * Update shared state for this WalSender process based on reply data from
1992 * standby.
1993 */
1994 {
1995 WalSnd *walsnd = MyWalSnd;
1996
1997 SpinLockAcquire(&walsnd->mutex);
1998 walsnd->replyTime = replyTime;
1999 SpinLockRelease(&walsnd->mutex);
2000 }
2001
2002 /*
2003 * Unset WalSender's xmins if the feedback message values are invalid.
2004 * This happens when the downstream turned hot_standby_feedback off.
2005 */
2006 if (!TransactionIdIsNormal(feedbackXmin)
2007 && !TransactionIdIsNormal(feedbackCatalogXmin))
2008 {
2009 MyPgXact->xmin = InvalidTransactionId;
2010 if (MyReplicationSlot != NULL)
2011 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2012 return;
2013 }
2014
2015 /*
2016 * Check that the provided xmin/epoch are sane, that is, not in the future
2017 * and not so far back as to be already wrapped around. Ignore if not.
2018 */
2019 if (TransactionIdIsNormal(feedbackXmin) &&
2020 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2021 return;
2022
2023 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2024 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2025 return;
2026
2027 /*
2028 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2029 * the xmin will be taken into account by GetOldestXmin. This will hold
2030 * back the removal of dead rows and thereby prevent the generation of
2031 * cleanup conflicts on the standby server.
2032 *
2033 * There is a small window for a race condition here: although we just
2034 * checked that feedbackXmin precedes nextXid, the nextXid could have
2035 * gotten advanced between our fetching it and applying the xmin below,
2036 * perhaps far enough to make feedbackXmin wrap around. In that case the
2037 * xmin we set here would be "in the future" and have no effect. No point
2038 * in worrying about this since it's too late to save the desired data
2039 * anyway. Assuming that the standby sends us an increasing sequence of
2040 * xmins, this could only happen during the first reply cycle, else our
2041 * own xmin would prevent nextXid from advancing so far.
2042 *
2043 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2044 * is assumed atomic, and there's no real need to prevent a concurrent
2045 * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2046 * safe, and if we're moving it backwards, well, the data is at risk
2047 * already since a VACUUM could have just finished calling GetOldestXmin.)
2048 *
2049 * If we're using a replication slot we reserve the xmin via that,
2050 * otherwise via the walsender's PGXACT entry. We can only track the
2051 * catalog xmin separately when using a slot, so we store the least of the
2052 * two provided when not using a slot.
2053 *
2054 * XXX: It might make sense to generalize the ephemeral slot concept and
2055 * always use the slot mechanism to handle the feedback xmin.
2056 */
2057 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2058 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2059 else
2060 {
2061 if (TransactionIdIsNormal(feedbackCatalogXmin)
2062 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2063 MyPgXact->xmin = feedbackCatalogXmin;
2064 else
2065 MyPgXact->xmin = feedbackXmin;
2066 }
2067}
2068
2069/*
2070 * Compute how long send/receive loops should sleep.
2071 *
2072 * If wal_sender_timeout is enabled we want to wake up in time to send
2073 * keepalives and to abort the connection if wal_sender_timeout has been
2074 * reached.
2075 */
2076static long
2077WalSndComputeSleeptime(TimestampTz now)
2078{
2079 long sleeptime = 10000; /* 10 s */
2080
2081 if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2082 {
2083 TimestampTz wakeup_time;
2084 long sec_to_timeout;
2085 int microsec_to_timeout;
2086
2087 /*
2088 * At the latest stop sleeping once wal_sender_timeout has been
2089 * reached.
2090 */
2091 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2092 wal_sender_timeout);
2093
2094 /*
2095 * If no ping has been sent yet, wakeup when it's time to do so.
2096 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2097 * the timeout passed without a response.
2098 */
2099 if (!waiting_for_ping_response)
2100 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2101 wal_sender_timeout / 2);
2102
2103 /* Compute relative time until wakeup. */
2104 TimestampDifference(now, wakeup_time,
2105 &sec_to_timeout, &microsec_to_timeout);
2106
2107 sleeptime = sec_to_timeout * 1000 +
2108 microsec_to_timeout / 1000;
2109 }
2110
2111 return sleeptime;
2112}
2113
2114/*
2115 * Check whether there have been responses by the client within
2116 * wal_sender_timeout and shutdown if not. Using last_processing as the
2117 * reference point avoids counting server-side stalls against the client.
2118 * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2119 * postdate last_processing by more than wal_sender_timeout. If that happens,
2120 * the client must reply almost immediately to avoid a timeout. This rarely
2121 * affects the default configuration, under which clients spontaneously send a
2122 * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2123 * could eliminate that problem by recognizing timeout expiration at
2124 * wal_sender_timeout/2 after the keepalive.
2125 */
2126static void
2127WalSndCheckTimeOut(void)
2128{
2129 TimestampTz timeout;
2130
2131 /* don't bail out if we're doing something that doesn't require timeouts */
2132 if (last_reply_timestamp <= 0)
2133 return;
2134
2135 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2136 wal_sender_timeout);
2137
2138 if (wal_sender_timeout > 0 && last_processing >= timeout)
2139 {
2140 /*
2141 * Since typically expiration of replication timeout means
2142 * communication problem, we don't send the error message to the
2143 * standby.
2144 */
2145 ereport(COMMERROR,
2146 (errmsg("terminating walsender process due to replication timeout")));
2147
2148 WalSndShutdown();
2149 }
2150}
2151
2152/* Main loop of walsender process that streams the WAL over Copy messages. */
2153static void
2154WalSndLoop(WalSndSendDataCallback send_data)
2155{
2156 /*
2157 * Initialize the last reply timestamp. That enables timeout processing
2158 * from hereon.
2159 */
2160 last_reply_timestamp = GetCurrentTimestamp();
2161 waiting_for_ping_response = false;
2162
2163 /*
2164 * Loop until we reach the end of this timeline or the client requests to
2165 * stop streaming.
2166 */
2167 for (;;)
2168 {
2169 /* Clear any already-pending wakeups */
2170 ResetLatch(MyLatch);
2171
2172 CHECK_FOR_INTERRUPTS();
2173
2174 /* Process any requests or signals received recently */
2175 if (ConfigReloadPending)
2176 {
2177 ConfigReloadPending = false;
2178 ProcessConfigFile(PGC_SIGHUP);
2179 SyncRepInitConfig();
2180 }
2181
2182 /* Check for input from the client */
2183 ProcessRepliesIfAny();
2184
2185 /*
2186 * If we have received CopyDone from the client, sent CopyDone
2187 * ourselves, and the output buffer is empty, it's time to exit
2188 * streaming.
2189 */
2190 if (streamingDoneReceiving && streamingDoneSending &&
2191 !pq_is_send_pending())
2192 break;
2193
2194 /*
2195 * If we don't have any pending data in the output buffer, try to send
2196 * some more. If there is some, we don't bother to call send_data
2197 * again until we've flushed it ... but we'd better assume we are not
2198 * caught up.
2199 */
2200 if (!pq_is_send_pending())
2201 send_data();
2202 else
2203 WalSndCaughtUp = false;
2204
2205 /* Try to flush pending output to the client */
2206 if (pq_flush_if_writable() != 0)
2207 WalSndShutdown();
2208
2209 /* If nothing remains to be sent right now ... */
2210 if (WalSndCaughtUp && !pq_is_send_pending())
2211 {
2212 /*
2213 * If we're in catchup state, move to streaming. This is an
2214 * important state change for users to know about, since before
2215 * this point data loss might occur if the primary dies and we
2216 * need to failover to the standby. The state change is also
2217 * important for synchronous replication, since commits that
2218 * started to wait at that point might wait for some time.
2219 */
2220 if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2221 {
2222 ereport(DEBUG1,
2223 (errmsg("\"%s\" has now caught up with upstream server",
2224 application_name)));
2225 WalSndSetState(WALSNDSTATE_STREAMING);
2226 }
2227
2228 /*
2229 * When SIGUSR2 arrives, we send any outstanding logs up to the
2230 * shutdown checkpoint record (i.e., the latest record), wait for
2231 * them to be replicated to the standby, and exit. This may be a
2232 * normal termination at shutdown, or a promotion, the walsender
2233 * is not sure which.
2234 */
2235 if (got_SIGUSR2)
2236 WalSndDone(send_data);
2237 }
2238
2239 /* Check for replication timeout. */
2240 WalSndCheckTimeOut();
2241
2242 /* Send keepalive if the time has come */
2243 WalSndKeepaliveIfNecessary();
2244
2245 /*
2246 * We don't block if not caught up, unless there is unsent data
2247 * pending in which case we'd better block until the socket is
2248 * write-ready. This test is only needed for the case where the
2249 * send_data callback handled a subset of the available data but then
2250 * pq_flush_if_writable flushed it all --- we should immediately try
2251 * to send more.
2252 */
2253 if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
2254 {
2255 long sleeptime;
2256 int wakeEvents;
2257
2258 wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2259 WL_SOCKET_READABLE;
2260
2261 /*
2262 * Use fresh timestamp, not last_processed, to reduce the chance
2263 * of reaching wal_sender_timeout before sending a keepalive.
2264 */
2265 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2266
2267 if (pq_is_send_pending())
2268 wakeEvents |= WL_SOCKET_WRITEABLE;
2269
2270 /* Sleep until something happens or we time out */
2271 (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2272 MyProcPort->sock, sleeptime,
2273 WAIT_EVENT_WAL_SENDER_MAIN);
2274 }
2275 }
2276 return;
2277}
2278
2279/* Initialize a per-walsender data structure for this walsender process */
2280static void
2281InitWalSenderSlot(void)
2282{
2283 int i;
2284
2285 /*
2286 * WalSndCtl should be set up already (we inherit this by fork() or
2287 * EXEC_BACKEND mechanism from the postmaster).
2288 */
2289 Assert(WalSndCtl != NULL);
2290 Assert(MyWalSnd == NULL);
2291
2292 /*
2293 * Find a free walsender slot and reserve it. This must not fail due to
2294 * the prior check for free WAL senders in InitProcess().
2295 */
2296 for (i = 0; i < max_wal_senders; i++)
2297 {
2298 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2299
2300 SpinLockAcquire(&walsnd->mutex);
2301
2302 if (walsnd->pid != 0)
2303 {
2304 SpinLockRelease(&walsnd->mutex);
2305 continue;
2306 }
2307 else
2308 {
2309 /*
2310 * Found a free slot. Reserve it for us.
2311 */
2312 walsnd->pid = MyProcPid;
2313 walsnd->sentPtr = InvalidXLogRecPtr;
2314 walsnd->write = InvalidXLogRecPtr;
2315 walsnd->flush = InvalidXLogRecPtr;
2316 walsnd->apply = InvalidXLogRecPtr;
2317 walsnd->writeLag = -1;
2318 walsnd->flushLag = -1;
2319 walsnd->applyLag = -1;
2320 walsnd->state = WALSNDSTATE_STARTUP;
2321 walsnd->latch = &MyProc->procLatch;
2322 walsnd->replyTime = 0;
2323 SpinLockRelease(&walsnd->mutex);
2324 /* don't need the lock anymore */
2325 MyWalSnd = (WalSnd *) walsnd;
2326
2327 break;
2328 }
2329 }
2330
2331 Assert(MyWalSnd != NULL);
2332
2333 /* Arrange to clean up at walsender exit */
2334 on_shmem_exit(WalSndKill, 0);
2335}
2336
2337/* Destroy the per-walsender data structure for this walsender process */
2338static void
2339WalSndKill(int code, Datum arg)
2340{
2341 WalSnd *walsnd = MyWalSnd;
2342
2343 Assert(walsnd != NULL);
2344
2345 MyWalSnd = NULL;
2346
2347 SpinLockAcquire(&walsnd->mutex);
2348 /* clear latch while holding the spinlock, so it can safely be read */
2349 walsnd->latch = NULL;
2350 /* Mark WalSnd struct as no longer being in use. */
2351 walsnd->pid = 0;
2352 SpinLockRelease(&walsnd->mutex);
2353}
2354
2355/*
2356 * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2357 *
2358 * XXX probably this should be improved to suck data directly from the
2359 * WAL buffers when possible.
2360 *
2361 * Will open, and keep open, one WAL segment stored in the global file
2362 * descriptor sendFile. This means if XLogRead is used once, there will
2363 * always be one descriptor left open until the process ends, but never
2364 * more than one.
2365 */
2366static void
2367XLogRead(char *buf, XLogRecPtr startptr, Size count)
2368{
2369 char *p;
2370 XLogRecPtr recptr;
2371 Size nbytes;
2372 XLogSegNo segno;
2373
2374retry:
2375 p = buf;
2376 recptr = startptr;
2377 nbytes = count;
2378
2379 while (nbytes > 0)
2380 {
2381 uint32 startoff;
2382 int segbytes;
2383 int readbytes;
2384
2385 startoff = XLogSegmentOffset(recptr, wal_segment_size);
2386
2387 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2388 {
2389 char path[MAXPGPATH];
2390
2391 /* Switch to another logfile segment */
2392 if (sendFile >= 0)
2393 close(sendFile);
2394
2395 XLByteToSeg(recptr, sendSegNo, wal_segment_size);
2396
2397 /*-------
2398 * When reading from a historic timeline, and there is a timeline
2399 * switch within this segment, read from the WAL segment belonging
2400 * to the new timeline.
2401 *
2402 * For example, imagine that this server is currently on timeline
2403 * 5, and we're streaming timeline 4. The switch from timeline 4
2404 * to 5 happened at 0/13002088. In pg_wal, we have these files:
2405 *
2406 * ...
2407 * 000000040000000000000012
2408 * 000000040000000000000013
2409 * 000000050000000000000013
2410 * 000000050000000000000014
2411 * ...
2412 *
2413 * In this situation, when requested to send the WAL from
2414 * segment 0x13, on timeline 4, we read the WAL from file
2415 * 000000050000000000000013. Archive recovery prefers files from
2416 * newer timelines, so if the segment was restored from the
2417 * archive on this server, the file belonging to the old timeline,
2418 * 000000040000000000000013, might not exist. Their contents are
2419 * equal up to the switchpoint, because at a timeline switch, the
2420 * used portion of the old segment is copied to the new file.
2421 *-------
2422 */
2423 curFileTimeLine = sendTimeLine;
2424 if (sendTimeLineIsHistoric)
2425 {
2426 XLogSegNo endSegNo;
2427
2428 XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
2429 if (sendSegNo == endSegNo)
2430 curFileTimeLine = sendTimeLineNextTLI;
2431 }
2432
2433 XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
2434
2435 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2436 if (sendFile < 0)
2437 {
2438 /*
2439 * If the file is not found, assume it's because the standby
2440 * asked for a too old WAL segment that has already been
2441 * removed or recycled.
2442 */
2443 if (errno == ENOENT)
2444 ereport(ERROR,
2445 (errcode_for_file_access(),
2446 errmsg("requested WAL segment %s has already been removed",
2447 XLogFileNameP(curFileTimeLine, sendSegNo))));
2448 else
2449 ereport(ERROR,
2450 (errcode_for_file_access(),
2451 errmsg("could not open file \"%s\": %m",
2452 path)));
2453 }
2454 sendOff = 0;
2455 }
2456
2457 /* Need to seek in the file? */
2458 if (sendOff != startoff)
2459 {
2460 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2461 ereport(ERROR,
2462 (errcode_for_file_access(),
2463 errmsg("could not seek in log segment %s to offset %u: %m",
2464 XLogFileNameP(curFileTimeLine, sendSegNo),
2465 startoff)));
2466 sendOff = startoff;
2467 }
2468
2469 /* How many bytes are within this segment? */
2470 if (nbytes > (wal_segment_size - startoff))
2471 segbytes = wal_segment_size - startoff;
2472 else
2473 segbytes = nbytes;
2474
2475 pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
2476 readbytes = read(sendFile, p, segbytes);
2477 pgstat_report_wait_end();
2478 if (readbytes < 0)
2479 {
2480 ereport(ERROR,
2481 (errcode_for_file_access(),
2482 errmsg("could not read from log segment %s, offset %u, length %zu: %m",
2483 XLogFileNameP(curFileTimeLine, sendSegNo),
2484 sendOff, (Size) segbytes)));
2485 }
2486 else if (readbytes == 0)
2487 {
2488 ereport(ERROR,
2489 (errcode(ERRCODE_DATA_CORRUPTED),
2490 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
2491 XLogFileNameP(curFileTimeLine, sendSegNo),
2492 sendOff, readbytes, (Size) segbytes)));
2493 }
2494
2495 /* Update state for read */
2496 recptr += readbytes;
2497
2498 sendOff += readbytes;
2499 nbytes -= readbytes;
2500 p += readbytes;
2501 }
2502
2503 /*
2504 * After reading into the buffer, check that what we read was valid. We do
2505 * this after reading, because even though the segment was present when we
2506 * opened it, it might get recycled or removed while we read it. The
2507 * read() succeeds in that case, but the data we tried to read might
2508 * already have been overwritten with new WAL records.
2509 */
2510 XLByteToSeg(startptr, segno, wal_segment_size);
2511 CheckXLogRemoved(segno, ThisTimeLineID);
2512
2513 /*
2514 * During recovery, the currently-open WAL file might be replaced with the
2515 * file of the same name retrieved from archive. So we always need to
2516 * check what we read was valid after reading into the buffer. If it's
2517 * invalid, we try to open and read the file again.
2518 */
2519 if (am_cascading_walsender)
2520 {
2521 WalSnd *walsnd = MyWalSnd;
2522 bool reload;
2523
2524 SpinLockAcquire(&walsnd->mutex);
2525 reload = walsnd->needreload;
2526 walsnd->needreload = false;
2527 SpinLockRelease(&walsnd->mutex);
2528
2529 if (reload && sendFile >= 0)
2530 {
2531 close(sendFile);
2532 sendFile = -1;
2533
2534 goto retry;
2535 }
2536 }
2537}
2538
2539/*
2540 * Send out the WAL in its normal physical/stored form.
2541 *
2542 * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2543 * but not yet sent to the client, and buffer it in the libpq output
2544 * buffer.
2545 *
2546 * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2547 * otherwise WalSndCaughtUp is set to false.
2548 */
2549static void
2550XLogSendPhysical(void)
2551{
2552 XLogRecPtr SendRqstPtr;
2553 XLogRecPtr startptr;
2554 XLogRecPtr endptr;
2555 Size nbytes;
2556
2557 /* If requested switch the WAL sender to the stopping state. */
2558 if (got_STOPPING)
2559 WalSndSetState(WALSNDSTATE_STOPPING);
2560
2561 if (streamingDoneSending)
2562 {
2563 WalSndCaughtUp = true;
2564 return;
2565 }
2566
2567 /* Figure out how far we can safely send the WAL. */
2568 if (sendTimeLineIsHistoric)
2569 {
2570 /*
2571 * Streaming an old timeline that's in this server's history, but is
2572 * not the one we're currently inserting or replaying. It can be
2573 * streamed up to the point where we switched off that timeline.
2574 */
2575 SendRqstPtr = sendTimeLineValidUpto;
2576 }
2577 else if (am_cascading_walsender)
2578 {
2579 /*
2580 * Streaming the latest timeline on a standby.
2581 *
2582 * Attempt to send all WAL that has already been replayed, so that we
2583 * know it's valid. If we're receiving WAL through streaming
2584 * replication, it's also OK to send any WAL that has been received
2585 * but not replayed.
2586 *
2587 * The timeline we're recovering from can change, or we can be
2588 * promoted. In either case, the current timeline becomes historic. We
2589 * need to detect that so that we don't try to stream past the point
2590 * where we switched to another timeline. We check for promotion or
2591 * timeline switch after calculating FlushPtr, to avoid a race
2592 * condition: if the timeline becomes historic just after we checked
2593 * that it was still current, it's still be OK to stream it up to the
2594 * FlushPtr that was calculated before it became historic.
2595 */
2596 bool becameHistoric = false;
2597
2598 SendRqstPtr = GetStandbyFlushRecPtr();
2599
2600 if (!RecoveryInProgress())
2601 {
2602 /*
2603 * We have been promoted. RecoveryInProgress() updated
2604 * ThisTimeLineID to the new current timeline.
2605 */
2606 am_cascading_walsender = false;
2607 becameHistoric = true;
2608 }
2609 else
2610 {
2611 /*
2612 * Still a cascading standby. But is the timeline we're sending
2613 * still the one recovery is recovering from? ThisTimeLineID was
2614 * updated by the GetStandbyFlushRecPtr() call above.
2615 */
2616 if (sendTimeLine != ThisTimeLineID)
2617 becameHistoric = true;
2618 }
2619
2620 if (becameHistoric)
2621 {
2622 /*
2623 * The timeline we were sending has become historic. Read the
2624 * timeline history file of the new timeline to see where exactly
2625 * we forked off from the timeline we were sending.
2626 */
2627 List *history;
2628
2629 history = readTimeLineHistory(ThisTimeLineID);
2630 sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2631
2632 Assert(sendTimeLine < sendTimeLineNextTLI);
2633 list_free_deep(history);
2634
2635 sendTimeLineIsHistoric = true;
2636
2637 SendRqstPtr = sendTimeLineValidUpto;
2638 }
2639 }
2640 else
2641 {
2642 /*
2643 * Streaming the current timeline on a master.
2644 *
2645 * Attempt to send all data that's already been written out and
2646 * fsync'd to disk. We cannot go further than what's been written out
2647 * given the current implementation of XLogRead(). And in any case
2648 * it's unsafe to send WAL that is not securely down to disk on the
2649 * master: if the master subsequently crashes and restarts, standbys
2650 * must not have applied any WAL that got lost on the master.
2651 */
2652 SendRqstPtr = GetFlushRecPtr();
2653 }
2654
2655 /*
2656 * Record the current system time as an approximation of the time at which
2657 * this WAL location was written for the purposes of lag tracking.
2658 *
2659 * In theory we could make XLogFlush() record a time in shmem whenever WAL
2660 * is flushed and we could get that time as well as the LSN when we call
2661 * GetFlushRecPtr() above (and likewise for the cascading standby
2662 * equivalent), but rather than putting any new code into the hot WAL path
2663 * it seems good enough to capture the time here. We should reach this
2664 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2665 * may take some time, we read the WAL flush pointer and take the time
2666 * very close to together here so that we'll get a later position if it is
2667 * still moving.
2668 *
2669 * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2670 * this gives us a cheap approximation for the WAL flush time for this
2671 * LSN.
2672 *
2673 * Note that the LSN is not necessarily the LSN for the data contained in
2674 * the present message; it's the end of the WAL, which might be further
2675 * ahead. All the lag tracking machinery cares about is finding out when
2676 * that arbitrary LSN is eventually reported as written, flushed and
2677 * applied, so that it can measure the elapsed time.
2678 */
2679 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2680
2681 /*
2682 * If this is a historic timeline and we've reached the point where we
2683 * forked to the next timeline, stop streaming.
2684 *
2685 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2686 * startup process will normally replay all WAL that has been received
2687 * from the master, before promoting, but if the WAL streaming is
2688 * terminated at a WAL page boundary, the valid portion of the timeline
2689 * might end in the middle of a WAL record. We might've already sent the
2690 * first half of that partial WAL record to the cascading standby, so that
2691 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2692 * replay the partial WAL record either, so it can still follow our
2693 * timeline switch.
2694 */
2695 if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2696 {
2697 /* close the current file. */
2698 if (sendFile >= 0)
2699 close(sendFile);
2700 sendFile = -1;
2701
2702 /* Send CopyDone */
2703 pq_putmessage_noblock('c', NULL, 0);
2704 streamingDoneSending = true;
2705
2706 WalSndCaughtUp = true;
2707
2708 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2709 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2710 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2711 return;
2712 }
2713
2714 /* Do we have any work to do? */
2715 Assert(sentPtr <= SendRqstPtr);
2716 if (SendRqstPtr <= sentPtr)
2717 {
2718 WalSndCaughtUp = true;
2719 return;
2720 }
2721
2722 /*
2723 * Figure out how much to send in one message. If there's no more than
2724 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2725 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2726 *
2727 * The rounding is not only for performance reasons. Walreceiver relies on
2728 * the fact that we never split a WAL record across two messages. Since a
2729 * long WAL record is split at page boundary into continuation records,
2730 * page boundary is always a safe cut-off point. We also assume that
2731 * SendRqstPtr never points to the middle of a WAL record.
2732 */
2733 startptr = sentPtr;
2734 endptr = startptr;
2735 endptr += MAX_SEND_SIZE;
2736
2737 /* if we went beyond SendRqstPtr, back off */
2738 if (SendRqstPtr <= endptr)
2739 {
2740 endptr = SendRqstPtr;
2741 if (sendTimeLineIsHistoric)
2742 WalSndCaughtUp = false;
2743 else
2744 WalSndCaughtUp = true;
2745 }
2746 else
2747 {
2748 /* round down to page boundary. */
2749 endptr -= (endptr % XLOG_BLCKSZ);
2750 WalSndCaughtUp = false;
2751 }
2752
2753 nbytes = endptr - startptr;
2754 Assert(nbytes <= MAX_SEND_SIZE);
2755
2756 /*
2757 * OK to read and send the slice.
2758 */
2759 resetStringInfo(&output_message);
2760 pq_sendbyte(&output_message, 'w');
2761
2762 pq_sendint64(&output_message, startptr); /* dataStart */
2763 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2764 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2765
2766 /*
2767 * Read the log directly into the output buffer to avoid extra memcpy
2768 * calls.
2769 */
2770 enlargeStringInfo(&output_message, nbytes);
2771 XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2772 output_message.len += nbytes;
2773 output_message.data[output_message.len] = '\0';
2774
2775 /*
2776 * Fill the send timestamp last, so that it is taken as late as possible.
2777 */
2778 resetStringInfo(&tmpbuf);
2779 pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2780 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2781 tmpbuf.data, sizeof(int64));
2782
2783 pq_putmessage_noblock('d', output_message.data, output_message.len);
2784
2785 sentPtr = endptr;
2786
2787 /* Update shared memory status */
2788 {
2789 WalSnd *walsnd = MyWalSnd;
2790
2791 SpinLockAcquire(&walsnd->mutex);
2792 walsnd->sentPtr = sentPtr;
2793 SpinLockRelease(&walsnd->mutex);
2794 }
2795
2796 /* Report progress of XLOG streaming in PS display */
2797 if (update_process_title)
2798 {
2799 char activitymsg[50];
2800
2801 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2802 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2803 set_ps_display(activitymsg, false);
2804 }
2805
2806 return;
2807}
2808
2809/*
2810 * Stream out logically decoded data.
2811 */
2812static void
2813XLogSendLogical(void)
2814{
2815 XLogRecord *record;
2816 char *errm;
2817
2818 /*
2819 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2820 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2821 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2822 * didn't wait - i.e. when we're shutting down.
2823 */
2824 WalSndCaughtUp = false;
2825
2826 record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2827 logical_startptr = InvalidXLogRecPtr;
2828
2829 /* xlog record was invalid */
2830 if (errm != NULL)
2831 elog(ERROR, "%s", errm);
2832
2833 if (record != NULL)
2834 {
2835 /* XXX: Note that logical decoding cannot be used while in recovery */
2836 XLogRecPtr flushPtr = GetFlushRecPtr();
2837
2838 /*
2839 * Note the lack of any call to LagTrackerWrite() which is handled by
2840 * WalSndUpdateProgress which is called by output plugin through
2841 * logical decoding write api.
2842 */
2843 LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2844
2845 sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2846
2847 /*
2848 * If we have sent a record that is at or beyond the flushed point, we
2849 * have caught up.
2850 */
2851 if (sentPtr >= flushPtr)
2852 WalSndCaughtUp = true;
2853 }
2854 else
2855 {
2856 /*
2857 * If the record we just wanted read is at or beyond the flushed
2858 * point, then we're caught up.
2859 */
2860 if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2861 {
2862 WalSndCaughtUp = true;
2863
2864 /*
2865 * Have WalSndLoop() terminate the connection in an orderly
2866 * manner, after writing out all the pending data.
2867 */
2868 if (got_STOPPING)
2869 got_SIGUSR2 = true;
2870 }
2871 }
2872
2873 /* Update shared memory status */
2874 {
2875 WalSnd *walsnd = MyWalSnd;
2876
2877 SpinLockAcquire(&walsnd->mutex);
2878 walsnd->sentPtr = sentPtr;
2879 SpinLockRelease(&walsnd->mutex);
2880 }
2881}
2882
2883/*
2884 * Shutdown if the sender is caught up.
2885 *
2886 * NB: This should only be called when the shutdown signal has been received
2887 * from postmaster.
2888 *
2889 * Note that if we determine that there's still more data to send, this
2890 * function will return control to the caller.
2891 */
2892static void
2893WalSndDone(WalSndSendDataCallback send_data)
2894{
2895 XLogRecPtr replicatedPtr;
2896
2897 /* ... let's just be real sure we're caught up ... */
2898 send_data();
2899
2900 /*
2901 * To figure out whether all WAL has successfully been replicated, check
2902 * flush location if valid, write otherwise. Tools like pg_receivewal will
2903 * usually (unless in synchronous mode) return an invalid flush location.
2904 */
2905 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2906 MyWalSnd->write : MyWalSnd->flush;
2907
2908 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2909 !pq_is_send_pending())
2910 {
2911 /* Inform the standby that XLOG streaming is done */
2912 EndCommand("COPY 0", DestRemote);
2913 pq_flush();
2914
2915 proc_exit(0);
2916 }
2917 if (!waiting_for_ping_response)
2918 {
2919 WalSndKeepalive(true);
2920 waiting_for_ping_response = true;
2921 }
2922}
2923
2924/*
2925 * Returns the latest point in WAL that has been safely flushed to disk, and
2926 * can be sent to the standby. This should only be called when in recovery,
2927 * ie. we're streaming to a cascaded standby.
2928 *
2929 * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2930 * replayed WAL record.
2931 */
2932static XLogRecPtr
2933GetStandbyFlushRecPtr(void)
2934{
2935 XLogRecPtr replayPtr;
2936 TimeLineID replayTLI;
2937 XLogRecPtr receivePtr;
2938 TimeLineID receiveTLI;
2939 XLogRecPtr result;
2940
2941 /*
2942 * We can safely send what's already been replayed. Also, if walreceiver
2943 * is streaming WAL from the same timeline, we can send anything that it
2944 * has streamed, but hasn't been replayed yet.
2945 */
2946
2947 receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2948 replayPtr = GetXLogReplayRecPtr(&replayTLI);
2949
2950 ThisTimeLineID = replayTLI;
2951
2952 result = replayPtr;
2953 if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2954 result = receivePtr;
2955
2956 return result;
2957}
2958
2959/*
2960 * Request walsenders to reload the currently-open WAL file
2961 */
2962void
2963WalSndRqstFileReload(void)
2964{
2965 int i;
2966
2967 for (i = 0; i < max_wal_senders; i++)
2968 {
2969 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2970
2971 SpinLockAcquire(&walsnd->mutex);
2972 if (walsnd->pid == 0)
2973 {
2974 SpinLockRelease(&walsnd->mutex);
2975 continue;
2976 }
2977 walsnd->needreload = true;
2978 SpinLockRelease(&walsnd->mutex);
2979 }
2980}
2981
2982/*
2983 * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2984 */
2985void
2986HandleWalSndInitStopping(void)
2987{
2988 Assert(am_walsender);
2989
2990 /*
2991 * If replication has not yet started, die like with SIGTERM. If
2992 * replication is active, only set a flag and wake up the main loop. It
2993 * will send any outstanding WAL, wait for it to be replicated to the
2994 * standby, and then exit gracefully.
2995 */
2996 if (!replication_active)
2997 kill(MyProcPid, SIGTERM);
2998 else
2999 got_STOPPING = true;
3000}
3001
3002/*
3003 * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3004 * sender should already have been switched to WALSNDSTATE_STOPPING at
3005 * this point.
3006 */
3007static void
3008WalSndLastCycleHandler(SIGNAL_ARGS)
3009{
3010 int save_errno = errno;
3011
3012 got_SIGUSR2 = true;
3013 SetLatch(MyLatch);
3014
3015 errno = save_errno;
3016}
3017
3018/* Set up signal handlers */
3019void
3020WalSndSignals(void)
3021{
3022 /* Set up signal handlers */
3023 pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
3024 * file */
3025 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3026 pqsignal(SIGTERM, die); /* request shutdown */
3027 pqsignal(SIGQUIT, quickdie); /* hard crash time */
3028 InitializeTimeouts(); /* establishes SIGALRM handler */
3029 pqsignal(SIGPIPE, SIG_IGN);
3030 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
3031 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3032 * shutdown */
3033
3034 /* Reset some signals that are accepted by postmaster but not here */
3035 pqsignal(SIGCHLD, SIG_DFL);
3036}
3037
3038/* Report shared-memory space needed by WalSndShmemInit */
3039Size
3040WalSndShmemSize(void)
3041{
3042 Size size = 0;
3043
3044 size = offsetof(WalSndCtlData, walsnds);
3045 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3046
3047 return size;
3048}
3049
3050/* Allocate and initialize walsender-related shared memory */
3051void
3052WalSndShmemInit(void)
3053{
3054 bool found;
3055 int i;
3056
3057 WalSndCtl = (WalSndCtlData *)
3058 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3059
3060 if (!found)
3061 {
3062 /* First time through, so initialize */
3063 MemSet(WalSndCtl, 0, WalSndShmemSize());
3064
3065 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3066 SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3067
3068 for (i = 0; i < max_wal_senders; i++)
3069 {
3070 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3071
3072 SpinLockInit(&walsnd->mutex);
3073 }
3074 }
3075}
3076
3077/*
3078 * Wake up all walsenders
3079 *
3080 * This will be called inside critical sections, so throwing an error is not
3081 * advisable.
3082 */
3083void
3084WalSndWakeup(void)
3085{
3086 int i;
3087
3088 for (i = 0; i < max_wal_senders; i++)
3089 {
3090 Latch *latch;
3091 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3092
3093 /*
3094 * Get latch pointer with spinlock held, for the unlikely case that
3095 * pointer reads aren't atomic (as they're 8 bytes).
3096 */
3097 SpinLockAcquire(&walsnd->mutex);
3098 latch = walsnd->latch;
3099 SpinLockRelease(&walsnd->mutex);
3100
3101 if (latch != NULL)
3102 SetLatch(latch);
3103 }
3104}
3105
3106/*
3107 * Signal all walsenders to move to stopping state.
3108 *
3109 * This will trigger walsenders to move to a state where no further WAL can be
3110 * generated. See this file's header for details.
3111 */
3112void
3113WalSndInitStopping(void)
3114{
3115 int i;
3116
3117 for (i = 0; i < max_wal_senders; i++)
3118 {
3119 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3120 pid_t pid;
3121
3122 SpinLockAcquire(&walsnd->mutex);
3123 pid = walsnd->pid;
3124 SpinLockRelease(&walsnd->mutex);
3125
3126 if (pid == 0)
3127 continue;
3128
3129 SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3130 }
3131}
3132
3133/*
3134 * Wait that all the WAL senders have quit or reached the stopping state. This
3135 * is used by the checkpointer to control when the shutdown checkpoint can
3136 * safely be performed.
3137 */
3138void
3139WalSndWaitStopping(void)
3140{
3141 for (;;)
3142 {
3143 int i;
3144 bool all_stopped = true;
3145
3146 for (i = 0; i < max_wal_senders; i++)
3147 {
3148 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3149
3150 SpinLockAcquire(&walsnd->mutex);
3151
3152 if (walsnd->pid == 0)
3153 {
3154 SpinLockRelease(&walsnd->mutex);
3155 continue;
3156 }
3157
3158 if (walsnd->state != WALSNDSTATE_STOPPING)
3159 {
3160 all_stopped = false;
3161 SpinLockRelease(&walsnd->mutex);
3162 break;
3163 }
3164 SpinLockRelease(&walsnd->mutex);
3165 }
3166
3167 /* safe to leave if confirmation is done for all WAL senders */
3168 if (all_stopped)
3169 return;
3170
3171 pg_usleep(10000L); /* wait for 10 msec */
3172 }
3173}
3174
3175/* Set state for current walsender (only called in walsender) */
3176void
3177WalSndSetState(WalSndState state)
3178{
3179 WalSnd *walsnd = MyWalSnd;
3180
3181 Assert(am_walsender);
3182
3183 if (walsnd->state == state)
3184 return;
3185
3186 SpinLockAcquire(&walsnd->mutex);
3187 walsnd->state = state;
3188 SpinLockRelease(&walsnd->mutex);
3189}
3190
3191/*
3192 * Return a string constant representing the state. This is used
3193 * in system views, and should *not* be translated.
3194 */
3195static const char *
3196WalSndGetStateString(WalSndState state)
3197{
3198 switch (state)
3199 {
3200 case WALSNDSTATE_STARTUP:
3201 return "startup";
3202 case WALSNDSTATE_BACKUP:
3203 return "backup";
3204 case WALSNDSTATE_CATCHUP:
3205 return "catchup";
3206 case WALSNDSTATE_STREAMING:
3207 return "streaming";
3208 case WALSNDSTATE_STOPPING:
3209 return "stopping";
3210 }
3211 return "UNKNOWN";
3212}
3213
3214static Interval *
3215offset_to_interval(TimeOffset offset)
3216{
3217 Interval *result = palloc(sizeof(Interval));
3218
3219 result->month = 0;
3220 result->day = 0;
3221 result->time = offset;
3222
3223 return result;
3224}
3225
3226/*
3227 * Returns activity of walsenders, including pids and xlog locations sent to
3228 * standby servers.
3229 */
3230Datum
3231pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3232{
3233#define PG_STAT_GET_WAL_SENDERS_COLS 12
3234 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3235 TupleDesc tupdesc;
3236 Tuplestorestate *tupstore;
3237 MemoryContext per_query_ctx;
3238 MemoryContext oldcontext;
3239 List *sync_standbys;
3240 int i;
3241
3242 /* check to see if caller supports us returning a tuplestore */
3243 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3244 ereport(ERROR,
3245 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3246 errmsg("set-valued function called in context that cannot accept a set")));
3247 if (!(rsinfo->allowedModes & SFRM_Materialize))
3248 ereport(ERROR,
3249 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3250 errmsg("materialize mode required, but it is not " \
3251 "allowed in this context")));
3252
3253 /* Build a tuple descriptor for our result type */
3254 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3255 elog(ERROR, "return type must be a row type");
3256
3257 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3258 oldcontext = MemoryContextSwitchTo(per_query_ctx);
3259
3260 tupstore = tuplestore_begin_heap(true, false, work_mem);
3261 rsinfo->returnMode = SFRM_Materialize;
3262 rsinfo->setResult = tupstore;
3263 rsinfo->setDesc = tupdesc;
3264
3265 MemoryContextSwitchTo(oldcontext);
3266
3267 /*
3268 * Get the currently active synchronous standbys.
3269 */
3270 LWLockAcquire(SyncRepLock, LW_SHARED);
3271 sync_standbys = SyncRepGetSyncStandbys(NULL);
3272 LWLockRelease(SyncRepLock);
3273
3274 for (i = 0; i < max_wal_senders; i++)
3275 {
3276 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3277 XLogRecPtr sentPtr;
3278 XLogRecPtr write;
3279 XLogRecPtr flush;
3280 XLogRecPtr apply;
3281 TimeOffset writeLag;
3282 TimeOffset flushLag;
3283 TimeOffset applyLag;
3284 int priority;
3285 int pid;
3286 WalSndState state;
3287 TimestampTz replyTime;
3288 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
3289 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3290
3291 SpinLockAcquire(&walsnd->mutex);
3292 if (walsnd->pid == 0)
3293 {
3294 SpinLockRelease(&walsnd->mutex);
3295 continue;
3296 }
3297 pid = walsnd->pid;
3298 sentPtr = walsnd->sentPtr;
3299 state = walsnd->state;
3300 write = walsnd->write;
3301 flush = walsnd->flush;
3302 apply = walsnd->apply;
3303 writeLag = walsnd->writeLag;
3304 flushLag = walsnd->flushLag;
3305 applyLag = walsnd->applyLag;
3306 priority = walsnd->sync_standby_priority;
3307 replyTime = walsnd->replyTime;
3308 SpinLockRelease(&walsnd->mutex);
3309
3310 memset(nulls, 0, sizeof(nulls));
3311 values[0] = Int32GetDatum(pid);
3312
3313 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3314 {
3315 /*
3316 * Only superusers and members of pg_read_all_stats can see
3317 * details. Other users only get the pid value to know it's a
3318 * walsender, but no details.
3319 */
3320 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3321 }
3322 else
3323 {
3324 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3325
3326 if (XLogRecPtrIsInvalid(sentPtr))
3327 nulls[2] = true;
3328 values[2] = LSNGetDatum(sentPtr);
3329
3330 if (XLogRecPtrIsInvalid(write))
3331 nulls[3] = true;
3332 values[3] = LSNGetDatum(write);
3333
3334 if (XLogRecPtrIsInvalid(flush))
3335 nulls[4] = true;
3336 values[4] = LSNGetDatum(flush);
3337
3338 if (XLogRecPtrIsInvalid(apply))
3339 nulls[5] = true;
3340 values[5] = LSNGetDatum(apply);
3341
3342 /*
3343 * Treat a standby such as a pg_basebackup background process
3344 * which always returns an invalid flush location, as an
3345 * asynchronous standby.
3346 */
3347 priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3348
3349 if (writeLag < 0)
3350 nulls[6] = true;
3351 else
3352 values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3353
3354 if (flushLag < 0)
3355 nulls[7] = true;
3356 else
3357 values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3358
3359 if (applyLag < 0)
3360 nulls[8] = true;
3361 else
3362 values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3363
3364 values[9] = Int32GetDatum(priority);
3365
3366 /*
3367 * More easily understood version of standby state. This is purely
3368 * informational.
3369 *
3370 * In quorum-based sync replication, the role of each standby
3371 * listed in synchronous_standby_names can be changing very
3372 * frequently. Any standbys considered as "sync" at one moment can
3373 * be switched to "potential" ones at the next moment. So, it's
3374 * basically useless to report "sync" or "potential" as their sync
3375 * states. We report just "quorum" for them.
3376 */
3377 if (priority == 0)
3378 values[10] = CStringGetTextDatum("async");
3379 else if (list_member_int(sync_standbys, i))
3380 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3381 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3382 else
3383 values[10] = CStringGetTextDatum("potential");
3384
3385 if (replyTime == 0)
3386 nulls[11] = true;
3387 else
3388 values[11] = TimestampTzGetDatum(replyTime);
3389 }
3390
3391 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3392 }
3393
3394 /* clean up and return the tuplestore */
3395 tuplestore_donestoring(tupstore);
3396
3397 return (Datum) 0;
3398}
3399
3400/*
3401 * This function is used to send a keepalive message to standby.
3402 * If requestReply is set, sets a flag in the message requesting the standby
3403 * to send a message back to us, for heartbeat purposes.
3404 */
3405static void
3406WalSndKeepalive(bool requestReply)
3407{
3408 elog(DEBUG2, "sending replication keepalive");
3409
3410 /* construct the message... */
3411 resetStringInfo(&output_message);
3412 pq_sendbyte(&output_message, 'k');
3413 pq_sendint64(&output_message, sentPtr);
3414 pq_sendint64(&output_message, GetCurrentTimestamp());
3415 pq_sendbyte(&output_message, requestReply ? 1 : 0);
3416
3417 /* ... and send it wrapped in CopyData */
3418 pq_putmessage_noblock('d', output_message.data, output_message.len);
3419}
3420
3421/*
3422 * Send keepalive message if too much time has elapsed.
3423 */
3424static void
3425WalSndKeepaliveIfNecessary(void)
3426{
3427 TimestampTz ping_time;
3428
3429 /*
3430 * Don't send keepalive messages if timeouts are globally disabled or
3431 * we're doing something not partaking in timeouts.
3432 */
3433 if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3434 return;
3435
3436 if (waiting_for_ping_response)
3437 return;
3438
3439 /*
3440 * If half of wal_sender_timeout has lapsed without receiving any reply
3441 * from the standby, send a keep-alive message to the standby requesting
3442 * an immediate reply.
3443 */
3444 ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3445 wal_sender_timeout / 2);
3446 if (last_processing >= ping_time)
3447 {
3448 WalSndKeepalive(true);
3449 waiting_for_ping_response = true;
3450
3451 /* Try to flush pending output to the client */
3452 if (pq_flush_if_writable() != 0)
3453 WalSndShutdown();
3454 }
3455}
3456
3457/*
3458 * Record the end of the WAL and the time it was flushed locally, so that
3459 * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3460 * eventually reported to have been written, flushed and applied by the
3461 * standby in a reply message.
3462 */
3463static void
3464LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3465{
3466 bool buffer_full;
3467 int new_write_head;
3468 int i;
3469
3470 if (!am_walsender)
3471 return;
3472
3473 /*
3474 * If the lsn hasn't advanced since last time, then do nothing. This way
3475 * we only record a new sample when new WAL has been written.
3476 */
3477 if (lag_tracker->last_lsn == lsn)
3478 return;
3479 lag_tracker->last_lsn = lsn;
3480
3481 /*
3482 * If advancing the write head of the circular buffer would crash into any
3483 * of the read heads, then the buffer is full. In other words, the
3484 * slowest reader (presumably apply) is the one that controls the release
3485 * of space.
3486 */
3487 new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3488 buffer_full = false;
3489 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3490 {
3491 if (new_write_head == lag_tracker->read_heads[i])
3492 buffer_full = true;
3493 }
3494
3495 /*
3496 * If the buffer is full, for now we just rewind by one slot and overwrite
3497 * the last sample, as a simple (if somewhat uneven) way to lower the
3498 * sampling rate. There may be better adaptive compaction algorithms.
3499 */
3500 if (buffer_full)
3501 {
3502 new_write_head = lag_tracker->write_head;
3503 if (lag_tracker->write_head > 0)
3504 lag_tracker->write_head--;
3505 else
3506 lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3507 }
3508
3509 /* Store a sample at the current write head position. */
3510 lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3511 lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3512 lag_tracker->write_head = new_write_head;
3513}
3514
3515/*
3516 * Find out how much time has elapsed between the moment WAL location 'lsn'
3517 * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3518 * We have a separate read head for each of the reported LSN locations we
3519 * receive in replies from standby; 'head' controls which read head is
3520 * used. Whenever a read head crosses an LSN which was written into the
3521 * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3522 * find out the time this LSN (or an earlier one) was flushed locally, and
3523 * therefore compute the lag.
3524 *
3525 * Return -1 if no new sample data is available, and otherwise the elapsed
3526 * time in microseconds.
3527 */
3528static TimeOffset
3529LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3530{
3531 TimestampTz time = 0;
3532
3533 /* Read all unread samples up to this LSN or end of buffer. */
3534 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3535 lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3536 {
3537 time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3538 lag_tracker->last_read[head] =
3539 lag_tracker->buffer[lag_tracker->read_heads[head]];
3540 lag_tracker->read_heads[head] =
3541 (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3542 }
3543
3544 /*
3545 * If the lag tracker is empty, that means the standby has processed
3546 * everything we've ever sent so we should now clear 'last_read'. If we
3547 * didn't do that, we'd risk using a stale and irrelevant sample for
3548 * interpolation at the beginning of the next burst of WAL after a period
3549 * of idleness.
3550 */
3551 if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3552 lag_tracker->last_read[head].time = 0;
3553
3554 if (time > now)
3555 {
3556 /* If the clock somehow went backwards, treat as not found. */
3557 return -1;
3558 }
3559 else if (time == 0)
3560 {
3561 /*
3562 * We didn't cross a time. If there is a future sample that we
3563 * haven't reached yet, and we've already reached at least one sample,
3564 * let's interpolate the local flushed time. This is mainly useful
3565 * for reporting a completely stuck apply position as having
3566 * increasing lag, since otherwise we'd have to wait for it to
3567 * eventually start moving again and cross one of our samples before
3568 * we can show the lag increasing.
3569 */
3570 if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3571 {
3572 /* There are no future samples, so we can't interpolate. */
3573 return -1;
3574 }
3575 else if (lag_tracker->last_read[head].time != 0)
3576 {
3577 /* We can interpolate between last_read and the next sample. */
3578 double fraction;
3579 WalTimeSample prev = lag_tracker->last_read[head];
3580 WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3581
3582 if (lsn < prev.lsn)
3583 {
3584 /*
3585 * Reported LSNs shouldn't normally go backwards, but it's
3586 * possible when there is a timeline change. Treat as not
3587 * found.
3588 */
3589 return -1;
3590 }
3591
3592 Assert(prev.lsn < next.lsn);
3593
3594 if (prev.time > next.time)
3595 {
3596 /* If the clock somehow went backwards, treat as not found. */
3597 return -1;
3598 }
3599
3600 /* See how far we are between the previous and next samples. */
3601 fraction =
3602 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3603
3604 /* Scale the local flush time proportionally. */
3605 time = (TimestampTz)
3606 ((double) prev.time + (next.time - prev.time) * fraction);
3607 }
3608 else
3609 {
3610 /*
3611 * We have only a future sample, implying that we were entirely
3612 * caught up but and now there is a new burst of WAL and the
3613 * standby hasn't processed the first sample yet. Until the
3614 * standby reaches the future sample the best we can do is report
3615 * the hypothetical lag if that sample were to be replayed now.
3616 */
3617 time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3618 }
3619 }
3620
3621 /* Return the elapsed time since local flush time in microseconds. */
3622 Assert(time != 0);
3623 return now - time;
3624}
3625