| 1 | /*------------------------------------------------------------------------- | 
|---|
| 2 | * | 
|---|
| 3 | * pg_receivewal.c - receive streaming WAL data and write it | 
|---|
| 4 | *					  to a local file. | 
|---|
| 5 | * | 
|---|
| 6 | * Author: Magnus Hagander <magnus@hagander.net> | 
|---|
| 7 | * | 
|---|
| 8 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group | 
|---|
| 9 | * | 
|---|
| 10 | * IDENTIFICATION | 
|---|
| 11 | *		  src/bin/pg_basebackup/pg_receivewal.c | 
|---|
| 12 | *------------------------------------------------------------------------- | 
|---|
| 13 | */ | 
|---|
| 14 |  | 
|---|
| 15 | #include "postgres_fe.h" | 
|---|
| 16 |  | 
|---|
| 17 | #include <dirent.h> | 
|---|
| 18 | #include <signal.h> | 
|---|
| 19 | #include <sys/stat.h> | 
|---|
| 20 | #include <unistd.h> | 
|---|
| 21 |  | 
|---|
| 22 | #include "common/file_perm.h" | 
|---|
| 23 | #include "common/logging.h" | 
|---|
| 24 | #include "libpq-fe.h" | 
|---|
| 25 | #include "access/xlog_internal.h" | 
|---|
| 26 | #include "getopt_long.h" | 
|---|
| 27 |  | 
|---|
| 28 | #include "receivelog.h" | 
|---|
| 29 | #include "streamutil.h" | 
|---|
| 30 |  | 
|---|
| 31 |  | 
|---|
| 32 | /* Time to sleep between reconnection attempts */ | 
|---|
| 33 | #define RECONNECT_SLEEP_TIME 5 | 
|---|
| 34 |  | 
|---|
| 35 | /* Global options */ | 
|---|
| 36 | static char *basedir = NULL; | 
|---|
| 37 | static int	verbose = 0; | 
|---|
| 38 | static int	compresslevel = 0; | 
|---|
| 39 | static int	noloop = 0; | 
|---|
| 40 | static int	standby_message_timeout = 10 * 1000;	/* 10 sec = default */ | 
|---|
| 41 | static volatile bool time_to_stop = false; | 
|---|
| 42 | static bool do_create_slot = false; | 
|---|
| 43 | static bool slot_exists_ok = false; | 
|---|
| 44 | static bool do_drop_slot = false; | 
|---|
| 45 | static bool do_sync = true; | 
|---|
| 46 | static bool synchronous = false; | 
|---|
| 47 | static char *replication_slot = NULL; | 
|---|
| 48 | static XLogRecPtr endpos = InvalidXLogRecPtr; | 
|---|
| 49 |  | 
|---|
| 50 |  | 
|---|
| 51 | static void usage(void); | 
|---|
| 52 | static DIR *get_destination_dir(char *dest_folder); | 
|---|
| 53 | static void close_destination_dir(DIR *dest_dir, char *dest_folder); | 
|---|
| 54 | static XLogRecPtr FindStreamingStart(uint32 *tli); | 
|---|
| 55 | static void StreamLog(void); | 
|---|
| 56 | static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, | 
|---|
| 57 | bool segment_finished); | 
|---|
| 58 |  | 
|---|
| 59 | static void | 
|---|
| 60 | disconnect_atexit(void) | 
|---|
| 61 | { | 
|---|
| 62 | if (conn != NULL) | 
|---|
| 63 | PQfinish(conn); | 
|---|
| 64 | } | 
|---|
| 65 |  | 
|---|
| 66 | /* Routines to evaluate segment file format */ | 
|---|
| 67 | #define IsCompressXLogFileName(fname)	 \ | 
|---|
| 68 | (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \ | 
|---|
| 69 | strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\ | 
|---|
| 70 | strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0) | 
|---|
| 71 | #define IsPartialCompressXLogFileName(fname)	\ | 
|---|
| 72 | (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \ | 
|---|
| 73 | strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\ | 
|---|
| 74 | strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0) | 
|---|
| 75 |  | 
|---|
| 76 | static void | 
|---|
| 77 | usage(void) | 
|---|
| 78 | { | 
|---|
| 79 | printf(_( "%s receives PostgreSQL streaming write-ahead logs.\n\n"), | 
|---|
| 80 | progname); | 
|---|
| 81 | printf(_( "Usage:\n")); | 
|---|
| 82 | printf(_( "  %s [OPTION]...\n"), progname); | 
|---|
| 83 | printf(_( "\nOptions:\n")); | 
|---|
| 84 | printf(_( "  -D, --directory=DIR    receive write-ahead log files into this directory\n")); | 
|---|
| 85 | printf(_( "  -E, --endpos=LSN       exit after receiving the specified LSN\n")); | 
|---|
| 86 | printf(_( "      --if-not-exists    do not error if slot already exists when creating a slot\n")); | 
|---|
| 87 | printf(_( "  -n, --no-loop          do not loop on connection lost\n")); | 
|---|
| 88 | printf(_( "      --no-sync          do not wait for changes to be written safely to disk\n")); | 
|---|
| 89 | printf(_( "  -s, --status-interval=SECS\n" | 
|---|
| 90 | "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); | 
|---|
| 91 | printf(_( "  -S, --slot=SLOTNAME    replication slot to use\n")); | 
|---|
| 92 | printf(_( "      --synchronous      flush write-ahead log immediately after writing\n")); | 
|---|
| 93 | printf(_( "  -v, --verbose          output verbose messages\n")); | 
|---|
| 94 | printf(_( "  -V, --version          output version information, then exit\n")); | 
|---|
| 95 | printf(_( "  -Z, --compress=0-9     compress logs with given compression level\n")); | 
|---|
| 96 | printf(_( "  -?, --help             show this help, then exit\n")); | 
|---|
| 97 | printf(_( "\nConnection options:\n")); | 
|---|
| 98 | printf(_( "  -d, --dbname=CONNSTR   connection string\n")); | 
|---|
| 99 | printf(_( "  -h, --host=HOSTNAME    database server host or socket directory\n")); | 
|---|
| 100 | printf(_( "  -p, --port=PORT        database server port number\n")); | 
|---|
| 101 | printf(_( "  -U, --username=NAME    connect as specified database user\n")); | 
|---|
| 102 | printf(_( "  -w, --no-password      never prompt for password\n")); | 
|---|
| 103 | printf(_( "  -W, --password         force password prompt (should happen automatically)\n")); | 
|---|
| 104 | printf(_( "\nOptional actions:\n")); | 
|---|
| 105 | printf(_( "      --create-slot      create a new replication slot (for the slot's name see --slot)\n")); | 
|---|
| 106 | printf(_( "      --drop-slot        drop the replication slot (for the slot's name see --slot)\n")); | 
|---|
| 107 | printf(_( "\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n")); | 
|---|
| 108 | } | 
|---|
| 109 |  | 
|---|
| 110 | static bool | 
|---|
| 111 | stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) | 
|---|
| 112 | { | 
|---|
| 113 | static uint32 prevtimeline = 0; | 
|---|
| 114 | static XLogRecPtr prevpos = InvalidXLogRecPtr; | 
|---|
| 115 |  | 
|---|
| 116 | /* we assume that we get called once at the end of each segment */ | 
|---|
| 117 | if (verbose && segment_finished) | 
|---|
| 118 | pg_log_info( "finished segment at %X/%X (timeline %u)", | 
|---|
| 119 | (uint32) (xlogpos >> 32), (uint32) xlogpos, | 
|---|
| 120 | timeline); | 
|---|
| 121 |  | 
|---|
| 122 | if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos) | 
|---|
| 123 | { | 
|---|
| 124 | if (verbose) | 
|---|
| 125 | pg_log_info( "stopped log streaming at %X/%X (timeline %u)", | 
|---|
| 126 | (uint32) (xlogpos >> 32), (uint32) xlogpos, | 
|---|
| 127 | timeline); | 
|---|
| 128 | time_to_stop = true; | 
|---|
| 129 | return true; | 
|---|
| 130 | } | 
|---|
| 131 |  | 
|---|
| 132 | /* | 
|---|
| 133 | * Note that we report the previous, not current, position here. After a | 
|---|
| 134 | * timeline switch, xlogpos points to the beginning of the segment because | 
|---|
| 135 | * that's where we always begin streaming. Reporting the end of previous | 
|---|
| 136 | * timeline isn't totally accurate, because the next timeline can begin | 
|---|
| 137 | * slightly before the end of the WAL that we received on the previous | 
|---|
| 138 | * timeline, but it's close enough for reporting purposes. | 
|---|
| 139 | */ | 
|---|
| 140 | if (verbose && prevtimeline != 0 && prevtimeline != timeline) | 
|---|
| 141 | pg_log_info( "switched to timeline %u at %X/%X", | 
|---|
| 142 | timeline, | 
|---|
| 143 | (uint32) (prevpos >> 32), (uint32) prevpos); | 
|---|
| 144 |  | 
|---|
| 145 | prevtimeline = timeline; | 
|---|
| 146 | prevpos = xlogpos; | 
|---|
| 147 |  | 
|---|
| 148 | if (time_to_stop) | 
|---|
| 149 | { | 
|---|
| 150 | if (verbose) | 
|---|
| 151 | pg_log_info( "received interrupt signal, exiting"); | 
|---|
| 152 | return true; | 
|---|
| 153 | } | 
|---|
| 154 | return false; | 
|---|
| 155 | } | 
|---|
| 156 |  | 
|---|
| 157 |  | 
|---|
| 158 | /* | 
|---|
| 159 | * Get destination directory. | 
|---|
| 160 | */ | 
|---|
| 161 | static DIR * | 
|---|
| 162 | get_destination_dir(char *dest_folder) | 
|---|
| 163 | { | 
|---|
| 164 | DIR		   *dir; | 
|---|
| 165 |  | 
|---|
| 166 | Assert(dest_folder != NULL); | 
|---|
| 167 | dir = opendir(dest_folder); | 
|---|
| 168 | if (dir == NULL) | 
|---|
| 169 | { | 
|---|
| 170 | pg_log_error( "could not open directory \"%s\": %m", basedir); | 
|---|
| 171 | exit(1); | 
|---|
| 172 | } | 
|---|
| 173 |  | 
|---|
| 174 | return dir; | 
|---|
| 175 | } | 
|---|
| 176 |  | 
|---|
| 177 |  | 
|---|
| 178 | /* | 
|---|
| 179 | * Close existing directory. | 
|---|
| 180 | */ | 
|---|
| 181 | static void | 
|---|
| 182 | close_destination_dir(DIR *dest_dir, char *dest_folder) | 
|---|
| 183 | { | 
|---|
| 184 | Assert(dest_dir != NULL && dest_folder != NULL); | 
|---|
| 185 | if (closedir(dest_dir)) | 
|---|
| 186 | { | 
|---|
| 187 | pg_log_error( "could not close directory \"%s\": %m", dest_folder); | 
|---|
| 188 | exit(1); | 
|---|
| 189 | } | 
|---|
| 190 | } | 
|---|
| 191 |  | 
|---|
| 192 |  | 
|---|
| 193 | /* | 
|---|
| 194 | * Determine starting location for streaming, based on any existing xlog | 
|---|
| 195 | * segments in the directory. We start at the end of the last one that is | 
|---|
| 196 | * complete (size matches wal segment size), on the timeline with highest ID. | 
|---|
| 197 | * | 
|---|
| 198 | * If there are no WAL files in the directory, returns InvalidXLogRecPtr. | 
|---|
| 199 | */ | 
|---|
| 200 | static XLogRecPtr | 
|---|
| 201 | FindStreamingStart(uint32 *tli) | 
|---|
| 202 | { | 
|---|
| 203 | DIR		   *dir; | 
|---|
| 204 | struct dirent *dirent; | 
|---|
| 205 | XLogSegNo	high_segno = 0; | 
|---|
| 206 | uint32		high_tli = 0; | 
|---|
| 207 | bool		high_ispartial = false; | 
|---|
| 208 |  | 
|---|
| 209 | dir = get_destination_dir(basedir); | 
|---|
| 210 |  | 
|---|
| 211 | while (errno = 0, (dirent = readdir(dir)) != NULL) | 
|---|
| 212 | { | 
|---|
| 213 | uint32		tli; | 
|---|
| 214 | XLogSegNo	segno; | 
|---|
| 215 | bool		ispartial; | 
|---|
| 216 | bool		iscompress; | 
|---|
| 217 |  | 
|---|
| 218 | /* | 
|---|
| 219 | * Check if the filename looks like an xlog file, or a .partial file. | 
|---|
| 220 | */ | 
|---|
| 221 | if (IsXLogFileName(dirent->d_name)) | 
|---|
| 222 | { | 
|---|
| 223 | ispartial = false; | 
|---|
| 224 | iscompress = false; | 
|---|
| 225 | } | 
|---|
| 226 | else if (IsPartialXLogFileName(dirent->d_name)) | 
|---|
| 227 | { | 
|---|
| 228 | ispartial = true; | 
|---|
| 229 | iscompress = false; | 
|---|
| 230 | } | 
|---|
| 231 | else if (IsCompressXLogFileName(dirent->d_name)) | 
|---|
| 232 | { | 
|---|
| 233 | ispartial = false; | 
|---|
| 234 | iscompress = true; | 
|---|
| 235 | } | 
|---|
| 236 | else if (IsPartialCompressXLogFileName(dirent->d_name)) | 
|---|
| 237 | { | 
|---|
| 238 | ispartial = true; | 
|---|
| 239 | iscompress = true; | 
|---|
| 240 | } | 
|---|
| 241 | else | 
|---|
| 242 | continue; | 
|---|
| 243 |  | 
|---|
| 244 | /* | 
|---|
| 245 | * Looks like an xlog file. Parse its position. | 
|---|
| 246 | */ | 
|---|
| 247 | XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz); | 
|---|
| 248 |  | 
|---|
| 249 | /* | 
|---|
| 250 | * Check that the segment has the right size, if it's supposed to be | 
|---|
| 251 | * completed.  For non-compressed segments just check the on-disk size | 
|---|
| 252 | * and see if it matches a completed segment. For compressed segments, | 
|---|
| 253 | * look at the last 4 bytes of the compressed file, which is where the | 
|---|
| 254 | * uncompressed size is located for gz files with a size lower than | 
|---|
| 255 | * 4GB, and then compare it to the size of a completed segment. The 4 | 
|---|
| 256 | * last bytes correspond to the ISIZE member according to | 
|---|
| 257 | * http://www.zlib.org/rfc-gzip.html. | 
|---|
| 258 | */ | 
|---|
| 259 | if (!ispartial && !iscompress) | 
|---|
| 260 | { | 
|---|
| 261 | struct stat statbuf; | 
|---|
| 262 | char		fullpath[MAXPGPATH * 2]; | 
|---|
| 263 |  | 
|---|
| 264 | snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); | 
|---|
| 265 | if (stat(fullpath, &statbuf) != 0) | 
|---|
| 266 | { | 
|---|
| 267 | pg_log_error( "could not stat file \"%s\": %m", fullpath); | 
|---|
| 268 | exit(1); | 
|---|
| 269 | } | 
|---|
| 270 |  | 
|---|
| 271 | if (statbuf.st_size != WalSegSz) | 
|---|
| 272 | { | 
|---|
| 273 | pg_log_warning( "segment file \"%s\" has incorrect size %d, skipping", | 
|---|
| 274 | dirent->d_name, (int) statbuf.st_size); | 
|---|
| 275 | continue; | 
|---|
| 276 | } | 
|---|
| 277 | } | 
|---|
| 278 | else if (!ispartial && iscompress) | 
|---|
| 279 | { | 
|---|
| 280 | int			fd; | 
|---|
| 281 | char		buf[4]; | 
|---|
| 282 | int			bytes_out; | 
|---|
| 283 | char		fullpath[MAXPGPATH * 2]; | 
|---|
| 284 | int			r; | 
|---|
| 285 |  | 
|---|
| 286 | snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); | 
|---|
| 287 |  | 
|---|
| 288 | fd = open(fullpath, O_RDONLY | PG_BINARY, 0); | 
|---|
| 289 | if (fd < 0) | 
|---|
| 290 | { | 
|---|
| 291 | pg_log_error( "could not open compressed file \"%s\": %m", | 
|---|
| 292 | fullpath); | 
|---|
| 293 | exit(1); | 
|---|
| 294 | } | 
|---|
| 295 | if (lseek(fd, (off_t) (-4), SEEK_END) < 0) | 
|---|
| 296 | { | 
|---|
| 297 | pg_log_error( "could not seek in compressed file \"%s\": %m", | 
|---|
| 298 | fullpath); | 
|---|
| 299 | exit(1); | 
|---|
| 300 | } | 
|---|
| 301 | r = read(fd, (char *) buf, sizeof(buf)); | 
|---|
| 302 | if (r != sizeof(buf)) | 
|---|
| 303 | { | 
|---|
| 304 | if (r < 0) | 
|---|
| 305 | pg_log_error( "could not read compressed file \"%s\": %m", | 
|---|
| 306 | fullpath); | 
|---|
| 307 | else | 
|---|
| 308 | pg_log_error( "could not read compressed file \"%s\": read %d of %zu", | 
|---|
| 309 | fullpath, r, sizeof(buf)); | 
|---|
| 310 | exit(1); | 
|---|
| 311 | } | 
|---|
| 312 |  | 
|---|
| 313 | close(fd); | 
|---|
| 314 | bytes_out = (buf[3] << 24) | (buf[2] << 16) | | 
|---|
| 315 | (buf[1] << 8) | buf[0]; | 
|---|
| 316 |  | 
|---|
| 317 | if (bytes_out != WalSegSz) | 
|---|
| 318 | { | 
|---|
| 319 | pg_log_warning( "compressed segment file \"%s\" has incorrect uncompressed size %d, skipping", | 
|---|
| 320 | dirent->d_name, bytes_out); | 
|---|
| 321 | continue; | 
|---|
| 322 | } | 
|---|
| 323 | } | 
|---|
| 324 |  | 
|---|
| 325 | /* Looks like a valid segment. Remember that we saw it. */ | 
|---|
| 326 | if ((segno > high_segno) || | 
|---|
| 327 | (segno == high_segno && tli > high_tli) || | 
|---|
| 328 | (segno == high_segno && tli == high_tli && high_ispartial && !ispartial)) | 
|---|
| 329 | { | 
|---|
| 330 | high_segno = segno; | 
|---|
| 331 | high_tli = tli; | 
|---|
| 332 | high_ispartial = ispartial; | 
|---|
| 333 | } | 
|---|
| 334 | } | 
|---|
| 335 |  | 
|---|
| 336 | if (errno) | 
|---|
| 337 | { | 
|---|
| 338 | pg_log_error( "could not read directory \"%s\": %m", basedir); | 
|---|
| 339 | exit(1); | 
|---|
| 340 | } | 
|---|
| 341 |  | 
|---|
| 342 | close_destination_dir(dir, basedir); | 
|---|
| 343 |  | 
|---|
| 344 | if (high_segno > 0) | 
|---|
| 345 | { | 
|---|
| 346 | XLogRecPtr	high_ptr; | 
|---|
| 347 |  | 
|---|
| 348 | /* | 
|---|
| 349 | * Move the starting pointer to the start of the next segment, if the | 
|---|
| 350 | * highest one we saw was completed. Otherwise start streaming from | 
|---|
| 351 | * the beginning of the .partial segment. | 
|---|
| 352 | */ | 
|---|
| 353 | if (!high_ispartial) | 
|---|
| 354 | high_segno++; | 
|---|
| 355 |  | 
|---|
| 356 | XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr); | 
|---|
| 357 |  | 
|---|
| 358 | *tli = high_tli; | 
|---|
| 359 | return high_ptr; | 
|---|
| 360 | } | 
|---|
| 361 | else | 
|---|
| 362 | return InvalidXLogRecPtr; | 
|---|
| 363 | } | 
|---|
| 364 |  | 
|---|
| 365 | /* | 
|---|
| 366 | * Start the log streaming | 
|---|
| 367 | */ | 
|---|
| 368 | static void | 
|---|
| 369 | StreamLog(void) | 
|---|
| 370 | { | 
|---|
| 371 | XLogRecPtr	serverpos; | 
|---|
| 372 | TimeLineID	servertli; | 
|---|
| 373 | StreamCtl	stream; | 
|---|
| 374 |  | 
|---|
| 375 | MemSet(&stream, 0, sizeof(stream)); | 
|---|
| 376 |  | 
|---|
| 377 | /* | 
|---|
| 378 | * Connect in replication mode to the server | 
|---|
| 379 | */ | 
|---|
| 380 | if (conn == NULL) | 
|---|
| 381 | conn = GetConnection(); | 
|---|
| 382 | if (!conn) | 
|---|
| 383 | /* Error message already written in GetConnection() */ | 
|---|
| 384 | return; | 
|---|
| 385 |  | 
|---|
| 386 | if (!CheckServerVersionForStreaming(conn)) | 
|---|
| 387 | { | 
|---|
| 388 | /* | 
|---|
| 389 | * Error message already written in CheckServerVersionForStreaming(). | 
|---|
| 390 | * There's no hope of recovering from a version mismatch, so don't | 
|---|
| 391 | * retry. | 
|---|
| 392 | */ | 
|---|
| 393 | exit(1); | 
|---|
| 394 | } | 
|---|
| 395 |  | 
|---|
| 396 | /* | 
|---|
| 397 | * Identify server, obtaining start LSN position and current timeline ID | 
|---|
| 398 | * at the same time, necessary if not valid data can be found in the | 
|---|
| 399 | * existing output directory. | 
|---|
| 400 | */ | 
|---|
| 401 | if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL)) | 
|---|
| 402 | exit(1); | 
|---|
| 403 |  | 
|---|
| 404 | /* | 
|---|
| 405 | * Figure out where to start streaming. | 
|---|
| 406 | */ | 
|---|
| 407 | stream.startpos = FindStreamingStart(&stream.timeline); | 
|---|
| 408 | if (stream.startpos == InvalidXLogRecPtr) | 
|---|
| 409 | { | 
|---|
| 410 | stream.startpos = serverpos; | 
|---|
| 411 | stream.timeline = servertli; | 
|---|
| 412 | } | 
|---|
| 413 |  | 
|---|
| 414 | /* | 
|---|
| 415 | * Always start streaming at the beginning of a segment | 
|---|
| 416 | */ | 
|---|
| 417 | stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz); | 
|---|
| 418 |  | 
|---|
| 419 | /* | 
|---|
| 420 | * Start the replication | 
|---|
| 421 | */ | 
|---|
| 422 | if (verbose) | 
|---|
| 423 | pg_log_info( "starting log streaming at %X/%X (timeline %u)", | 
|---|
| 424 | (uint32) (stream.startpos >> 32), (uint32) stream.startpos, | 
|---|
| 425 | stream.timeline); | 
|---|
| 426 |  | 
|---|
| 427 | stream.stream_stop = stop_streaming; | 
|---|
| 428 | stream.stop_socket = PGINVALID_SOCKET; | 
|---|
| 429 | stream.standby_message_timeout = standby_message_timeout; | 
|---|
| 430 | stream.synchronous = synchronous; | 
|---|
| 431 | stream.do_sync = do_sync; | 
|---|
| 432 | stream.mark_done = false; | 
|---|
| 433 | stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel, | 
|---|
| 434 | stream.do_sync); | 
|---|
| 435 | stream.partial_suffix = ".partial"; | 
|---|
| 436 | stream.replication_slot = replication_slot; | 
|---|
| 437 |  | 
|---|
| 438 | ReceiveXlogStream(conn, &stream); | 
|---|
| 439 |  | 
|---|
| 440 | if (!stream.walmethod->finish()) | 
|---|
| 441 | { | 
|---|
| 442 | pg_log_info( "could not finish writing WAL files: %m"); | 
|---|
| 443 | return; | 
|---|
| 444 | } | 
|---|
| 445 |  | 
|---|
| 446 | PQfinish(conn); | 
|---|
| 447 | conn = NULL; | 
|---|
| 448 |  | 
|---|
| 449 | FreeWalDirectoryMethod(); | 
|---|
| 450 | pg_free(stream.walmethod); | 
|---|
| 451 |  | 
|---|
| 452 | conn = NULL; | 
|---|
| 453 | } | 
|---|
| 454 |  | 
|---|
| 455 | /* | 
|---|
| 456 | * When sigint is called, just tell the system to exit at the next possible | 
|---|
| 457 | * moment. | 
|---|
| 458 | */ | 
|---|
| 459 | #ifndef WIN32 | 
|---|
| 460 |  | 
|---|
| 461 | static void | 
|---|
| 462 | sigint_handler(int signum) | 
|---|
| 463 | { | 
|---|
| 464 | time_to_stop = true; | 
|---|
| 465 | } | 
|---|
| 466 | #endif | 
|---|
| 467 |  | 
|---|
| 468 | int | 
|---|
| 469 | main(int argc, char **argv) | 
|---|
| 470 | { | 
|---|
| 471 | static struct option long_options[] = { | 
|---|
| 472 | { "help", no_argument, NULL, '?'}, | 
|---|
| 473 | { "version", no_argument, NULL, 'V'}, | 
|---|
| 474 | { "directory", required_argument, NULL, 'D'}, | 
|---|
| 475 | { "dbname", required_argument, NULL, 'd'}, | 
|---|
| 476 | { "endpos", required_argument, NULL, 'E'}, | 
|---|
| 477 | { "host", required_argument, NULL, 'h'}, | 
|---|
| 478 | { "port", required_argument, NULL, 'p'}, | 
|---|
| 479 | { "username", required_argument, NULL, 'U'}, | 
|---|
| 480 | { "no-loop", no_argument, NULL, 'n'}, | 
|---|
| 481 | { "no-password", no_argument, NULL, 'w'}, | 
|---|
| 482 | { "password", no_argument, NULL, 'W'}, | 
|---|
| 483 | { "status-interval", required_argument, NULL, 's'}, | 
|---|
| 484 | { "slot", required_argument, NULL, 'S'}, | 
|---|
| 485 | { "verbose", no_argument, NULL, 'v'}, | 
|---|
| 486 | { "compress", required_argument, NULL, 'Z'}, | 
|---|
| 487 | /* action */ | 
|---|
| 488 | { "create-slot", no_argument, NULL, 1}, | 
|---|
| 489 | { "drop-slot", no_argument, NULL, 2}, | 
|---|
| 490 | { "if-not-exists", no_argument, NULL, 3}, | 
|---|
| 491 | { "synchronous", no_argument, NULL, 4}, | 
|---|
| 492 | { "no-sync", no_argument, NULL, 5}, | 
|---|
| 493 | {NULL, 0, NULL, 0} | 
|---|
| 494 | }; | 
|---|
| 495 |  | 
|---|
| 496 | int			c; | 
|---|
| 497 | int			option_index; | 
|---|
| 498 | char	   *db_name; | 
|---|
| 499 | uint32		hi, | 
|---|
| 500 | lo; | 
|---|
| 501 |  | 
|---|
| 502 | pg_logging_init(argv[0]); | 
|---|
| 503 | progname = get_progname(argv[0]); | 
|---|
| 504 | set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN( "pg_basebackup")); | 
|---|
| 505 |  | 
|---|
| 506 | if (argc > 1) | 
|---|
| 507 | { | 
|---|
| 508 | if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) | 
|---|
| 509 | { | 
|---|
| 510 | usage(); | 
|---|
| 511 | exit(0); | 
|---|
| 512 | } | 
|---|
| 513 | else if (strcmp(argv[1], "-V") == 0 || | 
|---|
| 514 | strcmp(argv[1], "--version") == 0) | 
|---|
| 515 | { | 
|---|
| 516 | puts( "pg_receivewal (PostgreSQL) "PG_VERSION); | 
|---|
| 517 | exit(0); | 
|---|
| 518 | } | 
|---|
| 519 | } | 
|---|
| 520 |  | 
|---|
| 521 | while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:", | 
|---|
| 522 | long_options, &option_index)) != -1) | 
|---|
| 523 | { | 
|---|
| 524 | switch (c) | 
|---|
| 525 | { | 
|---|
| 526 | case 'D': | 
|---|
| 527 | basedir = pg_strdup(optarg); | 
|---|
| 528 | break; | 
|---|
| 529 | case 'd': | 
|---|
| 530 | connection_string = pg_strdup(optarg); | 
|---|
| 531 | break; | 
|---|
| 532 | case 'h': | 
|---|
| 533 | dbhost = pg_strdup(optarg); | 
|---|
| 534 | break; | 
|---|
| 535 | case 'p': | 
|---|
| 536 | if (atoi(optarg) <= 0) | 
|---|
| 537 | { | 
|---|
| 538 | pg_log_error( "invalid port number \"%s\"", optarg); | 
|---|
| 539 | exit(1); | 
|---|
| 540 | } | 
|---|
| 541 | dbport = pg_strdup(optarg); | 
|---|
| 542 | break; | 
|---|
| 543 | case 'U': | 
|---|
| 544 | dbuser = pg_strdup(optarg); | 
|---|
| 545 | break; | 
|---|
| 546 | case 'w': | 
|---|
| 547 | dbgetpassword = -1; | 
|---|
| 548 | break; | 
|---|
| 549 | case 'W': | 
|---|
| 550 | dbgetpassword = 1; | 
|---|
| 551 | break; | 
|---|
| 552 | case 's': | 
|---|
| 553 | standby_message_timeout = atoi(optarg) * 1000; | 
|---|
| 554 | if (standby_message_timeout < 0) | 
|---|
| 555 | { | 
|---|
| 556 | pg_log_error( "invalid status interval \"%s\"", optarg); | 
|---|
| 557 | exit(1); | 
|---|
| 558 | } | 
|---|
| 559 | break; | 
|---|
| 560 | case 'S': | 
|---|
| 561 | replication_slot = pg_strdup(optarg); | 
|---|
| 562 | break; | 
|---|
| 563 | case 'E': | 
|---|
| 564 | if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) | 
|---|
| 565 | { | 
|---|
| 566 | pg_log_error( "could not parse end position \"%s\"", optarg); | 
|---|
| 567 | exit(1); | 
|---|
| 568 | } | 
|---|
| 569 | endpos = ((uint64) hi) << 32 | lo; | 
|---|
| 570 | break; | 
|---|
| 571 | case 'n': | 
|---|
| 572 | noloop = 1; | 
|---|
| 573 | break; | 
|---|
| 574 | case 'v': | 
|---|
| 575 | verbose++; | 
|---|
| 576 | break; | 
|---|
| 577 | case 'Z': | 
|---|
| 578 | compresslevel = atoi(optarg); | 
|---|
| 579 | if (compresslevel < 0 || compresslevel > 9) | 
|---|
| 580 | { | 
|---|
| 581 | pg_log_error( "invalid compression level \"%s\"", optarg); | 
|---|
| 582 | exit(1); | 
|---|
| 583 | } | 
|---|
| 584 | break; | 
|---|
| 585 | /* action */ | 
|---|
| 586 | case 1: | 
|---|
| 587 | do_create_slot = true; | 
|---|
| 588 | break; | 
|---|
| 589 | case 2: | 
|---|
| 590 | do_drop_slot = true; | 
|---|
| 591 | break; | 
|---|
| 592 | case 3: | 
|---|
| 593 | slot_exists_ok = true; | 
|---|
| 594 | break; | 
|---|
| 595 | case 4: | 
|---|
| 596 | synchronous = true; | 
|---|
| 597 | break; | 
|---|
| 598 | case 5: | 
|---|
| 599 | do_sync = false; | 
|---|
| 600 | break; | 
|---|
| 601 | default: | 
|---|
| 602 |  | 
|---|
| 603 | /* | 
|---|
| 604 | * getopt_long already emitted a complaint | 
|---|
| 605 | */ | 
|---|
| 606 | fprintf(stderr, _( "Try \"%s --help\" for more information.\n"), | 
|---|
| 607 | progname); | 
|---|
| 608 | exit(1); | 
|---|
| 609 | } | 
|---|
| 610 | } | 
|---|
| 611 |  | 
|---|
| 612 | /* | 
|---|
| 613 | * Any non-option arguments? | 
|---|
| 614 | */ | 
|---|
| 615 | if (optind < argc) | 
|---|
| 616 | { | 
|---|
| 617 | pg_log_error( "too many command-line arguments (first is \"%s\")", | 
|---|
| 618 | argv[optind]); | 
|---|
| 619 | fprintf(stderr, _( "Try \"%s --help\" for more information.\n"), | 
|---|
| 620 | progname); | 
|---|
| 621 | exit(1); | 
|---|
| 622 | } | 
|---|
| 623 |  | 
|---|
| 624 | if (do_drop_slot && do_create_slot) | 
|---|
| 625 | { | 
|---|
| 626 | pg_log_error( "cannot use --create-slot together with --drop-slot"); | 
|---|
| 627 | fprintf(stderr, _( "Try \"%s --help\" for more information.\n"), | 
|---|
| 628 | progname); | 
|---|
| 629 | exit(1); | 
|---|
| 630 | } | 
|---|
| 631 |  | 
|---|
| 632 | if (replication_slot == NULL && (do_drop_slot || do_create_slot)) | 
|---|
| 633 | { | 
|---|
| 634 | /* translator: second %s is an option name */ | 
|---|
| 635 | pg_log_error( "%s needs a slot to be specified using --slot", | 
|---|
| 636 | do_drop_slot ? "--drop-slot": "--create-slot"); | 
|---|
| 637 | fprintf(stderr, _( "Try \"%s --help\" for more information.\n"), | 
|---|
| 638 | progname); | 
|---|
| 639 | exit(1); | 
|---|
| 640 | } | 
|---|
| 641 |  | 
|---|
| 642 | if (synchronous && !do_sync) | 
|---|
| 643 | { | 
|---|
| 644 | pg_log_error( "cannot use --synchronous together with --no-sync"); | 
|---|
| 645 | fprintf(stderr, _( "Try \"%s --help\" for more information.\n"), | 
|---|
| 646 | progname); | 
|---|
| 647 | exit(1); | 
|---|
| 648 | } | 
|---|
| 649 |  | 
|---|
| 650 | /* | 
|---|
| 651 | * Required arguments | 
|---|
| 652 | */ | 
|---|
| 653 | if (basedir == NULL && !do_drop_slot && !do_create_slot) | 
|---|
| 654 | { | 
|---|
| 655 | pg_log_error( "no target directory specified"); | 
|---|
| 656 | fprintf(stderr, _( "Try \"%s --help\" for more information.\n"), | 
|---|
| 657 | progname); | 
|---|
| 658 | exit(1); | 
|---|
| 659 | } | 
|---|
| 660 |  | 
|---|
| 661 | #ifndef HAVE_LIBZ | 
|---|
| 662 | if (compresslevel != 0) | 
|---|
| 663 | { | 
|---|
| 664 | pg_log_error( "this build does not support compression"); | 
|---|
| 665 | exit(1); | 
|---|
| 666 | } | 
|---|
| 667 | #endif | 
|---|
| 668 |  | 
|---|
| 669 | /* | 
|---|
| 670 | * Check existence of destination folder. | 
|---|
| 671 | */ | 
|---|
| 672 | if (!do_drop_slot && !do_create_slot) | 
|---|
| 673 | { | 
|---|
| 674 | DIR		   *dir = get_destination_dir(basedir); | 
|---|
| 675 |  | 
|---|
| 676 | close_destination_dir(dir, basedir); | 
|---|
| 677 | } | 
|---|
| 678 |  | 
|---|
| 679 | #ifndef WIN32 | 
|---|
| 680 | pqsignal(SIGINT, sigint_handler); | 
|---|
| 681 | #endif | 
|---|
| 682 |  | 
|---|
| 683 | /* | 
|---|
| 684 | * Obtain a connection before doing anything. | 
|---|
| 685 | */ | 
|---|
| 686 | conn = GetConnection(); | 
|---|
| 687 | if (!conn) | 
|---|
| 688 | /* error message already written in GetConnection() */ | 
|---|
| 689 | exit(1); | 
|---|
| 690 | atexit(disconnect_atexit); | 
|---|
| 691 |  | 
|---|
| 692 | /* | 
|---|
| 693 | * Run IDENTIFY_SYSTEM to make sure we've successfully have established a | 
|---|
| 694 | * replication connection and haven't connected using a database specific | 
|---|
| 695 | * connection. | 
|---|
| 696 | */ | 
|---|
| 697 | if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) | 
|---|
| 698 | exit(1); | 
|---|
| 699 |  | 
|---|
| 700 | /* | 
|---|
| 701 | * Set umask so that directories/files are created with the same | 
|---|
| 702 | * permissions as directories/files in the source data directory. | 
|---|
| 703 | * | 
|---|
| 704 | * pg_mode_mask is set to owner-only by default and then updated in | 
|---|
| 705 | * GetConnection() where we get the mode from the server-side with | 
|---|
| 706 | * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm(). | 
|---|
| 707 | */ | 
|---|
| 708 | umask(pg_mode_mask); | 
|---|
| 709 |  | 
|---|
| 710 | /* determine remote server's xlog segment size */ | 
|---|
| 711 | if (!RetrieveWalSegSize(conn)) | 
|---|
| 712 | exit(1); | 
|---|
| 713 |  | 
|---|
| 714 | /* | 
|---|
| 715 | * Check that there is a database associated with connection, none should | 
|---|
| 716 | * be defined in this context. | 
|---|
| 717 | */ | 
|---|
| 718 | if (db_name) | 
|---|
| 719 | { | 
|---|
| 720 | pg_log_error( "replication connection using slot \"%s\" is unexpectedly database specific", | 
|---|
| 721 | replication_slot); | 
|---|
| 722 | exit(1); | 
|---|
| 723 | } | 
|---|
| 724 |  | 
|---|
| 725 | /* | 
|---|
| 726 | * Drop a replication slot. | 
|---|
| 727 | */ | 
|---|
| 728 | if (do_drop_slot) | 
|---|
| 729 | { | 
|---|
| 730 | if (verbose) | 
|---|
| 731 | pg_log_info( "dropping replication slot \"%s\"", replication_slot); | 
|---|
| 732 |  | 
|---|
| 733 | if (!DropReplicationSlot(conn, replication_slot)) | 
|---|
| 734 | exit(1); | 
|---|
| 735 | exit(0); | 
|---|
| 736 | } | 
|---|
| 737 |  | 
|---|
| 738 | /* Create a replication slot */ | 
|---|
| 739 | if (do_create_slot) | 
|---|
| 740 | { | 
|---|
| 741 | if (verbose) | 
|---|
| 742 | pg_log_info( "creating replication slot \"%s\"", replication_slot); | 
|---|
| 743 |  | 
|---|
| 744 | if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false, | 
|---|
| 745 | slot_exists_ok)) | 
|---|
| 746 | exit(1); | 
|---|
| 747 | exit(0); | 
|---|
| 748 | } | 
|---|
| 749 |  | 
|---|
| 750 | /* | 
|---|
| 751 | * Don't close the connection here so that subsequent StreamLog() can | 
|---|
| 752 | * reuse it. | 
|---|
| 753 | */ | 
|---|
| 754 |  | 
|---|
| 755 | while (true) | 
|---|
| 756 | { | 
|---|
| 757 | StreamLog(); | 
|---|
| 758 | if (time_to_stop) | 
|---|
| 759 | { | 
|---|
| 760 | /* | 
|---|
| 761 | * We've been Ctrl-C'ed or end of streaming position has been | 
|---|
| 762 | * willingly reached, so exit without an error code. | 
|---|
| 763 | */ | 
|---|
| 764 | exit(0); | 
|---|
| 765 | } | 
|---|
| 766 | else if (noloop) | 
|---|
| 767 | { | 
|---|
| 768 | pg_log_error( "disconnected"); | 
|---|
| 769 | exit(1); | 
|---|
| 770 | } | 
|---|
| 771 | else | 
|---|
| 772 | { | 
|---|
| 773 | /* translator: check source for value for %d */ | 
|---|
| 774 | pg_log_info( "disconnected; waiting %d seconds to try again", | 
|---|
| 775 | RECONNECT_SLEEP_TIME); | 
|---|
| 776 | pg_usleep(RECONNECT_SLEEP_TIME * 1000000); | 
|---|
| 777 | } | 
|---|
| 778 | } | 
|---|
| 779 | } | 
|---|
| 780 |  | 
|---|