1/*-------------------------------------------------------------------------
2 *
3 * walreceiver.c
4 *
5 * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 * is the process in the standby server that takes charge of receiving
7 * XLOG records from a primary server during streaming replication.
8 *
9 * When the startup process determines that it's time to start streaming,
10 * it instructs postmaster to start walreceiver. Walreceiver first connects
11 * to the primary server (it will be served by a walsender process
12 * in the primary server), and then keeps receiving XLOG records and
13 * writing them to the disk as long as the connection is alive. As XLOG
14 * records are received and flushed to disk, it updates the
15 * WalRcv->receivedUpto variable in shared memory, to inform the startup
16 * process of how far it can proceed with XLOG replay.
17 *
18 * If the primary server ends streaming, but doesn't disconnect, walreceiver
19 * goes into "waiting" mode, and waits for the startup process to give new
20 * instructions. The startup process will treat that the same as
21 * disconnection, and will rescan the archive/pg_wal directory. But when the
22 * startup process wants to try streaming replication again, it will just
23 * nudge the existing walreceiver process that's waiting, instead of launching
24 * a new one.
25 *
26 * Normal termination is by SIGTERM, which instructs the walreceiver to
27 * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28 * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29 * of the connection and a FATAL error are treated not as a crash but as
30 * normal operation.
31 *
32 * This file contains the server-facing parts of walreceiver. The libpq-
33 * specific parts are in the libpqwalreceiver module. It's loaded
34 * dynamically to avoid linking the server with libpq.
35 *
36 * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
37 *
38 *
39 * IDENTIFICATION
40 * src/backend/replication/walreceiver.c
41 *
42 *-------------------------------------------------------------------------
43 */
44#include "postgres.h"
45
46#include <signal.h>
47#include <unistd.h>
48
49#include "access/htup_details.h"
50#include "access/timeline.h"
51#include "access/transam.h"
52#include "access/xlog_internal.h"
53#include "catalog/pg_authid.h"
54#include "catalog/pg_type.h"
55#include "common/ip.h"
56#include "funcapi.h"
57#include "libpq/pqformat.h"
58#include "libpq/pqsignal.h"
59#include "miscadmin.h"
60#include "pgstat.h"
61#include "replication/walreceiver.h"
62#include "replication/walsender.h"
63#include "storage/ipc.h"
64#include "storage/pmsignal.h"
65#include "storage/procarray.h"
66#include "utils/builtins.h"
67#include "utils/guc.h"
68#include "utils/pg_lsn.h"
69#include "utils/ps_status.h"
70#include "utils/resowner.h"
71#include "utils/timestamp.h"
72
73
74/* GUC variables */
75int wal_receiver_status_interval;
76int wal_receiver_timeout;
77bool hot_standby_feedback;
78
79/* libpqwalreceiver connection */
80static WalReceiverConn *wrconn = NULL;
81WalReceiverFunctionsType *WalReceiverFunctions = NULL;
82
83#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
84
85/*
86 * These variables are used similarly to openLogFile/SegNo/Off,
87 * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
88 * corresponding the filename of recvFile.
89 */
90static int recvFile = -1;
91static TimeLineID recvFileTLI = 0;
92static XLogSegNo recvSegNo = 0;
93static uint32 recvOff = 0;
94
95/*
96 * Flags set by interrupt handlers of walreceiver for later service in the
97 * main loop.
98 */
99static volatile sig_atomic_t got_SIGHUP = false;
100static volatile sig_atomic_t got_SIGTERM = false;
101
102/*
103 * LogstreamResult indicates the byte positions that we have already
104 * written/fsynced.
105 */
106static struct
107{
108 XLogRecPtr Write; /* last byte + 1 written out in the standby */
109 XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
110} LogstreamResult;
111
112static StringInfoData reply_message;
113static StringInfoData incoming_message;
114
115/* Prototypes for private functions */
116static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
117static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
118static void WalRcvDie(int code, Datum arg);
119static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
120static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
121static void XLogWalRcvFlush(bool dying);
122static void XLogWalRcvSendReply(bool force, bool requestReply);
123static void XLogWalRcvSendHSFeedback(bool immed);
124static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
125
126/* Signal handlers */
127static void WalRcvSigHupHandler(SIGNAL_ARGS);
128static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
129static void WalRcvShutdownHandler(SIGNAL_ARGS);
130static void WalRcvQuickDieHandler(SIGNAL_ARGS);
131
132
133/*
134 * Process any interrupts the walreceiver process may have received.
135 * This should be called any time the process's latch has become set.
136 *
137 * Currently, only SIGTERM is of interest. We can't just exit(1) within the
138 * SIGTERM signal handler, because the signal might arrive in the middle of
139 * some critical operation, like while we're holding a spinlock. Instead, the
140 * signal handler sets a flag variable as well as setting the process's latch.
141 * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
142 * latch has become set. Operations that could block for a long time, such as
143 * reading from a remote server, must pay attention to the latch too; see
144 * libpqrcv_PQgetResult for example.
145 */
146void
147ProcessWalRcvInterrupts(void)
148{
149 /*
150 * Although walreceiver interrupt handling doesn't use the same scheme as
151 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152 * any incoming signals on Win32.
153 */
154 CHECK_FOR_INTERRUPTS();
155
156 if (got_SIGTERM)
157 {
158 ereport(FATAL,
159 (errcode(ERRCODE_ADMIN_SHUTDOWN),
160 errmsg("terminating walreceiver process due to administrator command")));
161 }
162}
163
164
165/* Main entry point for walreceiver process */
166void
167WalReceiverMain(void)
168{
169 char conninfo[MAXCONNINFO];
170 char *tmp_conninfo;
171 char slotname[NAMEDATALEN];
172 XLogRecPtr startpoint;
173 TimeLineID startpointTLI;
174 TimeLineID primaryTLI;
175 bool first_stream;
176 WalRcvData *walrcv = WalRcv;
177 TimestampTz last_recv_timestamp;
178 TimestampTz now;
179 bool ping_sent;
180 char *err;
181 char *sender_host = NULL;
182 int sender_port = 0;
183
184 /*
185 * WalRcv should be set up already (if we are a backend, we inherit this
186 * by fork() or EXEC_BACKEND mechanism from the postmaster).
187 */
188 Assert(walrcv != NULL);
189
190 now = GetCurrentTimestamp();
191
192 /*
193 * Mark walreceiver as running in shared memory.
194 *
195 * Do this as early as possible, so that if we fail later on, we'll set
196 * state to STOPPED. If we die before this, the startup process will keep
197 * waiting for us to start up, until it times out.
198 */
199 SpinLockAcquire(&walrcv->mutex);
200 Assert(walrcv->pid == 0);
201 switch (walrcv->walRcvState)
202 {
203 case WALRCV_STOPPING:
204 /* If we've already been requested to stop, don't start up. */
205 walrcv->walRcvState = WALRCV_STOPPED;
206 /* fall through */
207
208 case WALRCV_STOPPED:
209 SpinLockRelease(&walrcv->mutex);
210 proc_exit(1);
211 break;
212
213 case WALRCV_STARTING:
214 /* The usual case */
215 break;
216
217 case WALRCV_WAITING:
218 case WALRCV_STREAMING:
219 case WALRCV_RESTARTING:
220 default:
221 /* Shouldn't happen */
222 SpinLockRelease(&walrcv->mutex);
223 elog(PANIC, "walreceiver still running according to shared memory state");
224 }
225 /* Advertise our PID so that the startup process can kill us */
226 walrcv->pid = MyProcPid;
227 walrcv->walRcvState = WALRCV_STREAMING;
228
229 /* Fetch information required to start streaming */
230 walrcv->ready_to_display = false;
231 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
232 strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
233 startpoint = walrcv->receiveStart;
234 startpointTLI = walrcv->receiveStartTLI;
235
236 /* Initialise to a sanish value */
237 walrcv->lastMsgSendTime =
238 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
239
240 /* Report the latch to use to awaken this process */
241 walrcv->latch = &MyProc->procLatch;
242
243 SpinLockRelease(&walrcv->mutex);
244
245 /* Arrange to clean up at walreceiver exit */
246 on_shmem_exit(WalRcvDie, 0);
247
248 /* Properly accept or ignore signals the postmaster might send us */
249 pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
250 pqsignal(SIGINT, SIG_IGN);
251 pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
252 pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
253 pqsignal(SIGALRM, SIG_IGN);
254 pqsignal(SIGPIPE, SIG_IGN);
255 pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
256 pqsignal(SIGUSR2, SIG_IGN);
257
258 /* Reset some signals that are accepted by postmaster but not here */
259 pqsignal(SIGCHLD, SIG_DFL);
260
261 /* We allow SIGQUIT (quickdie) at all times */
262 sigdelset(&BlockSig, SIGQUIT);
263
264 /* Load the libpq-specific functions */
265 load_file("libpqwalreceiver", false);
266 if (WalReceiverFunctions == NULL)
267 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
268
269 /* Unblock signals (they were blocked when the postmaster forked us) */
270 PG_SETMASK(&UnBlockSig);
271
272 /* Establish the connection to the primary for XLOG streaming */
273 wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
274 if (!wrconn)
275 ereport(ERROR,
276 (errmsg("could not connect to the primary server: %s", err)));
277
278 /*
279 * Save user-visible connection string. This clobbers the original
280 * conninfo, for security. Also save host and port of the sender server
281 * this walreceiver is connected to.
282 */
283 tmp_conninfo = walrcv_get_conninfo(wrconn);
284 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
285 SpinLockAcquire(&walrcv->mutex);
286 memset(walrcv->conninfo, 0, MAXCONNINFO);
287 if (tmp_conninfo)
288 strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
289
290 memset(walrcv->sender_host, 0, NI_MAXHOST);
291 if (sender_host)
292 strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
293
294 walrcv->sender_port = sender_port;
295 walrcv->ready_to_display = true;
296 SpinLockRelease(&walrcv->mutex);
297
298 if (tmp_conninfo)
299 pfree(tmp_conninfo);
300
301 if (sender_host)
302 pfree(sender_host);
303
304 first_stream = true;
305 for (;;)
306 {
307 char *primary_sysid;
308 char standby_sysid[32];
309 WalRcvStreamOptions options;
310
311 /*
312 * Check that we're connected to a valid server using the
313 * IDENTIFY_SYSTEM replication command.
314 */
315 primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
316
317 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
318 GetSystemIdentifier());
319 if (strcmp(primary_sysid, standby_sysid) != 0)
320 {
321 ereport(ERROR,
322 (errmsg("database system identifier differs between the primary and standby"),
323 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
324 primary_sysid, standby_sysid)));
325 }
326
327 /*
328 * Confirm that the current timeline of the primary is the same or
329 * ahead of ours.
330 */
331 if (primaryTLI < startpointTLI)
332 ereport(ERROR,
333 (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
334 primaryTLI, startpointTLI)));
335
336 /*
337 * Get any missing history files. We do this always, even when we're
338 * not interested in that timeline, so that if we're promoted to
339 * become the master later on, we don't select the same timeline that
340 * was already used in the current master. This isn't bullet-proof -
341 * you'll need some external software to manage your cluster if you
342 * need to ensure that a unique timeline id is chosen in every case,
343 * but let's avoid the confusion of timeline id collisions where we
344 * can.
345 */
346 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
347
348 /*
349 * Start streaming.
350 *
351 * We'll try to start at the requested starting point and timeline,
352 * even if it's different from the server's latest timeline. In case
353 * we've already reached the end of the old timeline, the server will
354 * finish the streaming immediately, and we will go back to await
355 * orders from the startup process. If recovery_target_timeline is
356 * 'latest', the startup process will scan pg_wal and find the new
357 * history file, bump recovery target timeline, and ask us to restart
358 * on the new timeline.
359 */
360 options.logical = false;
361 options.startpoint = startpoint;
362 options.slotname = slotname[0] != '\0' ? slotname : NULL;
363 options.proto.physical.startpointTLI = startpointTLI;
364 ThisTimeLineID = startpointTLI;
365 if (walrcv_startstreaming(wrconn, &options))
366 {
367 if (first_stream)
368 ereport(LOG,
369 (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
370 (uint32) (startpoint >> 32), (uint32) startpoint,
371 startpointTLI)));
372 else
373 ereport(LOG,
374 (errmsg("restarted WAL streaming at %X/%X on timeline %u",
375 (uint32) (startpoint >> 32), (uint32) startpoint,
376 startpointTLI)));
377 first_stream = false;
378
379 /* Initialize LogstreamResult and buffers for processing messages */
380 LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
381 initStringInfo(&reply_message);
382 initStringInfo(&incoming_message);
383
384 /* Initialize the last recv timestamp */
385 last_recv_timestamp = GetCurrentTimestamp();
386 ping_sent = false;
387
388 /* Loop until end-of-streaming or error */
389 for (;;)
390 {
391 char *buf;
392 int len;
393 bool endofwal = false;
394 pgsocket wait_fd = PGINVALID_SOCKET;
395 int rc;
396
397 /*
398 * Exit walreceiver if we're not in recovery. This should not
399 * happen, but cross-check the status here.
400 */
401 if (!RecoveryInProgress())
402 ereport(FATAL,
403 (errmsg("cannot continue WAL streaming, recovery has already ended")));
404
405 /* Process any requests or signals received recently */
406 ProcessWalRcvInterrupts();
407
408 if (got_SIGHUP)
409 {
410 got_SIGHUP = false;
411 ProcessConfigFile(PGC_SIGHUP);
412 XLogWalRcvSendHSFeedback(true);
413 }
414
415 /* See if we can read data immediately */
416 len = walrcv_receive(wrconn, &buf, &wait_fd);
417 if (len != 0)
418 {
419 /*
420 * Process the received data, and any subsequent data we
421 * can read without blocking.
422 */
423 for (;;)
424 {
425 if (len > 0)
426 {
427 /*
428 * Something was received from master, so reset
429 * timeout
430 */
431 last_recv_timestamp = GetCurrentTimestamp();
432 ping_sent = false;
433 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
434 }
435 else if (len == 0)
436 break;
437 else if (len < 0)
438 {
439 ereport(LOG,
440 (errmsg("replication terminated by primary server"),
441 errdetail("End of WAL reached on timeline %u at %X/%X.",
442 startpointTLI,
443 (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
444 endofwal = true;
445 break;
446 }
447 len = walrcv_receive(wrconn, &buf, &wait_fd);
448 }
449
450 /* Let the master know that we received some data. */
451 XLogWalRcvSendReply(false, false);
452
453 /*
454 * If we've written some records, flush them to disk and
455 * let the startup process and primary server know about
456 * them.
457 */
458 XLogWalRcvFlush(false);
459 }
460
461 /* Check if we need to exit the streaming loop. */
462 if (endofwal)
463 break;
464
465 /*
466 * Ideally we would reuse a WaitEventSet object repeatedly
467 * here to avoid the overheads of WaitLatchOrSocket on epoll
468 * systems, but we can't be sure that libpq (or any other
469 * walreceiver implementation) has the same socket (even if
470 * the fd is the same number, it may have been closed and
471 * reopened since the last time). In future, if there is a
472 * function for removing sockets from WaitEventSet, then we
473 * could add and remove just the socket each time, potentially
474 * avoiding some system calls.
475 */
476 Assert(wait_fd != PGINVALID_SOCKET);
477 rc = WaitLatchOrSocket(walrcv->latch,
478 WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
479 WL_TIMEOUT | WL_LATCH_SET,
480 wait_fd,
481 NAPTIME_PER_CYCLE,
482 WAIT_EVENT_WAL_RECEIVER_MAIN);
483 if (rc & WL_LATCH_SET)
484 {
485 ResetLatch(walrcv->latch);
486 ProcessWalRcvInterrupts();
487
488 if (walrcv->force_reply)
489 {
490 /*
491 * The recovery process has asked us to send apply
492 * feedback now. Make sure the flag is really set to
493 * false in shared memory before sending the reply, so
494 * we don't miss a new request for a reply.
495 */
496 walrcv->force_reply = false;
497 pg_memory_barrier();
498 XLogWalRcvSendReply(true, false);
499 }
500 }
501 if (rc & WL_TIMEOUT)
502 {
503 /*
504 * We didn't receive anything new. If we haven't heard
505 * anything from the server for more than
506 * wal_receiver_timeout / 2, ping the server. Also, if
507 * it's been longer than wal_receiver_status_interval
508 * since the last update we sent, send a status update to
509 * the master anyway, to report any progress in applying
510 * WAL.
511 */
512 bool requestReply = false;
513
514 /*
515 * Check if time since last receive from standby has
516 * reached the configured limit.
517 */
518 if (wal_receiver_timeout > 0)
519 {
520 TimestampTz now = GetCurrentTimestamp();
521 TimestampTz timeout;
522
523 timeout =
524 TimestampTzPlusMilliseconds(last_recv_timestamp,
525 wal_receiver_timeout);
526
527 if (now >= timeout)
528 ereport(ERROR,
529 (errmsg("terminating walreceiver due to timeout")));
530
531 /*
532 * We didn't receive anything new, for half of
533 * receiver replication timeout. Ping the server.
534 */
535 if (!ping_sent)
536 {
537 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
538 (wal_receiver_timeout / 2));
539 if (now >= timeout)
540 {
541 requestReply = true;
542 ping_sent = true;
543 }
544 }
545 }
546
547 XLogWalRcvSendReply(requestReply, requestReply);
548 XLogWalRcvSendHSFeedback(false);
549 }
550 }
551
552 /*
553 * The backend finished streaming. Exit streaming COPY-mode from
554 * our side, too.
555 */
556 walrcv_endstreaming(wrconn, &primaryTLI);
557
558 /*
559 * If the server had switched to a new timeline that we didn't
560 * know about when we began streaming, fetch its timeline history
561 * file now.
562 */
563 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
564 }
565 else
566 ereport(LOG,
567 (errmsg("primary server contains no more WAL on requested timeline %u",
568 startpointTLI)));
569
570 /*
571 * End of WAL reached on the requested timeline. Close the last
572 * segment, and await for new orders from the startup process.
573 */
574 if (recvFile >= 0)
575 {
576 char xlogfname[MAXFNAMELEN];
577
578 XLogWalRcvFlush(false);
579 if (close(recvFile) != 0)
580 ereport(PANIC,
581 (errcode_for_file_access(),
582 errmsg("could not close log segment %s: %m",
583 XLogFileNameP(recvFileTLI, recvSegNo))));
584
585 /*
586 * Create .done file forcibly to prevent the streamed segment from
587 * being archived later.
588 */
589 XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
590 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
591 XLogArchiveForceDone(xlogfname);
592 else
593 XLogArchiveNotify(xlogfname);
594 }
595 recvFile = -1;
596
597 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
598 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
599 }
600 /* not reached */
601}
602
603/*
604 * Wait for startup process to set receiveStart and receiveStartTLI.
605 */
606static void
607WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
608{
609 WalRcvData *walrcv = WalRcv;
610 int state;
611
612 SpinLockAcquire(&walrcv->mutex);
613 state = walrcv->walRcvState;
614 if (state != WALRCV_STREAMING)
615 {
616 SpinLockRelease(&walrcv->mutex);
617 if (state == WALRCV_STOPPING)
618 proc_exit(0);
619 else
620 elog(FATAL, "unexpected walreceiver state");
621 }
622 walrcv->walRcvState = WALRCV_WAITING;
623 walrcv->receiveStart = InvalidXLogRecPtr;
624 walrcv->receiveStartTLI = 0;
625 SpinLockRelease(&walrcv->mutex);
626
627 if (update_process_title)
628 set_ps_display("idle", false);
629
630 /*
631 * nudge startup process to notice that we've stopped streaming and are
632 * now waiting for instructions.
633 */
634 WakeupRecovery();
635 for (;;)
636 {
637 ResetLatch(walrcv->latch);
638
639 ProcessWalRcvInterrupts();
640
641 SpinLockAcquire(&walrcv->mutex);
642 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
643 walrcv->walRcvState == WALRCV_WAITING ||
644 walrcv->walRcvState == WALRCV_STOPPING);
645 if (walrcv->walRcvState == WALRCV_RESTARTING)
646 {
647 /* we don't expect primary_conninfo to change */
648 *startpoint = walrcv->receiveStart;
649 *startpointTLI = walrcv->receiveStartTLI;
650 walrcv->walRcvState = WALRCV_STREAMING;
651 SpinLockRelease(&walrcv->mutex);
652 break;
653 }
654 if (walrcv->walRcvState == WALRCV_STOPPING)
655 {
656 /*
657 * We should've received SIGTERM if the startup process wants us
658 * to die, but might as well check it here too.
659 */
660 SpinLockRelease(&walrcv->mutex);
661 exit(1);
662 }
663 SpinLockRelease(&walrcv->mutex);
664
665 (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
666 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
667 }
668
669 if (update_process_title)
670 {
671 char activitymsg[50];
672
673 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
674 (uint32) (*startpoint >> 32),
675 (uint32) *startpoint);
676 set_ps_display(activitymsg, false);
677 }
678}
679
680/*
681 * Fetch any missing timeline history files between 'first' and 'last'
682 * (inclusive) from the server.
683 */
684static void
685WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
686{
687 TimeLineID tli;
688
689 for (tli = first; tli <= last; tli++)
690 {
691 /* there's no history file for timeline 1 */
692 if (tli != 1 && !existsTimeLineHistory(tli))
693 {
694 char *fname;
695 char *content;
696 int len;
697 char expectedfname[MAXFNAMELEN];
698
699 ereport(LOG,
700 (errmsg("fetching timeline history file for timeline %u from primary server",
701 tli)));
702
703 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
704
705 /*
706 * Check that the filename on the master matches what we
707 * calculated ourselves. This is just a sanity check, it should
708 * always match.
709 */
710 TLHistoryFileName(expectedfname, tli);
711 if (strcmp(fname, expectedfname) != 0)
712 ereport(ERROR,
713 (errcode(ERRCODE_PROTOCOL_VIOLATION),
714 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
715 tli)));
716
717 /*
718 * Write the file to pg_wal.
719 */
720 writeTimeLineHistoryFile(tli, content, len);
721
722 pfree(fname);
723 pfree(content);
724 }
725 }
726}
727
728/*
729 * Mark us as STOPPED in shared memory at exit.
730 */
731static void
732WalRcvDie(int code, Datum arg)
733{
734 WalRcvData *walrcv = WalRcv;
735
736 /* Ensure that all WAL records received are flushed to disk */
737 XLogWalRcvFlush(true);
738
739 /* Mark ourselves inactive in shared memory */
740 SpinLockAcquire(&walrcv->mutex);
741 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
742 walrcv->walRcvState == WALRCV_RESTARTING ||
743 walrcv->walRcvState == WALRCV_STARTING ||
744 walrcv->walRcvState == WALRCV_WAITING ||
745 walrcv->walRcvState == WALRCV_STOPPING);
746 Assert(walrcv->pid == MyProcPid);
747 walrcv->walRcvState = WALRCV_STOPPED;
748 walrcv->pid = 0;
749 walrcv->ready_to_display = false;
750 walrcv->latch = NULL;
751 SpinLockRelease(&walrcv->mutex);
752
753 /* Terminate the connection gracefully. */
754 if (wrconn != NULL)
755 walrcv_disconnect(wrconn);
756
757 /* Wake up the startup process to notice promptly that we're gone */
758 WakeupRecovery();
759}
760
761/* SIGHUP: set flag to re-read config file at next convenient time */
762static void
763WalRcvSigHupHandler(SIGNAL_ARGS)
764{
765 got_SIGHUP = true;
766}
767
768
769/* SIGUSR1: used by latch mechanism */
770static void
771WalRcvSigUsr1Handler(SIGNAL_ARGS)
772{
773 int save_errno = errno;
774
775 latch_sigusr1_handler();
776
777 errno = save_errno;
778}
779
780/* SIGTERM: set flag for ProcessWalRcvInterrupts */
781static void
782WalRcvShutdownHandler(SIGNAL_ARGS)
783{
784 int save_errno = errno;
785
786 got_SIGTERM = true;
787
788 if (WalRcv->latch)
789 SetLatch(WalRcv->latch);
790
791 errno = save_errno;
792}
793
794/*
795 * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
796 *
797 * Some backend has bought the farm, so we need to stop what we're doing and
798 * exit.
799 */
800static void
801WalRcvQuickDieHandler(SIGNAL_ARGS)
802{
803 /*
804 * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
805 * because shared memory may be corrupted, so we don't want to try to
806 * clean up our transaction. Just nail the windows shut and get out of
807 * town. The callbacks wouldn't be safe to run from a signal handler,
808 * anyway.
809 *
810 * Note we use _exit(2) not _exit(0). This is to force the postmaster
811 * into a system reset cycle if someone sends a manual SIGQUIT to a random
812 * backend. This is necessary precisely because we don't clean up our
813 * shared memory state. (The "dead man switch" mechanism in pmsignal.c
814 * should ensure the postmaster sees this as a crash, too, but no harm in
815 * being doubly sure.)
816 */
817 _exit(2);
818}
819
820/*
821 * Accept the message from XLOG stream, and process it.
822 */
823static void
824XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
825{
826 int hdrlen;
827 XLogRecPtr dataStart;
828 XLogRecPtr walEnd;
829 TimestampTz sendTime;
830 bool replyRequested;
831
832 resetStringInfo(&incoming_message);
833
834 switch (type)
835 {
836 case 'w': /* WAL records */
837 {
838 /* copy message to StringInfo */
839 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
840 if (len < hdrlen)
841 ereport(ERROR,
842 (errcode(ERRCODE_PROTOCOL_VIOLATION),
843 errmsg_internal("invalid WAL message received from primary")));
844 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
845
846 /* read the fields */
847 dataStart = pq_getmsgint64(&incoming_message);
848 walEnd = pq_getmsgint64(&incoming_message);
849 sendTime = pq_getmsgint64(&incoming_message);
850 ProcessWalSndrMessage(walEnd, sendTime);
851
852 buf += hdrlen;
853 len -= hdrlen;
854 XLogWalRcvWrite(buf, len, dataStart);
855 break;
856 }
857 case 'k': /* Keepalive */
858 {
859 /* copy message to StringInfo */
860 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
861 if (len != hdrlen)
862 ereport(ERROR,
863 (errcode(ERRCODE_PROTOCOL_VIOLATION),
864 errmsg_internal("invalid keepalive message received from primary")));
865 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
866
867 /* read the fields */
868 walEnd = pq_getmsgint64(&incoming_message);
869 sendTime = pq_getmsgint64(&incoming_message);
870 replyRequested = pq_getmsgbyte(&incoming_message);
871
872 ProcessWalSndrMessage(walEnd, sendTime);
873
874 /* If the primary requested a reply, send one immediately */
875 if (replyRequested)
876 XLogWalRcvSendReply(true, false);
877 break;
878 }
879 default:
880 ereport(ERROR,
881 (errcode(ERRCODE_PROTOCOL_VIOLATION),
882 errmsg_internal("invalid replication message type %d",
883 type)));
884 }
885}
886
887/*
888 * Write XLOG data to disk.
889 */
890static void
891XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
892{
893 int startoff;
894 int byteswritten;
895
896 while (nbytes > 0)
897 {
898 int segbytes;
899
900 if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
901 {
902 bool use_existent;
903
904 /*
905 * fsync() and close current file before we switch to next one. We
906 * would otherwise have to reopen this file to fsync it later
907 */
908 if (recvFile >= 0)
909 {
910 char xlogfname[MAXFNAMELEN];
911
912 XLogWalRcvFlush(false);
913
914 /*
915 * XLOG segment files will be re-read by recovery in startup
916 * process soon, so we don't advise the OS to release cache
917 * pages associated with the file like XLogFileClose() does.
918 */
919 if (close(recvFile) != 0)
920 ereport(PANIC,
921 (errcode_for_file_access(),
922 errmsg("could not close log segment %s: %m",
923 XLogFileNameP(recvFileTLI, recvSegNo))));
924
925 /*
926 * Create .done file forcibly to prevent the streamed segment
927 * from being archived later.
928 */
929 XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
930 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
931 XLogArchiveForceDone(xlogfname);
932 else
933 XLogArchiveNotify(xlogfname);
934 }
935 recvFile = -1;
936
937 /* Create/use new log file */
938 XLByteToSeg(recptr, recvSegNo, wal_segment_size);
939 use_existent = true;
940 recvFile = XLogFileInit(recvSegNo, &use_existent, true);
941 recvFileTLI = ThisTimeLineID;
942 recvOff = 0;
943 }
944
945 /* Calculate the start offset of the received logs */
946 startoff = XLogSegmentOffset(recptr, wal_segment_size);
947
948 if (startoff + nbytes > wal_segment_size)
949 segbytes = wal_segment_size - startoff;
950 else
951 segbytes = nbytes;
952
953 /* Need to seek in the file? */
954 if (recvOff != startoff)
955 {
956 if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
957 ereport(PANIC,
958 (errcode_for_file_access(),
959 errmsg("could not seek in log segment %s to offset %u: %m",
960 XLogFileNameP(recvFileTLI, recvSegNo),
961 startoff)));
962 recvOff = startoff;
963 }
964
965 /* OK to write the logs */
966 errno = 0;
967
968 byteswritten = write(recvFile, buf, segbytes);
969 if (byteswritten <= 0)
970 {
971 /* if write didn't set errno, assume no disk space */
972 if (errno == 0)
973 errno = ENOSPC;
974 ereport(PANIC,
975 (errcode_for_file_access(),
976 errmsg("could not write to log segment %s "
977 "at offset %u, length %lu: %m",
978 XLogFileNameP(recvFileTLI, recvSegNo),
979 recvOff, (unsigned long) segbytes)));
980 }
981
982 /* Update state for write */
983 recptr += byteswritten;
984
985 recvOff += byteswritten;
986 nbytes -= byteswritten;
987 buf += byteswritten;
988
989 LogstreamResult.Write = recptr;
990 }
991}
992
993/*
994 * Flush the log to disk.
995 *
996 * If we're in the midst of dying, it's unwise to do anything that might throw
997 * an error, so we skip sending a reply in that case.
998 */
999static void
1000XLogWalRcvFlush(bool dying)
1001{
1002 if (LogstreamResult.Flush < LogstreamResult.Write)
1003 {
1004 WalRcvData *walrcv = WalRcv;
1005
1006 issue_xlog_fsync(recvFile, recvSegNo);
1007
1008 LogstreamResult.Flush = LogstreamResult.Write;
1009
1010 /* Update shared-memory status */
1011 SpinLockAcquire(&walrcv->mutex);
1012 if (walrcv->receivedUpto < LogstreamResult.Flush)
1013 {
1014 walrcv->latestChunkStart = walrcv->receivedUpto;
1015 walrcv->receivedUpto = LogstreamResult.Flush;
1016 walrcv->receivedTLI = ThisTimeLineID;
1017 }
1018 SpinLockRelease(&walrcv->mutex);
1019
1020 /* Signal the startup process and walsender that new WAL has arrived */
1021 WakeupRecovery();
1022 if (AllowCascadeReplication())
1023 WalSndWakeup();
1024
1025 /* Report XLOG streaming progress in PS display */
1026 if (update_process_title)
1027 {
1028 char activitymsg[50];
1029
1030 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1031 (uint32) (LogstreamResult.Write >> 32),
1032 (uint32) LogstreamResult.Write);
1033 set_ps_display(activitymsg, false);
1034 }
1035
1036 /* Also let the master know that we made some progress */
1037 if (!dying)
1038 {
1039 XLogWalRcvSendReply(false, false);
1040 XLogWalRcvSendHSFeedback(false);
1041 }
1042 }
1043}
1044
1045/*
1046 * Send reply message to primary, indicating our current WAL locations, oldest
1047 * xmin and the current time.
1048 *
1049 * If 'force' is not set, the message is only sent if enough time has
1050 * passed since last status update to reach wal_receiver_status_interval.
1051 * If wal_receiver_status_interval is disabled altogether and 'force' is
1052 * false, this is a no-op.
1053 *
1054 * If 'requestReply' is true, requests the server to reply immediately upon
1055 * receiving this message. This is used for heartbearts, when approaching
1056 * wal_receiver_timeout.
1057 */
1058static void
1059XLogWalRcvSendReply(bool force, bool requestReply)
1060{
1061 static XLogRecPtr writePtr = 0;
1062 static XLogRecPtr flushPtr = 0;
1063 XLogRecPtr applyPtr;
1064 static TimestampTz sendTime = 0;
1065 TimestampTz now;
1066
1067 /*
1068 * If the user doesn't want status to be reported to the master, be sure
1069 * to exit before doing anything at all.
1070 */
1071 if (!force && wal_receiver_status_interval <= 0)
1072 return;
1073
1074 /* Get current timestamp. */
1075 now = GetCurrentTimestamp();
1076
1077 /*
1078 * We can compare the write and flush positions to the last message we
1079 * sent without taking any lock, but the apply position requires a spin
1080 * lock, so we don't check that unless something else has changed or 10
1081 * seconds have passed. This means that the apply WAL location will
1082 * appear, from the master's point of view, to lag slightly, but since
1083 * this is only for reporting purposes and only on idle systems, that's
1084 * probably OK.
1085 */
1086 if (!force
1087 && writePtr == LogstreamResult.Write
1088 && flushPtr == LogstreamResult.Flush
1089 && !TimestampDifferenceExceeds(sendTime, now,
1090 wal_receiver_status_interval * 1000))
1091 return;
1092 sendTime = now;
1093
1094 /* Construct a new message */
1095 writePtr = LogstreamResult.Write;
1096 flushPtr = LogstreamResult.Flush;
1097 applyPtr = GetXLogReplayRecPtr(NULL);
1098
1099 resetStringInfo(&reply_message);
1100 pq_sendbyte(&reply_message, 'r');
1101 pq_sendint64(&reply_message, writePtr);
1102 pq_sendint64(&reply_message, flushPtr);
1103 pq_sendint64(&reply_message, applyPtr);
1104 pq_sendint64(&reply_message, GetCurrentTimestamp());
1105 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1106
1107 /* Send it */
1108 elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1109 (uint32) (writePtr >> 32), (uint32) writePtr,
1110 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1111 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1112 requestReply ? " (reply requested)" : "");
1113
1114 walrcv_send(wrconn, reply_message.data, reply_message.len);
1115}
1116
1117/*
1118 * Send hot standby feedback message to primary, plus the current time,
1119 * in case they don't have a watch.
1120 *
1121 * If the user disables feedback, send one final message to tell sender
1122 * to forget about the xmin on this standby. We also send this message
1123 * on first connect because a previous connection might have set xmin
1124 * on a replication slot. (If we're not using a slot it's harmless to
1125 * send a feedback message explicitly setting InvalidTransactionId).
1126 */
1127static void
1128XLogWalRcvSendHSFeedback(bool immed)
1129{
1130 TimestampTz now;
1131 FullTransactionId nextFullXid;
1132 TransactionId nextXid;
1133 uint32 xmin_epoch,
1134 catalog_xmin_epoch;
1135 TransactionId xmin,
1136 catalog_xmin;
1137 static TimestampTz sendTime = 0;
1138
1139 /* initially true so we always send at least one feedback message */
1140 static bool master_has_standby_xmin = true;
1141
1142 /*
1143 * If the user doesn't want status to be reported to the master, be sure
1144 * to exit before doing anything at all.
1145 */
1146 if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1147 !master_has_standby_xmin)
1148 return;
1149
1150 /* Get current timestamp. */
1151 now = GetCurrentTimestamp();
1152
1153 if (!immed)
1154 {
1155 /*
1156 * Send feedback at most once per wal_receiver_status_interval.
1157 */
1158 if (!TimestampDifferenceExceeds(sendTime, now,
1159 wal_receiver_status_interval * 1000))
1160 return;
1161 sendTime = now;
1162 }
1163
1164 /*
1165 * If Hot Standby is not yet accepting connections there is nothing to
1166 * send. Check this after the interval has expired to reduce number of
1167 * calls.
1168 *
1169 * Bailing out here also ensures that we don't send feedback until we've
1170 * read our own replication slot state, so we don't tell the master to
1171 * discard needed xmin or catalog_xmin from any slots that may exist on
1172 * this replica.
1173 */
1174 if (!HotStandbyActive())
1175 return;
1176
1177 /*
1178 * Make the expensive call to get the oldest xmin once we are certain
1179 * everything else has been checked.
1180 */
1181 if (hot_standby_feedback)
1182 {
1183 TransactionId slot_xmin;
1184
1185 /*
1186 * Usually GetOldestXmin() would include both global replication slot
1187 * xmin and catalog_xmin in its calculations, but we want to derive
1188 * separate values for each of those. So we ask for an xmin that
1189 * excludes the catalog_xmin.
1190 */
1191 xmin = GetOldestXmin(NULL,
1192 PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN);
1193
1194 ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1195
1196 if (TransactionIdIsValid(slot_xmin) &&
1197 TransactionIdPrecedes(slot_xmin, xmin))
1198 xmin = slot_xmin;
1199 }
1200 else
1201 {
1202 xmin = InvalidTransactionId;
1203 catalog_xmin = InvalidTransactionId;
1204 }
1205
1206 /*
1207 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1208 * the epoch boundary.
1209 */
1210 nextFullXid = ReadNextFullTransactionId();
1211 nextXid = XidFromFullTransactionId(nextFullXid);
1212 xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1213 catalog_xmin_epoch = xmin_epoch;
1214 if (nextXid < xmin)
1215 xmin_epoch--;
1216 if (nextXid < catalog_xmin)
1217 catalog_xmin_epoch--;
1218
1219 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1220 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1221
1222 /* Construct the message and send it. */
1223 resetStringInfo(&reply_message);
1224 pq_sendbyte(&reply_message, 'h');
1225 pq_sendint64(&reply_message, GetCurrentTimestamp());
1226 pq_sendint32(&reply_message, xmin);
1227 pq_sendint32(&reply_message, xmin_epoch);
1228 pq_sendint32(&reply_message, catalog_xmin);
1229 pq_sendint32(&reply_message, catalog_xmin_epoch);
1230 walrcv_send(wrconn, reply_message.data, reply_message.len);
1231 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1232 master_has_standby_xmin = true;
1233 else
1234 master_has_standby_xmin = false;
1235}
1236
1237/*
1238 * Update shared memory status upon receiving a message from primary.
1239 *
1240 * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1241 * message, reported by primary.
1242 */
1243static void
1244ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1245{
1246 WalRcvData *walrcv = WalRcv;
1247
1248 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1249
1250 /* Update shared-memory status */
1251 SpinLockAcquire(&walrcv->mutex);
1252 if (walrcv->latestWalEnd < walEnd)
1253 walrcv->latestWalEndTime = sendTime;
1254 walrcv->latestWalEnd = walEnd;
1255 walrcv->lastMsgSendTime = sendTime;
1256 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1257 SpinLockRelease(&walrcv->mutex);
1258
1259 if (log_min_messages <= DEBUG2)
1260 {
1261 char *sendtime;
1262 char *receipttime;
1263 int applyDelay;
1264
1265 /* Copy because timestamptz_to_str returns a static buffer */
1266 sendtime = pstrdup(timestamptz_to_str(sendTime));
1267 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1268 applyDelay = GetReplicationApplyDelay();
1269
1270 /* apply delay is not available */
1271 if (applyDelay == -1)
1272 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1273 sendtime,
1274 receipttime,
1275 GetReplicationTransferLatency());
1276 else
1277 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1278 sendtime,
1279 receipttime,
1280 applyDelay,
1281 GetReplicationTransferLatency());
1282
1283 pfree(sendtime);
1284 pfree(receipttime);
1285 }
1286}
1287
1288/*
1289 * Wake up the walreceiver main loop.
1290 *
1291 * This is called by the startup process whenever interesting xlog records
1292 * are applied, so that walreceiver can check if it needs to send an apply
1293 * notification back to the master which may be waiting in a COMMIT with
1294 * synchronous_commit = remote_apply.
1295 */
1296void
1297WalRcvForceReply(void)
1298{
1299 Latch *latch;
1300
1301 WalRcv->force_reply = true;
1302 /* fetching the latch pointer might not be atomic, so use spinlock */
1303 SpinLockAcquire(&WalRcv->mutex);
1304 latch = WalRcv->latch;
1305 SpinLockRelease(&WalRcv->mutex);
1306 if (latch)
1307 SetLatch(latch);
1308}
1309
1310/*
1311 * Return a string constant representing the state. This is used
1312 * in system functions and views, and should *not* be translated.
1313 */
1314static const char *
1315WalRcvGetStateString(WalRcvState state)
1316{
1317 switch (state)
1318 {
1319 case WALRCV_STOPPED:
1320 return "stopped";
1321 case WALRCV_STARTING:
1322 return "starting";
1323 case WALRCV_STREAMING:
1324 return "streaming";
1325 case WALRCV_WAITING:
1326 return "waiting";
1327 case WALRCV_RESTARTING:
1328 return "restarting";
1329 case WALRCV_STOPPING:
1330 return "stopping";
1331 }
1332 return "UNKNOWN";
1333}
1334
1335/*
1336 * Returns activity of WAL receiver, including pid, state and xlog locations
1337 * received from the WAL sender of another server.
1338 */
1339Datum
1340pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1341{
1342 TupleDesc tupdesc;
1343 Datum *values;
1344 bool *nulls;
1345 int pid;
1346 bool ready_to_display;
1347 WalRcvState state;
1348 XLogRecPtr receive_start_lsn;
1349 TimeLineID receive_start_tli;
1350 XLogRecPtr received_lsn;
1351 TimeLineID received_tli;
1352 TimestampTz last_send_time;
1353 TimestampTz last_receipt_time;
1354 XLogRecPtr latest_end_lsn;
1355 TimestampTz latest_end_time;
1356 char sender_host[NI_MAXHOST];
1357 int sender_port = 0;
1358 char slotname[NAMEDATALEN];
1359 char conninfo[MAXCONNINFO];
1360
1361 /* Take a lock to ensure value consistency */
1362 SpinLockAcquire(&WalRcv->mutex);
1363 pid = (int) WalRcv->pid;
1364 ready_to_display = WalRcv->ready_to_display;
1365 state = WalRcv->walRcvState;
1366 receive_start_lsn = WalRcv->receiveStart;
1367 receive_start_tli = WalRcv->receiveStartTLI;
1368 received_lsn = WalRcv->receivedUpto;
1369 received_tli = WalRcv->receivedTLI;
1370 last_send_time = WalRcv->lastMsgSendTime;
1371 last_receipt_time = WalRcv->lastMsgReceiptTime;
1372 latest_end_lsn = WalRcv->latestWalEnd;
1373 latest_end_time = WalRcv->latestWalEndTime;
1374 strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1375 strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1376 sender_port = WalRcv->sender_port;
1377 strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1378 SpinLockRelease(&WalRcv->mutex);
1379
1380 /*
1381 * No WAL receiver (or not ready yet), just return a tuple with NULL
1382 * values
1383 */
1384 if (pid == 0 || !ready_to_display)
1385 PG_RETURN_NULL();
1386
1387 /* determine result type */
1388 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1389 elog(ERROR, "return type must be a row type");
1390
1391 values = palloc0(sizeof(Datum) * tupdesc->natts);
1392 nulls = palloc0(sizeof(bool) * tupdesc->natts);
1393
1394 /* Fetch values */
1395 values[0] = Int32GetDatum(pid);
1396
1397 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1398 {
1399 /*
1400 * Only superusers and members of pg_read_all_stats can see details.
1401 * Other users only get the pid value to know whether it is a WAL
1402 * receiver, but no details.
1403 */
1404 MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1405 }
1406 else
1407 {
1408 values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1409
1410 if (XLogRecPtrIsInvalid(receive_start_lsn))
1411 nulls[2] = true;
1412 else
1413 values[2] = LSNGetDatum(receive_start_lsn);
1414 values[3] = Int32GetDatum(receive_start_tli);
1415 if (XLogRecPtrIsInvalid(received_lsn))
1416 nulls[4] = true;
1417 else
1418 values[4] = LSNGetDatum(received_lsn);
1419 values[5] = Int32GetDatum(received_tli);
1420 if (last_send_time == 0)
1421 nulls[6] = true;
1422 else
1423 values[6] = TimestampTzGetDatum(last_send_time);
1424 if (last_receipt_time == 0)
1425 nulls[7] = true;
1426 else
1427 values[7] = TimestampTzGetDatum(last_receipt_time);
1428 if (XLogRecPtrIsInvalid(latest_end_lsn))
1429 nulls[8] = true;
1430 else
1431 values[8] = LSNGetDatum(latest_end_lsn);
1432 if (latest_end_time == 0)
1433 nulls[9] = true;
1434 else
1435 values[9] = TimestampTzGetDatum(latest_end_time);
1436 if (*slotname == '\0')
1437 nulls[10] = true;
1438 else
1439 values[10] = CStringGetTextDatum(slotname);
1440 if (*sender_host == '\0')
1441 nulls[11] = true;
1442 else
1443 values[11] = CStringGetTextDatum(sender_host);
1444 if (sender_port == 0)
1445 nulls[12] = true;
1446 else
1447 values[12] = Int32GetDatum(sender_port);
1448 if (*conninfo == '\0')
1449 nulls[13] = true;
1450 else
1451 values[13] = CStringGetTextDatum(conninfo);
1452 }
1453
1454 /* Returns the record as Datum */
1455 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1456}
1457