1/*-------------------------------------------------------------------------
2 *
3 * receivelog.c - receive WAL files using the streaming
4 * replication protocol.
5 *
6 * Author: Magnus Hagander <magnus@hagander.net>
7 *
8 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
9 *
10 * IDENTIFICATION
11 * src/bin/pg_basebackup/receivelog.c
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres_fe.h"
16
17#include <sys/stat.h>
18#include <unistd.h>
19#ifdef HAVE_SYS_SELECT_H
20#include <sys/select.h>
21#endif
22
23/* local includes */
24#include "receivelog.h"
25#include "streamutil.h"
26
27#include "libpq-fe.h"
28#include "access/xlog_internal.h"
29#include "common/file_utils.h"
30#include "common/logging.h"
31
32
33/* fd and filename for currently open WAL file */
34static Walfile *walfile = NULL;
35static char current_walfile_name[MAXPGPATH] = "";
36static bool reportFlushPosition = false;
37static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
38
39static bool still_sending = true; /* feedback still needs to be sent? */
40
41static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
42 XLogRecPtr *stoppos);
43static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
44static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
45 char **buffer);
46static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
47 int len, XLogRecPtr blockpos, TimestampTz *last_status);
48static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
49 XLogRecPtr *blockpos);
50static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
51 XLogRecPtr blockpos, XLogRecPtr *stoppos);
52static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
53 XLogRecPtr *stoppos);
54static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
55 TimestampTz last_status);
56
57static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
58 uint32 *timeline);
59
60static bool
61mark_file_as_archived(StreamCtl *stream, const char *fname)
62{
63 Walfile *f;
64 static char tmppath[MAXPGPATH];
65
66 snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
67 fname);
68
69 f = stream->walmethod->open_for_write(tmppath, NULL, 0);
70 if (f == NULL)
71 {
72 pg_log_error("could not create archive status file \"%s\": %s",
73 tmppath, stream->walmethod->getlasterror());
74 return false;
75 }
76
77 stream->walmethod->close(f, CLOSE_NORMAL);
78
79 return true;
80}
81
82/*
83 * Open a new WAL file in the specified directory.
84 *
85 * Returns true if OK; on failure, returns false after printing an error msg.
86 * On success, 'walfile' is set to the FD for the file, and the base filename
87 * (without partial_suffix) is stored in 'current_walfile_name'.
88 *
89 * The file will be padded to 16Mb with zeroes.
90 */
91static bool
92open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
93{
94 Walfile *f;
95 char fn[MAXPGPATH];
96 ssize_t size;
97 XLogSegNo segno;
98
99 XLByteToSeg(startpoint, segno, WalSegSz);
100 XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
101
102 snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
103 stream->partial_suffix ? stream->partial_suffix : "");
104
105 /*
106 * When streaming to files, if an existing file exists we verify that it's
107 * either empty (just created), or a complete WalSegSz segment (in which
108 * case it has been created and padded). Anything else indicates a corrupt
109 * file.
110 *
111 * When streaming to tar, no file with this name will exist before, so we
112 * never have to verify a size.
113 */
114 if (stream->walmethod->existsfile(fn))
115 {
116 size = stream->walmethod->get_file_size(fn);
117 if (size < 0)
118 {
119 pg_log_error("could not get size of write-ahead log file \"%s\": %s",
120 fn, stream->walmethod->getlasterror());
121 return false;
122 }
123 if (size == WalSegSz)
124 {
125 /* Already padded file. Open it for use */
126 f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
127 if (f == NULL)
128 {
129 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
130 fn, stream->walmethod->getlasterror());
131 return false;
132 }
133
134 /* fsync file in case of a previous crash */
135 if (stream->walmethod->sync(f) != 0)
136 {
137 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
138 fn, stream->walmethod->getlasterror());
139 stream->walmethod->close(f, CLOSE_UNLINK);
140 return false;
141 }
142
143 walfile = f;
144 return true;
145 }
146 if (size != 0)
147 {
148 /* if write didn't set errno, assume problem is no disk space */
149 if (errno == 0)
150 errno = ENOSPC;
151 pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
152 "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
153 size),
154 fn, (int) size, WalSegSz);
155 return false;
156 }
157 /* File existed and was empty, so fall through and open */
158 }
159
160 /* No file existed, so create one */
161
162 f = stream->walmethod->open_for_write(current_walfile_name,
163 stream->partial_suffix, WalSegSz);
164 if (f == NULL)
165 {
166 pg_log_error("could not open write-ahead log file \"%s\": %s",
167 fn, stream->walmethod->getlasterror());
168 return false;
169 }
170
171 walfile = f;
172 return true;
173}
174
175/*
176 * Close the current WAL file (if open), and rename it to the correct
177 * filename if it's complete. On failure, prints an error message to stderr
178 * and returns false, otherwise returns true.
179 */
180static bool
181close_walfile(StreamCtl *stream, XLogRecPtr pos)
182{
183 off_t currpos;
184 int r;
185
186 if (walfile == NULL)
187 return true;
188
189 currpos = stream->walmethod->get_current_pos(walfile);
190 if (currpos == -1)
191 {
192 pg_log_error("could not determine seek position in file \"%s\": %s",
193 current_walfile_name, stream->walmethod->getlasterror());
194 stream->walmethod->close(walfile, CLOSE_UNLINK);
195 walfile = NULL;
196
197 return false;
198 }
199
200 if (stream->partial_suffix)
201 {
202 if (currpos == WalSegSz)
203 r = stream->walmethod->close(walfile, CLOSE_NORMAL);
204 else
205 {
206 pg_log_info("not renaming \"%s%s\", segment is not complete",
207 current_walfile_name, stream->partial_suffix);
208 r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
209 }
210 }
211 else
212 r = stream->walmethod->close(walfile, CLOSE_NORMAL);
213
214 walfile = NULL;
215
216 if (r != 0)
217 {
218 pg_log_error("could not close file \"%s\": %s",
219 current_walfile_name, stream->walmethod->getlasterror());
220 return false;
221 }
222
223 /*
224 * Mark file as archived if requested by the caller - pg_basebackup needs
225 * to do so as files can otherwise get archived again after promotion of a
226 * new node. This is in line with walreceiver.c always doing a
227 * XLogArchiveForceDone() after a complete segment.
228 */
229 if (currpos == WalSegSz && stream->mark_done)
230 {
231 /* writes error message if failed */
232 if (!mark_file_as_archived(stream, current_walfile_name))
233 return false;
234 }
235
236 lastFlushPosition = pos;
237 return true;
238}
239
240
241/*
242 * Check if a timeline history file exists.
243 */
244static bool
245existsTimeLineHistoryFile(StreamCtl *stream)
246{
247 char histfname[MAXFNAMELEN];
248
249 /*
250 * Timeline 1 never has a history file. We treat that as if it existed,
251 * since we never need to stream it.
252 */
253 if (stream->timeline == 1)
254 return true;
255
256 TLHistoryFileName(histfname, stream->timeline);
257
258 return stream->walmethod->existsfile(histfname);
259}
260
261static bool
262writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
263{
264 int size = strlen(content);
265 char histfname[MAXFNAMELEN];
266 Walfile *f;
267
268 /*
269 * Check that the server's idea of how timeline history files should be
270 * named matches ours.
271 */
272 TLHistoryFileName(histfname, stream->timeline);
273 if (strcmp(histfname, filename) != 0)
274 {
275 pg_log_error("server reported unexpected history file name for timeline %u: %s",
276 stream->timeline, filename);
277 return false;
278 }
279
280 f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
281 if (f == NULL)
282 {
283 pg_log_error("could not create timeline history file \"%s\": %s",
284 histfname, stream->walmethod->getlasterror());
285 return false;
286 }
287
288 if ((int) stream->walmethod->write(f, content, size) != size)
289 {
290 pg_log_error("could not write timeline history file \"%s\": %s",
291 histfname, stream->walmethod->getlasterror());
292
293 /*
294 * If we fail to make the file, delete it to release disk space
295 */
296 stream->walmethod->close(f, CLOSE_UNLINK);
297
298 return false;
299 }
300
301 if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
302 {
303 pg_log_error("could not close file \"%s\": %s",
304 histfname, stream->walmethod->getlasterror());
305 return false;
306 }
307
308 /* Maintain archive_status, check close_walfile() for details. */
309 if (stream->mark_done)
310 {
311 /* writes error message if failed */
312 if (!mark_file_as_archived(stream, histfname))
313 return false;
314 }
315
316 return true;
317}
318
319/*
320 * Send a Standby Status Update message to server.
321 */
322static bool
323sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
324{
325 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
326 int len = 0;
327
328 replybuf[len] = 'r';
329 len += 1;
330 fe_sendint64(blockpos, &replybuf[len]); /* write */
331 len += 8;
332 if (reportFlushPosition)
333 fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
334 else
335 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
336 len += 8;
337 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
338 len += 8;
339 fe_sendint64(now, &replybuf[len]); /* sendTime */
340 len += 8;
341 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
342 len += 1;
343
344 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
345 {
346 pg_log_error("could not send feedback packet: %s",
347 PQerrorMessage(conn));
348 return false;
349 }
350
351 return true;
352}
353
354/*
355 * Check that the server version we're connected to is supported by
356 * ReceiveXlogStream().
357 *
358 * If it's not, an error message is printed to stderr, and false is returned.
359 */
360bool
361CheckServerVersionForStreaming(PGconn *conn)
362{
363 int minServerMajor,
364 maxServerMajor;
365 int serverMajor;
366
367 /*
368 * The message format used in streaming replication changed in 9.3, so we
369 * cannot stream from older servers. And we don't support servers newer
370 * than the client; it might work, but we don't know, so err on the safe
371 * side.
372 */
373 minServerMajor = 903;
374 maxServerMajor = PG_VERSION_NUM / 100;
375 serverMajor = PQserverVersion(conn) / 100;
376 if (serverMajor < minServerMajor)
377 {
378 const char *serverver = PQparameterStatus(conn, "server_version");
379
380 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
381 serverver ? serverver : "'unknown'",
382 "9.3");
383 return false;
384 }
385 else if (serverMajor > maxServerMajor)
386 {
387 const char *serverver = PQparameterStatus(conn, "server_version");
388
389 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
390 serverver ? serverver : "'unknown'",
391 PG_VERSION);
392 return false;
393 }
394 return true;
395}
396
397/*
398 * Receive a log stream starting at the specified position.
399 *
400 * Individual parameters are passed through the StreamCtl structure.
401 *
402 * If sysidentifier is specified, validate that both the system
403 * identifier and the timeline matches the specified ones
404 * (by sending an extra IDENTIFY_SYSTEM command)
405 *
406 * All received segments will be written to the directory
407 * specified by basedir. This will also fetch any missing timeline history
408 * files.
409 *
410 * The stream_stop callback will be called every time data
411 * is received, and whenever a segment is completed. If it returns
412 * true, the streaming will stop and the function
413 * return. As long as it returns false, streaming will continue
414 * indefinitely.
415 *
416 * If stream_stop() checks for external input, stop_socket should be set to
417 * the FD it checks. This will allow such input to be detected promptly
418 * rather than after standby_message_timeout (which might be indefinite).
419 * Note that signals will interrupt waits for input as well, but that is
420 * race-y since a signal received while busy won't interrupt the wait.
421 *
422 * standby_message_timeout controls how often we send a message
423 * back to the master letting it know our progress, in milliseconds.
424 * Zero means no messages are sent.
425 * This message will only contain the write location, and never
426 * flush or replay.
427 *
428 * If 'partial_suffix' is not NULL, files are initially created with the
429 * given suffix, and the suffix is removed once the file is finished. That
430 * allows you to tell the difference between partial and completed files,
431 * so that you can continue later where you left.
432 *
433 * If 'synchronous' is true, the received WAL is flushed as soon as written,
434 * otherwise only when the WAL file is closed.
435 *
436 * Note: The WAL location *must* be at a log segment start!
437 */
438bool
439ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
440{
441 char query[128];
442 char slotcmd[128];
443 PGresult *res;
444 XLogRecPtr stoppos;
445
446 /*
447 * The caller should've checked the server version already, but doesn't do
448 * any harm to check it here too.
449 */
450 if (!CheckServerVersionForStreaming(conn))
451 return false;
452
453 /*
454 * Decide whether we want to report the flush position. If we report the
455 * flush position, the primary will know what WAL we'll possibly
456 * re-request, and it can then remove older WAL safely. We must always do
457 * that when we are using slots.
458 *
459 * Reporting the flush position makes one eligible as a synchronous
460 * replica. People shouldn't include generic names in
461 * synchronous_standby_names, but we've protected them against it so far,
462 * so let's continue to do so unless specifically requested.
463 */
464 if (stream->replication_slot != NULL)
465 {
466 reportFlushPosition = true;
467 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
468 }
469 else
470 {
471 if (stream->synchronous)
472 reportFlushPosition = true;
473 else
474 reportFlushPosition = false;
475 slotcmd[0] = 0;
476 }
477
478 if (stream->sysidentifier != NULL)
479 {
480 /* Validate system identifier hasn't changed */
481 res = PQexec(conn, "IDENTIFY_SYSTEM");
482 if (PQresultStatus(res) != PGRES_TUPLES_OK)
483 {
484 pg_log_error("could not send replication command \"%s\": %s",
485 "IDENTIFY_SYSTEM", PQerrorMessage(conn));
486 PQclear(res);
487 return false;
488 }
489 if (PQntuples(res) != 1 || PQnfields(res) < 3)
490 {
491 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
492 PQntuples(res), PQnfields(res), 1, 3);
493 PQclear(res);
494 return false;
495 }
496 if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
497 {
498 pg_log_error("system identifier does not match between base backup and streaming connection");
499 PQclear(res);
500 return false;
501 }
502 if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
503 {
504 pg_log_error("starting timeline %u is not present in the server",
505 stream->timeline);
506 PQclear(res);
507 return false;
508 }
509 PQclear(res);
510 }
511
512 /*
513 * initialize flush position to starting point, it's the caller's
514 * responsibility that that's sane.
515 */
516 lastFlushPosition = stream->startpos;
517
518 while (1)
519 {
520 /*
521 * Fetch the timeline history file for this timeline, if we don't have
522 * it already. When streaming log to tar, this will always return
523 * false, as we are never streaming into an existing file and
524 * therefore there can be no pre-existing timeline history file.
525 */
526 if (!existsTimeLineHistoryFile(stream))
527 {
528 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
529 res = PQexec(conn, query);
530 if (PQresultStatus(res) != PGRES_TUPLES_OK)
531 {
532 /* FIXME: we might send it ok, but get an error */
533 pg_log_error("could not send replication command \"%s\": %s",
534 "TIMELINE_HISTORY", PQresultErrorMessage(res));
535 PQclear(res);
536 return false;
537 }
538
539 /*
540 * The response to TIMELINE_HISTORY is a single row result set
541 * with two fields: filename and content
542 */
543 if (PQnfields(res) != 2 || PQntuples(res) != 1)
544 {
545 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
546 PQntuples(res), PQnfields(res), 1, 2);
547 }
548
549 /* Write the history file to disk */
550 writeTimeLineHistoryFile(stream,
551 PQgetvalue(res, 0, 0),
552 PQgetvalue(res, 0, 1));
553
554 PQclear(res);
555 }
556
557 /*
558 * Before we start streaming from the requested location, check if the
559 * callback tells us to stop here.
560 */
561 if (stream->stream_stop(stream->startpos, stream->timeline, false))
562 return true;
563
564 /* Initiate the replication stream at specified location */
565 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
566 slotcmd,
567 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
568 stream->timeline);
569 res = PQexec(conn, query);
570 if (PQresultStatus(res) != PGRES_COPY_BOTH)
571 {
572 pg_log_error("could not send replication command \"%s\": %s",
573 "START_REPLICATION", PQresultErrorMessage(res));
574 PQclear(res);
575 return false;
576 }
577 PQclear(res);
578
579 /* Stream the WAL */
580 res = HandleCopyStream(conn, stream, &stoppos);
581 if (res == NULL)
582 goto error;
583
584 /*
585 * Streaming finished.
586 *
587 * There are two possible reasons for that: a controlled shutdown, or
588 * we reached the end of the current timeline. In case of
589 * end-of-timeline, the server sends a result set after Copy has
590 * finished, containing information about the next timeline. Read
591 * that, and restart streaming from the next timeline. In case of
592 * controlled shutdown, stop here.
593 */
594 if (PQresultStatus(res) == PGRES_TUPLES_OK)
595 {
596 /*
597 * End-of-timeline. Read the next timeline's ID and starting
598 * position. Usually, the starting position will match the end of
599 * the previous timeline, but there are corner cases like if the
600 * server had sent us half of a WAL record, when it was promoted.
601 * The new timeline will begin at the end of the last complete
602 * record in that case, overlapping the partial WAL record on the
603 * old timeline.
604 */
605 uint32 newtimeline;
606 bool parsed;
607
608 parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
609 PQclear(res);
610 if (!parsed)
611 goto error;
612
613 /* Sanity check the values the server gave us */
614 if (newtimeline <= stream->timeline)
615 {
616 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
617 newtimeline, stream->timeline);
618 goto error;
619 }
620 if (stream->startpos > stoppos)
621 {
622 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
623 stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
624 newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
625 goto error;
626 }
627
628 /* Read the final result, which should be CommandComplete. */
629 res = PQgetResult(conn);
630 if (PQresultStatus(res) != PGRES_COMMAND_OK)
631 {
632 pg_log_error("unexpected termination of replication stream: %s",
633 PQresultErrorMessage(res));
634 PQclear(res);
635 goto error;
636 }
637 PQclear(res);
638
639 /*
640 * Loop back to start streaming from the new timeline. Always
641 * start streaming at the beginning of a segment.
642 */
643 stream->timeline = newtimeline;
644 stream->startpos = stream->startpos -
645 XLogSegmentOffset(stream->startpos, WalSegSz);
646 continue;
647 }
648 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
649 {
650 PQclear(res);
651
652 /*
653 * End of replication (ie. controlled shut down of the server).
654 *
655 * Check if the callback thinks it's OK to stop here. If not,
656 * complain.
657 */
658 if (stream->stream_stop(stoppos, stream->timeline, false))
659 return true;
660 else
661 {
662 pg_log_error("replication stream was terminated before stop point");
663 goto error;
664 }
665 }
666 else
667 {
668 /* Server returned an error. */
669 pg_log_error("unexpected termination of replication stream: %s",
670 PQresultErrorMessage(res));
671 PQclear(res);
672 goto error;
673 }
674 }
675
676error:
677 if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
678 pg_log_error("could not close file \"%s\": %s",
679 current_walfile_name, stream->walmethod->getlasterror());
680 walfile = NULL;
681 return false;
682}
683
684/*
685 * Helper function to parse the result set returned by server after streaming
686 * has finished. On failure, prints an error to stderr and returns false.
687 */
688static bool
689ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
690{
691 uint32 startpos_xlogid,
692 startpos_xrecoff;
693
694 /*----------
695 * The result set consists of one row and two columns, e.g:
696 *
697 * next_tli | next_tli_startpos
698 * ----------+-------------------
699 * 4 | 0/9949AE0
700 *
701 * next_tli is the timeline ID of the next timeline after the one that
702 * just finished streaming. next_tli_startpos is the WAL location where
703 * the server switched to it.
704 *----------
705 */
706 if (PQnfields(res) < 2 || PQntuples(res) != 1)
707 {
708 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
709 PQntuples(res), PQnfields(res), 1, 2);
710 return false;
711 }
712
713 *timeline = atoi(PQgetvalue(res, 0, 0));
714 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
715 &startpos_xrecoff) != 2)
716 {
717 pg_log_error("could not parse next timeline's starting point \"%s\"",
718 PQgetvalue(res, 0, 1));
719 return false;
720 }
721 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
722
723 return true;
724}
725
726/*
727 * The main loop of ReceiveXlogStream. Handles the COPY stream after
728 * initiating streaming with the START_REPLICATION command.
729 *
730 * If the COPY ends (not necessarily successfully) due a message from the
731 * server, returns a PGresult and sets *stoppos to the last byte written.
732 * On any other sort of error, returns NULL.
733 */
734static PGresult *
735HandleCopyStream(PGconn *conn, StreamCtl *stream,
736 XLogRecPtr *stoppos)
737{
738 char *copybuf = NULL;
739 TimestampTz last_status = -1;
740 XLogRecPtr blockpos = stream->startpos;
741
742 still_sending = true;
743
744 while (1)
745 {
746 int r;
747 TimestampTz now;
748 long sleeptime;
749
750 /*
751 * Check if we should continue streaming, or abort at this point.
752 */
753 if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
754 goto error;
755
756 now = feGetCurrentTimestamp();
757
758 /*
759 * If synchronous option is true, issue sync command as soon as there
760 * are WAL data which has not been flushed yet.
761 */
762 if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
763 {
764 if (stream->walmethod->sync(walfile) != 0)
765 {
766 pg_log_error("could not fsync file \"%s\": %s",
767 current_walfile_name, stream->walmethod->getlasterror());
768 goto error;
769 }
770 lastFlushPosition = blockpos;
771
772 /*
773 * Send feedback so that the server sees the latest WAL locations
774 * immediately.
775 */
776 if (!sendFeedback(conn, blockpos, now, false))
777 goto error;
778 last_status = now;
779 }
780
781 /*
782 * Potentially send a status message to the master
783 */
784 if (still_sending && stream->standby_message_timeout > 0 &&
785 feTimestampDifferenceExceeds(last_status, now,
786 stream->standby_message_timeout))
787 {
788 /* Time to send feedback! */
789 if (!sendFeedback(conn, blockpos, now, false))
790 goto error;
791 last_status = now;
792 }
793
794 /*
795 * Calculate how long send/receive loops should sleep
796 */
797 sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
798 last_status);
799
800 r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
801 while (r != 0)
802 {
803 if (r == -1)
804 goto error;
805 if (r == -2)
806 {
807 PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
808
809 if (res == NULL)
810 goto error;
811 else
812 return res;
813 }
814
815 /* Check the message type. */
816 if (copybuf[0] == 'k')
817 {
818 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
819 &last_status))
820 goto error;
821 }
822 else if (copybuf[0] == 'w')
823 {
824 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
825 goto error;
826
827 /*
828 * Check if we should continue streaming, or abort at this
829 * point.
830 */
831 if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
832 goto error;
833 }
834 else
835 {
836 pg_log_error("unrecognized streaming header: \"%c\"",
837 copybuf[0]);
838 goto error;
839 }
840
841 /*
842 * Process the received data, and any subsequent data we can read
843 * without blocking.
844 */
845 r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
846 }
847 }
848
849error:
850 if (copybuf != NULL)
851 PQfreemem(copybuf);
852 return NULL;
853}
854
855/*
856 * Wait until we can read a CopyData message,
857 * or timeout, or occurrence of a signal or input on the stop_socket.
858 * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
859 *
860 * Returns 1 if data has become available for reading, 0 if timed out
861 * or interrupted by signal or stop_socket input, and -1 on an error.
862 */
863static int
864CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
865{
866 int ret;
867 fd_set input_mask;
868 int connsocket;
869 int maxfd;
870 struct timeval timeout;
871 struct timeval *timeoutptr;
872
873 connsocket = PQsocket(conn);
874 if (connsocket < 0)
875 {
876 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
877 return -1;
878 }
879
880 FD_ZERO(&input_mask);
881 FD_SET(connsocket, &input_mask);
882 maxfd = connsocket;
883 if (stop_socket != PGINVALID_SOCKET)
884 {
885 FD_SET(stop_socket, &input_mask);
886 maxfd = Max(maxfd, stop_socket);
887 }
888
889 if (timeout_ms < 0)
890 timeoutptr = NULL;
891 else
892 {
893 timeout.tv_sec = timeout_ms / 1000L;
894 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
895 timeoutptr = &timeout;
896 }
897
898 ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
899
900 if (ret < 0)
901 {
902 if (errno == EINTR)
903 return 0; /* Got a signal, so not an error */
904 pg_log_error("select() failed: %m");
905 return -1;
906 }
907 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
908 return 1; /* Got input on connection socket */
909
910 return 0; /* Got timeout or input on stop_socket */
911}
912
913/*
914 * Receive CopyData message available from XLOG stream, blocking for
915 * maximum of 'timeout' ms.
916 *
917 * If data was received, returns the length of the data. *buffer is set to
918 * point to a buffer holding the received message. The buffer is only valid
919 * until the next CopyStreamReceive call.
920 *
921 * Returns 0 if no data was available within timeout, or if wait was
922 * interrupted by signal or stop_socket input.
923 * -1 on error. -2 if the server ended the COPY.
924 */
925static int
926CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
927 char **buffer)
928{
929 char *copybuf = NULL;
930 int rawlen;
931
932 if (*buffer != NULL)
933 PQfreemem(*buffer);
934 *buffer = NULL;
935
936 /* Try to receive a CopyData message */
937 rawlen = PQgetCopyData(conn, &copybuf, 1);
938 if (rawlen == 0)
939 {
940 int ret;
941
942 /*
943 * No data available. Wait for some to appear, but not longer than
944 * the specified timeout, so that we can ping the server. Also stop
945 * waiting if input appears on stop_socket.
946 */
947 ret = CopyStreamPoll(conn, timeout, stop_socket);
948 if (ret <= 0)
949 return ret;
950
951 /* Now there is actually data on the socket */
952 if (PQconsumeInput(conn) == 0)
953 {
954 pg_log_error("could not receive data from WAL stream: %s",
955 PQerrorMessage(conn));
956 return -1;
957 }
958
959 /* Now that we've consumed some input, try again */
960 rawlen = PQgetCopyData(conn, &copybuf, 1);
961 if (rawlen == 0)
962 return 0;
963 }
964 if (rawlen == -1) /* end-of-streaming or error */
965 return -2;
966 if (rawlen == -2)
967 {
968 pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
969 return -1;
970 }
971
972 /* Return received messages to caller */
973 *buffer = copybuf;
974 return rawlen;
975}
976
977/*
978 * Process the keepalive message.
979 */
980static bool
981ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
982 XLogRecPtr blockpos, TimestampTz *last_status)
983{
984 int pos;
985 bool replyRequested;
986 TimestampTz now;
987
988 /*
989 * Parse the keepalive message, enclosed in the CopyData message. We just
990 * check if the server requested a reply, and ignore the rest.
991 */
992 pos = 1; /* skip msgtype 'k' */
993 pos += 8; /* skip walEnd */
994 pos += 8; /* skip sendTime */
995
996 if (len < pos + 1)
997 {
998 pg_log_error("streaming header too small: %d", len);
999 return false;
1000 }
1001 replyRequested = copybuf[pos];
1002
1003 /* If the server requested an immediate reply, send one. */
1004 if (replyRequested && still_sending)
1005 {
1006 if (reportFlushPosition && lastFlushPosition < blockpos &&
1007 walfile != NULL)
1008 {
1009 /*
1010 * If a valid flush location needs to be reported, flush the
1011 * current WAL file so that the latest flush location is sent back
1012 * to the server. This is necessary to see whether the last WAL
1013 * data has been successfully replicated or not, at the normal
1014 * shutdown of the server.
1015 */
1016 if (stream->walmethod->sync(walfile) != 0)
1017 {
1018 pg_log_error("could not fsync file \"%s\": %s",
1019 current_walfile_name, stream->walmethod->getlasterror());
1020 return false;
1021 }
1022 lastFlushPosition = blockpos;
1023 }
1024
1025 now = feGetCurrentTimestamp();
1026 if (!sendFeedback(conn, blockpos, now, false))
1027 return false;
1028 *last_status = now;
1029 }
1030
1031 return true;
1032}
1033
1034/*
1035 * Process XLogData message.
1036 */
1037static bool
1038ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1039 XLogRecPtr *blockpos)
1040{
1041 int xlogoff;
1042 int bytes_left;
1043 int bytes_written;
1044 int hdr_len;
1045
1046 /*
1047 * Once we've decided we don't want to receive any more, just ignore any
1048 * subsequent XLogData messages.
1049 */
1050 if (!(still_sending))
1051 return true;
1052
1053 /*
1054 * Read the header of the XLogData message, enclosed in the CopyData
1055 * message. We only need the WAL location field (dataStart), the rest of
1056 * the header is ignored.
1057 */
1058 hdr_len = 1; /* msgtype 'w' */
1059 hdr_len += 8; /* dataStart */
1060 hdr_len += 8; /* walEnd */
1061 hdr_len += 8; /* sendTime */
1062 if (len < hdr_len)
1063 {
1064 pg_log_error("streaming header too small: %d", len);
1065 return false;
1066 }
1067 *blockpos = fe_recvint64(&copybuf[1]);
1068
1069 /* Extract WAL location for this block */
1070 xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1071
1072 /*
1073 * Verify that the initial location in the stream matches where we think
1074 * we are.
1075 */
1076 if (walfile == NULL)
1077 {
1078 /* No file open yet */
1079 if (xlogoff != 0)
1080 {
1081 pg_log_error("received write-ahead log record for offset %u with no file open",
1082 xlogoff);
1083 return false;
1084 }
1085 }
1086 else
1087 {
1088 /* More data in existing segment */
1089 if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1090 {
1091 pg_log_error("got WAL data offset %08x, expected %08x",
1092 xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1093 return false;
1094 }
1095 }
1096
1097 bytes_left = len - hdr_len;
1098 bytes_written = 0;
1099
1100 while (bytes_left)
1101 {
1102 int bytes_to_write;
1103
1104 /*
1105 * If crossing a WAL boundary, only write up until we reach wal
1106 * segment size.
1107 */
1108 if (xlogoff + bytes_left > WalSegSz)
1109 bytes_to_write = WalSegSz - xlogoff;
1110 else
1111 bytes_to_write = bytes_left;
1112
1113 if (walfile == NULL)
1114 {
1115 if (!open_walfile(stream, *blockpos))
1116 {
1117 /* Error logged by open_walfile */
1118 return false;
1119 }
1120 }
1121
1122 if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1123 bytes_to_write) != bytes_to_write)
1124 {
1125 pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1126 bytes_to_write, current_walfile_name,
1127 stream->walmethod->getlasterror());
1128 return false;
1129 }
1130
1131 /* Write was successful, advance our position */
1132 bytes_written += bytes_to_write;
1133 bytes_left -= bytes_to_write;
1134 *blockpos += bytes_to_write;
1135 xlogoff += bytes_to_write;
1136
1137 /* Did we reach the end of a WAL segment? */
1138 if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1139 {
1140 if (!close_walfile(stream, *blockpos))
1141 /* Error message written in close_walfile() */
1142 return false;
1143
1144 xlogoff = 0;
1145
1146 if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1147 {
1148 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1149 {
1150 pg_log_error("could not send copy-end packet: %s",
1151 PQerrorMessage(conn));
1152 return false;
1153 }
1154 still_sending = false;
1155 return true; /* ignore the rest of this XLogData packet */
1156 }
1157 }
1158 }
1159 /* No more data left to write, receive next copy packet */
1160
1161 return true;
1162}
1163
1164/*
1165 * Handle end of the copy stream.
1166 */
1167static PGresult *
1168HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1169 XLogRecPtr blockpos, XLogRecPtr *stoppos)
1170{
1171 PGresult *res = PQgetResult(conn);
1172
1173 /*
1174 * The server closed its end of the copy stream. If we haven't closed
1175 * ours already, we need to do so now, unless the server threw an error,
1176 * in which case we don't.
1177 */
1178 if (still_sending)
1179 {
1180 if (!close_walfile(stream, blockpos))
1181 {
1182 /* Error message written in close_walfile() */
1183 PQclear(res);
1184 return NULL;
1185 }
1186 if (PQresultStatus(res) == PGRES_COPY_IN)
1187 {
1188 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1189 {
1190 pg_log_error("could not send copy-end packet: %s",
1191 PQerrorMessage(conn));
1192 PQclear(res);
1193 return NULL;
1194 }
1195 res = PQgetResult(conn);
1196 }
1197 still_sending = false;
1198 }
1199 if (copybuf != NULL)
1200 PQfreemem(copybuf);
1201 *stoppos = blockpos;
1202 return res;
1203}
1204
1205/*
1206 * Check if we should continue streaming, or abort at this point.
1207 */
1208static bool
1209CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
1210 XLogRecPtr *stoppos)
1211{
1212 if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1213 {
1214 if (!close_walfile(stream, blockpos))
1215 {
1216 /* Potential error message is written by close_walfile */
1217 return false;
1218 }
1219 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1220 {
1221 pg_log_error("could not send copy-end packet: %s",
1222 PQerrorMessage(conn));
1223 return false;
1224 }
1225 still_sending = false;
1226 }
1227
1228 return true;
1229}
1230
1231/*
1232 * Calculate how long send/receive loops should sleep
1233 */
1234static long
1235CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1236 TimestampTz last_status)
1237{
1238 TimestampTz status_targettime = 0;
1239 long sleeptime;
1240
1241 if (standby_message_timeout && still_sending)
1242 status_targettime = last_status +
1243 (standby_message_timeout - 1) * ((int64) 1000);
1244
1245 if (status_targettime > 0)
1246 {
1247 long secs;
1248 int usecs;
1249
1250 feTimestampDifference(now,
1251 status_targettime,
1252 &secs,
1253 &usecs);
1254 /* Always sleep at least 1 sec */
1255 if (secs <= 0)
1256 {
1257 secs = 1;
1258 usecs = 0;
1259 }
1260
1261 sleeptime = secs * 1000 + usecs / 1000;
1262 }
1263 else
1264 sleeptime = -1;
1265
1266 return sleeptime;
1267}
1268