| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * walreceiverfuncs.c |
| 4 | * |
| 5 | * This file contains functions used by the startup process to communicate |
| 6 | * with the walreceiver process. Functions implementing walreceiver itself |
| 7 | * are in walreceiver.c. |
| 8 | * |
| 9 | * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group |
| 10 | * |
| 11 | * |
| 12 | * IDENTIFICATION |
| 13 | * src/backend/replication/walreceiverfuncs.c |
| 14 | * |
| 15 | *------------------------------------------------------------------------- |
| 16 | */ |
| 17 | #include "postgres.h" |
| 18 | |
| 19 | #include <sys/stat.h> |
| 20 | #include <sys/time.h> |
| 21 | #include <time.h> |
| 22 | #include <unistd.h> |
| 23 | #include <signal.h> |
| 24 | |
| 25 | #include "access/xlog_internal.h" |
| 26 | #include "postmaster/startup.h" |
| 27 | #include "replication/walreceiver.h" |
| 28 | #include "storage/pmsignal.h" |
| 29 | #include "storage/shmem.h" |
| 30 | #include "utils/timestamp.h" |
| 31 | |
| 32 | WalRcvData *WalRcv = NULL; |
| 33 | |
| 34 | /* |
| 35 | * How long to wait for walreceiver to start up after requesting |
| 36 | * postmaster to launch it. In seconds. |
| 37 | */ |
| 38 | #define WALRCV_STARTUP_TIMEOUT 10 |
| 39 | |
| 40 | /* Report shared memory space needed by WalRcvShmemInit */ |
| 41 | Size |
| 42 | WalRcvShmemSize(void) |
| 43 | { |
| 44 | Size size = 0; |
| 45 | |
| 46 | size = add_size(size, sizeof(WalRcvData)); |
| 47 | |
| 48 | return size; |
| 49 | } |
| 50 | |
| 51 | /* Allocate and initialize walreceiver-related shared memory */ |
| 52 | void |
| 53 | WalRcvShmemInit(void) |
| 54 | { |
| 55 | bool found; |
| 56 | |
| 57 | WalRcv = (WalRcvData *) |
| 58 | ShmemInitStruct("Wal Receiver Ctl" , WalRcvShmemSize(), &found); |
| 59 | |
| 60 | if (!found) |
| 61 | { |
| 62 | /* First time through, so initialize */ |
| 63 | MemSet(WalRcv, 0, WalRcvShmemSize()); |
| 64 | WalRcv->walRcvState = WALRCV_STOPPED; |
| 65 | SpinLockInit(&WalRcv->mutex); |
| 66 | WalRcv->latch = NULL; |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | /* Is walreceiver running (or starting up)? */ |
| 71 | bool |
| 72 | WalRcvRunning(void) |
| 73 | { |
| 74 | WalRcvData *walrcv = WalRcv; |
| 75 | WalRcvState state; |
| 76 | pg_time_t startTime; |
| 77 | |
| 78 | SpinLockAcquire(&walrcv->mutex); |
| 79 | |
| 80 | state = walrcv->walRcvState; |
| 81 | startTime = walrcv->startTime; |
| 82 | |
| 83 | SpinLockRelease(&walrcv->mutex); |
| 84 | |
| 85 | /* |
| 86 | * If it has taken too long for walreceiver to start up, give up. Setting |
| 87 | * the state to STOPPED ensures that if walreceiver later does start up |
| 88 | * after all, it will see that it's not supposed to be running and die |
| 89 | * without doing anything. |
| 90 | */ |
| 91 | if (state == WALRCV_STARTING) |
| 92 | { |
| 93 | pg_time_t now = (pg_time_t) time(NULL); |
| 94 | |
| 95 | if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) |
| 96 | { |
| 97 | SpinLockAcquire(&walrcv->mutex); |
| 98 | |
| 99 | if (walrcv->walRcvState == WALRCV_STARTING) |
| 100 | state = walrcv->walRcvState = WALRCV_STOPPED; |
| 101 | |
| 102 | SpinLockRelease(&walrcv->mutex); |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | if (state != WALRCV_STOPPED) |
| 107 | return true; |
| 108 | else |
| 109 | return false; |
| 110 | } |
| 111 | |
| 112 | /* |
| 113 | * Is walreceiver running and streaming (or at least attempting to connect, |
| 114 | * or starting up)? |
| 115 | */ |
| 116 | bool |
| 117 | WalRcvStreaming(void) |
| 118 | { |
| 119 | WalRcvData *walrcv = WalRcv; |
| 120 | WalRcvState state; |
| 121 | pg_time_t startTime; |
| 122 | |
| 123 | SpinLockAcquire(&walrcv->mutex); |
| 124 | |
| 125 | state = walrcv->walRcvState; |
| 126 | startTime = walrcv->startTime; |
| 127 | |
| 128 | SpinLockRelease(&walrcv->mutex); |
| 129 | |
| 130 | /* |
| 131 | * If it has taken too long for walreceiver to start up, give up. Setting |
| 132 | * the state to STOPPED ensures that if walreceiver later does start up |
| 133 | * after all, it will see that it's not supposed to be running and die |
| 134 | * without doing anything. |
| 135 | */ |
| 136 | if (state == WALRCV_STARTING) |
| 137 | { |
| 138 | pg_time_t now = (pg_time_t) time(NULL); |
| 139 | |
| 140 | if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) |
| 141 | { |
| 142 | SpinLockAcquire(&walrcv->mutex); |
| 143 | |
| 144 | if (walrcv->walRcvState == WALRCV_STARTING) |
| 145 | state = walrcv->walRcvState = WALRCV_STOPPED; |
| 146 | |
| 147 | SpinLockRelease(&walrcv->mutex); |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | if (state == WALRCV_STREAMING || state == WALRCV_STARTING || |
| 152 | state == WALRCV_RESTARTING) |
| 153 | return true; |
| 154 | else |
| 155 | return false; |
| 156 | } |
| 157 | |
| 158 | /* |
| 159 | * Stop walreceiver (if running) and wait for it to die. |
| 160 | * Executed by the Startup process. |
| 161 | */ |
| 162 | void |
| 163 | ShutdownWalRcv(void) |
| 164 | { |
| 165 | WalRcvData *walrcv = WalRcv; |
| 166 | pid_t walrcvpid = 0; |
| 167 | |
| 168 | /* |
| 169 | * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED |
| 170 | * mode once it's finished, and will also request postmaster to not |
| 171 | * restart itself. |
| 172 | */ |
| 173 | SpinLockAcquire(&walrcv->mutex); |
| 174 | switch (walrcv->walRcvState) |
| 175 | { |
| 176 | case WALRCV_STOPPED: |
| 177 | break; |
| 178 | case WALRCV_STARTING: |
| 179 | walrcv->walRcvState = WALRCV_STOPPED; |
| 180 | break; |
| 181 | |
| 182 | case WALRCV_STREAMING: |
| 183 | case WALRCV_WAITING: |
| 184 | case WALRCV_RESTARTING: |
| 185 | walrcv->walRcvState = WALRCV_STOPPING; |
| 186 | /* fall through */ |
| 187 | case WALRCV_STOPPING: |
| 188 | walrcvpid = walrcv->pid; |
| 189 | break; |
| 190 | } |
| 191 | SpinLockRelease(&walrcv->mutex); |
| 192 | |
| 193 | /* |
| 194 | * Signal walreceiver process if it was still running. |
| 195 | */ |
| 196 | if (walrcvpid != 0) |
| 197 | kill(walrcvpid, SIGTERM); |
| 198 | |
| 199 | /* |
| 200 | * Wait for walreceiver to acknowledge its death by setting state to |
| 201 | * WALRCV_STOPPED. |
| 202 | */ |
| 203 | while (WalRcvRunning()) |
| 204 | { |
| 205 | /* |
| 206 | * This possibly-long loop needs to handle interrupts of startup |
| 207 | * process. |
| 208 | */ |
| 209 | HandleStartupProcInterrupts(); |
| 210 | |
| 211 | pg_usleep(100000); /* 100ms */ |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | /* |
| 216 | * Request postmaster to start walreceiver. |
| 217 | * |
| 218 | * recptr indicates the position where streaming should begin, conninfo |
| 219 | * is a libpq connection string to use, and slotname is, optionally, the name |
| 220 | * of a replication slot to acquire. |
| 221 | */ |
| 222 | void |
| 223 | RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, |
| 224 | const char *slotname) |
| 225 | { |
| 226 | WalRcvData *walrcv = WalRcv; |
| 227 | bool launch = false; |
| 228 | pg_time_t now = (pg_time_t) time(NULL); |
| 229 | Latch *latch; |
| 230 | |
| 231 | /* |
| 232 | * We always start at the beginning of the segment. That prevents a broken |
| 233 | * segment (i.e., with no records in the first half of a segment) from |
| 234 | * being created by XLOG streaming, which might cause trouble later on if |
| 235 | * the segment is e.g archived. |
| 236 | */ |
| 237 | if (XLogSegmentOffset(recptr, wal_segment_size) != 0) |
| 238 | recptr -= XLogSegmentOffset(recptr, wal_segment_size); |
| 239 | |
| 240 | SpinLockAcquire(&walrcv->mutex); |
| 241 | |
| 242 | /* It better be stopped if we try to restart it */ |
| 243 | Assert(walrcv->walRcvState == WALRCV_STOPPED || |
| 244 | walrcv->walRcvState == WALRCV_WAITING); |
| 245 | |
| 246 | if (conninfo != NULL) |
| 247 | strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); |
| 248 | else |
| 249 | walrcv->conninfo[0] = '\0'; |
| 250 | |
| 251 | if (slotname != NULL) |
| 252 | strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN); |
| 253 | else |
| 254 | walrcv->slotname[0] = '\0'; |
| 255 | |
| 256 | if (walrcv->walRcvState == WALRCV_STOPPED) |
| 257 | { |
| 258 | launch = true; |
| 259 | walrcv->walRcvState = WALRCV_STARTING; |
| 260 | } |
| 261 | else |
| 262 | walrcv->walRcvState = WALRCV_RESTARTING; |
| 263 | walrcv->startTime = now; |
| 264 | |
| 265 | /* |
| 266 | * If this is the first startup of walreceiver (on this timeline), |
| 267 | * initialize receivedUpto and latestChunkStart to the starting point. |
| 268 | */ |
| 269 | if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) |
| 270 | { |
| 271 | walrcv->receivedUpto = recptr; |
| 272 | walrcv->receivedTLI = tli; |
| 273 | walrcv->latestChunkStart = recptr; |
| 274 | } |
| 275 | walrcv->receiveStart = recptr; |
| 276 | walrcv->receiveStartTLI = tli; |
| 277 | |
| 278 | latch = walrcv->latch; |
| 279 | |
| 280 | SpinLockRelease(&walrcv->mutex); |
| 281 | |
| 282 | if (launch) |
| 283 | SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); |
| 284 | else if (latch) |
| 285 | SetLatch(latch); |
| 286 | } |
| 287 | |
| 288 | /* |
| 289 | * Returns the last+1 byte position that walreceiver has written. |
| 290 | * |
| 291 | * Optionally, returns the previous chunk start, that is the first byte |
| 292 | * written in the most recent walreceiver flush cycle. Callers not |
| 293 | * interested in that value may pass NULL for latestChunkStart. Same for |
| 294 | * receiveTLI. |
| 295 | */ |
| 296 | XLogRecPtr |
| 297 | GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) |
| 298 | { |
| 299 | WalRcvData *walrcv = WalRcv; |
| 300 | XLogRecPtr recptr; |
| 301 | |
| 302 | SpinLockAcquire(&walrcv->mutex); |
| 303 | recptr = walrcv->receivedUpto; |
| 304 | if (latestChunkStart) |
| 305 | *latestChunkStart = walrcv->latestChunkStart; |
| 306 | if (receiveTLI) |
| 307 | *receiveTLI = walrcv->receivedTLI; |
| 308 | SpinLockRelease(&walrcv->mutex); |
| 309 | |
| 310 | return recptr; |
| 311 | } |
| 312 | |
| 313 | /* |
| 314 | * Returns the replication apply delay in ms or -1 |
| 315 | * if the apply delay info is not available |
| 316 | */ |
| 317 | int |
| 318 | GetReplicationApplyDelay(void) |
| 319 | { |
| 320 | WalRcvData *walrcv = WalRcv; |
| 321 | XLogRecPtr receivePtr; |
| 322 | XLogRecPtr replayPtr; |
| 323 | |
| 324 | long secs; |
| 325 | int usecs; |
| 326 | |
| 327 | TimestampTz chunkReplayStartTime; |
| 328 | |
| 329 | SpinLockAcquire(&walrcv->mutex); |
| 330 | receivePtr = walrcv->receivedUpto; |
| 331 | SpinLockRelease(&walrcv->mutex); |
| 332 | |
| 333 | replayPtr = GetXLogReplayRecPtr(NULL); |
| 334 | |
| 335 | if (receivePtr == replayPtr) |
| 336 | return 0; |
| 337 | |
| 338 | chunkReplayStartTime = GetCurrentChunkReplayStartTime(); |
| 339 | |
| 340 | if (chunkReplayStartTime == 0) |
| 341 | return -1; |
| 342 | |
| 343 | TimestampDifference(chunkReplayStartTime, |
| 344 | GetCurrentTimestamp(), |
| 345 | &secs, &usecs); |
| 346 | |
| 347 | return (((int) secs * 1000) + (usecs / 1000)); |
| 348 | } |
| 349 | |
| 350 | /* |
| 351 | * Returns the network latency in ms, note that this includes any |
| 352 | * difference in clock settings between the servers, as well as timezone. |
| 353 | */ |
| 354 | int |
| 355 | GetReplicationTransferLatency(void) |
| 356 | { |
| 357 | WalRcvData *walrcv = WalRcv; |
| 358 | |
| 359 | TimestampTz lastMsgSendTime; |
| 360 | TimestampTz lastMsgReceiptTime; |
| 361 | |
| 362 | long secs = 0; |
| 363 | int usecs = 0; |
| 364 | int ms; |
| 365 | |
| 366 | SpinLockAcquire(&walrcv->mutex); |
| 367 | lastMsgSendTime = walrcv->lastMsgSendTime; |
| 368 | lastMsgReceiptTime = walrcv->lastMsgReceiptTime; |
| 369 | SpinLockRelease(&walrcv->mutex); |
| 370 | |
| 371 | TimestampDifference(lastMsgSendTime, |
| 372 | lastMsgReceiptTime, |
| 373 | &secs, &usecs); |
| 374 | |
| 375 | ms = ((int) secs * 1000) + (usecs / 1000); |
| 376 | |
| 377 | return ms; |
| 378 | } |
| 379 | |