1/*-------------------------------------------------------------------------
2 *
3 * dbcommands.c
4 * Database management commands (create/drop database).
5 *
6 * Note: database creation/destruction commands use exclusive locks on
7 * the database objects (as expressed by LockSharedObject()) to avoid
8 * stepping on each others' toes. Formerly we used table-level locks
9 * on pg_database, but that's too coarse-grained.
10 *
11 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 *
15 * IDENTIFICATION
16 * src/backend/commands/dbcommands.c
17 *
18 *-------------------------------------------------------------------------
19 */
20#include "postgres.h"
21
22#include <fcntl.h>
23#include <unistd.h>
24#include <sys/stat.h>
25
26#include "access/genam.h"
27#include "access/heapam.h"
28#include "access/htup_details.h"
29#include "access/tableam.h"
30#include "access/xact.h"
31#include "access/xloginsert.h"
32#include "access/xlogutils.h"
33#include "catalog/catalog.h"
34#include "catalog/dependency.h"
35#include "catalog/indexing.h"
36#include "catalog/objectaccess.h"
37#include "catalog/pg_authid.h"
38#include "catalog/pg_database.h"
39#include "catalog/pg_db_role_setting.h"
40#include "catalog/pg_subscription.h"
41#include "catalog/pg_tablespace.h"
42#include "commands/comment.h"
43#include "commands/dbcommands.h"
44#include "commands/dbcommands_xlog.h"
45#include "commands/defrem.h"
46#include "commands/seclabel.h"
47#include "commands/tablespace.h"
48#include "mb/pg_wchar.h"
49#include "miscadmin.h"
50#include "pgstat.h"
51#include "postmaster/bgwriter.h"
52#include "replication/slot.h"
53#include "storage/copydir.h"
54#include "storage/fd.h"
55#include "storage/lmgr.h"
56#include "storage/ipc.h"
57#include "storage/md.h"
58#include "storage/procarray.h"
59#include "storage/smgr.h"
60#include "utils/acl.h"
61#include "utils/builtins.h"
62#include "utils/fmgroids.h"
63#include "utils/pg_locale.h"
64#include "utils/snapmgr.h"
65#include "utils/syscache.h"
66
67
68typedef struct
69{
70 Oid src_dboid; /* source (template) DB */
71 Oid dest_dboid; /* DB we are trying to create */
72} createdb_failure_params;
73
74typedef struct
75{
76 Oid dest_dboid; /* DB we are trying to move */
77 Oid dest_tsoid; /* tablespace we are trying to move to */
78} movedb_failure_params;
79
80/* non-export function prototypes */
81static void createdb_failure_callback(int code, Datum arg);
82static void movedb(const char *dbname, const char *tblspcname);
83static void movedb_failure_callback(int code, Datum arg);
84static bool get_db_info(const char *name, LOCKMODE lockmode,
85 Oid *dbIdP, Oid *ownerIdP,
86 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
87 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
88 MultiXactId *dbMinMultiP,
89 Oid *dbTablespace, char **dbCollate, char **dbCtype);
90static bool have_createdb_privilege(void);
91static void remove_dbtablespaces(Oid db_id);
92static bool check_db_file_conflict(Oid db_id);
93static int errdetail_busy_db(int notherbackends, int npreparedxacts);
94
95
96/*
97 * CREATE DATABASE
98 */
99Oid
100createdb(ParseState *pstate, const CreatedbStmt *stmt)
101{
102 TableScanDesc scan;
103 Relation rel;
104 Oid src_dboid;
105 Oid src_owner;
106 int src_encoding;
107 char *src_collate;
108 char *src_ctype;
109 bool src_istemplate;
110 bool src_allowconn;
111 Oid src_lastsysoid;
112 TransactionId src_frozenxid;
113 MultiXactId src_minmxid;
114 Oid src_deftablespace;
115 volatile Oid dst_deftablespace;
116 Relation pg_database_rel;
117 HeapTuple tuple;
118 Datum new_record[Natts_pg_database];
119 bool new_record_nulls[Natts_pg_database];
120 Oid dboid;
121 Oid datdba;
122 ListCell *option;
123 DefElem *dtablespacename = NULL;
124 DefElem *downer = NULL;
125 DefElem *dtemplate = NULL;
126 DefElem *dencoding = NULL;
127 DefElem *dcollate = NULL;
128 DefElem *dctype = NULL;
129 DefElem *distemplate = NULL;
130 DefElem *dallowconnections = NULL;
131 DefElem *dconnlimit = NULL;
132 char *dbname = stmt->dbname;
133 char *dbowner = NULL;
134 const char *dbtemplate = NULL;
135 char *dbcollate = NULL;
136 char *dbctype = NULL;
137 char *canonname;
138 int encoding = -1;
139 bool dbistemplate = false;
140 bool dballowconnections = true;
141 int dbconnlimit = -1;
142 int notherbackends;
143 int npreparedxacts;
144 createdb_failure_params fparms;
145
146 /* Extract options from the statement node tree */
147 foreach(option, stmt->options)
148 {
149 DefElem *defel = (DefElem *) lfirst(option);
150
151 if (strcmp(defel->defname, "tablespace") == 0)
152 {
153 if (dtablespacename)
154 ereport(ERROR,
155 (errcode(ERRCODE_SYNTAX_ERROR),
156 errmsg("conflicting or redundant options"),
157 parser_errposition(pstate, defel->location)));
158 dtablespacename = defel;
159 }
160 else if (strcmp(defel->defname, "owner") == 0)
161 {
162 if (downer)
163 ereport(ERROR,
164 (errcode(ERRCODE_SYNTAX_ERROR),
165 errmsg("conflicting or redundant options"),
166 parser_errposition(pstate, defel->location)));
167 downer = defel;
168 }
169 else if (strcmp(defel->defname, "template") == 0)
170 {
171 if (dtemplate)
172 ereport(ERROR,
173 (errcode(ERRCODE_SYNTAX_ERROR),
174 errmsg("conflicting or redundant options"),
175 parser_errposition(pstate, defel->location)));
176 dtemplate = defel;
177 }
178 else if (strcmp(defel->defname, "encoding") == 0)
179 {
180 if (dencoding)
181 ereport(ERROR,
182 (errcode(ERRCODE_SYNTAX_ERROR),
183 errmsg("conflicting or redundant options"),
184 parser_errposition(pstate, defel->location)));
185 dencoding = defel;
186 }
187 else if (strcmp(defel->defname, "lc_collate") == 0)
188 {
189 if (dcollate)
190 ereport(ERROR,
191 (errcode(ERRCODE_SYNTAX_ERROR),
192 errmsg("conflicting or redundant options"),
193 parser_errposition(pstate, defel->location)));
194 dcollate = defel;
195 }
196 else if (strcmp(defel->defname, "lc_ctype") == 0)
197 {
198 if (dctype)
199 ereport(ERROR,
200 (errcode(ERRCODE_SYNTAX_ERROR),
201 errmsg("conflicting or redundant options"),
202 parser_errposition(pstate, defel->location)));
203 dctype = defel;
204 }
205 else if (strcmp(defel->defname, "is_template") == 0)
206 {
207 if (distemplate)
208 ereport(ERROR,
209 (errcode(ERRCODE_SYNTAX_ERROR),
210 errmsg("conflicting or redundant options"),
211 parser_errposition(pstate, defel->location)));
212 distemplate = defel;
213 }
214 else if (strcmp(defel->defname, "allow_connections") == 0)
215 {
216 if (dallowconnections)
217 ereport(ERROR,
218 (errcode(ERRCODE_SYNTAX_ERROR),
219 errmsg("conflicting or redundant options"),
220 parser_errposition(pstate, defel->location)));
221 dallowconnections = defel;
222 }
223 else if (strcmp(defel->defname, "connection_limit") == 0)
224 {
225 if (dconnlimit)
226 ereport(ERROR,
227 (errcode(ERRCODE_SYNTAX_ERROR),
228 errmsg("conflicting or redundant options"),
229 parser_errposition(pstate, defel->location)));
230 dconnlimit = defel;
231 }
232 else if (strcmp(defel->defname, "location") == 0)
233 {
234 ereport(WARNING,
235 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
236 errmsg("LOCATION is not supported anymore"),
237 errhint("Consider using tablespaces instead."),
238 parser_errposition(pstate, defel->location)));
239 }
240 else
241 ereport(ERROR,
242 (errcode(ERRCODE_SYNTAX_ERROR),
243 errmsg("option \"%s\" not recognized", defel->defname),
244 parser_errposition(pstate, defel->location)));
245 }
246
247 if (downer && downer->arg)
248 dbowner = defGetString(downer);
249 if (dtemplate && dtemplate->arg)
250 dbtemplate = defGetString(dtemplate);
251 if (dencoding && dencoding->arg)
252 {
253 const char *encoding_name;
254
255 if (IsA(dencoding->arg, Integer))
256 {
257 encoding = defGetInt32(dencoding);
258 encoding_name = pg_encoding_to_char(encoding);
259 if (strcmp(encoding_name, "") == 0 ||
260 pg_valid_server_encoding(encoding_name) < 0)
261 ereport(ERROR,
262 (errcode(ERRCODE_UNDEFINED_OBJECT),
263 errmsg("%d is not a valid encoding code",
264 encoding),
265 parser_errposition(pstate, dencoding->location)));
266 }
267 else
268 {
269 encoding_name = defGetString(dencoding);
270 encoding = pg_valid_server_encoding(encoding_name);
271 if (encoding < 0)
272 ereport(ERROR,
273 (errcode(ERRCODE_UNDEFINED_OBJECT),
274 errmsg("%s is not a valid encoding name",
275 encoding_name),
276 parser_errposition(pstate, dencoding->location)));
277 }
278 }
279 if (dcollate && dcollate->arg)
280 dbcollate = defGetString(dcollate);
281 if (dctype && dctype->arg)
282 dbctype = defGetString(dctype);
283 if (distemplate && distemplate->arg)
284 dbistemplate = defGetBoolean(distemplate);
285 if (dallowconnections && dallowconnections->arg)
286 dballowconnections = defGetBoolean(dallowconnections);
287 if (dconnlimit && dconnlimit->arg)
288 {
289 dbconnlimit = defGetInt32(dconnlimit);
290 if (dbconnlimit < -1)
291 ereport(ERROR,
292 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293 errmsg("invalid connection limit: %d", dbconnlimit)));
294 }
295
296 /* obtain OID of proposed owner */
297 if (dbowner)
298 datdba = get_role_oid(dbowner, false);
299 else
300 datdba = GetUserId();
301
302 /*
303 * To create a database, must have createdb privilege and must be able to
304 * become the target role (this does not imply that the target role itself
305 * must have createdb privilege). The latter provision guards against
306 * "giveaway" attacks. Note that a superuser will always have both of
307 * these privileges a fortiori.
308 */
309 if (!have_createdb_privilege())
310 ereport(ERROR,
311 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
312 errmsg("permission denied to create database")));
313
314 check_is_member_of_role(GetUserId(), datdba);
315
316 /*
317 * Lookup database (template) to be cloned, and obtain share lock on it.
318 * ShareLock allows two CREATE DATABASEs to work from the same template
319 * concurrently, while ensuring no one is busy dropping it in parallel
320 * (which would be Very Bad since we'd likely get an incomplete copy
321 * without knowing it). This also prevents any new connections from being
322 * made to the source until we finish copying it, so we can be sure it
323 * won't change underneath us.
324 */
325 if (!dbtemplate)
326 dbtemplate = "template1"; /* Default template database name */
327
328 if (!get_db_info(dbtemplate, ShareLock,
329 &src_dboid, &src_owner, &src_encoding,
330 &src_istemplate, &src_allowconn, &src_lastsysoid,
331 &src_frozenxid, &src_minmxid, &src_deftablespace,
332 &src_collate, &src_ctype))
333 ereport(ERROR,
334 (errcode(ERRCODE_UNDEFINED_DATABASE),
335 errmsg("template database \"%s\" does not exist",
336 dbtemplate)));
337
338 /*
339 * Permission check: to copy a DB that's not marked datistemplate, you
340 * must be superuser or the owner thereof.
341 */
342 if (!src_istemplate)
343 {
344 if (!pg_database_ownercheck(src_dboid, GetUserId()))
345 ereport(ERROR,
346 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
347 errmsg("permission denied to copy database \"%s\"",
348 dbtemplate)));
349 }
350
351 /* If encoding or locales are defaulted, use source's setting */
352 if (encoding < 0)
353 encoding = src_encoding;
354 if (dbcollate == NULL)
355 dbcollate = src_collate;
356 if (dbctype == NULL)
357 dbctype = src_ctype;
358
359 /* Some encodings are client only */
360 if (!PG_VALID_BE_ENCODING(encoding))
361 ereport(ERROR,
362 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
363 errmsg("invalid server encoding %d", encoding)));
364
365 /* Check that the chosen locales are valid, and get canonical spellings */
366 if (!check_locale(LC_COLLATE, dbcollate, &canonname))
367 ereport(ERROR,
368 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
369 errmsg("invalid locale name: \"%s\"", dbcollate)));
370 dbcollate = canonname;
371 if (!check_locale(LC_CTYPE, dbctype, &canonname))
372 ereport(ERROR,
373 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
374 errmsg("invalid locale name: \"%s\"", dbctype)));
375 dbctype = canonname;
376
377 check_encoding_locale_matches(encoding, dbcollate, dbctype);
378
379 /*
380 * Check that the new encoding and locale settings match the source
381 * database. We insist on this because we simply copy the source data ---
382 * any non-ASCII data would be wrongly encoded, and any indexes sorted
383 * according to the source locale would be wrong.
384 *
385 * However, we assume that template0 doesn't contain any non-ASCII data
386 * nor any indexes that depend on collation or ctype, so template0 can be
387 * used as template for creating a database with any encoding or locale.
388 */
389 if (strcmp(dbtemplate, "template0") != 0)
390 {
391 if (encoding != src_encoding)
392 ereport(ERROR,
393 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
394 errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
395 pg_encoding_to_char(encoding),
396 pg_encoding_to_char(src_encoding)),
397 errhint("Use the same encoding as in the template database, or use template0 as template.")));
398
399 if (strcmp(dbcollate, src_collate) != 0)
400 ereport(ERROR,
401 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
402 errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
403 dbcollate, src_collate),
404 errhint("Use the same collation as in the template database, or use template0 as template.")));
405
406 if (strcmp(dbctype, src_ctype) != 0)
407 ereport(ERROR,
408 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
409 errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
410 dbctype, src_ctype),
411 errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
412 }
413
414 /* Resolve default tablespace for new database */
415 if (dtablespacename && dtablespacename->arg)
416 {
417 char *tablespacename;
418 AclResult aclresult;
419
420 tablespacename = defGetString(dtablespacename);
421 dst_deftablespace = get_tablespace_oid(tablespacename, false);
422 /* check permissions */
423 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
424 ACL_CREATE);
425 if (aclresult != ACLCHECK_OK)
426 aclcheck_error(aclresult, OBJECT_TABLESPACE,
427 tablespacename);
428
429 /* pg_global must never be the default tablespace */
430 if (dst_deftablespace == GLOBALTABLESPACE_OID)
431 ereport(ERROR,
432 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
433 errmsg("pg_global cannot be used as default tablespace")));
434
435 /*
436 * If we are trying to change the default tablespace of the template,
437 * we require that the template not have any files in the new default
438 * tablespace. This is necessary because otherwise the copied
439 * database would contain pg_class rows that refer to its default
440 * tablespace both explicitly (by OID) and implicitly (as zero), which
441 * would cause problems. For example another CREATE DATABASE using
442 * the copied database as template, and trying to change its default
443 * tablespace again, would yield outright incorrect results (it would
444 * improperly move tables to the new default tablespace that should
445 * stay in the same tablespace).
446 */
447 if (dst_deftablespace != src_deftablespace)
448 {
449 char *srcpath;
450 struct stat st;
451
452 srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
453
454 if (stat(srcpath, &st) == 0 &&
455 S_ISDIR(st.st_mode) &&
456 !directory_is_empty(srcpath))
457 ereport(ERROR,
458 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
459 errmsg("cannot assign new default tablespace \"%s\"",
460 tablespacename),
461 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
462 dbtemplate)));
463 pfree(srcpath);
464 }
465 }
466 else
467 {
468 /* Use template database's default tablespace */
469 dst_deftablespace = src_deftablespace;
470 /* Note there is no additional permission check in this path */
471 }
472
473 /*
474 * If built with appropriate switch, whine when regression-testing
475 * conventions for database names are violated. But don't complain during
476 * initdb.
477 */
478#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
479 if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
480 elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
481#endif
482
483 /*
484 * Check for db name conflict. This is just to give a more friendly error
485 * message than "unique index violation". There's a race condition but
486 * we're willing to accept the less friendly message in that case.
487 */
488 if (OidIsValid(get_database_oid(dbname, true)))
489 ereport(ERROR,
490 (errcode(ERRCODE_DUPLICATE_DATABASE),
491 errmsg("database \"%s\" already exists", dbname)));
492
493 /*
494 * The source DB can't have any active backends, except this one
495 * (exception is to allow CREATE DB while connected to template1).
496 * Otherwise we might copy inconsistent data.
497 *
498 * This should be last among the basic error checks, because it involves
499 * potential waiting; we may as well throw an error first if we're gonna
500 * throw one.
501 */
502 if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
503 ereport(ERROR,
504 (errcode(ERRCODE_OBJECT_IN_USE),
505 errmsg("source database \"%s\" is being accessed by other users",
506 dbtemplate),
507 errdetail_busy_db(notherbackends, npreparedxacts)));
508
509 /*
510 * Select an OID for the new database, checking that it doesn't have a
511 * filename conflict with anything already existing in the tablespace
512 * directories.
513 */
514 pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
515
516 do
517 {
518 dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
519 Anum_pg_database_oid);
520 } while (check_db_file_conflict(dboid));
521
522 /*
523 * Insert a new tuple into pg_database. This establishes our ownership of
524 * the new database name (anyone else trying to insert the same name will
525 * block on the unique index, and fail after we commit).
526 */
527
528 /* Form tuple */
529 MemSet(new_record, 0, sizeof(new_record));
530 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
531
532 new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
533 new_record[Anum_pg_database_datname - 1] =
534 DirectFunctionCall1(namein, CStringGetDatum(dbname));
535 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
536 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
537 new_record[Anum_pg_database_datcollate - 1] =
538 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
539 new_record[Anum_pg_database_datctype - 1] =
540 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
541 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
542 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
543 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
544 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
545 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
546 new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
547 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
548
549 /*
550 * We deliberately set datacl to default (NULL), rather than copying it
551 * from the template database. Copying it would be a bad idea when the
552 * owner is not the same as the template's owner.
553 */
554 new_record_nulls[Anum_pg_database_datacl - 1] = true;
555
556 tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
557 new_record, new_record_nulls);
558
559 CatalogTupleInsert(pg_database_rel, tuple);
560
561 /*
562 * Now generate additional catalog entries associated with the new DB
563 */
564
565 /* Register owner dependency */
566 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
567
568 /* Create pg_shdepend entries for objects within database */
569 copyTemplateDependencies(src_dboid, dboid);
570
571 /* Post creation hook for new database */
572 InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
573
574 /*
575 * Force a checkpoint before starting the copy. This will force all dirty
576 * buffers, including those of unlogged tables, out to disk, to ensure
577 * source database is up-to-date on disk for the copy.
578 * FlushDatabaseBuffers() would suffice for that, but we also want to
579 * process any pending unlink requests. Otherwise, if a checkpoint
580 * happened while we're copying files, a file might be deleted just when
581 * we're about to copy it, causing the lstat() call in copydir() to fail
582 * with ENOENT.
583 */
584 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
585 | CHECKPOINT_FLUSH_ALL);
586
587 /*
588 * Once we start copying subdirectories, we need to be able to clean 'em
589 * up if we fail. Use an ENSURE block to make sure this happens. (This
590 * is not a 100% solution, because of the possibility of failure during
591 * transaction commit after we leave this routine, but it should handle
592 * most scenarios.)
593 */
594 fparms.src_dboid = src_dboid;
595 fparms.dest_dboid = dboid;
596 PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
597 PointerGetDatum(&fparms));
598 {
599 /*
600 * Iterate through all tablespaces of the template database, and copy
601 * each one to the new database.
602 */
603 rel = table_open(TableSpaceRelationId, AccessShareLock);
604 scan = table_beginscan_catalog(rel, 0, NULL);
605 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
606 {
607 Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
608 Oid srctablespace = spaceform->oid;
609 Oid dsttablespace;
610 char *srcpath;
611 char *dstpath;
612 struct stat st;
613
614 /* No need to copy global tablespace */
615 if (srctablespace == GLOBALTABLESPACE_OID)
616 continue;
617
618 srcpath = GetDatabasePath(src_dboid, srctablespace);
619
620 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
621 directory_is_empty(srcpath))
622 {
623 /* Assume we can ignore it */
624 pfree(srcpath);
625 continue;
626 }
627
628 if (srctablespace == src_deftablespace)
629 dsttablespace = dst_deftablespace;
630 else
631 dsttablespace = srctablespace;
632
633 dstpath = GetDatabasePath(dboid, dsttablespace);
634
635 /*
636 * Copy this subdirectory to the new location
637 *
638 * We don't need to copy subdirectories
639 */
640 copydir(srcpath, dstpath, false);
641
642 /* Record the filesystem change in XLOG */
643 {
644 xl_dbase_create_rec xlrec;
645
646 xlrec.db_id = dboid;
647 xlrec.tablespace_id = dsttablespace;
648 xlrec.src_db_id = src_dboid;
649 xlrec.src_tablespace_id = srctablespace;
650
651 XLogBeginInsert();
652 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
653
654 (void) XLogInsert(RM_DBASE_ID,
655 XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
656 }
657 }
658 table_endscan(scan);
659 table_close(rel, AccessShareLock);
660
661 /*
662 * We force a checkpoint before committing. This effectively means
663 * that committed XLOG_DBASE_CREATE operations will never need to be
664 * replayed (at least not in ordinary crash recovery; we still have to
665 * make the XLOG entry for the benefit of PITR operations). This
666 * avoids two nasty scenarios:
667 *
668 * #1: When PITR is off, we don't XLOG the contents of newly created
669 * indexes; therefore the drop-and-recreate-whole-directory behavior
670 * of DBASE_CREATE replay would lose such indexes.
671 *
672 * #2: Since we have to recopy the source database during DBASE_CREATE
673 * replay, we run the risk of copying changes in it that were
674 * committed after the original CREATE DATABASE command but before the
675 * system crash that led to the replay. This is at least unexpected
676 * and at worst could lead to inconsistencies, eg duplicate table
677 * names.
678 *
679 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
680 *
681 * In PITR replay, the first of these isn't an issue, and the second
682 * is only a risk if the CREATE DATABASE and subsequent template
683 * database change both occur while a base backup is being taken.
684 * There doesn't seem to be much we can do about that except document
685 * it as a limitation.
686 *
687 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
688 * we can avoid this.
689 */
690 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
691
692 /*
693 * Close pg_database, but keep lock till commit.
694 */
695 table_close(pg_database_rel, NoLock);
696
697 /*
698 * Force synchronous commit, thus minimizing the window between
699 * creation of the database files and committal of the transaction. If
700 * we crash before committing, we'll have a DB that's taking up disk
701 * space but is not in pg_database, which is not good.
702 */
703 ForceSyncCommit();
704 }
705 PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
706 PointerGetDatum(&fparms));
707
708 return dboid;
709}
710
711/*
712 * Check whether chosen encoding matches chosen locale settings. This
713 * restriction is necessary because libc's locale-specific code usually
714 * fails when presented with data in an encoding it's not expecting. We
715 * allow mismatch in four cases:
716 *
717 * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
718 * which works with any encoding.
719 *
720 * 2. locale encoding = -1, which means that we couldn't determine the
721 * locale's encoding and have to trust the user to get it right.
722 *
723 * 3. selected encoding is UTF8 and platform is win32. This is because
724 * UTF8 is a pseudo codepage that is supported in all locales since it's
725 * converted to UTF16 before being used.
726 *
727 * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
728 * is risky but we have historically allowed it --- notably, the
729 * regression tests require it.
730 *
731 * Note: if you change this policy, fix initdb to match.
732 */
733void
734check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
735{
736 int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
737 int collate_encoding = pg_get_encoding_from_locale(collate, true);
738
739 if (!(ctype_encoding == encoding ||
740 ctype_encoding == PG_SQL_ASCII ||
741 ctype_encoding == -1 ||
742#ifdef WIN32
743 encoding == PG_UTF8 ||
744#endif
745 (encoding == PG_SQL_ASCII && superuser())))
746 ereport(ERROR,
747 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
748 errmsg("encoding \"%s\" does not match locale \"%s\"",
749 pg_encoding_to_char(encoding),
750 ctype),
751 errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
752 pg_encoding_to_char(ctype_encoding))));
753
754 if (!(collate_encoding == encoding ||
755 collate_encoding == PG_SQL_ASCII ||
756 collate_encoding == -1 ||
757#ifdef WIN32
758 encoding == PG_UTF8 ||
759#endif
760 (encoding == PG_SQL_ASCII && superuser())))
761 ereport(ERROR,
762 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
763 errmsg("encoding \"%s\" does not match locale \"%s\"",
764 pg_encoding_to_char(encoding),
765 collate),
766 errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
767 pg_encoding_to_char(collate_encoding))));
768}
769
770/* Error cleanup callback for createdb */
771static void
772createdb_failure_callback(int code, Datum arg)
773{
774 createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
775
776 /*
777 * Release lock on source database before doing recursive remove. This is
778 * not essential but it seems desirable to release the lock as soon as
779 * possible.
780 */
781 UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
782
783 /* Throw away any successfully copied subdirectories */
784 remove_dbtablespaces(fparms->dest_dboid);
785}
786
787
788/*
789 * DROP DATABASE
790 */
791void
792dropdb(const char *dbname, bool missing_ok)
793{
794 Oid db_id;
795 bool db_istemplate;
796 Relation pgdbrel;
797 HeapTuple tup;
798 int notherbackends;
799 int npreparedxacts;
800 int nslots,
801 nslots_active;
802 int nsubscriptions;
803
804 /*
805 * Look up the target database's OID, and get exclusive lock on it. We
806 * need this to ensure that no new backend starts up in the target
807 * database while we are deleting it (see postinit.c), and that no one is
808 * using it as a CREATE DATABASE template or trying to delete it for
809 * themselves.
810 */
811 pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
812
813 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
814 &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
815 {
816 if (!missing_ok)
817 {
818 ereport(ERROR,
819 (errcode(ERRCODE_UNDEFINED_DATABASE),
820 errmsg("database \"%s\" does not exist", dbname)));
821 }
822 else
823 {
824 /* Close pg_database, release the lock, since we changed nothing */
825 table_close(pgdbrel, RowExclusiveLock);
826 ereport(NOTICE,
827 (errmsg("database \"%s\" does not exist, skipping",
828 dbname)));
829 return;
830 }
831 }
832
833 /*
834 * Permission checks
835 */
836 if (!pg_database_ownercheck(db_id, GetUserId()))
837 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
838 dbname);
839
840 /* DROP hook for the database being removed */
841 InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
842
843 /*
844 * Disallow dropping a DB that is marked istemplate. This is just to
845 * prevent people from accidentally dropping template0 or template1; they
846 * can do so if they're really determined ...
847 */
848 if (db_istemplate)
849 ereport(ERROR,
850 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
851 errmsg("cannot drop a template database")));
852
853 /* Obviously can't drop my own database */
854 if (db_id == MyDatabaseId)
855 ereport(ERROR,
856 (errcode(ERRCODE_OBJECT_IN_USE),
857 errmsg("cannot drop the currently open database")));
858
859 /*
860 * Check whether there are active logical slots that refer to the
861 * to-be-dropped database. The database lock we are holding prevents the
862 * creation of new slots using the database or existing slots becoming
863 * active.
864 */
865 (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
866 if (nslots_active)
867 {
868 ereport(ERROR,
869 (errcode(ERRCODE_OBJECT_IN_USE),
870 errmsg("database \"%s\" is used by an active logical replication slot",
871 dbname),
872 errdetail_plural("There is %d active slot.",
873 "There are %d active slots.",
874 nslots_active, nslots_active)));
875 }
876
877 /*
878 * Check for other backends in the target database. (Because we hold the
879 * database lock, no new ones can start after this.)
880 *
881 * As in CREATE DATABASE, check this after other error conditions.
882 */
883 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
884 ereport(ERROR,
885 (errcode(ERRCODE_OBJECT_IN_USE),
886 errmsg("database \"%s\" is being accessed by other users",
887 dbname),
888 errdetail_busy_db(notherbackends, npreparedxacts)));
889
890 /*
891 * Check if there are subscriptions defined in the target database.
892 *
893 * We can't drop them automatically because they might be holding
894 * resources in other databases/instances.
895 */
896 if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
897 ereport(ERROR,
898 (errcode(ERRCODE_OBJECT_IN_USE),
899 errmsg("database \"%s\" is being used by logical replication subscription",
900 dbname),
901 errdetail_plural("There is %d subscription.",
902 "There are %d subscriptions.",
903 nsubscriptions, nsubscriptions)));
904
905 /*
906 * Remove the database's tuple from pg_database.
907 */
908 tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
909 if (!HeapTupleIsValid(tup))
910 elog(ERROR, "cache lookup failed for database %u", db_id);
911
912 CatalogTupleDelete(pgdbrel, &tup->t_self);
913
914 ReleaseSysCache(tup);
915
916 /*
917 * Delete any comments or security labels associated with the database.
918 */
919 DeleteSharedComments(db_id, DatabaseRelationId);
920 DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
921
922 /*
923 * Remove settings associated with this database
924 */
925 DropSetting(db_id, InvalidOid);
926
927 /*
928 * Remove shared dependency references for the database.
929 */
930 dropDatabaseDependencies(db_id);
931
932 /*
933 * Drop db-specific replication slots.
934 */
935 ReplicationSlotsDropDBSlots(db_id);
936
937 /*
938 * Drop pages for this database that are in the shared buffer cache. This
939 * is important to ensure that no remaining backend tries to write out a
940 * dirty buffer to the dead database later...
941 */
942 DropDatabaseBuffers(db_id);
943
944 /*
945 * Tell the stats collector to forget it immediately, too.
946 */
947 pgstat_drop_database(db_id);
948
949 /*
950 * Tell checkpointer to forget any pending fsync and unlink requests for
951 * files in the database; else the fsyncs will fail at next checkpoint, or
952 * worse, it will delete files that belong to a newly created database
953 * with the same OID.
954 */
955 ForgetDatabaseSyncRequests(db_id);
956
957 /*
958 * Force a checkpoint to make sure the checkpointer has received the
959 * message sent by ForgetDatabaseSyncRequests. On Windows, this also
960 * ensures that background procs don't hold any open files, which would
961 * cause rmdir() to fail.
962 */
963 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
964
965 /*
966 * Remove all tablespace subdirs belonging to the database.
967 */
968 remove_dbtablespaces(db_id);
969
970 /*
971 * Close pg_database, but keep lock till commit.
972 */
973 table_close(pgdbrel, NoLock);
974
975 /*
976 * Force synchronous commit, thus minimizing the window between removal of
977 * the database files and committal of the transaction. If we crash before
978 * committing, we'll have a DB that's gone on disk but still there
979 * according to pg_database, which is not good.
980 */
981 ForceSyncCommit();
982}
983
984
985/*
986 * Rename database
987 */
988ObjectAddress
989RenameDatabase(const char *oldname, const char *newname)
990{
991 Oid db_id;
992 HeapTuple newtup;
993 Relation rel;
994 int notherbackends;
995 int npreparedxacts;
996 ObjectAddress address;
997
998 /*
999 * Look up the target database's OID, and get exclusive lock on it. We
1000 * need this for the same reasons as DROP DATABASE.
1001 */
1002 rel = table_open(DatabaseRelationId, RowExclusiveLock);
1003
1004 if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
1005 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1006 ereport(ERROR,
1007 (errcode(ERRCODE_UNDEFINED_DATABASE),
1008 errmsg("database \"%s\" does not exist", oldname)));
1009
1010 /* must be owner */
1011 if (!pg_database_ownercheck(db_id, GetUserId()))
1012 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1013 oldname);
1014
1015 /* must have createdb rights */
1016 if (!have_createdb_privilege())
1017 ereport(ERROR,
1018 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1019 errmsg("permission denied to rename database")));
1020
1021 /*
1022 * If built with appropriate switch, whine when regression-testing
1023 * conventions for database names are violated.
1024 */
1025#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1026 if (strstr(newname, "regression") == NULL)
1027 elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1028#endif
1029
1030 /*
1031 * Make sure the new name doesn't exist. See notes for same error in
1032 * CREATE DATABASE.
1033 */
1034 if (OidIsValid(get_database_oid(newname, true)))
1035 ereport(ERROR,
1036 (errcode(ERRCODE_DUPLICATE_DATABASE),
1037 errmsg("database \"%s\" already exists", newname)));
1038
1039 /*
1040 * XXX Client applications probably store the current database somewhere,
1041 * so renaming it could cause confusion. On the other hand, there may not
1042 * be an actual problem besides a little confusion, so think about this
1043 * and decide.
1044 */
1045 if (db_id == MyDatabaseId)
1046 ereport(ERROR,
1047 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1048 errmsg("current database cannot be renamed")));
1049
1050 /*
1051 * Make sure the database does not have active sessions. This is the same
1052 * concern as above, but applied to other sessions.
1053 *
1054 * As in CREATE DATABASE, check this after other error conditions.
1055 */
1056 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1057 ereport(ERROR,
1058 (errcode(ERRCODE_OBJECT_IN_USE),
1059 errmsg("database \"%s\" is being accessed by other users",
1060 oldname),
1061 errdetail_busy_db(notherbackends, npreparedxacts)));
1062
1063 /* rename */
1064 newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1065 if (!HeapTupleIsValid(newtup))
1066 elog(ERROR, "cache lookup failed for database %u", db_id);
1067 namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1068 CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1069
1070 InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1071
1072 ObjectAddressSet(address, DatabaseRelationId, db_id);
1073
1074 /*
1075 * Close pg_database, but keep lock till commit.
1076 */
1077 table_close(rel, NoLock);
1078
1079 return address;
1080}
1081
1082
1083/*
1084 * ALTER DATABASE SET TABLESPACE
1085 */
1086static void
1087movedb(const char *dbname, const char *tblspcname)
1088{
1089 Oid db_id;
1090 Relation pgdbrel;
1091 int notherbackends;
1092 int npreparedxacts;
1093 HeapTuple oldtuple,
1094 newtuple;
1095 Oid src_tblspcoid,
1096 dst_tblspcoid;
1097 Datum new_record[Natts_pg_database];
1098 bool new_record_nulls[Natts_pg_database];
1099 bool new_record_repl[Natts_pg_database];
1100 ScanKeyData scankey;
1101 SysScanDesc sysscan;
1102 AclResult aclresult;
1103 char *src_dbpath;
1104 char *dst_dbpath;
1105 DIR *dstdir;
1106 struct dirent *xlde;
1107 movedb_failure_params fparms;
1108
1109 /*
1110 * Look up the target database's OID, and get exclusive lock on it. We
1111 * need this to ensure that no new backend starts up in the database while
1112 * we are moving it, and that no one is using it as a CREATE DATABASE
1113 * template or trying to delete it.
1114 */
1115 pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1116
1117 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1118 NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1119 ereport(ERROR,
1120 (errcode(ERRCODE_UNDEFINED_DATABASE),
1121 errmsg("database \"%s\" does not exist", dbname)));
1122
1123 /*
1124 * We actually need a session lock, so that the lock will persist across
1125 * the commit/restart below. (We could almost get away with letting the
1126 * lock be released at commit, except that someone could try to move
1127 * relations of the DB back into the old directory while we rmtree() it.)
1128 */
1129 LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1130 AccessExclusiveLock);
1131
1132 /*
1133 * Permission checks
1134 */
1135 if (!pg_database_ownercheck(db_id, GetUserId()))
1136 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1137 dbname);
1138
1139 /*
1140 * Obviously can't move the tables of my own database
1141 */
1142 if (db_id == MyDatabaseId)
1143 ereport(ERROR,
1144 (errcode(ERRCODE_OBJECT_IN_USE),
1145 errmsg("cannot change the tablespace of the currently open database")));
1146
1147 /*
1148 * Get tablespace's oid
1149 */
1150 dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1151
1152 /*
1153 * Permission checks
1154 */
1155 aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1156 ACL_CREATE);
1157 if (aclresult != ACLCHECK_OK)
1158 aclcheck_error(aclresult, OBJECT_TABLESPACE,
1159 tblspcname);
1160
1161 /*
1162 * pg_global must never be the default tablespace
1163 */
1164 if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1165 ereport(ERROR,
1166 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1167 errmsg("pg_global cannot be used as default tablespace")));
1168
1169 /*
1170 * No-op if same tablespace
1171 */
1172 if (src_tblspcoid == dst_tblspcoid)
1173 {
1174 table_close(pgdbrel, NoLock);
1175 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1176 AccessExclusiveLock);
1177 return;
1178 }
1179
1180 /*
1181 * Check for other backends in the target database. (Because we hold the
1182 * database lock, no new ones can start after this.)
1183 *
1184 * As in CREATE DATABASE, check this after other error conditions.
1185 */
1186 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1187 ereport(ERROR,
1188 (errcode(ERRCODE_OBJECT_IN_USE),
1189 errmsg("database \"%s\" is being accessed by other users",
1190 dbname),
1191 errdetail_busy_db(notherbackends, npreparedxacts)));
1192
1193 /*
1194 * Get old and new database paths
1195 */
1196 src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1197 dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1198
1199 /*
1200 * Force a checkpoint before proceeding. This will force all dirty
1201 * buffers, including those of unlogged tables, out to disk, to ensure
1202 * source database is up-to-date on disk for the copy.
1203 * FlushDatabaseBuffers() would suffice for that, but we also want to
1204 * process any pending unlink requests. Otherwise, the check for existing
1205 * files in the target directory might fail unnecessarily, not to mention
1206 * that the copy might fail due to source files getting deleted under it.
1207 * On Windows, this also ensures that background procs don't hold any open
1208 * files, which would cause rmdir() to fail.
1209 */
1210 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
1211 | CHECKPOINT_FLUSH_ALL);
1212
1213 /*
1214 * Now drop all buffers holding data of the target database; they should
1215 * no longer be dirty so DropDatabaseBuffers is safe.
1216 *
1217 * It might seem that we could just let these buffers age out of shared
1218 * buffers naturally, since they should not get referenced anymore. The
1219 * problem with that is that if the user later moves the database back to
1220 * its original tablespace, any still-surviving buffers would appear to
1221 * contain valid data again --- but they'd be missing any changes made in
1222 * the database while it was in the new tablespace. In any case, freeing
1223 * buffers that should never be used again seems worth the cycles.
1224 *
1225 * Note: it'd be sufficient to get rid of buffers matching db_id and
1226 * src_tblspcoid, but bufmgr.c presently provides no API for that.
1227 */
1228 DropDatabaseBuffers(db_id);
1229
1230 /*
1231 * Check for existence of files in the target directory, i.e., objects of
1232 * this database that are already in the target tablespace. We can't
1233 * allow the move in such a case, because we would need to change those
1234 * relations' pg_class.reltablespace entries to zero, and we don't have
1235 * access to the DB's pg_class to do so.
1236 */
1237 dstdir = AllocateDir(dst_dbpath);
1238 if (dstdir != NULL)
1239 {
1240 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1241 {
1242 if (strcmp(xlde->d_name, ".") == 0 ||
1243 strcmp(xlde->d_name, "..") == 0)
1244 continue;
1245
1246 ereport(ERROR,
1247 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1248 errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1249 dbname, tblspcname),
1250 errhint("You must move them back to the database's default tablespace before using this command.")));
1251 }
1252
1253 FreeDir(dstdir);
1254
1255 /*
1256 * The directory exists but is empty. We must remove it before using
1257 * the copydir function.
1258 */
1259 if (rmdir(dst_dbpath) != 0)
1260 elog(ERROR, "could not remove directory \"%s\": %m",
1261 dst_dbpath);
1262 }
1263
1264 /*
1265 * Use an ENSURE block to make sure we remove the debris if the copy fails
1266 * (eg, due to out-of-disk-space). This is not a 100% solution, because
1267 * of the possibility of failure during transaction commit, but it should
1268 * handle most scenarios.
1269 */
1270 fparms.dest_dboid = db_id;
1271 fparms.dest_tsoid = dst_tblspcoid;
1272 PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1273 PointerGetDatum(&fparms));
1274 {
1275 /*
1276 * Copy files from the old tablespace to the new one
1277 */
1278 copydir(src_dbpath, dst_dbpath, false);
1279
1280 /*
1281 * Record the filesystem change in XLOG
1282 */
1283 {
1284 xl_dbase_create_rec xlrec;
1285
1286 xlrec.db_id = db_id;
1287 xlrec.tablespace_id = dst_tblspcoid;
1288 xlrec.src_db_id = db_id;
1289 xlrec.src_tablespace_id = src_tblspcoid;
1290
1291 XLogBeginInsert();
1292 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
1293
1294 (void) XLogInsert(RM_DBASE_ID,
1295 XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
1296 }
1297
1298 /*
1299 * Update the database's pg_database tuple
1300 */
1301 ScanKeyInit(&scankey,
1302 Anum_pg_database_datname,
1303 BTEqualStrategyNumber, F_NAMEEQ,
1304 CStringGetDatum(dbname));
1305 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1306 NULL, 1, &scankey);
1307 oldtuple = systable_getnext(sysscan);
1308 if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
1309 ereport(ERROR,
1310 (errcode(ERRCODE_UNDEFINED_DATABASE),
1311 errmsg("database \"%s\" does not exist", dbname)));
1312
1313 MemSet(new_record, 0, sizeof(new_record));
1314 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1315 MemSet(new_record_repl, false, sizeof(new_record_repl));
1316
1317 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1318 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1319
1320 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1321 new_record,
1322 new_record_nulls, new_record_repl);
1323 CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
1324
1325 InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1326
1327 systable_endscan(sysscan);
1328
1329 /*
1330 * Force another checkpoint here. As in CREATE DATABASE, this is to
1331 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1332 * operation, which would cause us to lose any unlogged operations
1333 * done in the new DB tablespace before the next checkpoint.
1334 */
1335 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1336
1337 /*
1338 * Force synchronous commit, thus minimizing the window between
1339 * copying the database files and committal of the transaction. If we
1340 * crash before committing, we'll leave an orphaned set of files on
1341 * disk, which is not fatal but not good either.
1342 */
1343 ForceSyncCommit();
1344
1345 /*
1346 * Close pg_database, but keep lock till commit.
1347 */
1348 table_close(pgdbrel, NoLock);
1349 }
1350 PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1351 PointerGetDatum(&fparms));
1352
1353 /*
1354 * Commit the transaction so that the pg_database update is committed. If
1355 * we crash while removing files, the database won't be corrupt, we'll
1356 * just leave some orphaned files in the old directory.
1357 *
1358 * (This is OK because we know we aren't inside a transaction block.)
1359 *
1360 * XXX would it be safe/better to do this inside the ensure block? Not
1361 * convinced it's a good idea; consider elog just after the transaction
1362 * really commits.
1363 */
1364 PopActiveSnapshot();
1365 CommitTransactionCommand();
1366
1367 /* Start new transaction for the remaining work; don't need a snapshot */
1368 StartTransactionCommand();
1369
1370 /*
1371 * Remove files from the old tablespace
1372 */
1373 if (!rmtree(src_dbpath, true))
1374 ereport(WARNING,
1375 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1376 src_dbpath)));
1377
1378 /*
1379 * Record the filesystem change in XLOG
1380 */
1381 {
1382 xl_dbase_drop_rec xlrec;
1383
1384 xlrec.db_id = db_id;
1385 xlrec.tablespace_id = src_tblspcoid;
1386
1387 XLogBeginInsert();
1388 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1389
1390 (void) XLogInsert(RM_DBASE_ID,
1391 XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1392 }
1393
1394 /* Now it's safe to release the database lock */
1395 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1396 AccessExclusiveLock);
1397}
1398
1399/* Error cleanup callback for movedb */
1400static void
1401movedb_failure_callback(int code, Datum arg)
1402{
1403 movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1404 char *dstpath;
1405
1406 /* Get rid of anything we managed to copy to the target directory */
1407 dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1408
1409 (void) rmtree(dstpath, true);
1410}
1411
1412
1413/*
1414 * ALTER DATABASE name ...
1415 */
1416Oid
1417AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
1418{
1419 Relation rel;
1420 Oid dboid;
1421 HeapTuple tuple,
1422 newtuple;
1423 Form_pg_database datform;
1424 ScanKeyData scankey;
1425 SysScanDesc scan;
1426 ListCell *option;
1427 bool dbistemplate = false;
1428 bool dballowconnections = true;
1429 int dbconnlimit = -1;
1430 DefElem *distemplate = NULL;
1431 DefElem *dallowconnections = NULL;
1432 DefElem *dconnlimit = NULL;
1433 DefElem *dtablespace = NULL;
1434 Datum new_record[Natts_pg_database];
1435 bool new_record_nulls[Natts_pg_database];
1436 bool new_record_repl[Natts_pg_database];
1437
1438 /* Extract options from the statement node tree */
1439 foreach(option, stmt->options)
1440 {
1441 DefElem *defel = (DefElem *) lfirst(option);
1442
1443 if (strcmp(defel->defname, "is_template") == 0)
1444 {
1445 if (distemplate)
1446 ereport(ERROR,
1447 (errcode(ERRCODE_SYNTAX_ERROR),
1448 errmsg("conflicting or redundant options"),
1449 parser_errposition(pstate, defel->location)));
1450 distemplate = defel;
1451 }
1452 else if (strcmp(defel->defname, "allow_connections") == 0)
1453 {
1454 if (dallowconnections)
1455 ereport(ERROR,
1456 (errcode(ERRCODE_SYNTAX_ERROR),
1457 errmsg("conflicting or redundant options"),
1458 parser_errposition(pstate, defel->location)));
1459 dallowconnections = defel;
1460 }
1461 else if (strcmp(defel->defname, "connection_limit") == 0)
1462 {
1463 if (dconnlimit)
1464 ereport(ERROR,
1465 (errcode(ERRCODE_SYNTAX_ERROR),
1466 errmsg("conflicting or redundant options"),
1467 parser_errposition(pstate, defel->location)));
1468 dconnlimit = defel;
1469 }
1470 else if (strcmp(defel->defname, "tablespace") == 0)
1471 {
1472 if (dtablespace)
1473 ereport(ERROR,
1474 (errcode(ERRCODE_SYNTAX_ERROR),
1475 errmsg("conflicting or redundant options"),
1476 parser_errposition(pstate, defel->location)));
1477 dtablespace = defel;
1478 }
1479 else
1480 ereport(ERROR,
1481 (errcode(ERRCODE_SYNTAX_ERROR),
1482 errmsg("option \"%s\" not recognized", defel->defname),
1483 parser_errposition(pstate, defel->location)));
1484 }
1485
1486 if (dtablespace)
1487 {
1488 /*
1489 * While the SET TABLESPACE syntax doesn't allow any other options,
1490 * somebody could write "WITH TABLESPACE ...". Forbid any other
1491 * options from being specified in that case.
1492 */
1493 if (list_length(stmt->options) != 1)
1494 ereport(ERROR,
1495 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1496 errmsg("option \"%s\" cannot be specified with other options",
1497 dtablespace->defname),
1498 parser_errposition(pstate, dtablespace->location)));
1499 /* this case isn't allowed within a transaction block */
1500 PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1501 movedb(stmt->dbname, defGetString(dtablespace));
1502 return InvalidOid;
1503 }
1504
1505 if (distemplate && distemplate->arg)
1506 dbistemplate = defGetBoolean(distemplate);
1507 if (dallowconnections && dallowconnections->arg)
1508 dballowconnections = defGetBoolean(dallowconnections);
1509 if (dconnlimit && dconnlimit->arg)
1510 {
1511 dbconnlimit = defGetInt32(dconnlimit);
1512 if (dbconnlimit < -1)
1513 ereport(ERROR,
1514 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1515 errmsg("invalid connection limit: %d", dbconnlimit)));
1516 }
1517
1518 /*
1519 * Get the old tuple. We don't need a lock on the database per se,
1520 * because we're not going to do anything that would mess up incoming
1521 * connections.
1522 */
1523 rel = table_open(DatabaseRelationId, RowExclusiveLock);
1524 ScanKeyInit(&scankey,
1525 Anum_pg_database_datname,
1526 BTEqualStrategyNumber, F_NAMEEQ,
1527 CStringGetDatum(stmt->dbname));
1528 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1529 NULL, 1, &scankey);
1530 tuple = systable_getnext(scan);
1531 if (!HeapTupleIsValid(tuple))
1532 ereport(ERROR,
1533 (errcode(ERRCODE_UNDEFINED_DATABASE),
1534 errmsg("database \"%s\" does not exist", stmt->dbname)));
1535
1536 datform = (Form_pg_database) GETSTRUCT(tuple);
1537 dboid = datform->oid;
1538
1539 if (!pg_database_ownercheck(dboid, GetUserId()))
1540 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1541 stmt->dbname);
1542
1543 /*
1544 * In order to avoid getting locked out and having to go through
1545 * standalone mode, we refuse to disallow connections to the database
1546 * we're currently connected to. Lockout can still happen with concurrent
1547 * sessions but the likeliness of that is not high enough to worry about.
1548 */
1549 if (!dballowconnections && dboid == MyDatabaseId)
1550 ereport(ERROR,
1551 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1552 errmsg("cannot disallow connections for current database")));
1553
1554 /*
1555 * Build an updated tuple, perusing the information just obtained
1556 */
1557 MemSet(new_record, 0, sizeof(new_record));
1558 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1559 MemSet(new_record_repl, false, sizeof(new_record_repl));
1560
1561 if (distemplate)
1562 {
1563 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1564 new_record_repl[Anum_pg_database_datistemplate - 1] = true;
1565 }
1566 if (dallowconnections)
1567 {
1568 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1569 new_record_repl[Anum_pg_database_datallowconn - 1] = true;
1570 }
1571 if (dconnlimit)
1572 {
1573 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1574 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1575 }
1576
1577 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1578 new_record_nulls, new_record_repl);
1579 CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
1580
1581 InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
1582
1583 systable_endscan(scan);
1584
1585 /* Close pg_database, but keep lock till commit */
1586 table_close(rel, NoLock);
1587
1588 return dboid;
1589}
1590
1591
1592/*
1593 * ALTER DATABASE name SET ...
1594 */
1595Oid
1596AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1597{
1598 Oid datid = get_database_oid(stmt->dbname, false);
1599
1600 /*
1601 * Obtain a lock on the database and make sure it didn't go away in the
1602 * meantime.
1603 */
1604 shdepLockAndCheckObject(DatabaseRelationId, datid);
1605
1606 if (!pg_database_ownercheck(datid, GetUserId()))
1607 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1608 stmt->dbname);
1609
1610 AlterSetting(datid, InvalidOid, stmt->setstmt);
1611
1612 UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1613
1614 return datid;
1615}
1616
1617
1618/*
1619 * ALTER DATABASE name OWNER TO newowner
1620 */
1621ObjectAddress
1622AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1623{
1624 Oid db_id;
1625 HeapTuple tuple;
1626 Relation rel;
1627 ScanKeyData scankey;
1628 SysScanDesc scan;
1629 Form_pg_database datForm;
1630 ObjectAddress address;
1631
1632 /*
1633 * Get the old tuple. We don't need a lock on the database per se,
1634 * because we're not going to do anything that would mess up incoming
1635 * connections.
1636 */
1637 rel = table_open(DatabaseRelationId, RowExclusiveLock);
1638 ScanKeyInit(&scankey,
1639 Anum_pg_database_datname,
1640 BTEqualStrategyNumber, F_NAMEEQ,
1641 CStringGetDatum(dbname));
1642 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1643 NULL, 1, &scankey);
1644 tuple = systable_getnext(scan);
1645 if (!HeapTupleIsValid(tuple))
1646 ereport(ERROR,
1647 (errcode(ERRCODE_UNDEFINED_DATABASE),
1648 errmsg("database \"%s\" does not exist", dbname)));
1649
1650 datForm = (Form_pg_database) GETSTRUCT(tuple);
1651 db_id = datForm->oid;
1652
1653 /*
1654 * If the new owner is the same as the existing owner, consider the
1655 * command to have succeeded. This is to be consistent with other
1656 * objects.
1657 */
1658 if (datForm->datdba != newOwnerId)
1659 {
1660 Datum repl_val[Natts_pg_database];
1661 bool repl_null[Natts_pg_database];
1662 bool repl_repl[Natts_pg_database];
1663 Acl *newAcl;
1664 Datum aclDatum;
1665 bool isNull;
1666 HeapTuple newtuple;
1667
1668 /* Otherwise, must be owner of the existing object */
1669 if (!pg_database_ownercheck(db_id, GetUserId()))
1670 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1671 dbname);
1672
1673 /* Must be able to become new owner */
1674 check_is_member_of_role(GetUserId(), newOwnerId);
1675
1676 /*
1677 * must have createdb rights
1678 *
1679 * NOTE: This is different from other alter-owner checks in that the
1680 * current user is checked for createdb privileges instead of the
1681 * destination owner. This is consistent with the CREATE case for
1682 * databases. Because superusers will always have this right, we need
1683 * no special case for them.
1684 */
1685 if (!have_createdb_privilege())
1686 ereport(ERROR,
1687 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1688 errmsg("permission denied to change owner of database")));
1689
1690 memset(repl_null, false, sizeof(repl_null));
1691 memset(repl_repl, false, sizeof(repl_repl));
1692
1693 repl_repl[Anum_pg_database_datdba - 1] = true;
1694 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1695
1696 /*
1697 * Determine the modified ACL for the new owner. This is only
1698 * necessary when the ACL is non-null.
1699 */
1700 aclDatum = heap_getattr(tuple,
1701 Anum_pg_database_datacl,
1702 RelationGetDescr(rel),
1703 &isNull);
1704 if (!isNull)
1705 {
1706 newAcl = aclnewowner(DatumGetAclP(aclDatum),
1707 datForm->datdba, newOwnerId);
1708 repl_repl[Anum_pg_database_datacl - 1] = true;
1709 repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1710 }
1711
1712 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1713 CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1714
1715 heap_freetuple(newtuple);
1716
1717 /* Update owner dependency reference */
1718 changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
1719 }
1720
1721 InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1722
1723 ObjectAddressSet(address, DatabaseRelationId, db_id);
1724
1725 systable_endscan(scan);
1726
1727 /* Close pg_database, but keep lock till commit */
1728 table_close(rel, NoLock);
1729
1730 return address;
1731}
1732
1733
1734/*
1735 * Helper functions
1736 */
1737
1738/*
1739 * Look up info about the database named "name". If the database exists,
1740 * obtain the specified lock type on it, fill in any of the remaining
1741 * parameters that aren't NULL, and return true. If no such database,
1742 * return false.
1743 */
1744static bool
1745get_db_info(const char *name, LOCKMODE lockmode,
1746 Oid *dbIdP, Oid *ownerIdP,
1747 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1748 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1749 MultiXactId *dbMinMultiP,
1750 Oid *dbTablespace, char **dbCollate, char **dbCtype)
1751{
1752 bool result = false;
1753 Relation relation;
1754
1755 AssertArg(name);
1756
1757 /* Caller may wish to grab a better lock on pg_database beforehand... */
1758 relation = table_open(DatabaseRelationId, AccessShareLock);
1759
1760 /*
1761 * Loop covers the rare case where the database is renamed before we can
1762 * lock it. We try again just in case we can find a new one of the same
1763 * name.
1764 */
1765 for (;;)
1766 {
1767 ScanKeyData scanKey;
1768 SysScanDesc scan;
1769 HeapTuple tuple;
1770 Oid dbOid;
1771
1772 /*
1773 * there's no syscache for database-indexed-by-name, so must do it the
1774 * hard way
1775 */
1776 ScanKeyInit(&scanKey,
1777 Anum_pg_database_datname,
1778 BTEqualStrategyNumber, F_NAMEEQ,
1779 CStringGetDatum(name));
1780
1781 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1782 NULL, 1, &scanKey);
1783
1784 tuple = systable_getnext(scan);
1785
1786 if (!HeapTupleIsValid(tuple))
1787 {
1788 /* definitely no database of that name */
1789 systable_endscan(scan);
1790 break;
1791 }
1792
1793 dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
1794
1795 systable_endscan(scan);
1796
1797 /*
1798 * Now that we have a database OID, we can try to lock the DB.
1799 */
1800 if (lockmode != NoLock)
1801 LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1802
1803 /*
1804 * And now, re-fetch the tuple by OID. If it's still there and still
1805 * the same name, we win; else, drop the lock and loop back to try
1806 * again.
1807 */
1808 tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1809 if (HeapTupleIsValid(tuple))
1810 {
1811 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1812
1813 if (strcmp(name, NameStr(dbform->datname)) == 0)
1814 {
1815 /* oid of the database */
1816 if (dbIdP)
1817 *dbIdP = dbOid;
1818 /* oid of the owner */
1819 if (ownerIdP)
1820 *ownerIdP = dbform->datdba;
1821 /* character encoding */
1822 if (encodingP)
1823 *encodingP = dbform->encoding;
1824 /* allowed as template? */
1825 if (dbIsTemplateP)
1826 *dbIsTemplateP = dbform->datistemplate;
1827 /* allowing connections? */
1828 if (dbAllowConnP)
1829 *dbAllowConnP = dbform->datallowconn;
1830 /* last system OID used in database */
1831 if (dbLastSysOidP)
1832 *dbLastSysOidP = dbform->datlastsysoid;
1833 /* limit of frozen XIDs */
1834 if (dbFrozenXidP)
1835 *dbFrozenXidP = dbform->datfrozenxid;
1836 /* minimum MultixactId */
1837 if (dbMinMultiP)
1838 *dbMinMultiP = dbform->datminmxid;
1839 /* default tablespace for this database */
1840 if (dbTablespace)
1841 *dbTablespace = dbform->dattablespace;
1842 /* default locale settings for this database */
1843 if (dbCollate)
1844 *dbCollate = pstrdup(NameStr(dbform->datcollate));
1845 if (dbCtype)
1846 *dbCtype = pstrdup(NameStr(dbform->datctype));
1847 ReleaseSysCache(tuple);
1848 result = true;
1849 break;
1850 }
1851 /* can only get here if it was just renamed */
1852 ReleaseSysCache(tuple);
1853 }
1854
1855 if (lockmode != NoLock)
1856 UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1857 }
1858
1859 table_close(relation, AccessShareLock);
1860
1861 return result;
1862}
1863
1864/* Check if current user has createdb privileges */
1865static bool
1866have_createdb_privilege(void)
1867{
1868 bool result = false;
1869 HeapTuple utup;
1870
1871 /* Superusers can always do everything */
1872 if (superuser())
1873 return true;
1874
1875 utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1876 if (HeapTupleIsValid(utup))
1877 {
1878 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1879 ReleaseSysCache(utup);
1880 }
1881 return result;
1882}
1883
1884/*
1885 * Remove tablespace directories
1886 *
1887 * We don't know what tablespaces db_id is using, so iterate through all
1888 * tablespaces removing <tablespace>/db_id
1889 */
1890static void
1891remove_dbtablespaces(Oid db_id)
1892{
1893 Relation rel;
1894 TableScanDesc scan;
1895 HeapTuple tuple;
1896
1897 rel = table_open(TableSpaceRelationId, AccessShareLock);
1898 scan = table_beginscan_catalog(rel, 0, NULL);
1899 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1900 {
1901 Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
1902 Oid dsttablespace = spcform->oid;
1903 char *dstpath;
1904 struct stat st;
1905
1906 /* Don't mess with the global tablespace */
1907 if (dsttablespace == GLOBALTABLESPACE_OID)
1908 continue;
1909
1910 dstpath = GetDatabasePath(db_id, dsttablespace);
1911
1912 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1913 {
1914 /* Assume we can ignore it */
1915 pfree(dstpath);
1916 continue;
1917 }
1918
1919 if (!rmtree(dstpath, true))
1920 ereport(WARNING,
1921 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1922 dstpath)));
1923
1924 /* Record the filesystem change in XLOG */
1925 {
1926 xl_dbase_drop_rec xlrec;
1927
1928 xlrec.db_id = db_id;
1929 xlrec.tablespace_id = dsttablespace;
1930
1931 XLogBeginInsert();
1932 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1933
1934 (void) XLogInsert(RM_DBASE_ID,
1935 XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1936 }
1937
1938 pfree(dstpath);
1939 }
1940
1941 table_endscan(scan);
1942 table_close(rel, AccessShareLock);
1943}
1944
1945/*
1946 * Check for existing files that conflict with a proposed new DB OID;
1947 * return true if there are any
1948 *
1949 * If there were a subdirectory in any tablespace matching the proposed new
1950 * OID, we'd get a create failure due to the duplicate name ... and then we'd
1951 * try to remove that already-existing subdirectory during the cleanup in
1952 * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
1953 * instead we make this extra check before settling on the OID of the new
1954 * database. This exactly parallels what GetNewRelFileNode() does for table
1955 * relfilenode values.
1956 */
1957static bool
1958check_db_file_conflict(Oid db_id)
1959{
1960 bool result = false;
1961 Relation rel;
1962 TableScanDesc scan;
1963 HeapTuple tuple;
1964
1965 rel = table_open(TableSpaceRelationId, AccessShareLock);
1966 scan = table_beginscan_catalog(rel, 0, NULL);
1967 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1968 {
1969 Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
1970 Oid dsttablespace = spcform->oid;
1971 char *dstpath;
1972 struct stat st;
1973
1974 /* Don't mess with the global tablespace */
1975 if (dsttablespace == GLOBALTABLESPACE_OID)
1976 continue;
1977
1978 dstpath = GetDatabasePath(db_id, dsttablespace);
1979
1980 if (lstat(dstpath, &st) == 0)
1981 {
1982 /* Found a conflicting file (or directory, whatever) */
1983 pfree(dstpath);
1984 result = true;
1985 break;
1986 }
1987
1988 pfree(dstpath);
1989 }
1990
1991 table_endscan(scan);
1992 table_close(rel, AccessShareLock);
1993
1994 return result;
1995}
1996
1997/*
1998 * Issue a suitable errdetail message for a busy database
1999 */
2000static int
2001errdetail_busy_db(int notherbackends, int npreparedxacts)
2002{
2003 if (notherbackends > 0 && npreparedxacts > 0)
2004
2005 /*
2006 * We don't deal with singular versus plural here, since gettext
2007 * doesn't support multiple plurals in one string.
2008 */
2009 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
2010 notherbackends, npreparedxacts);
2011 else if (notherbackends > 0)
2012 errdetail_plural("There is %d other session using the database.",
2013 "There are %d other sessions using the database.",
2014 notherbackends,
2015 notherbackends);
2016 else
2017 errdetail_plural("There is %d prepared transaction using the database.",
2018 "There are %d prepared transactions using the database.",
2019 npreparedxacts,
2020 npreparedxacts);
2021 return 0; /* just to keep ereport macro happy */
2022}
2023
2024/*
2025 * get_database_oid - given a database name, look up the OID
2026 *
2027 * If missing_ok is false, throw an error if database name not found. If
2028 * true, just return InvalidOid.
2029 */
2030Oid
2031get_database_oid(const char *dbname, bool missing_ok)
2032{
2033 Relation pg_database;
2034 ScanKeyData entry[1];
2035 SysScanDesc scan;
2036 HeapTuple dbtuple;
2037 Oid oid;
2038
2039 /*
2040 * There's no syscache for pg_database indexed by name, so we must look
2041 * the hard way.
2042 */
2043 pg_database = table_open(DatabaseRelationId, AccessShareLock);
2044 ScanKeyInit(&entry[0],
2045 Anum_pg_database_datname,
2046 BTEqualStrategyNumber, F_NAMEEQ,
2047 CStringGetDatum(dbname));
2048 scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
2049 NULL, 1, entry);
2050
2051 dbtuple = systable_getnext(scan);
2052
2053 /* We assume that there can be at most one matching tuple */
2054 if (HeapTupleIsValid(dbtuple))
2055 oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
2056 else
2057 oid = InvalidOid;
2058
2059 systable_endscan(scan);
2060 table_close(pg_database, AccessShareLock);
2061
2062 if (!OidIsValid(oid) && !missing_ok)
2063 ereport(ERROR,
2064 (errcode(ERRCODE_UNDEFINED_DATABASE),
2065 errmsg("database \"%s\" does not exist",
2066 dbname)));
2067
2068 return oid;
2069}
2070
2071
2072/*
2073 * get_database_name - given a database OID, look up the name
2074 *
2075 * Returns a palloc'd string, or NULL if no such database.
2076 */
2077char *
2078get_database_name(Oid dbid)
2079{
2080 HeapTuple dbtuple;
2081 char *result;
2082
2083 dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2084 if (HeapTupleIsValid(dbtuple))
2085 {
2086 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
2087 ReleaseSysCache(dbtuple);
2088 }
2089 else
2090 result = NULL;
2091
2092 return result;
2093}
2094
2095/*
2096 * DATABASE resource manager's routines
2097 */
2098void
2099dbase_redo(XLogReaderState *record)
2100{
2101 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2102
2103 /* Backup blocks are not used in dbase records */
2104 Assert(!XLogRecHasAnyBlockRefs(record));
2105
2106 if (info == XLOG_DBASE_CREATE)
2107 {
2108 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
2109 char *src_path;
2110 char *dst_path;
2111 struct stat st;
2112
2113 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
2114 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2115
2116 /*
2117 * Our theory for replaying a CREATE is to forcibly drop the target
2118 * subdirectory if present, then re-copy the source data. This may be
2119 * more work than needed, but it is simple to implement.
2120 */
2121 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
2122 {
2123 if (!rmtree(dst_path, true))
2124 /* If this failed, copydir() below is going to error. */
2125 ereport(WARNING,
2126 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2127 dst_path)));
2128 }
2129
2130 /*
2131 * Force dirty buffers out to disk, to ensure source database is
2132 * up-to-date for the copy.
2133 */
2134 FlushDatabaseBuffers(xlrec->src_db_id);
2135
2136 /*
2137 * Copy this subdirectory to the new location
2138 *
2139 * We don't need to copy subdirectories
2140 */
2141 copydir(src_path, dst_path, false);
2142 }
2143 else if (info == XLOG_DBASE_DROP)
2144 {
2145 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
2146 char *dst_path;
2147
2148 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2149
2150 if (InHotStandby)
2151 {
2152 /*
2153 * Lock database while we resolve conflicts to ensure that
2154 * InitPostgres() cannot fully re-execute concurrently. This
2155 * avoids backends re-connecting automatically to same database,
2156 * which can happen in some cases.
2157 *
2158 * This will lock out walsenders trying to connect to db-specific
2159 * slots for logical decoding too, so it's safe for us to drop
2160 * slots.
2161 */
2162 LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2163 ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2164 }
2165
2166 /* Drop any database-specific replication slots */
2167 ReplicationSlotsDropDBSlots(xlrec->db_id);
2168
2169 /* Drop pages for this database that are in the shared buffer cache */
2170 DropDatabaseBuffers(xlrec->db_id);
2171
2172 /* Also, clean out any fsync requests that might be pending in md.c */
2173 ForgetDatabaseSyncRequests(xlrec->db_id);
2174
2175 /* Clean out the xlog relcache too */
2176 XLogDropDatabase(xlrec->db_id);
2177
2178 /* And remove the physical files */
2179 if (!rmtree(dst_path, true))
2180 ereport(WARNING,
2181 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2182 dst_path)));
2183
2184 if (InHotStandby)
2185 {
2186 /*
2187 * Release locks prior to commit. XXX There is a race condition
2188 * here that may allow backends to reconnect, but the window for
2189 * this is small because the gap between here and commit is mostly
2190 * fairly small and it is unlikely that people will be dropping
2191 * databases that we are trying to connect to anyway.
2192 */
2193 UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2194 }
2195 }
2196 else
2197 elog(PANIC, "dbase_redo: unknown op code %u", info);
2198}
2199