1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * streamutil.c - utility functions for pg_basebackup, pg_receivewal and |
4 | * pg_recvlogical |
5 | * |
6 | * Author: Magnus Hagander <magnus@hagander.net> |
7 | * |
8 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
9 | * |
10 | * IDENTIFICATION |
11 | * src/bin/pg_basebackup/streamutil.c |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | |
15 | #include "postgres_fe.h" |
16 | |
17 | #include <sys/time.h> |
18 | #include <unistd.h> |
19 | |
20 | /* local includes */ |
21 | #include "receivelog.h" |
22 | #include "streamutil.h" |
23 | |
24 | #include "access/xlog_internal.h" |
25 | #include "common/fe_memutils.h" |
26 | #include "common/file_perm.h" |
27 | #include "common/logging.h" |
28 | #include "datatype/timestamp.h" |
29 | #include "fe_utils/connect.h" |
30 | #include "port/pg_bswap.h" |
31 | #include "pqexpbuffer.h" |
32 | |
33 | #define ERRCODE_DUPLICATE_OBJECT "42710" |
34 | |
35 | uint32 WalSegSz; |
36 | |
37 | static bool RetrieveDataDirCreatePerm(PGconn *conn); |
38 | |
39 | /* SHOW command for replication connection was introduced in version 10 */ |
40 | #define MINIMUM_VERSION_FOR_SHOW_CMD 100000 |
41 | |
42 | /* |
43 | * Group access is supported from version 11. |
44 | */ |
45 | #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000 |
46 | |
47 | const char *progname; |
48 | char *connection_string = NULL; |
49 | char *dbhost = NULL; |
50 | char *dbuser = NULL; |
51 | char *dbport = NULL; |
52 | char *dbname = NULL; |
53 | int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ |
54 | static bool have_password = false; |
55 | static char password[100]; |
56 | PGconn *conn = NULL; |
57 | |
58 | /* |
59 | * Connect to the server. Returns a valid PGconn pointer if connected, |
60 | * or NULL on non-permanent error. On permanent error, the function will |
61 | * call exit(1) directly. |
62 | */ |
63 | PGconn * |
64 | GetConnection(void) |
65 | { |
66 | PGconn *tmpconn; |
67 | int argcount = 7; /* dbname, replication, fallback_app_name, |
68 | * host, user, port, password */ |
69 | int i; |
70 | const char **keywords; |
71 | const char **values; |
72 | const char *tmpparam; |
73 | bool need_password; |
74 | PQconninfoOption *conn_opts = NULL; |
75 | PQconninfoOption *conn_opt; |
76 | char *err_msg = NULL; |
77 | |
78 | /* pg_recvlogical uses dbname only; others use connection_string only. */ |
79 | Assert(dbname == NULL || connection_string == NULL); |
80 | |
81 | /* |
82 | * Merge the connection info inputs given in form of connection string, |
83 | * options and default values (dbname=replication, replication=true, etc.) |
84 | * Explicitly discard any dbname value in the connection string; |
85 | * otherwise, PQconnectdbParams() would interpret that value as being |
86 | * itself a connection string. |
87 | */ |
88 | i = 0; |
89 | if (connection_string) |
90 | { |
91 | conn_opts = PQconninfoParse(connection_string, &err_msg); |
92 | if (conn_opts == NULL) |
93 | { |
94 | pg_log_error("%s" , err_msg); |
95 | exit(1); |
96 | } |
97 | |
98 | for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++) |
99 | { |
100 | if (conn_opt->val != NULL && conn_opt->val[0] != '\0' && |
101 | strcmp(conn_opt->keyword, "dbname" ) != 0) |
102 | argcount++; |
103 | } |
104 | |
105 | keywords = pg_malloc0((argcount + 1) * sizeof(*keywords)); |
106 | values = pg_malloc0((argcount + 1) * sizeof(*values)); |
107 | |
108 | for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++) |
109 | { |
110 | if (conn_opt->val != NULL && conn_opt->val[0] != '\0' && |
111 | strcmp(conn_opt->keyword, "dbname" ) != 0) |
112 | { |
113 | keywords[i] = conn_opt->keyword; |
114 | values[i] = conn_opt->val; |
115 | i++; |
116 | } |
117 | } |
118 | } |
119 | else |
120 | { |
121 | keywords = pg_malloc0((argcount + 1) * sizeof(*keywords)); |
122 | values = pg_malloc0((argcount + 1) * sizeof(*values)); |
123 | } |
124 | |
125 | keywords[i] = "dbname" ; |
126 | values[i] = dbname == NULL ? "replication" : dbname; |
127 | i++; |
128 | keywords[i] = "replication" ; |
129 | values[i] = dbname == NULL ? "true" : "database" ; |
130 | i++; |
131 | keywords[i] = "fallback_application_name" ; |
132 | values[i] = progname; |
133 | i++; |
134 | |
135 | if (dbhost) |
136 | { |
137 | keywords[i] = "host" ; |
138 | values[i] = dbhost; |
139 | i++; |
140 | } |
141 | if (dbuser) |
142 | { |
143 | keywords[i] = "user" ; |
144 | values[i] = dbuser; |
145 | i++; |
146 | } |
147 | if (dbport) |
148 | { |
149 | keywords[i] = "port" ; |
150 | values[i] = dbport; |
151 | i++; |
152 | } |
153 | |
154 | /* If -W was given, force prompt for password, but only the first time */ |
155 | need_password = (dbgetpassword == 1 && !have_password); |
156 | |
157 | do |
158 | { |
159 | /* Get a new password if appropriate */ |
160 | if (need_password) |
161 | { |
162 | simple_prompt("Password: " , password, sizeof(password), false); |
163 | have_password = true; |
164 | need_password = false; |
165 | } |
166 | |
167 | /* Use (or reuse, on a subsequent connection) password if we have it */ |
168 | if (have_password) |
169 | { |
170 | keywords[i] = "password" ; |
171 | values[i] = password; |
172 | } |
173 | else |
174 | { |
175 | keywords[i] = NULL; |
176 | values[i] = NULL; |
177 | } |
178 | |
179 | tmpconn = PQconnectdbParams(keywords, values, true); |
180 | |
181 | /* |
182 | * If there is too little memory even to allocate the PGconn object |
183 | * and PQconnectdbParams returns NULL, we call exit(1) directly. |
184 | */ |
185 | if (!tmpconn) |
186 | { |
187 | pg_log_error("could not connect to server" ); |
188 | exit(1); |
189 | } |
190 | |
191 | /* If we need a password and -w wasn't given, loop back and get one */ |
192 | if (PQstatus(tmpconn) == CONNECTION_BAD && |
193 | PQconnectionNeedsPassword(tmpconn) && |
194 | dbgetpassword != -1) |
195 | { |
196 | PQfinish(tmpconn); |
197 | need_password = true; |
198 | } |
199 | } |
200 | while (need_password); |
201 | |
202 | if (PQstatus(tmpconn) != CONNECTION_OK) |
203 | { |
204 | pg_log_error("could not connect to server: %s" , |
205 | PQerrorMessage(tmpconn)); |
206 | PQfinish(tmpconn); |
207 | free(values); |
208 | free(keywords); |
209 | if (conn_opts) |
210 | PQconninfoFree(conn_opts); |
211 | return NULL; |
212 | } |
213 | |
214 | /* Connection ok! */ |
215 | free(values); |
216 | free(keywords); |
217 | if (conn_opts) |
218 | PQconninfoFree(conn_opts); |
219 | |
220 | /* |
221 | * Set always-secure search path, so malicious users can't get control. |
222 | * The capacity to run normal SQL queries was added in PostgreSQL 10, so |
223 | * the search path cannot be changed (by us or attackers) on earlier |
224 | * versions. |
225 | */ |
226 | if (dbname != NULL && PQserverVersion(tmpconn) >= 100000) |
227 | { |
228 | PGresult *res; |
229 | |
230 | res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL); |
231 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
232 | { |
233 | pg_log_error("could not clear search_path: %s" , |
234 | PQerrorMessage(tmpconn)); |
235 | PQclear(res); |
236 | PQfinish(tmpconn); |
237 | exit(1); |
238 | } |
239 | PQclear(res); |
240 | } |
241 | |
242 | /* |
243 | * Ensure we have the same value of integer_datetimes (now always "on") as |
244 | * the server we are connecting to. |
245 | */ |
246 | tmpparam = PQparameterStatus(tmpconn, "integer_datetimes" ); |
247 | if (!tmpparam) |
248 | { |
249 | pg_log_error("could not determine server setting for integer_datetimes" ); |
250 | PQfinish(tmpconn); |
251 | exit(1); |
252 | } |
253 | |
254 | if (strcmp(tmpparam, "on" ) != 0) |
255 | { |
256 | pg_log_error("integer_datetimes compile flag does not match server" ); |
257 | PQfinish(tmpconn); |
258 | exit(1); |
259 | } |
260 | |
261 | /* |
262 | * Retrieve the source data directory mode and use it to construct a umask |
263 | * for creating directories and files. |
264 | */ |
265 | if (!RetrieveDataDirCreatePerm(tmpconn)) |
266 | { |
267 | PQfinish(tmpconn); |
268 | exit(1); |
269 | } |
270 | |
271 | return tmpconn; |
272 | } |
273 | |
274 | /* |
275 | * From version 10, explicitly set wal segment size using SHOW wal_segment_size |
276 | * since ControlFile is not accessible here. |
277 | */ |
278 | bool |
279 | RetrieveWalSegSize(PGconn *conn) |
280 | { |
281 | PGresult *res; |
282 | char xlog_unit[3]; |
283 | int xlog_val, |
284 | multiplier = 1; |
285 | |
286 | /* check connection existence */ |
287 | Assert(conn != NULL); |
288 | |
289 | /* for previous versions set the default xlog seg size */ |
290 | if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD) |
291 | { |
292 | WalSegSz = DEFAULT_XLOG_SEG_SIZE; |
293 | return true; |
294 | } |
295 | |
296 | res = PQexec(conn, "SHOW wal_segment_size" ); |
297 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
298 | { |
299 | pg_log_error("could not send replication command \"%s\": %s" , |
300 | "SHOW wal_segment_size" , PQerrorMessage(conn)); |
301 | |
302 | PQclear(res); |
303 | return false; |
304 | } |
305 | if (PQntuples(res) != 1 || PQnfields(res) < 1) |
306 | { |
307 | pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields" , |
308 | PQntuples(res), PQnfields(res), 1, 1); |
309 | |
310 | PQclear(res); |
311 | return false; |
312 | } |
313 | |
314 | /* fetch xlog value and unit from the result */ |
315 | if (sscanf(PQgetvalue(res, 0, 0), "%d%s" , &xlog_val, xlog_unit) != 2) |
316 | { |
317 | pg_log_error("WAL segment size could not be parsed" ); |
318 | return false; |
319 | } |
320 | |
321 | /* set the multiplier based on unit to convert xlog_val to bytes */ |
322 | if (strcmp(xlog_unit, "MB" ) == 0) |
323 | multiplier = 1024 * 1024; |
324 | else if (strcmp(xlog_unit, "GB" ) == 0) |
325 | multiplier = 1024 * 1024 * 1024; |
326 | |
327 | /* convert and set WalSegSz */ |
328 | WalSegSz = xlog_val * multiplier; |
329 | |
330 | if (!IsValidWalSegSize(WalSegSz)) |
331 | { |
332 | pg_log_error(ngettext("WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d byte" , |
333 | "WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d bytes" , |
334 | WalSegSz), |
335 | WalSegSz); |
336 | return false; |
337 | } |
338 | |
339 | PQclear(res); |
340 | return true; |
341 | } |
342 | |
343 | /* |
344 | * RetrieveDataDirCreatePerm |
345 | * |
346 | * This function is used to determine the privileges on the server's PG data |
347 | * directory and, based on that, set what the permissions will be for |
348 | * directories and files we create. |
349 | * |
350 | * PG11 added support for (optionally) group read/execute rights to be set on |
351 | * the data directory. Prior to PG11, only the owner was allowed to have rights |
352 | * on the data directory. |
353 | */ |
354 | static bool |
355 | RetrieveDataDirCreatePerm(PGconn *conn) |
356 | { |
357 | PGresult *res; |
358 | int data_directory_mode; |
359 | |
360 | /* check connection existence */ |
361 | Assert(conn != NULL); |
362 | |
363 | /* for previous versions leave the default group access */ |
364 | if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS) |
365 | return true; |
366 | |
367 | res = PQexec(conn, "SHOW data_directory_mode" ); |
368 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
369 | { |
370 | pg_log_error("could not send replication command \"%s\": %s" , |
371 | "SHOW data_directory_mode" , PQerrorMessage(conn)); |
372 | |
373 | PQclear(res); |
374 | return false; |
375 | } |
376 | if (PQntuples(res) != 1 || PQnfields(res) < 1) |
377 | { |
378 | pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields" , |
379 | PQntuples(res), PQnfields(res), 1, 1); |
380 | |
381 | PQclear(res); |
382 | return false; |
383 | } |
384 | |
385 | if (sscanf(PQgetvalue(res, 0, 0), "%o" , &data_directory_mode) != 1) |
386 | { |
387 | pg_log_error("group access flag could not be parsed: %s" , |
388 | PQgetvalue(res, 0, 0)); |
389 | |
390 | PQclear(res); |
391 | return false; |
392 | } |
393 | |
394 | SetDataDirectoryCreatePerm(data_directory_mode); |
395 | |
396 | PQclear(res); |
397 | return true; |
398 | } |
399 | |
400 | /* |
401 | * Run IDENTIFY_SYSTEM through a given connection and give back to caller |
402 | * some result information if requested: |
403 | * - System identifier |
404 | * - Current timeline ID |
405 | * - Start LSN position |
406 | * - Database name (NULL in servers prior to 9.4) |
407 | */ |
408 | bool |
409 | RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, |
410 | XLogRecPtr *startpos, char **db_name) |
411 | { |
412 | PGresult *res; |
413 | uint32 hi, |
414 | lo; |
415 | |
416 | /* Check connection existence */ |
417 | Assert(conn != NULL); |
418 | |
419 | res = PQexec(conn, "IDENTIFY_SYSTEM" ); |
420 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
421 | { |
422 | pg_log_error("could not send replication command \"%s\": %s" , |
423 | "IDENTIFY_SYSTEM" , PQerrorMessage(conn)); |
424 | |
425 | PQclear(res); |
426 | return false; |
427 | } |
428 | if (PQntuples(res) != 1 || PQnfields(res) < 3) |
429 | { |
430 | pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields" , |
431 | PQntuples(res), PQnfields(res), 1, 3); |
432 | |
433 | PQclear(res); |
434 | return false; |
435 | } |
436 | |
437 | /* Get system identifier */ |
438 | if (sysid != NULL) |
439 | *sysid = pg_strdup(PQgetvalue(res, 0, 0)); |
440 | |
441 | /* Get timeline ID to start streaming from */ |
442 | if (starttli != NULL) |
443 | *starttli = atoi(PQgetvalue(res, 0, 1)); |
444 | |
445 | /* Get LSN start position if necessary */ |
446 | if (startpos != NULL) |
447 | { |
448 | if (sscanf(PQgetvalue(res, 0, 2), "%X/%X" , &hi, &lo) != 2) |
449 | { |
450 | pg_log_error("could not parse write-ahead log location \"%s\"" , |
451 | PQgetvalue(res, 0, 2)); |
452 | |
453 | PQclear(res); |
454 | return false; |
455 | } |
456 | *startpos = ((uint64) hi) << 32 | lo; |
457 | } |
458 | |
459 | /* Get database name, only available in 9.4 and newer versions */ |
460 | if (db_name != NULL) |
461 | { |
462 | *db_name = NULL; |
463 | if (PQserverVersion(conn) >= 90400) |
464 | { |
465 | if (PQnfields(res) < 4) |
466 | { |
467 | pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields" , |
468 | PQntuples(res), PQnfields(res), 1, 4); |
469 | |
470 | PQclear(res); |
471 | return false; |
472 | } |
473 | if (!PQgetisnull(res, 0, 3)) |
474 | *db_name = pg_strdup(PQgetvalue(res, 0, 3)); |
475 | } |
476 | } |
477 | |
478 | PQclear(res); |
479 | return true; |
480 | } |
481 | |
482 | /* |
483 | * Create a replication slot for the given connection. This function |
484 | * returns true in case of success. |
485 | */ |
486 | bool |
487 | CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, |
488 | bool is_temporary, bool is_physical, bool reserve_wal, |
489 | bool slot_exists_ok) |
490 | { |
491 | PQExpBuffer query; |
492 | PGresult *res; |
493 | |
494 | query = createPQExpBuffer(); |
495 | |
496 | Assert((is_physical && plugin == NULL) || |
497 | (!is_physical && plugin != NULL)); |
498 | Assert(slot_name != NULL); |
499 | |
500 | /* Build query */ |
501 | appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"" , slot_name); |
502 | if (is_temporary) |
503 | appendPQExpBuffer(query, " TEMPORARY" ); |
504 | if (is_physical) |
505 | { |
506 | appendPQExpBuffer(query, " PHYSICAL" ); |
507 | if (reserve_wal) |
508 | appendPQExpBuffer(query, " RESERVE_WAL" ); |
509 | } |
510 | else |
511 | { |
512 | appendPQExpBuffer(query, " LOGICAL \"%s\"" , plugin); |
513 | if (PQserverVersion(conn) >= 100000) |
514 | /* pg_recvlogical doesn't use an exported snapshot, so suppress */ |
515 | appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT" ); |
516 | } |
517 | |
518 | res = PQexec(conn, query->data); |
519 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
520 | { |
521 | const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); |
522 | |
523 | if (slot_exists_ok && |
524 | sqlstate && |
525 | strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0) |
526 | { |
527 | destroyPQExpBuffer(query); |
528 | PQclear(res); |
529 | return true; |
530 | } |
531 | else |
532 | { |
533 | pg_log_error("could not send replication command \"%s\": %s" , |
534 | query->data, PQerrorMessage(conn)); |
535 | |
536 | destroyPQExpBuffer(query); |
537 | PQclear(res); |
538 | return false; |
539 | } |
540 | } |
541 | |
542 | if (PQntuples(res) != 1 || PQnfields(res) != 4) |
543 | { |
544 | pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields" , |
545 | slot_name, |
546 | PQntuples(res), PQnfields(res), 1, 4); |
547 | |
548 | destroyPQExpBuffer(query); |
549 | PQclear(res); |
550 | return false; |
551 | } |
552 | |
553 | destroyPQExpBuffer(query); |
554 | PQclear(res); |
555 | return true; |
556 | } |
557 | |
558 | /* |
559 | * Drop a replication slot for the given connection. This function |
560 | * returns true in case of success. |
561 | */ |
562 | bool |
563 | DropReplicationSlot(PGconn *conn, const char *slot_name) |
564 | { |
565 | PQExpBuffer query; |
566 | PGresult *res; |
567 | |
568 | Assert(slot_name != NULL); |
569 | |
570 | query = createPQExpBuffer(); |
571 | |
572 | /* Build query */ |
573 | appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"" , |
574 | slot_name); |
575 | res = PQexec(conn, query->data); |
576 | if (PQresultStatus(res) != PGRES_COMMAND_OK) |
577 | { |
578 | pg_log_error("could not send replication command \"%s\": %s" , |
579 | query->data, PQerrorMessage(conn)); |
580 | |
581 | destroyPQExpBuffer(query); |
582 | PQclear(res); |
583 | return false; |
584 | } |
585 | |
586 | if (PQntuples(res) != 0 || PQnfields(res) != 0) |
587 | { |
588 | pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields" , |
589 | slot_name, |
590 | PQntuples(res), PQnfields(res), 0, 0); |
591 | |
592 | destroyPQExpBuffer(query); |
593 | PQclear(res); |
594 | return false; |
595 | } |
596 | |
597 | destroyPQExpBuffer(query); |
598 | PQclear(res); |
599 | return true; |
600 | } |
601 | |
602 | |
603 | /* |
604 | * Frontend version of GetCurrentTimestamp(), since we are not linked with |
605 | * backend code. |
606 | */ |
607 | TimestampTz |
608 | feGetCurrentTimestamp(void) |
609 | { |
610 | TimestampTz result; |
611 | struct timeval tp; |
612 | |
613 | gettimeofday(&tp, NULL); |
614 | |
615 | result = (TimestampTz) tp.tv_sec - |
616 | ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); |
617 | result = (result * USECS_PER_SEC) + tp.tv_usec; |
618 | |
619 | return result; |
620 | } |
621 | |
622 | /* |
623 | * Frontend version of TimestampDifference(), since we are not linked with |
624 | * backend code. |
625 | */ |
626 | void |
627 | feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, |
628 | long *secs, int *microsecs) |
629 | { |
630 | TimestampTz diff = stop_time - start_time; |
631 | |
632 | if (diff <= 0) |
633 | { |
634 | *secs = 0; |
635 | *microsecs = 0; |
636 | } |
637 | else |
638 | { |
639 | *secs = (long) (diff / USECS_PER_SEC); |
640 | *microsecs = (int) (diff % USECS_PER_SEC); |
641 | } |
642 | } |
643 | |
644 | /* |
645 | * Frontend version of TimestampDifferenceExceeds(), since we are not |
646 | * linked with backend code. |
647 | */ |
648 | bool |
649 | feTimestampDifferenceExceeds(TimestampTz start_time, |
650 | TimestampTz stop_time, |
651 | int msec) |
652 | { |
653 | TimestampTz diff = stop_time - start_time; |
654 | |
655 | return (diff >= msec * INT64CONST(1000)); |
656 | } |
657 | |
658 | /* |
659 | * Converts an int64 to network byte order. |
660 | */ |
661 | void |
662 | fe_sendint64(int64 i, char *buf) |
663 | { |
664 | uint64 n64 = pg_hton64(i); |
665 | |
666 | memcpy(buf, &n64, sizeof(n64)); |
667 | } |
668 | |
669 | /* |
670 | * Converts an int64 from network byte order to native format. |
671 | */ |
672 | int64 |
673 | fe_recvint64(char *buf) |
674 | { |
675 | uint64 n64; |
676 | |
677 | memcpy(&n64, buf, sizeof(n64)); |
678 | |
679 | return pg_ntoh64(n64); |
680 | } |
681 | |