1/*-------------------------------------------------------------------------
2 *
3 * walreceiverfuncs.c
4 *
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
8 *
9 * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
10 *
11 *
12 * IDENTIFICATION
13 * src/backend/replication/walreceiverfuncs.c
14 *
15 *-------------------------------------------------------------------------
16 */
17#include "postgres.h"
18
19#include <sys/stat.h>
20#include <sys/time.h>
21#include <time.h>
22#include <unistd.h>
23#include <signal.h>
24
25#include "access/xlog_internal.h"
26#include "postmaster/startup.h"
27#include "replication/walreceiver.h"
28#include "storage/pmsignal.h"
29#include "storage/shmem.h"
30#include "utils/timestamp.h"
31
32WalRcvData *WalRcv = NULL;
33
34/*
35 * How long to wait for walreceiver to start up after requesting
36 * postmaster to launch it. In seconds.
37 */
38#define WALRCV_STARTUP_TIMEOUT 10
39
40/* Report shared memory space needed by WalRcvShmemInit */
41Size
42WalRcvShmemSize(void)
43{
44 Size size = 0;
45
46 size = add_size(size, sizeof(WalRcvData));
47
48 return size;
49}
50
51/* Allocate and initialize walreceiver-related shared memory */
52void
53WalRcvShmemInit(void)
54{
55 bool found;
56
57 WalRcv = (WalRcvData *)
58 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59
60 if (!found)
61 {
62 /* First time through, so initialize */
63 MemSet(WalRcv, 0, WalRcvShmemSize());
64 WalRcv->walRcvState = WALRCV_STOPPED;
65 SpinLockInit(&WalRcv->mutex);
66 WalRcv->latch = NULL;
67 }
68}
69
70/* Is walreceiver running (or starting up)? */
71bool
72WalRcvRunning(void)
73{
74 WalRcvData *walrcv = WalRcv;
75 WalRcvState state;
76 pg_time_t startTime;
77
78 SpinLockAcquire(&walrcv->mutex);
79
80 state = walrcv->walRcvState;
81 startTime = walrcv->startTime;
82
83 SpinLockRelease(&walrcv->mutex);
84
85 /*
86 * If it has taken too long for walreceiver to start up, give up. Setting
87 * the state to STOPPED ensures that if walreceiver later does start up
88 * after all, it will see that it's not supposed to be running and die
89 * without doing anything.
90 */
91 if (state == WALRCV_STARTING)
92 {
93 pg_time_t now = (pg_time_t) time(NULL);
94
95 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
96 {
97 SpinLockAcquire(&walrcv->mutex);
98
99 if (walrcv->walRcvState == WALRCV_STARTING)
100 state = walrcv->walRcvState = WALRCV_STOPPED;
101
102 SpinLockRelease(&walrcv->mutex);
103 }
104 }
105
106 if (state != WALRCV_STOPPED)
107 return true;
108 else
109 return false;
110}
111
112/*
113 * Is walreceiver running and streaming (or at least attempting to connect,
114 * or starting up)?
115 */
116bool
117WalRcvStreaming(void)
118{
119 WalRcvData *walrcv = WalRcv;
120 WalRcvState state;
121 pg_time_t startTime;
122
123 SpinLockAcquire(&walrcv->mutex);
124
125 state = walrcv->walRcvState;
126 startTime = walrcv->startTime;
127
128 SpinLockRelease(&walrcv->mutex);
129
130 /*
131 * If it has taken too long for walreceiver to start up, give up. Setting
132 * the state to STOPPED ensures that if walreceiver later does start up
133 * after all, it will see that it's not supposed to be running and die
134 * without doing anything.
135 */
136 if (state == WALRCV_STARTING)
137 {
138 pg_time_t now = (pg_time_t) time(NULL);
139
140 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
141 {
142 SpinLockAcquire(&walrcv->mutex);
143
144 if (walrcv->walRcvState == WALRCV_STARTING)
145 state = walrcv->walRcvState = WALRCV_STOPPED;
146
147 SpinLockRelease(&walrcv->mutex);
148 }
149 }
150
151 if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
152 state == WALRCV_RESTARTING)
153 return true;
154 else
155 return false;
156}
157
158/*
159 * Stop walreceiver (if running) and wait for it to die.
160 * Executed by the Startup process.
161 */
162void
163ShutdownWalRcv(void)
164{
165 WalRcvData *walrcv = WalRcv;
166 pid_t walrcvpid = 0;
167
168 /*
169 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
170 * mode once it's finished, and will also request postmaster to not
171 * restart itself.
172 */
173 SpinLockAcquire(&walrcv->mutex);
174 switch (walrcv->walRcvState)
175 {
176 case WALRCV_STOPPED:
177 break;
178 case WALRCV_STARTING:
179 walrcv->walRcvState = WALRCV_STOPPED;
180 break;
181
182 case WALRCV_STREAMING:
183 case WALRCV_WAITING:
184 case WALRCV_RESTARTING:
185 walrcv->walRcvState = WALRCV_STOPPING;
186 /* fall through */
187 case WALRCV_STOPPING:
188 walrcvpid = walrcv->pid;
189 break;
190 }
191 SpinLockRelease(&walrcv->mutex);
192
193 /*
194 * Signal walreceiver process if it was still running.
195 */
196 if (walrcvpid != 0)
197 kill(walrcvpid, SIGTERM);
198
199 /*
200 * Wait for walreceiver to acknowledge its death by setting state to
201 * WALRCV_STOPPED.
202 */
203 while (WalRcvRunning())
204 {
205 /*
206 * This possibly-long loop needs to handle interrupts of startup
207 * process.
208 */
209 HandleStartupProcInterrupts();
210
211 pg_usleep(100000); /* 100ms */
212 }
213}
214
215/*
216 * Request postmaster to start walreceiver.
217 *
218 * recptr indicates the position where streaming should begin, conninfo
219 * is a libpq connection string to use, and slotname is, optionally, the name
220 * of a replication slot to acquire.
221 */
222void
223RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
224 const char *slotname)
225{
226 WalRcvData *walrcv = WalRcv;
227 bool launch = false;
228 pg_time_t now = (pg_time_t) time(NULL);
229 Latch *latch;
230
231 /*
232 * We always start at the beginning of the segment. That prevents a broken
233 * segment (i.e., with no records in the first half of a segment) from
234 * being created by XLOG streaming, which might cause trouble later on if
235 * the segment is e.g archived.
236 */
237 if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
238 recptr -= XLogSegmentOffset(recptr, wal_segment_size);
239
240 SpinLockAcquire(&walrcv->mutex);
241
242 /* It better be stopped if we try to restart it */
243 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
244 walrcv->walRcvState == WALRCV_WAITING);
245
246 if (conninfo != NULL)
247 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
248 else
249 walrcv->conninfo[0] = '\0';
250
251 if (slotname != NULL)
252 strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
253 else
254 walrcv->slotname[0] = '\0';
255
256 if (walrcv->walRcvState == WALRCV_STOPPED)
257 {
258 launch = true;
259 walrcv->walRcvState = WALRCV_STARTING;
260 }
261 else
262 walrcv->walRcvState = WALRCV_RESTARTING;
263 walrcv->startTime = now;
264
265 /*
266 * If this is the first startup of walreceiver (on this timeline),
267 * initialize receivedUpto and latestChunkStart to the starting point.
268 */
269 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
270 {
271 walrcv->receivedUpto = recptr;
272 walrcv->receivedTLI = tli;
273 walrcv->latestChunkStart = recptr;
274 }
275 walrcv->receiveStart = recptr;
276 walrcv->receiveStartTLI = tli;
277
278 latch = walrcv->latch;
279
280 SpinLockRelease(&walrcv->mutex);
281
282 if (launch)
283 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
284 else if (latch)
285 SetLatch(latch);
286}
287
288/*
289 * Returns the last+1 byte position that walreceiver has written.
290 *
291 * Optionally, returns the previous chunk start, that is the first byte
292 * written in the most recent walreceiver flush cycle. Callers not
293 * interested in that value may pass NULL for latestChunkStart. Same for
294 * receiveTLI.
295 */
296XLogRecPtr
297GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
298{
299 WalRcvData *walrcv = WalRcv;
300 XLogRecPtr recptr;
301
302 SpinLockAcquire(&walrcv->mutex);
303 recptr = walrcv->receivedUpto;
304 if (latestChunkStart)
305 *latestChunkStart = walrcv->latestChunkStart;
306 if (receiveTLI)
307 *receiveTLI = walrcv->receivedTLI;
308 SpinLockRelease(&walrcv->mutex);
309
310 return recptr;
311}
312
313/*
314 * Returns the replication apply delay in ms or -1
315 * if the apply delay info is not available
316 */
317int
318GetReplicationApplyDelay(void)
319{
320 WalRcvData *walrcv = WalRcv;
321 XLogRecPtr receivePtr;
322 XLogRecPtr replayPtr;
323
324 long secs;
325 int usecs;
326
327 TimestampTz chunkReplayStartTime;
328
329 SpinLockAcquire(&walrcv->mutex);
330 receivePtr = walrcv->receivedUpto;
331 SpinLockRelease(&walrcv->mutex);
332
333 replayPtr = GetXLogReplayRecPtr(NULL);
334
335 if (receivePtr == replayPtr)
336 return 0;
337
338 chunkReplayStartTime = GetCurrentChunkReplayStartTime();
339
340 if (chunkReplayStartTime == 0)
341 return -1;
342
343 TimestampDifference(chunkReplayStartTime,
344 GetCurrentTimestamp(),
345 &secs, &usecs);
346
347 return (((int) secs * 1000) + (usecs / 1000));
348}
349
350/*
351 * Returns the network latency in ms, note that this includes any
352 * difference in clock settings between the servers, as well as timezone.
353 */
354int
355GetReplicationTransferLatency(void)
356{
357 WalRcvData *walrcv = WalRcv;
358
359 TimestampTz lastMsgSendTime;
360 TimestampTz lastMsgReceiptTime;
361
362 long secs = 0;
363 int usecs = 0;
364 int ms;
365
366 SpinLockAcquire(&walrcv->mutex);
367 lastMsgSendTime = walrcv->lastMsgSendTime;
368 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
369 SpinLockRelease(&walrcv->mutex);
370
371 TimestampDifference(lastMsgSendTime,
372 lastMsgReceiptTime,
373 &secs, &usecs);
374
375 ms = ((int) secs * 1000) + (usecs / 1000);
376
377 return ms;
378}
379