1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * pg_recvlogical.c - receive data from a logical decoding slot in a streaming |
4 | * fashion and write it to a local file. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * |
8 | * IDENTIFICATION |
9 | * src/bin/pg_basebackup/pg_recvlogical.c |
10 | *------------------------------------------------------------------------- |
11 | */ |
12 | |
13 | #include "postgres_fe.h" |
14 | |
15 | #include <dirent.h> |
16 | #include <sys/stat.h> |
17 | #include <unistd.h> |
18 | #ifdef HAVE_SYS_SELECT_H |
19 | #include <sys/select.h> |
20 | #endif |
21 | |
22 | /* local includes */ |
23 | #include "streamutil.h" |
24 | |
25 | #include "access/xlog_internal.h" |
26 | #include "common/file_perm.h" |
27 | #include "common/fe_memutils.h" |
28 | #include "common/logging.h" |
29 | #include "getopt_long.h" |
30 | #include "libpq-fe.h" |
31 | #include "libpq/pqsignal.h" |
32 | #include "pqexpbuffer.h" |
33 | |
34 | |
35 | /* Time to sleep between reconnection attempts */ |
36 | #define RECONNECT_SLEEP_TIME 5 |
37 | |
38 | /* Global Options */ |
39 | static char *outfile = NULL; |
40 | static int verbose = 0; |
41 | static int noloop = 0; |
42 | static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ |
43 | static int fsync_interval = 10 * 1000; /* 10 sec = default */ |
44 | static XLogRecPtr startpos = InvalidXLogRecPtr; |
45 | static XLogRecPtr endpos = InvalidXLogRecPtr; |
46 | static bool do_create_slot = false; |
47 | static bool slot_exists_ok = false; |
48 | static bool do_start_slot = false; |
49 | static bool do_drop_slot = false; |
50 | static char *replication_slot = NULL; |
51 | |
52 | /* filled pairwise with option, value. value may be NULL */ |
53 | static char **options; |
54 | static size_t noptions = 0; |
55 | static const char *plugin = "test_decoding" ; |
56 | |
57 | /* Global State */ |
58 | static int outfd = -1; |
59 | static volatile sig_atomic_t time_to_abort = false; |
60 | static volatile sig_atomic_t output_reopen = false; |
61 | static bool output_isfile; |
62 | static TimestampTz output_last_fsync = -1; |
63 | static bool output_needs_fsync = false; |
64 | static XLogRecPtr output_written_lsn = InvalidXLogRecPtr; |
65 | static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; |
66 | |
67 | static void usage(void); |
68 | static void StreamLogicalLog(void); |
69 | static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); |
70 | static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, |
71 | bool keepalive, XLogRecPtr lsn); |
72 | |
73 | static void |
74 | usage(void) |
75 | { |
76 | printf(_("%s controls PostgreSQL logical decoding streams.\n\n" ), |
77 | progname); |
78 | printf(_("Usage:\n" )); |
79 | printf(_(" %s [OPTION]...\n" ), progname); |
80 | printf(_("\nAction to be performed:\n" )); |
81 | printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n" )); |
82 | printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n" )); |
83 | printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n" )); |
84 | printf(_("\nOptions:\n" )); |
85 | printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n" )); |
86 | printf(_(" -f, --file=FILE receive log into this file, - for stdout\n" )); |
87 | printf(_(" -F --fsync-interval=SECS\n" |
88 | " time between fsyncs to the output file (default: %d)\n" ), (fsync_interval / 1000)); |
89 | printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n" )); |
90 | printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n" )); |
91 | printf(_(" -n, --no-loop do not loop on connection lost\n" )); |
92 | printf(_(" -o, --option=NAME[=VALUE]\n" |
93 | " pass option NAME with optional value VALUE to the\n" |
94 | " output plugin\n" )); |
95 | printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n" ), plugin); |
96 | printf(_(" -s, --status-interval=SECS\n" |
97 | " time between status packets sent to server (default: %d)\n" ), (standby_message_timeout / 1000)); |
98 | printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n" )); |
99 | printf(_(" -v, --verbose output verbose messages\n" )); |
100 | printf(_(" -V, --version output version information, then exit\n" )); |
101 | printf(_(" -?, --help show this help, then exit\n" )); |
102 | printf(_("\nConnection options:\n" )); |
103 | printf(_(" -d, --dbname=DBNAME database to connect to\n" )); |
104 | printf(_(" -h, --host=HOSTNAME database server host or socket directory\n" )); |
105 | printf(_(" -p, --port=PORT database server port number\n" )); |
106 | printf(_(" -U, --username=NAME connect as specified database user\n" )); |
107 | printf(_(" -w, --no-password never prompt for password\n" )); |
108 | printf(_(" -W, --password force password prompt (should happen automatically)\n" )); |
109 | printf(_("\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n" )); |
110 | } |
111 | |
112 | /* |
113 | * Send a Standby Status Update message to server. |
114 | */ |
115 | static bool |
116 | sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) |
117 | { |
118 | static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; |
119 | static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; |
120 | |
121 | char replybuf[1 + 8 + 8 + 8 + 8 + 1]; |
122 | int len = 0; |
123 | |
124 | /* |
125 | * we normally don't want to send superfluous feedback, but if it's |
126 | * because of a timeout we need to, otherwise wal_sender_timeout will kill |
127 | * us. |
128 | */ |
129 | if (!force && |
130 | last_written_lsn == output_written_lsn && |
131 | last_fsync_lsn != output_fsync_lsn) |
132 | return true; |
133 | |
134 | if (verbose) |
135 | pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)" , |
136 | (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn, |
137 | (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn, |
138 | replication_slot); |
139 | |
140 | replybuf[len] = 'r'; |
141 | len += 1; |
142 | fe_sendint64(output_written_lsn, &replybuf[len]); /* write */ |
143 | len += 8; |
144 | fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */ |
145 | len += 8; |
146 | fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ |
147 | len += 8; |
148 | fe_sendint64(now, &replybuf[len]); /* sendTime */ |
149 | len += 8; |
150 | replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ |
151 | len += 1; |
152 | |
153 | startpos = output_written_lsn; |
154 | last_written_lsn = output_written_lsn; |
155 | last_fsync_lsn = output_fsync_lsn; |
156 | |
157 | if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) |
158 | { |
159 | pg_log_error("could not send feedback packet: %s" , |
160 | PQerrorMessage(conn)); |
161 | return false; |
162 | } |
163 | |
164 | return true; |
165 | } |
166 | |
167 | static void |
168 | disconnect_atexit(void) |
169 | { |
170 | if (conn != NULL) |
171 | PQfinish(conn); |
172 | } |
173 | |
174 | static bool |
175 | OutputFsync(TimestampTz now) |
176 | { |
177 | output_last_fsync = now; |
178 | |
179 | output_fsync_lsn = output_written_lsn; |
180 | |
181 | if (fsync_interval <= 0) |
182 | return true; |
183 | |
184 | if (!output_needs_fsync) |
185 | return true; |
186 | |
187 | output_needs_fsync = false; |
188 | |
189 | /* can only fsync if it's a regular file */ |
190 | if (!output_isfile) |
191 | return true; |
192 | |
193 | if (fsync(outfd) != 0) |
194 | { |
195 | pg_log_error("could not fsync file \"%s\": %m" , outfile); |
196 | return false; |
197 | } |
198 | |
199 | return true; |
200 | } |
201 | |
202 | /* |
203 | * Start the log streaming |
204 | */ |
205 | static void |
206 | StreamLogicalLog(void) |
207 | { |
208 | PGresult *res; |
209 | char *copybuf = NULL; |
210 | TimestampTz last_status = -1; |
211 | int i; |
212 | PQExpBuffer query; |
213 | |
214 | output_written_lsn = InvalidXLogRecPtr; |
215 | output_fsync_lsn = InvalidXLogRecPtr; |
216 | |
217 | query = createPQExpBuffer(); |
218 | |
219 | /* |
220 | * Connect in replication mode to the server |
221 | */ |
222 | if (!conn) |
223 | conn = GetConnection(); |
224 | if (!conn) |
225 | /* Error message already written in GetConnection() */ |
226 | return; |
227 | |
228 | /* |
229 | * Start the replication |
230 | */ |
231 | if (verbose) |
232 | pg_log_info("starting log streaming at %X/%X (slot %s)" , |
233 | (uint32) (startpos >> 32), (uint32) startpos, |
234 | replication_slot); |
235 | |
236 | /* Initiate the replication stream at specified location */ |
237 | appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X" , |
238 | replication_slot, (uint32) (startpos >> 32), (uint32) startpos); |
239 | |
240 | /* print options if there are any */ |
241 | if (noptions) |
242 | appendPQExpBufferStr(query, " (" ); |
243 | |
244 | for (i = 0; i < noptions; i++) |
245 | { |
246 | /* separator */ |
247 | if (i > 0) |
248 | appendPQExpBufferStr(query, ", " ); |
249 | |
250 | /* write option name */ |
251 | appendPQExpBuffer(query, "\"%s\"" , options[(i * 2)]); |
252 | |
253 | /* write option value if specified */ |
254 | if (options[(i * 2) + 1] != NULL) |
255 | appendPQExpBuffer(query, " '%s'" , options[(i * 2) + 1]); |
256 | } |
257 | |
258 | if (noptions) |
259 | appendPQExpBufferChar(query, ')'); |
260 | |
261 | res = PQexec(conn, query->data); |
262 | if (PQresultStatus(res) != PGRES_COPY_BOTH) |
263 | { |
264 | pg_log_error("could not send replication command \"%s\": %s" , |
265 | query->data, PQresultErrorMessage(res)); |
266 | PQclear(res); |
267 | goto error; |
268 | } |
269 | PQclear(res); |
270 | resetPQExpBuffer(query); |
271 | |
272 | if (verbose) |
273 | pg_log_info("streaming initiated" ); |
274 | |
275 | while (!time_to_abort) |
276 | { |
277 | int r; |
278 | int bytes_left; |
279 | int bytes_written; |
280 | TimestampTz now; |
281 | int hdr_len; |
282 | XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; |
283 | |
284 | if (copybuf != NULL) |
285 | { |
286 | PQfreemem(copybuf); |
287 | copybuf = NULL; |
288 | } |
289 | |
290 | /* |
291 | * Potentially send a status message to the master |
292 | */ |
293 | now = feGetCurrentTimestamp(); |
294 | |
295 | if (outfd != -1 && |
296 | feTimestampDifferenceExceeds(output_last_fsync, now, |
297 | fsync_interval)) |
298 | { |
299 | if (!OutputFsync(now)) |
300 | goto error; |
301 | } |
302 | |
303 | if (standby_message_timeout > 0 && |
304 | feTimestampDifferenceExceeds(last_status, now, |
305 | standby_message_timeout)) |
306 | { |
307 | /* Time to send feedback! */ |
308 | if (!sendFeedback(conn, now, true, false)) |
309 | goto error; |
310 | |
311 | last_status = now; |
312 | } |
313 | |
314 | /* got SIGHUP, close output file */ |
315 | if (outfd != -1 && output_reopen && strcmp(outfile, "-" ) != 0) |
316 | { |
317 | now = feGetCurrentTimestamp(); |
318 | if (!OutputFsync(now)) |
319 | goto error; |
320 | close(outfd); |
321 | outfd = -1; |
322 | } |
323 | output_reopen = false; |
324 | |
325 | /* open the output file, if not open yet */ |
326 | if (outfd == -1) |
327 | { |
328 | struct stat statbuf; |
329 | |
330 | if (strcmp(outfile, "-" ) == 0) |
331 | outfd = fileno(stdout); |
332 | else |
333 | outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY, |
334 | S_IRUSR | S_IWUSR); |
335 | if (outfd == -1) |
336 | { |
337 | pg_log_error("could not open log file \"%s\": %m" , outfile); |
338 | goto error; |
339 | } |
340 | |
341 | if (fstat(outfd, &statbuf) != 0) |
342 | pg_log_error("could not stat file \"%s\": %m" , outfile); |
343 | |
344 | output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd); |
345 | } |
346 | |
347 | r = PQgetCopyData(conn, ©buf, 1); |
348 | if (r == 0) |
349 | { |
350 | /* |
351 | * In async mode, and no data available. We block on reading but |
352 | * not more than the specified timeout, so that we can send a |
353 | * response back to the client. |
354 | */ |
355 | fd_set input_mask; |
356 | TimestampTz message_target = 0; |
357 | TimestampTz fsync_target = 0; |
358 | struct timeval timeout; |
359 | struct timeval *timeoutptr = NULL; |
360 | |
361 | if (PQsocket(conn) < 0) |
362 | { |
363 | pg_log_error("invalid socket: %s" , PQerrorMessage(conn)); |
364 | goto error; |
365 | } |
366 | |
367 | FD_ZERO(&input_mask); |
368 | FD_SET(PQsocket(conn), &input_mask); |
369 | |
370 | /* Compute when we need to wakeup to send a keepalive message. */ |
371 | if (standby_message_timeout) |
372 | message_target = last_status + (standby_message_timeout - 1) * |
373 | ((int64) 1000); |
374 | |
375 | /* Compute when we need to wakeup to fsync the output file. */ |
376 | if (fsync_interval > 0 && output_needs_fsync) |
377 | fsync_target = output_last_fsync + (fsync_interval - 1) * |
378 | ((int64) 1000); |
379 | |
380 | /* Now compute when to wakeup. */ |
381 | if (message_target > 0 || fsync_target > 0) |
382 | { |
383 | TimestampTz targettime; |
384 | long secs; |
385 | int usecs; |
386 | |
387 | targettime = message_target; |
388 | |
389 | if (fsync_target > 0 && fsync_target < targettime) |
390 | targettime = fsync_target; |
391 | |
392 | feTimestampDifference(now, |
393 | targettime, |
394 | &secs, |
395 | &usecs); |
396 | if (secs <= 0) |
397 | timeout.tv_sec = 1; /* Always sleep at least 1 sec */ |
398 | else |
399 | timeout.tv_sec = secs; |
400 | timeout.tv_usec = usecs; |
401 | timeoutptr = &timeout; |
402 | } |
403 | |
404 | r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); |
405 | if (r == 0 || (r < 0 && errno == EINTR)) |
406 | { |
407 | /* |
408 | * Got a timeout or signal. Continue the loop and either |
409 | * deliver a status packet to the server or just go back into |
410 | * blocking. |
411 | */ |
412 | continue; |
413 | } |
414 | else if (r < 0) |
415 | { |
416 | pg_log_error("select() failed: %m" ); |
417 | goto error; |
418 | } |
419 | |
420 | /* Else there is actually data on the socket */ |
421 | if (PQconsumeInput(conn) == 0) |
422 | { |
423 | pg_log_error("could not receive data from WAL stream: %s" , |
424 | PQerrorMessage(conn)); |
425 | goto error; |
426 | } |
427 | continue; |
428 | } |
429 | |
430 | /* End of copy stream */ |
431 | if (r == -1) |
432 | break; |
433 | |
434 | /* Failure while reading the copy stream */ |
435 | if (r == -2) |
436 | { |
437 | pg_log_error("could not read COPY data: %s" , |
438 | PQerrorMessage(conn)); |
439 | goto error; |
440 | } |
441 | |
442 | /* Check the message type. */ |
443 | if (copybuf[0] == 'k') |
444 | { |
445 | int pos; |
446 | bool replyRequested; |
447 | XLogRecPtr walEnd; |
448 | bool endposReached = false; |
449 | |
450 | /* |
451 | * Parse the keepalive message, enclosed in the CopyData message. |
452 | * We just check if the server requested a reply, and ignore the |
453 | * rest. |
454 | */ |
455 | pos = 1; /* skip msgtype 'k' */ |
456 | walEnd = fe_recvint64(©buf[pos]); |
457 | output_written_lsn = Max(walEnd, output_written_lsn); |
458 | |
459 | pos += 8; /* read walEnd */ |
460 | |
461 | pos += 8; /* skip sendTime */ |
462 | |
463 | if (r < pos + 1) |
464 | { |
465 | pg_log_error("streaming header too small: %d" , r); |
466 | goto error; |
467 | } |
468 | replyRequested = copybuf[pos]; |
469 | |
470 | if (endpos != InvalidXLogRecPtr && walEnd >= endpos) |
471 | { |
472 | /* |
473 | * If there's nothing to read on the socket until a keepalive |
474 | * we know that the server has nothing to send us; and if |
475 | * walEnd has passed endpos, we know nothing else can have |
476 | * committed before endpos. So we can bail out now. |
477 | */ |
478 | endposReached = true; |
479 | } |
480 | |
481 | /* Send a reply, if necessary */ |
482 | if (replyRequested || endposReached) |
483 | { |
484 | if (!flushAndSendFeedback(conn, &now)) |
485 | goto error; |
486 | last_status = now; |
487 | } |
488 | |
489 | if (endposReached) |
490 | { |
491 | prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); |
492 | time_to_abort = true; |
493 | break; |
494 | } |
495 | |
496 | continue; |
497 | } |
498 | else if (copybuf[0] != 'w') |
499 | { |
500 | pg_log_error("unrecognized streaming header: \"%c\"" , |
501 | copybuf[0]); |
502 | goto error; |
503 | } |
504 | |
505 | /* |
506 | * Read the header of the XLogData message, enclosed in the CopyData |
507 | * message. We only need the WAL location field (dataStart), the rest |
508 | * of the header is ignored. |
509 | */ |
510 | hdr_len = 1; /* msgtype 'w' */ |
511 | hdr_len += 8; /* dataStart */ |
512 | hdr_len += 8; /* walEnd */ |
513 | hdr_len += 8; /* sendTime */ |
514 | if (r < hdr_len + 1) |
515 | { |
516 | pg_log_error("streaming header too small: %d" , r); |
517 | goto error; |
518 | } |
519 | |
520 | /* Extract WAL location for this block */ |
521 | cur_record_lsn = fe_recvint64(©buf[1]); |
522 | |
523 | if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) |
524 | { |
525 | /* |
526 | * We've read past our endpoint, so prepare to go away being |
527 | * cautious about what happens to our output data. |
528 | */ |
529 | if (!flushAndSendFeedback(conn, &now)) |
530 | goto error; |
531 | prepareToTerminate(conn, endpos, false, cur_record_lsn); |
532 | time_to_abort = true; |
533 | break; |
534 | } |
535 | |
536 | output_written_lsn = Max(cur_record_lsn, output_written_lsn); |
537 | |
538 | bytes_left = r - hdr_len; |
539 | bytes_written = 0; |
540 | |
541 | /* signal that a fsync is needed */ |
542 | output_needs_fsync = true; |
543 | |
544 | while (bytes_left) |
545 | { |
546 | int ret; |
547 | |
548 | ret = write(outfd, |
549 | copybuf + hdr_len + bytes_written, |
550 | bytes_left); |
551 | |
552 | if (ret < 0) |
553 | { |
554 | pg_log_error("could not write %u bytes to log file \"%s\": %m" , |
555 | bytes_left, outfile); |
556 | goto error; |
557 | } |
558 | |
559 | /* Write was successful, advance our position */ |
560 | bytes_written += ret; |
561 | bytes_left -= ret; |
562 | } |
563 | |
564 | if (write(outfd, "\n" , 1) != 1) |
565 | { |
566 | pg_log_error("could not write %u bytes to log file \"%s\": %m" , |
567 | 1, outfile); |
568 | goto error; |
569 | } |
570 | |
571 | if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos) |
572 | { |
573 | /* endpos was exactly the record we just processed, we're done */ |
574 | if (!flushAndSendFeedback(conn, &now)) |
575 | goto error; |
576 | prepareToTerminate(conn, endpos, false, cur_record_lsn); |
577 | time_to_abort = true; |
578 | break; |
579 | } |
580 | } |
581 | |
582 | res = PQgetResult(conn); |
583 | if (PQresultStatus(res) == PGRES_COPY_OUT) |
584 | { |
585 | /* |
586 | * We're doing a client-initiated clean exit and have sent CopyDone to |
587 | * the server. We've already sent replay confirmation and fsync'd so |
588 | * we can just clean up the connection now. |
589 | */ |
590 | goto error; |
591 | } |
592 | else if (PQresultStatus(res) != PGRES_COMMAND_OK) |
593 | { |
594 | pg_log_error("unexpected termination of replication stream: %s" , |
595 | PQresultErrorMessage(res)); |
596 | goto error; |
597 | } |
598 | PQclear(res); |
599 | |
600 | if (outfd != -1 && strcmp(outfile, "-" ) != 0) |
601 | { |
602 | TimestampTz t = feGetCurrentTimestamp(); |
603 | |
604 | /* no need to jump to error on failure here, we're finishing anyway */ |
605 | OutputFsync(t); |
606 | |
607 | if (close(outfd) != 0) |
608 | pg_log_error("could not close file \"%s\": %m" , outfile); |
609 | } |
610 | outfd = -1; |
611 | error: |
612 | if (copybuf != NULL) |
613 | { |
614 | PQfreemem(copybuf); |
615 | copybuf = NULL; |
616 | } |
617 | destroyPQExpBuffer(query); |
618 | PQfinish(conn); |
619 | conn = NULL; |
620 | } |
621 | |
622 | /* |
623 | * Unfortunately we can't do sensible signal handling on windows... |
624 | */ |
625 | #ifndef WIN32 |
626 | |
627 | /* |
628 | * When sigint is called, just tell the system to exit at the next possible |
629 | * moment. |
630 | */ |
631 | static void |
632 | sigint_handler(int signum) |
633 | { |
634 | time_to_abort = true; |
635 | } |
636 | |
637 | /* |
638 | * Trigger the output file to be reopened. |
639 | */ |
640 | static void |
641 | sighup_handler(int signum) |
642 | { |
643 | output_reopen = true; |
644 | } |
645 | #endif |
646 | |
647 | |
648 | int |
649 | main(int argc, char **argv) |
650 | { |
651 | static struct option long_options[] = { |
652 | /* general options */ |
653 | {"file" , required_argument, NULL, 'f'}, |
654 | {"fsync-interval" , required_argument, NULL, 'F'}, |
655 | {"no-loop" , no_argument, NULL, 'n'}, |
656 | {"verbose" , no_argument, NULL, 'v'}, |
657 | {"version" , no_argument, NULL, 'V'}, |
658 | {"help" , no_argument, NULL, '?'}, |
659 | /* connection options */ |
660 | {"dbname" , required_argument, NULL, 'd'}, |
661 | {"host" , required_argument, NULL, 'h'}, |
662 | {"port" , required_argument, NULL, 'p'}, |
663 | {"username" , required_argument, NULL, 'U'}, |
664 | {"no-password" , no_argument, NULL, 'w'}, |
665 | {"password" , no_argument, NULL, 'W'}, |
666 | /* replication options */ |
667 | {"startpos" , required_argument, NULL, 'I'}, |
668 | {"endpos" , required_argument, NULL, 'E'}, |
669 | {"option" , required_argument, NULL, 'o'}, |
670 | {"plugin" , required_argument, NULL, 'P'}, |
671 | {"status-interval" , required_argument, NULL, 's'}, |
672 | {"slot" , required_argument, NULL, 'S'}, |
673 | /* action */ |
674 | {"create-slot" , no_argument, NULL, 1}, |
675 | {"start" , no_argument, NULL, 2}, |
676 | {"drop-slot" , no_argument, NULL, 3}, |
677 | {"if-not-exists" , no_argument, NULL, 4}, |
678 | {NULL, 0, NULL, 0} |
679 | }; |
680 | int c; |
681 | int option_index; |
682 | uint32 hi, |
683 | lo; |
684 | char *db_name; |
685 | |
686 | pg_logging_init(argv[0]); |
687 | progname = get_progname(argv[0]); |
688 | set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup" )); |
689 | |
690 | if (argc > 1) |
691 | { |
692 | if (strcmp(argv[1], "--help" ) == 0 || strcmp(argv[1], "-?" ) == 0) |
693 | { |
694 | usage(); |
695 | exit(0); |
696 | } |
697 | else if (strcmp(argv[1], "-V" ) == 0 || |
698 | strcmp(argv[1], "--version" ) == 0) |
699 | { |
700 | puts("pg_recvlogical (PostgreSQL) " PG_VERSION); |
701 | exit(0); |
702 | } |
703 | } |
704 | |
705 | while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:" , |
706 | long_options, &option_index)) != -1) |
707 | { |
708 | switch (c) |
709 | { |
710 | /* general options */ |
711 | case 'f': |
712 | outfile = pg_strdup(optarg); |
713 | break; |
714 | case 'F': |
715 | fsync_interval = atoi(optarg) * 1000; |
716 | if (fsync_interval < 0) |
717 | { |
718 | pg_log_error("invalid fsync interval \"%s\"" , optarg); |
719 | exit(1); |
720 | } |
721 | break; |
722 | case 'n': |
723 | noloop = 1; |
724 | break; |
725 | case 'v': |
726 | verbose++; |
727 | break; |
728 | /* connection options */ |
729 | case 'd': |
730 | dbname = pg_strdup(optarg); |
731 | break; |
732 | case 'h': |
733 | dbhost = pg_strdup(optarg); |
734 | break; |
735 | case 'p': |
736 | if (atoi(optarg) <= 0) |
737 | { |
738 | pg_log_error("invalid port number \"%s\"" , optarg); |
739 | exit(1); |
740 | } |
741 | dbport = pg_strdup(optarg); |
742 | break; |
743 | case 'U': |
744 | dbuser = pg_strdup(optarg); |
745 | break; |
746 | case 'w': |
747 | dbgetpassword = -1; |
748 | break; |
749 | case 'W': |
750 | dbgetpassword = 1; |
751 | break; |
752 | /* replication options */ |
753 | case 'I': |
754 | if (sscanf(optarg, "%X/%X" , &hi, &lo) != 2) |
755 | { |
756 | pg_log_error("could not parse start position \"%s\"" , optarg); |
757 | exit(1); |
758 | } |
759 | startpos = ((uint64) hi) << 32 | lo; |
760 | break; |
761 | case 'E': |
762 | if (sscanf(optarg, "%X/%X" , &hi, &lo) != 2) |
763 | { |
764 | pg_log_error("could not parse end position \"%s\"" , optarg); |
765 | exit(1); |
766 | } |
767 | endpos = ((uint64) hi) << 32 | lo; |
768 | break; |
769 | case 'o': |
770 | { |
771 | char *data = pg_strdup(optarg); |
772 | char *val = strchr(data, '='); |
773 | |
774 | if (val != NULL) |
775 | { |
776 | /* remove =; separate data from val */ |
777 | *val = '\0'; |
778 | val++; |
779 | } |
780 | |
781 | noptions += 1; |
782 | options = pg_realloc(options, sizeof(char *) * noptions * 2); |
783 | |
784 | options[(noptions - 1) * 2] = data; |
785 | options[(noptions - 1) * 2 + 1] = val; |
786 | } |
787 | |
788 | break; |
789 | case 'P': |
790 | plugin = pg_strdup(optarg); |
791 | break; |
792 | case 's': |
793 | standby_message_timeout = atoi(optarg) * 1000; |
794 | if (standby_message_timeout < 0) |
795 | { |
796 | pg_log_error("invalid status interval \"%s\"" , optarg); |
797 | exit(1); |
798 | } |
799 | break; |
800 | case 'S': |
801 | replication_slot = pg_strdup(optarg); |
802 | break; |
803 | /* action */ |
804 | case 1: |
805 | do_create_slot = true; |
806 | break; |
807 | case 2: |
808 | do_start_slot = true; |
809 | break; |
810 | case 3: |
811 | do_drop_slot = true; |
812 | break; |
813 | case 4: |
814 | slot_exists_ok = true; |
815 | break; |
816 | |
817 | default: |
818 | |
819 | /* |
820 | * getopt_long already emitted a complaint |
821 | */ |
822 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
823 | progname); |
824 | exit(1); |
825 | } |
826 | } |
827 | |
828 | /* |
829 | * Any non-option arguments? |
830 | */ |
831 | if (optind < argc) |
832 | { |
833 | pg_log_error("too many command-line arguments (first is \"%s\")" , |
834 | argv[optind]); |
835 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
836 | progname); |
837 | exit(1); |
838 | } |
839 | |
840 | /* |
841 | * Required arguments |
842 | */ |
843 | if (replication_slot == NULL) |
844 | { |
845 | pg_log_error("no slot specified" ); |
846 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
847 | progname); |
848 | exit(1); |
849 | } |
850 | |
851 | if (do_start_slot && outfile == NULL) |
852 | { |
853 | pg_log_error("no target file specified" ); |
854 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
855 | progname); |
856 | exit(1); |
857 | } |
858 | |
859 | if (!do_drop_slot && dbname == NULL) |
860 | { |
861 | pg_log_error("no database specified" ); |
862 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
863 | progname); |
864 | exit(1); |
865 | } |
866 | |
867 | if (!do_drop_slot && !do_create_slot && !do_start_slot) |
868 | { |
869 | pg_log_error("at least one action needs to be specified" ); |
870 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
871 | progname); |
872 | exit(1); |
873 | } |
874 | |
875 | if (do_drop_slot && (do_create_slot || do_start_slot)) |
876 | { |
877 | pg_log_error("cannot use --create-slot or --start together with --drop-slot" ); |
878 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
879 | progname); |
880 | exit(1); |
881 | } |
882 | |
883 | if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot)) |
884 | { |
885 | pg_log_error("cannot use --create-slot or --drop-slot together with --startpos" ); |
886 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
887 | progname); |
888 | exit(1); |
889 | } |
890 | |
891 | if (endpos != InvalidXLogRecPtr && !do_start_slot) |
892 | { |
893 | pg_log_error("--endpos may only be specified with --start" ); |
894 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
895 | progname); |
896 | exit(1); |
897 | } |
898 | |
899 | #ifndef WIN32 |
900 | pqsignal(SIGINT, sigint_handler); |
901 | pqsignal(SIGHUP, sighup_handler); |
902 | #endif |
903 | |
904 | /* |
905 | * Obtain a connection to server. This is not really necessary but it |
906 | * helps to get more precise error messages about authentication, required |
907 | * GUC parameters and such. |
908 | */ |
909 | conn = GetConnection(); |
910 | if (!conn) |
911 | /* Error message already written in GetConnection() */ |
912 | exit(1); |
913 | atexit(disconnect_atexit); |
914 | |
915 | /* |
916 | * Run IDENTIFY_SYSTEM to make sure we connected using a database specific |
917 | * replication connection. |
918 | */ |
919 | if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) |
920 | exit(1); |
921 | |
922 | if (db_name == NULL) |
923 | { |
924 | pg_log_error("could not establish database-specific replication connection" ); |
925 | exit(1); |
926 | } |
927 | |
928 | /* |
929 | * Set umask so that directories/files are created with the same |
930 | * permissions as directories/files in the source data directory. |
931 | * |
932 | * pg_mode_mask is set to owner-only by default and then updated in |
933 | * GetConnection() where we get the mode from the server-side with |
934 | * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm(). |
935 | */ |
936 | umask(pg_mode_mask); |
937 | |
938 | /* Drop a replication slot. */ |
939 | if (do_drop_slot) |
940 | { |
941 | if (verbose) |
942 | pg_log_info("dropping replication slot \"%s\"" , replication_slot); |
943 | |
944 | if (!DropReplicationSlot(conn, replication_slot)) |
945 | exit(1); |
946 | } |
947 | |
948 | /* Create a replication slot. */ |
949 | if (do_create_slot) |
950 | { |
951 | if (verbose) |
952 | pg_log_info("creating replication slot \"%s\"" , replication_slot); |
953 | |
954 | if (!CreateReplicationSlot(conn, replication_slot, plugin, false, |
955 | false, false, slot_exists_ok)) |
956 | exit(1); |
957 | startpos = InvalidXLogRecPtr; |
958 | } |
959 | |
960 | if (!do_start_slot) |
961 | exit(0); |
962 | |
963 | /* Stream loop */ |
964 | while (true) |
965 | { |
966 | StreamLogicalLog(); |
967 | if (time_to_abort) |
968 | { |
969 | /* |
970 | * We've been Ctrl-C'ed or reached an exit limit condition. That's |
971 | * not an error, so exit without an errorcode. |
972 | */ |
973 | exit(0); |
974 | } |
975 | else if (noloop) |
976 | { |
977 | pg_log_error("disconnected" ); |
978 | exit(1); |
979 | } |
980 | else |
981 | { |
982 | /* translator: check source for value for %d */ |
983 | pg_log_info("disconnected; waiting %d seconds to try again" , |
984 | RECONNECT_SLEEP_TIME); |
985 | pg_usleep(RECONNECT_SLEEP_TIME * 1000000); |
986 | } |
987 | } |
988 | } |
989 | |
990 | /* |
991 | * Fsync our output data, and send a feedback message to the server. Returns |
992 | * true if successful, false otherwise. |
993 | * |
994 | * If successful, *now is updated to the current timestamp just before sending |
995 | * feedback. |
996 | */ |
997 | static bool |
998 | flushAndSendFeedback(PGconn *conn, TimestampTz *now) |
999 | { |
1000 | /* flush data to disk, so that we send a recent flush pointer */ |
1001 | if (!OutputFsync(*now)) |
1002 | return false; |
1003 | *now = feGetCurrentTimestamp(); |
1004 | if (!sendFeedback(conn, *now, true, false)) |
1005 | return false; |
1006 | |
1007 | return true; |
1008 | } |
1009 | |
1010 | /* |
1011 | * Try to inform the server about our upcoming demise, but don't wait around or |
1012 | * retry on failure. |
1013 | */ |
1014 | static void |
1015 | prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn) |
1016 | { |
1017 | (void) PQputCopyEnd(conn, NULL); |
1018 | (void) PQflush(conn); |
1019 | |
1020 | if (verbose) |
1021 | { |
1022 | if (keepalive) |
1023 | pg_log_info("end position %X/%X reached by keepalive" , |
1024 | (uint32) (endpos >> 32), (uint32) endpos); |
1025 | else |
1026 | pg_log_info("end position %X/%X reached by WAL record at %X/%X" , |
1027 | (uint32) (endpos >> 32), (uint32) (endpos), |
1028 | (uint32) (lsn >> 32), (uint32) lsn); |
1029 | } |
1030 | } |
1031 | |