1/*-------------------------------------------------------------------------
2 *
3 * vacuumdb
4 *
5 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
6 * Portions Copyright (c) 1994, Regents of the University of California
7 *
8 * src/bin/scripts/vacuumdb.c
9 *
10 *-------------------------------------------------------------------------
11 */
12
13#ifdef WIN32
14#define FD_SETSIZE 1024 /* must set before winsock2.h is included */
15#endif
16
17#include "postgres_fe.h"
18
19#ifdef HAVE_SYS_SELECT_H
20#include <sys/select.h>
21#endif
22
23#include "catalog/pg_class_d.h"
24
25#include "common.h"
26#include "common/logging.h"
27#include "fe_utils/connect.h"
28#include "fe_utils/simple_list.h"
29#include "fe_utils/string_utils.h"
30
31
32#define ERRCODE_UNDEFINED_TABLE "42P01"
33
34/* Parallel vacuuming stuff */
35typedef struct ParallelSlot
36{
37 PGconn *connection; /* One connection */
38 bool isFree; /* Is it known to be idle? */
39} ParallelSlot;
40
41/* vacuum options controlled by user flags */
42typedef struct vacuumingOptions
43{
44 bool analyze_only;
45 bool verbose;
46 bool and_analyze;
47 bool full;
48 bool freeze;
49 bool disable_page_skipping;
50 bool skip_locked;
51 int min_xid_age;
52 int min_mxid_age;
53} vacuumingOptions;
54
55
56static void vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
57 int stage,
58 SimpleStringList *tables,
59 const char *host, const char *port,
60 const char *username, enum trivalue prompt_password,
61 int concurrentCons,
62 const char *progname, bool echo, bool quiet);
63
64static void vacuum_all_databases(vacuumingOptions *vacopts,
65 bool analyze_in_stages,
66 const char *maintenance_db,
67 const char *host, const char *port,
68 const char *username, enum trivalue prompt_password,
69 int concurrentCons,
70 const char *progname, bool echo, bool quiet);
71
72static void prepare_vacuum_command(PQExpBuffer sql, int serverVersion,
73 vacuumingOptions *vacopts, const char *table);
74
75static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
76 const char *table, const char *progname, bool async);
77
78static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
79 const char *progname);
80
81static bool ProcessQueryResult(PGconn *conn, PGresult *result,
82 const char *progname);
83
84static bool GetQueryResult(PGconn *conn, const char *progname);
85
86static void DisconnectDatabase(ParallelSlot *slot);
87
88static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
89
90static void init_slot(ParallelSlot *slot, PGconn *conn);
91
92static void help(const char *progname);
93
94/* For analyze-in-stages mode */
95#define ANALYZE_NO_STAGE -1
96#define ANALYZE_NUM_STAGES 3
97
98
99int
100main(int argc, char *argv[])
101{
102 static struct option long_options[] = {
103 {"host", required_argument, NULL, 'h'},
104 {"port", required_argument, NULL, 'p'},
105 {"username", required_argument, NULL, 'U'},
106 {"no-password", no_argument, NULL, 'w'},
107 {"password", no_argument, NULL, 'W'},
108 {"echo", no_argument, NULL, 'e'},
109 {"quiet", no_argument, NULL, 'q'},
110 {"dbname", required_argument, NULL, 'd'},
111 {"analyze", no_argument, NULL, 'z'},
112 {"analyze-only", no_argument, NULL, 'Z'},
113 {"freeze", no_argument, NULL, 'F'},
114 {"all", no_argument, NULL, 'a'},
115 {"table", required_argument, NULL, 't'},
116 {"full", no_argument, NULL, 'f'},
117 {"verbose", no_argument, NULL, 'v'},
118 {"jobs", required_argument, NULL, 'j'},
119 {"maintenance-db", required_argument, NULL, 2},
120 {"analyze-in-stages", no_argument, NULL, 3},
121 {"disable-page-skipping", no_argument, NULL, 4},
122 {"skip-locked", no_argument, NULL, 5},
123 {"min-xid-age", required_argument, NULL, 6},
124 {"min-mxid-age", required_argument, NULL, 7},
125 {NULL, 0, NULL, 0}
126 };
127
128 const char *progname;
129 int optindex;
130 int c;
131 const char *dbname = NULL;
132 const char *maintenance_db = NULL;
133 char *host = NULL;
134 char *port = NULL;
135 char *username = NULL;
136 enum trivalue prompt_password = TRI_DEFAULT;
137 bool echo = false;
138 bool quiet = false;
139 vacuumingOptions vacopts;
140 bool analyze_in_stages = false;
141 bool alldb = false;
142 SimpleStringList tables = {NULL, NULL};
143 int concurrentCons = 1;
144 int tbl_count = 0;
145
146 /* initialize options to all false */
147 memset(&vacopts, 0, sizeof(vacopts));
148
149 pg_logging_init(argv[0]);
150 progname = get_progname(argv[0]);
151 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
152
153 handle_help_version_opts(argc, argv, "vacuumdb", help);
154
155 while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
156 {
157 switch (c)
158 {
159 case 'h':
160 host = pg_strdup(optarg);
161 break;
162 case 'p':
163 port = pg_strdup(optarg);
164 break;
165 case 'U':
166 username = pg_strdup(optarg);
167 break;
168 case 'w':
169 prompt_password = TRI_NO;
170 break;
171 case 'W':
172 prompt_password = TRI_YES;
173 break;
174 case 'e':
175 echo = true;
176 break;
177 case 'q':
178 quiet = true;
179 break;
180 case 'd':
181 dbname = pg_strdup(optarg);
182 break;
183 case 'z':
184 vacopts.and_analyze = true;
185 break;
186 case 'Z':
187 vacopts.analyze_only = true;
188 break;
189 case 'F':
190 vacopts.freeze = true;
191 break;
192 case 'a':
193 alldb = true;
194 break;
195 case 't':
196 {
197 simple_string_list_append(&tables, optarg);
198 tbl_count++;
199 break;
200 }
201 case 'f':
202 vacopts.full = true;
203 break;
204 case 'v':
205 vacopts.verbose = true;
206 break;
207 case 'j':
208 concurrentCons = atoi(optarg);
209 if (concurrentCons <= 0)
210 {
211 pg_log_error("number of parallel jobs must be at least 1");
212 exit(1);
213 }
214 break;
215 case 2:
216 maintenance_db = pg_strdup(optarg);
217 break;
218 case 3:
219 analyze_in_stages = vacopts.analyze_only = true;
220 break;
221 case 4:
222 vacopts.disable_page_skipping = true;
223 break;
224 case 5:
225 vacopts.skip_locked = true;
226 break;
227 case 6:
228 vacopts.min_xid_age = atoi(optarg);
229 if (vacopts.min_xid_age <= 0)
230 {
231 pg_log_error("minimum transaction ID age must be at least 1");
232 exit(1);
233 }
234 break;
235 case 7:
236 vacopts.min_mxid_age = atoi(optarg);
237 if (vacopts.min_mxid_age <= 0)
238 {
239 pg_log_error("minimum multixact ID age must be at least 1");
240 exit(1);
241 }
242 break;
243 default:
244 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
245 exit(1);
246 }
247 }
248
249 /*
250 * Non-option argument specifies database name as long as it wasn't
251 * already specified with -d / --dbname
252 */
253 if (optind < argc && dbname == NULL)
254 {
255 dbname = argv[optind];
256 optind++;
257 }
258
259 if (optind < argc)
260 {
261 pg_log_error("too many command-line arguments (first is \"%s\")",
262 argv[optind]);
263 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
264 exit(1);
265 }
266
267 if (vacopts.analyze_only)
268 {
269 if (vacopts.full)
270 {
271 pg_log_error("cannot use the \"%s\" option when performing only analyze",
272 "full");
273 exit(1);
274 }
275 if (vacopts.freeze)
276 {
277 pg_log_error("cannot use the \"%s\" option when performing only analyze",
278 "freeze");
279 exit(1);
280 }
281 if (vacopts.disable_page_skipping)
282 {
283 pg_log_error("cannot use the \"%s\" option when performing only analyze",
284 "disable-page-skipping");
285 exit(1);
286 }
287 /* allow 'and_analyze' with 'analyze_only' */
288 }
289
290 setup_cancel_handler();
291
292 /* Avoid opening extra connections. */
293 if (tbl_count && (concurrentCons > tbl_count))
294 concurrentCons = tbl_count;
295
296 if (alldb)
297 {
298 if (dbname)
299 {
300 pg_log_error("cannot vacuum all databases and a specific one at the same time");
301 exit(1);
302 }
303 if (tables.head != NULL)
304 {
305 pg_log_error("cannot vacuum specific table(s) in all databases");
306 exit(1);
307 }
308
309 vacuum_all_databases(&vacopts,
310 analyze_in_stages,
311 maintenance_db,
312 host, port, username, prompt_password,
313 concurrentCons,
314 progname, echo, quiet);
315 }
316 else
317 {
318 if (dbname == NULL)
319 {
320 if (getenv("PGDATABASE"))
321 dbname = getenv("PGDATABASE");
322 else if (getenv("PGUSER"))
323 dbname = getenv("PGUSER");
324 else
325 dbname = get_user_name_or_exit(progname);
326 }
327
328 if (analyze_in_stages)
329 {
330 int stage;
331
332 for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
333 {
334 vacuum_one_database(dbname, &vacopts,
335 stage,
336 &tables,
337 host, port, username, prompt_password,
338 concurrentCons,
339 progname, echo, quiet);
340 }
341 }
342 else
343 vacuum_one_database(dbname, &vacopts,
344 ANALYZE_NO_STAGE,
345 &tables,
346 host, port, username, prompt_password,
347 concurrentCons,
348 progname, echo, quiet);
349 }
350
351 exit(0);
352}
353
354/*
355 * vacuum_one_database
356 *
357 * Process tables in the given database. If the 'tables' list is empty,
358 * process all tables in the database.
359 *
360 * Note that this function is only concerned with running exactly one stage
361 * when in analyze-in-stages mode; caller must iterate on us if necessary.
362 *
363 * If concurrentCons is > 1, multiple connections are used to vacuum tables
364 * in parallel. In this case and if the table list is empty, we first obtain
365 * a list of tables from the database.
366 */
367static void
368vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
369 int stage,
370 SimpleStringList *tables,
371 const char *host, const char *port,
372 const char *username, enum trivalue prompt_password,
373 int concurrentCons,
374 const char *progname, bool echo, bool quiet)
375{
376 PQExpBufferData sql;
377 PQExpBufferData buf;
378 PQExpBufferData catalog_query;
379 PGresult *res;
380 PGconn *conn;
381 SimpleStringListCell *cell;
382 ParallelSlot *slots;
383 SimpleStringList dbtables = {NULL, NULL};
384 int i;
385 int ntups;
386 bool failed = false;
387 bool parallel = concurrentCons > 1;
388 bool tables_listed = false;
389 bool has_where = false;
390 const char *stage_commands[] = {
391 "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
392 "SET default_statistics_target=10; RESET vacuum_cost_delay;",
393 "RESET default_statistics_target;"
394 };
395 const char *stage_messages[] = {
396 gettext_noop("Generating minimal optimizer statistics (1 target)"),
397 gettext_noop("Generating medium optimizer statistics (10 targets)"),
398 gettext_noop("Generating default (full) optimizer statistics")
399 };
400
401 Assert(stage == ANALYZE_NO_STAGE ||
402 (stage >= 0 && stage < ANALYZE_NUM_STAGES));
403
404 conn = connectDatabase(dbname, host, port, username, prompt_password,
405 progname, echo, false, true);
406
407 if (vacopts->disable_page_skipping && PQserverVersion(conn) < 90600)
408 {
409 PQfinish(conn);
410 pg_log_error("cannot use the \"%s\" option on server versions older than PostgreSQL %s",
411 "disable-page-skipping", "9.6");
412 exit(1);
413 }
414
415 if (vacopts->skip_locked && PQserverVersion(conn) < 120000)
416 {
417 PQfinish(conn);
418 pg_log_error("cannot use the \"%s\" option on server versions older than PostgreSQL %s",
419 "skip-locked", "12");
420 exit(1);
421 }
422
423 if (vacopts->min_xid_age != 0 && PQserverVersion(conn) < 90600)
424 {
425 pg_log_error("cannot use the \"%s\" option on server versions older than PostgreSQL %s",
426 "--min-xid-age", "9.6");
427 exit(1);
428 }
429
430 if (vacopts->min_mxid_age != 0 && PQserverVersion(conn) < 90600)
431 {
432 pg_log_error("cannot use the \"%s\" option on server versions older than PostgreSQL %s",
433 "--min-mxid-age", "9.6");
434 exit(1);
435 }
436
437 if (!quiet)
438 {
439 if (stage != ANALYZE_NO_STAGE)
440 printf(_("%s: processing database \"%s\": %s\n"),
441 progname, PQdb(conn), _(stage_messages[stage]));
442 else
443 printf(_("%s: vacuuming database \"%s\"\n"),
444 progname, PQdb(conn));
445 fflush(stdout);
446 }
447
448 /*
449 * Prepare the list of tables to process by querying the catalogs.
450 *
451 * Since we execute the constructed query with the default search_path
452 * (which could be unsafe), everything in this query MUST be fully
453 * qualified.
454 *
455 * First, build a WITH clause for the catalog query if any tables were
456 * specified, with a set of values made of relation names and their
457 * optional set of columns. This is used to match any provided column
458 * lists with the generated qualified identifiers and to filter for the
459 * tables provided via --table. If a listed table does not exist, the
460 * catalog query will fail.
461 */
462 initPQExpBuffer(&catalog_query);
463 for (cell = tables ? tables->head : NULL; cell; cell = cell->next)
464 {
465 char *just_table;
466 const char *just_columns;
467
468 /*
469 * Split relation and column names given by the user, this is used to
470 * feed the CTE with values on which are performed pre-run validity
471 * checks as well. For now these happen only on the relation name.
472 */
473 splitTableColumnsSpec(cell->val, PQclientEncoding(conn),
474 &just_table, &just_columns);
475
476 if (!tables_listed)
477 {
478 appendPQExpBuffer(&catalog_query,
479 "WITH listed_tables (table_oid, column_list) "
480 "AS (\n VALUES (");
481 tables_listed = true;
482 }
483 else
484 appendPQExpBuffer(&catalog_query, ",\n (");
485
486 appendStringLiteralConn(&catalog_query, just_table, conn);
487 appendPQExpBuffer(&catalog_query, "::pg_catalog.regclass, ");
488
489 if (just_columns && just_columns[0] != '\0')
490 appendStringLiteralConn(&catalog_query, just_columns, conn);
491 else
492 appendPQExpBufferStr(&catalog_query, "NULL");
493
494 appendPQExpBufferStr(&catalog_query, "::pg_catalog.text)");
495
496 pg_free(just_table);
497 }
498
499 /* Finish formatting the CTE */
500 if (tables_listed)
501 appendPQExpBuffer(&catalog_query, "\n)\n");
502
503 appendPQExpBuffer(&catalog_query, "SELECT c.relname, ns.nspname");
504
505 if (tables_listed)
506 appendPQExpBuffer(&catalog_query, ", listed_tables.column_list");
507
508 appendPQExpBuffer(&catalog_query,
509 " FROM pg_catalog.pg_class c\n"
510 " JOIN pg_catalog.pg_namespace ns"
511 " ON c.relnamespace OPERATOR(pg_catalog.=) ns.oid\n"
512 " LEFT JOIN pg_catalog.pg_class t"
513 " ON c.reltoastrelid OPERATOR(pg_catalog.=) t.oid\n");
514
515 /* Used to match the tables listed by the user */
516 if (tables_listed)
517 appendPQExpBuffer(&catalog_query, " JOIN listed_tables"
518 " ON listed_tables.table_oid OPERATOR(pg_catalog.=) c.oid\n");
519
520 /*
521 * If no tables were listed, filter for the relevant relation types. If
522 * tables were given via --table, don't bother filtering by relation type.
523 * Instead, let the server decide whether a given relation can be
524 * processed in which case the user will know about it.
525 */
526 if (!tables_listed)
527 {
528 appendPQExpBuffer(&catalog_query, " WHERE c.relkind OPERATOR(pg_catalog.=) ANY (array["
529 CppAsString2(RELKIND_RELATION) ", "
530 CppAsString2(RELKIND_MATVIEW) "])\n");
531 has_where = true;
532 }
533
534 /*
535 * For --min-xid-age and --min-mxid-age, the age of the relation is the
536 * greatest of the ages of the main relation and its associated TOAST
537 * table. The commands generated by vacuumdb will also process the TOAST
538 * table for the relation if necessary, so it does not need to be
539 * considered separately.
540 */
541 if (vacopts->min_xid_age != 0)
542 {
543 appendPQExpBuffer(&catalog_query,
544 " %s GREATEST(pg_catalog.age(c.relfrozenxid),"
545 " pg_catalog.age(t.relfrozenxid)) "
546 " OPERATOR(pg_catalog.>=) '%d'::pg_catalog.int4\n"
547 " AND c.relfrozenxid OPERATOR(pg_catalog.!=)"
548 " '0'::pg_catalog.xid\n",
549 has_where ? "AND" : "WHERE", vacopts->min_xid_age);
550 has_where = true;
551 }
552
553 if (vacopts->min_mxid_age != 0)
554 {
555 appendPQExpBuffer(&catalog_query,
556 " %s GREATEST(pg_catalog.mxid_age(c.relminmxid),"
557 " pg_catalog.mxid_age(t.relminmxid)) OPERATOR(pg_catalog.>=)"
558 " '%d'::pg_catalog.int4\n"
559 " AND c.relminmxid OPERATOR(pg_catalog.!=)"
560 " '0'::pg_catalog.xid\n",
561 has_where ? "AND" : "WHERE", vacopts->min_mxid_age);
562 has_where = true;
563 }
564
565 /*
566 * Execute the catalog query. We use the default search_path for this
567 * query for consistency with table lookups done elsewhere by the user.
568 */
569 appendPQExpBuffer(&catalog_query, " ORDER BY c.relpages DESC;");
570 executeCommand(conn, "RESET search_path;", progname, echo);
571 res = executeQuery(conn, catalog_query.data, progname, echo);
572 termPQExpBuffer(&catalog_query);
573 PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL,
574 progname, echo));
575
576 /*
577 * If no rows are returned, there are no matching tables, so we are done.
578 */
579 ntups = PQntuples(res);
580 if (ntups == 0)
581 {
582 PQclear(res);
583 PQfinish(conn);
584 return;
585 }
586
587 /*
588 * Build qualified identifiers for each table, including the column list
589 * if given.
590 */
591 initPQExpBuffer(&buf);
592 for (i = 0; i < ntups; i++)
593 {
594 appendPQExpBufferStr(&buf,
595 fmtQualifiedId(PQgetvalue(res, i, 1),
596 PQgetvalue(res, i, 0)));
597
598 if (tables_listed && !PQgetisnull(res, i, 2))
599 appendPQExpBufferStr(&buf, PQgetvalue(res, i, 2));
600
601 simple_string_list_append(&dbtables, buf.data);
602 resetPQExpBuffer(&buf);
603 }
604 termPQExpBuffer(&buf);
605 PQclear(res);
606
607 /*
608 * If there are more connections than vacuumable relations, we don't need
609 * to use them all.
610 */
611 if (parallel)
612 {
613 if (concurrentCons > ntups)
614 concurrentCons = ntups;
615 if (concurrentCons <= 1)
616 parallel = false;
617 }
618
619 /*
620 * Setup the database connections. We reuse the connection we already have
621 * for the first slot. If not in parallel mode, the first slot in the
622 * array contains the connection.
623 */
624 if (concurrentCons <= 0)
625 concurrentCons = 1;
626 slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
627 init_slot(slots, conn);
628 if (parallel)
629 {
630 for (i = 1; i < concurrentCons; i++)
631 {
632 conn = connectDatabase(dbname, host, port, username, prompt_password,
633 progname, echo, false, true);
634
635 /*
636 * Fail and exit immediately if trying to use a socket in an
637 * unsupported range. POSIX requires open(2) to use the lowest
638 * unused file descriptor and the hint given relies on that.
639 */
640 if (PQsocket(conn) >= FD_SETSIZE)
641 {
642 pg_log_fatal("too many jobs for this platform -- try %d", i);
643 exit(1);
644 }
645
646 init_slot(slots + i, conn);
647 }
648 }
649
650 /*
651 * Prepare all the connections to run the appropriate analyze stage, if
652 * caller requested that mode.
653 */
654 if (stage != ANALYZE_NO_STAGE)
655 {
656 int j;
657
658 /* We already emitted the message above */
659
660 for (j = 0; j < concurrentCons; j++)
661 executeCommand((slots + j)->connection,
662 stage_commands[stage], progname, echo);
663 }
664
665 initPQExpBuffer(&sql);
666
667 cell = dbtables.head;
668 do
669 {
670 const char *tabname = cell->val;
671 ParallelSlot *free_slot;
672
673 if (CancelRequested)
674 {
675 failed = true;
676 goto finish;
677 }
678
679 /*
680 * Get the connection slot to use. If in parallel mode, here we wait
681 * for one connection to become available if none already is. In
682 * non-parallel mode we simply use the only slot we have, which we
683 * know to be free.
684 */
685 if (parallel)
686 {
687 /*
688 * Get a free slot, waiting until one becomes free if none
689 * currently is.
690 */
691 free_slot = GetIdleSlot(slots, concurrentCons, progname);
692 if (!free_slot)
693 {
694 failed = true;
695 goto finish;
696 }
697
698 free_slot->isFree = false;
699 }
700 else
701 free_slot = slots;
702
703 prepare_vacuum_command(&sql, PQserverVersion(free_slot->connection),
704 vacopts, tabname);
705
706 /*
707 * Execute the vacuum. If not in parallel mode, this terminates the
708 * program in case of an error. (The parallel case handles query
709 * errors in ProcessQueryResult through GetIdleSlot.)
710 */
711 run_vacuum_command(free_slot->connection, sql.data,
712 echo, tabname, progname, parallel);
713
714 cell = cell->next;
715 } while (cell != NULL);
716
717 if (parallel)
718 {
719 int j;
720
721 /* wait for all connections to finish */
722 for (j = 0; j < concurrentCons; j++)
723 {
724 if (!GetQueryResult((slots + j)->connection, progname))
725 {
726 failed = true;
727 goto finish;
728 }
729 }
730 }
731
732finish:
733 for (i = 0; i < concurrentCons; i++)
734 DisconnectDatabase(slots + i);
735 pfree(slots);
736
737 termPQExpBuffer(&sql);
738
739 if (failed)
740 exit(1);
741}
742
743/*
744 * Vacuum/analyze all connectable databases.
745 *
746 * In analyze-in-stages mode, we process all databases in one stage before
747 * moving on to the next stage. That ensure minimal stats are available
748 * quickly everywhere before generating more detailed ones.
749 */
750static void
751vacuum_all_databases(vacuumingOptions *vacopts,
752 bool analyze_in_stages,
753 const char *maintenance_db, const char *host,
754 const char *port, const char *username,
755 enum trivalue prompt_password,
756 int concurrentCons,
757 const char *progname, bool echo, bool quiet)
758{
759 PGconn *conn;
760 PGresult *result;
761 PQExpBufferData connstr;
762 int stage;
763 int i;
764
765 conn = connectMaintenanceDatabase(maintenance_db, host, port, username,
766 prompt_password, progname, echo);
767 result = executeQuery(conn,
768 "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
769 progname, echo);
770 PQfinish(conn);
771
772 initPQExpBuffer(&connstr);
773 if (analyze_in_stages)
774 {
775 /*
776 * When analyzing all databases in stages, we analyze them all in the
777 * fastest stage first, so that initial statistics become available
778 * for all of them as soon as possible.
779 *
780 * This means we establish several times as many connections, but
781 * that's a secondary consideration.
782 */
783 for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
784 {
785 for (i = 0; i < PQntuples(result); i++)
786 {
787 resetPQExpBuffer(&connstr);
788 appendPQExpBuffer(&connstr, "dbname=");
789 appendConnStrVal(&connstr, PQgetvalue(result, i, 0));
790
791 vacuum_one_database(connstr.data, vacopts,
792 stage,
793 NULL,
794 host, port, username, prompt_password,
795 concurrentCons,
796 progname, echo, quiet);
797 }
798 }
799 }
800 else
801 {
802 for (i = 0; i < PQntuples(result); i++)
803 {
804 resetPQExpBuffer(&connstr);
805 appendPQExpBuffer(&connstr, "dbname=");
806 appendConnStrVal(&connstr, PQgetvalue(result, i, 0));
807
808 vacuum_one_database(connstr.data, vacopts,
809 ANALYZE_NO_STAGE,
810 NULL,
811 host, port, username, prompt_password,
812 concurrentCons,
813 progname, echo, quiet);
814 }
815 }
816 termPQExpBuffer(&connstr);
817
818 PQclear(result);
819}
820
821/*
822 * Construct a vacuum/analyze command to run based on the given options, in the
823 * given string buffer, which may contain previous garbage.
824 *
825 * The table name used must be already properly quoted. The command generated
826 * depends on the server version involved and it is semicolon-terminated.
827 */
828static void
829prepare_vacuum_command(PQExpBuffer sql, int serverVersion,
830 vacuumingOptions *vacopts, const char *table)
831{
832 const char *paren = " (";
833 const char *comma = ", ";
834 const char *sep = paren;
835
836 resetPQExpBuffer(sql);
837
838 if (vacopts->analyze_only)
839 {
840 appendPQExpBufferStr(sql, "ANALYZE");
841
842 /* parenthesized grammar of ANALYZE is supported since v11 */
843 if (serverVersion >= 110000)
844 {
845 if (vacopts->skip_locked)
846 {
847 /* SKIP_LOCKED is supported since v12 */
848 Assert(serverVersion >= 120000);
849 appendPQExpBuffer(sql, "%sSKIP_LOCKED", sep);
850 sep = comma;
851 }
852 if (vacopts->verbose)
853 {
854 appendPQExpBuffer(sql, "%sVERBOSE", sep);
855 sep = comma;
856 }
857 if (sep != paren)
858 appendPQExpBufferChar(sql, ')');
859 }
860 else
861 {
862 if (vacopts->verbose)
863 appendPQExpBufferStr(sql, " VERBOSE");
864 }
865 }
866 else
867 {
868 appendPQExpBufferStr(sql, "VACUUM");
869
870 /* parenthesized grammar of VACUUM is supported since v9.0 */
871 if (serverVersion >= 90000)
872 {
873 if (vacopts->disable_page_skipping)
874 {
875 /* DISABLE_PAGE_SKIPPING is supported since v9.6 */
876 Assert(serverVersion >= 90600);
877 appendPQExpBuffer(sql, "%sDISABLE_PAGE_SKIPPING", sep);
878 sep = comma;
879 }
880 if (vacopts->skip_locked)
881 {
882 /* SKIP_LOCKED is supported since v12 */
883 Assert(serverVersion >= 120000);
884 appendPQExpBuffer(sql, "%sSKIP_LOCKED", sep);
885 sep = comma;
886 }
887 if (vacopts->full)
888 {
889 appendPQExpBuffer(sql, "%sFULL", sep);
890 sep = comma;
891 }
892 if (vacopts->freeze)
893 {
894 appendPQExpBuffer(sql, "%sFREEZE", sep);
895 sep = comma;
896 }
897 if (vacopts->verbose)
898 {
899 appendPQExpBuffer(sql, "%sVERBOSE", sep);
900 sep = comma;
901 }
902 if (vacopts->and_analyze)
903 {
904 appendPQExpBuffer(sql, "%sANALYZE", sep);
905 sep = comma;
906 }
907 if (sep != paren)
908 appendPQExpBufferChar(sql, ')');
909 }
910 else
911 {
912 if (vacopts->full)
913 appendPQExpBufferStr(sql, " FULL");
914 if (vacopts->freeze)
915 appendPQExpBufferStr(sql, " FREEZE");
916 if (vacopts->verbose)
917 appendPQExpBufferStr(sql, " VERBOSE");
918 if (vacopts->and_analyze)
919 appendPQExpBufferStr(sql, " ANALYZE");
920 }
921 }
922
923 appendPQExpBuffer(sql, " %s;", table);
924}
925
926/*
927 * Send a vacuum/analyze command to the server. In async mode, return after
928 * sending the command; else, wait for it to finish.
929 *
930 * Any errors during command execution are reported to stderr. If async is
931 * false, this function exits the program after reporting the error.
932 */
933static void
934run_vacuum_command(PGconn *conn, const char *sql, bool echo,
935 const char *table, const char *progname, bool async)
936{
937 bool status;
938
939 if (async)
940 {
941 if (echo)
942 printf("%s\n", sql);
943
944 status = PQsendQuery(conn, sql) == 1;
945 }
946 else
947 status = executeMaintenanceCommand(conn, sql, echo);
948
949 if (!status)
950 {
951 if (table)
952 pg_log_error("vacuuming of table \"%s\" in database \"%s\" failed: %s",
953 table, PQdb(conn), PQerrorMessage(conn));
954 else
955 pg_log_error("vacuuming of database \"%s\" failed: %s",
956 PQdb(conn), PQerrorMessage(conn));
957
958 if (!async)
959 {
960 PQfinish(conn);
961 exit(1);
962 }
963 }
964}
965
966/*
967 * GetIdleSlot
968 * Return a connection slot that is ready to execute a command.
969 *
970 * We return the first slot we find that is marked isFree, if one is;
971 * otherwise, we loop on select() until one socket becomes available. When
972 * this happens, we read the whole set and mark as free all sockets that become
973 * available.
974 *
975 * If an error occurs, NULL is returned.
976 */
977static ParallelSlot *
978GetIdleSlot(ParallelSlot slots[], int numslots,
979 const char *progname)
980{
981 int i;
982 int firstFree = -1;
983
984 /* Any connection already known free? */
985 for (i = 0; i < numslots; i++)
986 {
987 if (slots[i].isFree)
988 return slots + i;
989 }
990
991 /*
992 * No free slot found, so wait until one of the connections has finished
993 * its task and return the available slot.
994 */
995 while (firstFree < 0)
996 {
997 fd_set slotset;
998 int maxFd = 0;
999 bool aborting;
1000
1001 /* We must reconstruct the fd_set for each call to select_loop */
1002 FD_ZERO(&slotset);
1003
1004 for (i = 0; i < numslots; i++)
1005 {
1006 int sock = PQsocket(slots[i].connection);
1007
1008 /*
1009 * We don't really expect any connections to lose their sockets
1010 * after startup, but just in case, cope by ignoring them.
1011 */
1012 if (sock < 0)
1013 continue;
1014
1015 FD_SET(sock, &slotset);
1016 if (sock > maxFd)
1017 maxFd = sock;
1018 }
1019
1020 SetCancelConn(slots->connection);
1021 i = select_loop(maxFd, &slotset, &aborting);
1022 ResetCancelConn();
1023
1024 if (aborting)
1025 {
1026 /*
1027 * We set the cancel-receiving connection to the one in the zeroth
1028 * slot above, so fetch the error from there.
1029 */
1030 GetQueryResult(slots->connection, progname);
1031 return NULL;
1032 }
1033 Assert(i != 0);
1034
1035 for (i = 0; i < numslots; i++)
1036 {
1037 int sock = PQsocket(slots[i].connection);
1038
1039 if (sock >= 0 && FD_ISSET(sock, &slotset))
1040 {
1041 /* select() says input is available, so consume it */
1042 PQconsumeInput(slots[i].connection);
1043 }
1044
1045 /* Collect result(s) as long as any are available */
1046 while (!PQisBusy(slots[i].connection))
1047 {
1048 PGresult *result = PQgetResult(slots[i].connection);
1049
1050 if (result != NULL)
1051 {
1052 /* Check and discard the command result */
1053 if (!ProcessQueryResult(slots[i].connection, result,
1054 progname))
1055 return NULL;
1056 }
1057 else
1058 {
1059 /* This connection has become idle */
1060 slots[i].isFree = true;
1061 if (firstFree < 0)
1062 firstFree = i;
1063 break;
1064 }
1065 }
1066 }
1067 }
1068
1069 return slots + firstFree;
1070}
1071
1072/*
1073 * ProcessQueryResult
1074 *
1075 * Process (and delete) a query result. Returns true if there's no error,
1076 * false otherwise -- but errors about trying to vacuum a missing relation
1077 * are reported and subsequently ignored.
1078 */
1079static bool
1080ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
1081{
1082 /*
1083 * If it's an error, report it. Errors about a missing table are harmless
1084 * so we continue processing; but die for other errors.
1085 */
1086 if (PQresultStatus(result) != PGRES_COMMAND_OK)
1087 {
1088 char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
1089
1090 pg_log_error("vacuuming of database \"%s\" failed: %s",
1091 PQdb(conn), PQerrorMessage(conn));
1092
1093 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
1094 {
1095 PQclear(result);
1096 return false;
1097 }
1098 }
1099
1100 PQclear(result);
1101 return true;
1102}
1103
1104/*
1105 * GetQueryResult
1106 *
1107 * Pump the conn till it's dry of results; return false if any are errors.
1108 * Note that this will block if the conn is busy.
1109 */
1110static bool
1111GetQueryResult(PGconn *conn, const char *progname)
1112{
1113 bool ok = true;
1114 PGresult *result;
1115
1116 SetCancelConn(conn);
1117 while ((result = PQgetResult(conn)) != NULL)
1118 {
1119 if (!ProcessQueryResult(conn, result, progname))
1120 ok = false;
1121 }
1122 ResetCancelConn();
1123 return ok;
1124}
1125
1126/*
1127 * DisconnectDatabase
1128 * Disconnect the connection associated with the given slot
1129 */
1130static void
1131DisconnectDatabase(ParallelSlot *slot)
1132{
1133 char errbuf[256];
1134
1135 if (!slot->connection)
1136 return;
1137
1138 if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
1139 {
1140 PGcancel *cancel;
1141
1142 if ((cancel = PQgetCancel(slot->connection)))
1143 {
1144 (void) PQcancel(cancel, errbuf, sizeof(errbuf));
1145 PQfreeCancel(cancel);
1146 }
1147 }
1148
1149 PQfinish(slot->connection);
1150 slot->connection = NULL;
1151}
1152
1153/*
1154 * Loop on select() until a descriptor from the given set becomes readable.
1155 *
1156 * If we get a cancel request while we're waiting, we forego all further
1157 * processing and set the *aborting flag to true. The return value must be
1158 * ignored in this case. Otherwise, *aborting is set to false.
1159 */
1160static int
1161select_loop(int maxFd, fd_set *workerset, bool *aborting)
1162{
1163 int i;
1164 fd_set saveSet = *workerset;
1165
1166 if (CancelRequested)
1167 {
1168 *aborting = true;
1169 return -1;
1170 }
1171 else
1172 *aborting = false;
1173
1174 for (;;)
1175 {
1176 /*
1177 * On Windows, we need to check once in a while for cancel requests;
1178 * on other platforms we rely on select() returning when interrupted.
1179 */
1180 struct timeval *tvp;
1181#ifdef WIN32
1182 struct timeval tv = {0, 1000000};
1183
1184 tvp = &tv;
1185#else
1186 tvp = NULL;
1187#endif
1188
1189 *workerset = saveSet;
1190 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
1191
1192#ifdef WIN32
1193 if (i == SOCKET_ERROR)
1194 {
1195 i = -1;
1196
1197 if (WSAGetLastError() == WSAEINTR)
1198 errno = EINTR;
1199 }
1200#endif
1201
1202 if (i < 0 && errno == EINTR)
1203 continue; /* ignore this */
1204 if (i < 0 || CancelRequested)
1205 *aborting = true; /* but not this */
1206 if (i == 0)
1207 continue; /* timeout (Win32 only) */
1208 break;
1209 }
1210
1211 return i;
1212}
1213
1214static void
1215init_slot(ParallelSlot *slot, PGconn *conn)
1216{
1217 slot->connection = conn;
1218 /* Initially assume connection is idle */
1219 slot->isFree = true;
1220}
1221
1222static void
1223help(const char *progname)
1224{
1225 printf(_("%s cleans and analyzes a PostgreSQL database.\n\n"), progname);
1226 printf(_("Usage:\n"));
1227 printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1228 printf(_("\nOptions:\n"));
1229 printf(_(" -a, --all vacuum all databases\n"));
1230 printf(_(" -d, --dbname=DBNAME database to vacuum\n"));
1231 printf(_(" --disable-page-skipping disable all page-skipping behavior\n"));
1232 printf(_(" -e, --echo show the commands being sent to the server\n"));
1233 printf(_(" -f, --full do full vacuuming\n"));
1234 printf(_(" -F, --freeze freeze row transaction information\n"));
1235 printf(_(" -j, --jobs=NUM use this many concurrent connections to vacuum\n"));
1236 printf(_(" --min-mxid-age=MXID_AGE minimum multixact ID age of tables to vacuum\n"));
1237 printf(_(" --min-xid-age=XID_AGE minimum transaction ID age of tables to vacuum\n"));
1238 printf(_(" -q, --quiet don't write any messages\n"));
1239 printf(_(" --skip-locked skip relations that cannot be immediately locked\n"));
1240 printf(_(" -t, --table='TABLE[(COLUMNS)]' vacuum specific table(s) only\n"));
1241 printf(_(" -v, --verbose write a lot of output\n"));
1242 printf(_(" -V, --version output version information, then exit\n"));
1243 printf(_(" -z, --analyze update optimizer statistics\n"));
1244 printf(_(" -Z, --analyze-only only update optimizer statistics; no vacuum\n"));
1245 printf(_(" --analyze-in-stages only update optimizer statistics, in multiple\n"
1246 " stages for faster results; no vacuum\n"));
1247 printf(_(" -?, --help show this help, then exit\n"));
1248 printf(_("\nConnection options:\n"));
1249 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1250 printf(_(" -p, --port=PORT database server port\n"));
1251 printf(_(" -U, --username=USERNAME user name to connect as\n"));
1252 printf(_(" -w, --no-password never prompt for password\n"));
1253 printf(_(" -W, --password force password prompt\n"));
1254 printf(_(" --maintenance-db=DBNAME alternate maintenance database\n"));
1255 printf(_("\nRead the description of the SQL command VACUUM for details.\n"));
1256 printf(_("\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n"));
1257}
1258