1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_db.c
4 *
5 * Implements the basic DB functions used by the archiver.
6 *
7 * IDENTIFICATION
8 * src/bin/pg_dump/pg_backup_db.c
9 *
10 *-------------------------------------------------------------------------
11 */
12#include "postgres_fe.h"
13
14#include "fe_utils/connect.h"
15#include "fe_utils/string_utils.h"
16
17#include "dumputils.h"
18#include "parallel.h"
19#include "pg_backup_archiver.h"
20#include "pg_backup_db.h"
21#include "pg_backup_utils.h"
22
23#include <unistd.h>
24#include <ctype.h>
25#ifdef HAVE_TERMIOS_H
26#include <termios.h>
27#endif
28
29
30static void _check_database_version(ArchiveHandle *AH);
31static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
32static void notice_processor(void *arg, const char *message);
33
34static void
35_check_database_version(ArchiveHandle *AH)
36{
37 const char *remoteversion_str;
38 int remoteversion;
39 PGresult *res;
40
41 remoteversion_str = PQparameterStatus(AH->connection, "server_version");
42 remoteversion = PQserverVersion(AH->connection);
43 if (remoteversion == 0 || !remoteversion_str)
44 fatal("could not get server_version from libpq");
45
46 AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
47 AH->public.remoteVersion = remoteversion;
48 if (!AH->archiveRemoteVersion)
49 AH->archiveRemoteVersion = AH->public.remoteVersionStr;
50
51 if (remoteversion != PG_VERSION_NUM
52 && (remoteversion < AH->public.minRemoteVersion ||
53 remoteversion > AH->public.maxRemoteVersion))
54 {
55 pg_log_error("server version: %s; %s version: %s",
56 remoteversion_str, progname, PG_VERSION);
57 fatal("aborting because of server version mismatch");
58 }
59
60 /*
61 * When running against 9.0 or later, check if we are in recovery mode,
62 * which means we are on a hot standby.
63 */
64 if (remoteversion >= 90000)
65 {
66 res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
67
68 AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
69 PQclear(res);
70 }
71 else
72 AH->public.isStandby = false;
73}
74
75/*
76 * Reconnect to the server. If dbname is not NULL, use that database,
77 * else the one associated with the archive handle. If username is
78 * not NULL, use that user name, else the one from the handle.
79 */
80void
81ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *username)
82{
83 PGconn *newConn;
84 const char *newdbname;
85 const char *newusername;
86
87 if (!dbname)
88 newdbname = PQdb(AH->connection);
89 else
90 newdbname = dbname;
91
92 if (!username)
93 newusername = PQuser(AH->connection);
94 else
95 newusername = username;
96
97 newConn = _connectDB(AH, newdbname, newusername);
98
99 /* Update ArchiveHandle's connCancel before closing old connection */
100 set_archive_cancel_info(AH, newConn);
101
102 PQfinish(AH->connection);
103 AH->connection = newConn;
104
105 /* Start strict; later phases may override this. */
106 PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
107 ALWAYS_SECURE_SEARCH_PATH_SQL));
108}
109
110/*
111 * Connect to the db again.
112 *
113 * Note: it's not really all that sensible to use a single-entry password
114 * cache if the username keeps changing. In current usage, however, the
115 * username never does change, so one savedPassword is sufficient. We do
116 * update the cache on the off chance that the password has changed since the
117 * start of the run.
118 */
119static PGconn *
120_connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
121{
122 PQExpBufferData connstr;
123 PGconn *newConn;
124 const char *newdb;
125 const char *newuser;
126 char *password;
127 char passbuf[100];
128 bool new_pass;
129
130 if (!reqdb)
131 newdb = PQdb(AH->connection);
132 else
133 newdb = reqdb;
134
135 if (!requser || strlen(requser) == 0)
136 newuser = PQuser(AH->connection);
137 else
138 newuser = requser;
139
140 pg_log_info("connecting to database \"%s\" as user \"%s\"",
141 newdb, newuser);
142
143 password = AH->savedPassword;
144
145 if (AH->promptPassword == TRI_YES && password == NULL)
146 {
147 simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
148 password = passbuf;
149 }
150
151 initPQExpBuffer(&connstr);
152 appendPQExpBuffer(&connstr, "dbname=");
153 appendConnStrVal(&connstr, newdb);
154
155 do
156 {
157 const char *keywords[7];
158 const char *values[7];
159
160 keywords[0] = "host";
161 values[0] = PQhost(AH->connection);
162 keywords[1] = "port";
163 values[1] = PQport(AH->connection);
164 keywords[2] = "user";
165 values[2] = newuser;
166 keywords[3] = "password";
167 values[3] = password;
168 keywords[4] = "dbname";
169 values[4] = connstr.data;
170 keywords[5] = "fallback_application_name";
171 values[5] = progname;
172 keywords[6] = NULL;
173 values[6] = NULL;
174
175 new_pass = false;
176 newConn = PQconnectdbParams(keywords, values, true);
177
178 if (!newConn)
179 fatal("could not reconnect to database");
180
181 if (PQstatus(newConn) == CONNECTION_BAD)
182 {
183 if (!PQconnectionNeedsPassword(newConn))
184 fatal("could not reconnect to database: %s",
185 PQerrorMessage(newConn));
186 PQfinish(newConn);
187
188 if (password)
189 fprintf(stderr, "Password incorrect\n");
190
191 fprintf(stderr, "Connecting to %s as %s\n",
192 newdb, newuser);
193
194 if (AH->promptPassword != TRI_NO)
195 {
196 simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
197 password = passbuf;
198 }
199 else
200 fatal("connection needs password");
201
202 new_pass = true;
203 }
204 } while (new_pass);
205
206 /*
207 * We want to remember connection's actual password, whether or not we got
208 * it by prompting. So we don't just store the password variable.
209 */
210 if (PQconnectionUsedPassword(newConn))
211 {
212 if (AH->savedPassword)
213 free(AH->savedPassword);
214 AH->savedPassword = pg_strdup(PQpass(newConn));
215 }
216
217 termPQExpBuffer(&connstr);
218
219 /* check for version mismatch */
220 _check_database_version(AH);
221
222 PQsetNoticeProcessor(newConn, notice_processor, NULL);
223
224 return newConn;
225}
226
227
228/*
229 * Make a database connection with the given parameters. The
230 * connection handle is returned, the parameters are stored in AHX.
231 * An interactive password prompt is automatically issued if required.
232 *
233 * Note: it's not really all that sensible to use a single-entry password
234 * cache if the username keeps changing. In current usage, however, the
235 * username never does change, so one savedPassword is sufficient.
236 */
237void
238ConnectDatabase(Archive *AHX,
239 const char *dbname,
240 const char *pghost,
241 const char *pgport,
242 const char *username,
243 trivalue prompt_password)
244{
245 ArchiveHandle *AH = (ArchiveHandle *) AHX;
246 char *password;
247 char passbuf[100];
248 bool new_pass;
249
250 if (AH->connection)
251 fatal("already connected to a database");
252
253 password = AH->savedPassword;
254
255 if (prompt_password == TRI_YES && password == NULL)
256 {
257 simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
258 password = passbuf;
259 }
260 AH->promptPassword = prompt_password;
261
262 /*
263 * Start the connection. Loop until we have a password if requested by
264 * backend.
265 */
266 do
267 {
268 const char *keywords[7];
269 const char *values[7];
270
271 keywords[0] = "host";
272 values[0] = pghost;
273 keywords[1] = "port";
274 values[1] = pgport;
275 keywords[2] = "user";
276 values[2] = username;
277 keywords[3] = "password";
278 values[3] = password;
279 keywords[4] = "dbname";
280 values[4] = dbname;
281 keywords[5] = "fallback_application_name";
282 values[5] = progname;
283 keywords[6] = NULL;
284 values[6] = NULL;
285
286 new_pass = false;
287 AH->connection = PQconnectdbParams(keywords, values, true);
288
289 if (!AH->connection)
290 fatal("could not connect to database");
291
292 if (PQstatus(AH->connection) == CONNECTION_BAD &&
293 PQconnectionNeedsPassword(AH->connection) &&
294 password == NULL &&
295 prompt_password != TRI_NO)
296 {
297 PQfinish(AH->connection);
298 simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
299 password = passbuf;
300 new_pass = true;
301 }
302 } while (new_pass);
303
304 /* check to see that the backend connection was successfully made */
305 if (PQstatus(AH->connection) == CONNECTION_BAD)
306 fatal("connection to database \"%s\" failed: %s",
307 PQdb(AH->connection) ? PQdb(AH->connection) : "",
308 PQerrorMessage(AH->connection));
309
310 /* Start strict; later phases may override this. */
311 PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
312 ALWAYS_SECURE_SEARCH_PATH_SQL));
313
314 /*
315 * We want to remember connection's actual password, whether or not we got
316 * it by prompting. So we don't just store the password variable.
317 */
318 if (PQconnectionUsedPassword(AH->connection))
319 {
320 if (AH->savedPassword)
321 free(AH->savedPassword);
322 AH->savedPassword = pg_strdup(PQpass(AH->connection));
323 }
324
325 /* check for version mismatch */
326 _check_database_version(AH);
327
328 PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
329
330 /* arrange for SIGINT to issue a query cancel on this connection */
331 set_archive_cancel_info(AH, AH->connection);
332}
333
334/*
335 * Close the connection to the database and also cancel off the query if we
336 * have one running.
337 */
338void
339DisconnectDatabase(Archive *AHX)
340{
341 ArchiveHandle *AH = (ArchiveHandle *) AHX;
342 char errbuf[1];
343
344 if (!AH->connection)
345 return;
346
347 if (AH->connCancel)
348 {
349 /*
350 * If we have an active query, send a cancel before closing, ignoring
351 * any errors. This is of no use for a normal exit, but might be
352 * helpful during fatal().
353 */
354 if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
355 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
356
357 /*
358 * Prevent signal handler from sending a cancel after this.
359 */
360 set_archive_cancel_info(AH, NULL);
361 }
362
363 PQfinish(AH->connection);
364 AH->connection = NULL;
365}
366
367PGconn *
368GetConnection(Archive *AHX)
369{
370 ArchiveHandle *AH = (ArchiveHandle *) AHX;
371
372 return AH->connection;
373}
374
375static void
376notice_processor(void *arg, const char *message)
377{
378 pg_log_generic(PG_LOG_INFO, "%s", message);
379}
380
381/* Like exit_fatal(), but with a complaint about a particular query. */
382static void
383die_on_query_failure(ArchiveHandle *AH, const char *query)
384{
385 pg_log_error("query failed: %s",
386 PQerrorMessage(AH->connection));
387 fatal("query was: %s", query);
388}
389
390void
391ExecuteSqlStatement(Archive *AHX, const char *query)
392{
393 ArchiveHandle *AH = (ArchiveHandle *) AHX;
394 PGresult *res;
395
396 res = PQexec(AH->connection, query);
397 if (PQresultStatus(res) != PGRES_COMMAND_OK)
398 die_on_query_failure(AH, query);
399 PQclear(res);
400}
401
402PGresult *
403ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
404{
405 ArchiveHandle *AH = (ArchiveHandle *) AHX;
406 PGresult *res;
407
408 res = PQexec(AH->connection, query);
409 if (PQresultStatus(res) != status)
410 die_on_query_failure(AH, query);
411 return res;
412}
413
414/*
415 * Execute an SQL query and verify that we got exactly one row back.
416 */
417PGresult *
418ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
419{
420 PGresult *res;
421 int ntups;
422
423 res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
424
425 /* Expecting a single result only */
426 ntups = PQntuples(res);
427 if (ntups != 1)
428 fatal(ngettext("query returned %d row instead of one: %s",
429 "query returned %d rows instead of one: %s",
430 ntups),
431 ntups, query);
432
433 return res;
434}
435
436/*
437 * Convenience function to send a query.
438 * Monitors result to detect COPY statements
439 */
440static void
441ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
442{
443 PGconn *conn = AH->connection;
444 PGresult *res;
445
446#ifdef NOT_USED
447 fprintf(stderr, "Executing: '%s'\n\n", qry);
448#endif
449 res = PQexec(conn, qry);
450
451 switch (PQresultStatus(res))
452 {
453 case PGRES_COMMAND_OK:
454 case PGRES_TUPLES_OK:
455 case PGRES_EMPTY_QUERY:
456 /* A-OK */
457 break;
458 case PGRES_COPY_IN:
459 /* Assume this is an expected result */
460 AH->pgCopyIn = true;
461 break;
462 default:
463 /* trouble */
464 warn_or_exit_horribly(AH, "%s: %sCommand was: %s",
465 desc, PQerrorMessage(conn), qry);
466 break;
467 }
468
469 PQclear(res);
470}
471
472
473/*
474 * Process non-COPY table data (that is, INSERT commands).
475 *
476 * The commands have been run together as one long string for compressibility,
477 * and we are receiving them in bufferloads with arbitrary boundaries, so we
478 * have to locate command boundaries and save partial commands across calls.
479 * All state must be kept in AH->sqlparse, not in local variables of this
480 * routine. We assume that AH->sqlparse was filled with zeroes when created.
481 *
482 * We have to lex the data to the extent of identifying literals and quoted
483 * identifiers, so that we can recognize statement-terminating semicolons.
484 * We assume that INSERT data will not contain SQL comments, E'' literals,
485 * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
486 *
487 * Note: when restoring from a pre-9.0 dump file, this code is also used to
488 * process BLOB COMMENTS data, which has the same problem of containing
489 * multiple SQL commands that might be split across bufferloads. Fortunately,
490 * that data won't contain anything complicated to lex either.
491 */
492static void
493ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
494{
495 const char *qry = buf;
496 const char *eos = buf + bufLen;
497
498 /* initialize command buffer if first time through */
499 if (AH->sqlparse.curCmd == NULL)
500 AH->sqlparse.curCmd = createPQExpBuffer();
501
502 for (; qry < eos; qry++)
503 {
504 char ch = *qry;
505
506 /* For neatness, we skip any newlines between commands */
507 if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
508 appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
509
510 switch (AH->sqlparse.state)
511 {
512 case SQL_SCAN: /* Default state == 0, set in _allocAH */
513 if (ch == ';')
514 {
515 /*
516 * We've found the end of a statement. Send it and reset
517 * the buffer.
518 */
519 ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
520 "could not execute query");
521 resetPQExpBuffer(AH->sqlparse.curCmd);
522 }
523 else if (ch == '\'')
524 {
525 AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
526 AH->sqlparse.backSlash = false;
527 }
528 else if (ch == '"')
529 {
530 AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
531 }
532 break;
533
534 case SQL_IN_SINGLE_QUOTE:
535 /* We needn't handle '' specially */
536 if (ch == '\'' && !AH->sqlparse.backSlash)
537 AH->sqlparse.state = SQL_SCAN;
538 else if (ch == '\\' && !AH->public.std_strings)
539 AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
540 else
541 AH->sqlparse.backSlash = false;
542 break;
543
544 case SQL_IN_DOUBLE_QUOTE:
545 /* We needn't handle "" specially */
546 if (ch == '"')
547 AH->sqlparse.state = SQL_SCAN;
548 break;
549 }
550 }
551}
552
553
554/*
555 * Implement ahwrite() for direct-to-DB restore
556 */
557int
558ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
559{
560 ArchiveHandle *AH = (ArchiveHandle *) AHX;
561
562 if (AH->outputKind == OUTPUT_COPYDATA)
563 {
564 /*
565 * COPY data.
566 *
567 * We drop the data on the floor if libpq has failed to enter COPY
568 * mode; this allows us to behave reasonably when trying to continue
569 * after an error in a COPY command.
570 */
571 if (AH->pgCopyIn &&
572 PQputCopyData(AH->connection, buf, bufLen) <= 0)
573 fatal("error returned by PQputCopyData: %s",
574 PQerrorMessage(AH->connection));
575 }
576 else if (AH->outputKind == OUTPUT_OTHERDATA)
577 {
578 /*
579 * Table data expressed as INSERT commands; or, in old dump files,
580 * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
581 */
582 ExecuteSimpleCommands(AH, buf, bufLen);
583 }
584 else
585 {
586 /*
587 * General SQL commands; we assume that commands will not be split
588 * across calls.
589 *
590 * In most cases the data passed to us will be a null-terminated
591 * string, but if it's not, we have to add a trailing null.
592 */
593 if (buf[bufLen] == '\0')
594 ExecuteSqlCommand(AH, buf, "could not execute query");
595 else
596 {
597 char *str = (char *) pg_malloc(bufLen + 1);
598
599 memcpy(str, buf, bufLen);
600 str[bufLen] = '\0';
601 ExecuteSqlCommand(AH, str, "could not execute query");
602 free(str);
603 }
604 }
605
606 return bufLen;
607}
608
609/*
610 * Terminate a COPY operation during direct-to-DB restore
611 */
612void
613EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
614{
615 ArchiveHandle *AH = (ArchiveHandle *) AHX;
616
617 if (AH->pgCopyIn)
618 {
619 PGresult *res;
620
621 if (PQputCopyEnd(AH->connection, NULL) <= 0)
622 fatal("error returned by PQputCopyEnd: %s",
623 PQerrorMessage(AH->connection));
624
625 /* Check command status and return to normal libpq state */
626 res = PQgetResult(AH->connection);
627 if (PQresultStatus(res) != PGRES_COMMAND_OK)
628 warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s",
629 tocEntryTag, PQerrorMessage(AH->connection));
630 PQclear(res);
631
632 /* Do this to ensure we've pumped libpq back to idle state */
633 if (PQgetResult(AH->connection) != NULL)
634 pg_log_warning("unexpected extra results during COPY of table \"%s\"",
635 tocEntryTag);
636
637 AH->pgCopyIn = false;
638 }
639}
640
641void
642StartTransaction(Archive *AHX)
643{
644 ArchiveHandle *AH = (ArchiveHandle *) AHX;
645
646 ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
647}
648
649void
650CommitTransaction(Archive *AHX)
651{
652 ArchiveHandle *AH = (ArchiveHandle *) AHX;
653
654 ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
655}
656
657void
658DropBlobIfExists(ArchiveHandle *AH, Oid oid)
659{
660 /*
661 * If we are not restoring to a direct database connection, we have to
662 * guess about how to detect whether the blob exists. Assume new-style.
663 */
664 if (AH->connection == NULL ||
665 PQserverVersion(AH->connection) >= 90000)
666 {
667 ahprintf(AH,
668 "SELECT pg_catalog.lo_unlink(oid) "
669 "FROM pg_catalog.pg_largeobject_metadata "
670 "WHERE oid = '%u';\n",
671 oid);
672 }
673 else
674 {
675 /* Restoring to pre-9.0 server, so do it the old way */
676 ahprintf(AH,
677 "SELECT CASE WHEN EXISTS("
678 "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
679 ") THEN pg_catalog.lo_unlink('%u') END;\n",
680 oid, oid);
681 }
682}
683