1/*-------------------------------------------------------------------------
2 *
3 * publicationcmds.c
4 * publication manipulation
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * publicationcmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "funcapi.h"
18#include "miscadmin.h"
19
20#include "access/genam.h"
21#include "access/htup_details.h"
22#include "access/table.h"
23#include "access/xact.h"
24
25#include "catalog/catalog.h"
26#include "catalog/indexing.h"
27#include "catalog/namespace.h"
28#include "catalog/objectaccess.h"
29#include "catalog/objectaddress.h"
30#include "catalog/pg_inherits.h"
31#include "catalog/pg_type.h"
32#include "catalog/pg_publication.h"
33#include "catalog/pg_publication_rel.h"
34
35#include "commands/dbcommands.h"
36#include "commands/defrem.h"
37#include "commands/event_trigger.h"
38#include "commands/publicationcmds.h"
39
40#include "utils/array.h"
41#include "utils/builtins.h"
42#include "utils/catcache.h"
43#include "utils/fmgroids.h"
44#include "utils/inval.h"
45#include "utils/lsyscache.h"
46#include "utils/rel.h"
47#include "utils/syscache.h"
48#include "utils/varlena.h"
49
50/* Same as MAXNUMMESSAGES in sinvaladt.c */
51#define MAX_RELCACHE_INVAL_MSGS 4096
52
53static List *OpenTableList(List *tables);
54static void CloseTableList(List *rels);
55static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
56 AlterPublicationStmt *stmt);
57static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
58
59static void
60parse_publication_options(List *options,
61 bool *publish_given,
62 bool *publish_insert,
63 bool *publish_update,
64 bool *publish_delete,
65 bool *publish_truncate)
66{
67 ListCell *lc;
68
69 *publish_given = false;
70
71 /* Defaults are true */
72 *publish_insert = true;
73 *publish_update = true;
74 *publish_delete = true;
75 *publish_truncate = true;
76
77 /* Parse options */
78 foreach(lc, options)
79 {
80 DefElem *defel = (DefElem *) lfirst(lc);
81
82 if (strcmp(defel->defname, "publish") == 0)
83 {
84 char *publish;
85 List *publish_list;
86 ListCell *lc;
87
88 if (*publish_given)
89 ereport(ERROR,
90 (errcode(ERRCODE_SYNTAX_ERROR),
91 errmsg("conflicting or redundant options")));
92
93 /*
94 * If publish option was given only the explicitly listed actions
95 * should be published.
96 */
97 *publish_insert = false;
98 *publish_update = false;
99 *publish_delete = false;
100 *publish_truncate = false;
101
102 *publish_given = true;
103 publish = defGetString(defel);
104
105 if (!SplitIdentifierString(publish, ',', &publish_list))
106 ereport(ERROR,
107 (errcode(ERRCODE_SYNTAX_ERROR),
108 errmsg("invalid list syntax for \"publish\" option")));
109
110 /* Process the option list. */
111 foreach(lc, publish_list)
112 {
113 char *publish_opt = (char *) lfirst(lc);
114
115 if (strcmp(publish_opt, "insert") == 0)
116 *publish_insert = true;
117 else if (strcmp(publish_opt, "update") == 0)
118 *publish_update = true;
119 else if (strcmp(publish_opt, "delete") == 0)
120 *publish_delete = true;
121 else if (strcmp(publish_opt, "truncate") == 0)
122 *publish_truncate = true;
123 else
124 ereport(ERROR,
125 (errcode(ERRCODE_SYNTAX_ERROR),
126 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
127 }
128 }
129 else
130 ereport(ERROR,
131 (errcode(ERRCODE_SYNTAX_ERROR),
132 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
133 }
134}
135
136/*
137 * Create new publication.
138 */
139ObjectAddress
140CreatePublication(CreatePublicationStmt *stmt)
141{
142 Relation rel;
143 ObjectAddress myself;
144 Oid puboid;
145 bool nulls[Natts_pg_publication];
146 Datum values[Natts_pg_publication];
147 HeapTuple tup;
148 bool publish_given;
149 bool publish_insert;
150 bool publish_update;
151 bool publish_delete;
152 bool publish_truncate;
153 AclResult aclresult;
154
155 /* must have CREATE privilege on database */
156 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
157 if (aclresult != ACLCHECK_OK)
158 aclcheck_error(aclresult, OBJECT_DATABASE,
159 get_database_name(MyDatabaseId));
160
161 /* FOR ALL TABLES requires superuser */
162 if (stmt->for_all_tables && !superuser())
163 ereport(ERROR,
164 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
165 (errmsg("must be superuser to create FOR ALL TABLES publication"))));
166
167 rel = table_open(PublicationRelationId, RowExclusiveLock);
168
169 /* Check if name is used */
170 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
171 CStringGetDatum(stmt->pubname));
172 if (OidIsValid(puboid))
173 {
174 ereport(ERROR,
175 (errcode(ERRCODE_DUPLICATE_OBJECT),
176 errmsg("publication \"%s\" already exists",
177 stmt->pubname)));
178 }
179
180 /* Form a tuple. */
181 memset(values, 0, sizeof(values));
182 memset(nulls, false, sizeof(nulls));
183
184 values[Anum_pg_publication_pubname - 1] =
185 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
186 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
187
188 parse_publication_options(stmt->options,
189 &publish_given, &publish_insert,
190 &publish_update, &publish_delete,
191 &publish_truncate);
192
193 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
194 Anum_pg_publication_oid);
195 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
196 values[Anum_pg_publication_puballtables - 1] =
197 BoolGetDatum(stmt->for_all_tables);
198 values[Anum_pg_publication_pubinsert - 1] =
199 BoolGetDatum(publish_insert);
200 values[Anum_pg_publication_pubupdate - 1] =
201 BoolGetDatum(publish_update);
202 values[Anum_pg_publication_pubdelete - 1] =
203 BoolGetDatum(publish_delete);
204 values[Anum_pg_publication_pubtruncate - 1] =
205 BoolGetDatum(publish_truncate);
206
207 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
208
209 /* Insert tuple into catalog. */
210 CatalogTupleInsert(rel, tup);
211 heap_freetuple(tup);
212
213 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
214
215 ObjectAddressSet(myself, PublicationRelationId, puboid);
216
217 /* Make the changes visible. */
218 CommandCounterIncrement();
219
220 if (stmt->tables)
221 {
222 List *rels;
223
224 Assert(list_length(stmt->tables) > 0);
225
226 rels = OpenTableList(stmt->tables);
227 PublicationAddTables(puboid, rels, true, NULL);
228 CloseTableList(rels);
229 }
230
231 table_close(rel, RowExclusiveLock);
232
233 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
234
235 return myself;
236}
237
238/*
239 * Change options of a publication.
240 */
241static void
242AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
243 HeapTuple tup)
244{
245 bool nulls[Natts_pg_publication];
246 bool replaces[Natts_pg_publication];
247 Datum values[Natts_pg_publication];
248 bool publish_given;
249 bool publish_insert;
250 bool publish_update;
251 bool publish_delete;
252 bool publish_truncate;
253 ObjectAddress obj;
254 Form_pg_publication pubform;
255
256 parse_publication_options(stmt->options,
257 &publish_given, &publish_insert,
258 &publish_update, &publish_delete,
259 &publish_truncate);
260
261 /* Everything ok, form a new tuple. */
262 memset(values, 0, sizeof(values));
263 memset(nulls, false, sizeof(nulls));
264 memset(replaces, false, sizeof(replaces));
265
266 if (publish_given)
267 {
268 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
269 replaces[Anum_pg_publication_pubinsert - 1] = true;
270
271 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
272 replaces[Anum_pg_publication_pubupdate - 1] = true;
273
274 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
275 replaces[Anum_pg_publication_pubdelete - 1] = true;
276
277 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
278 replaces[Anum_pg_publication_pubtruncate - 1] = true;
279 }
280
281 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
282 replaces);
283
284 /* Update the catalog. */
285 CatalogTupleUpdate(rel, &tup->t_self, tup);
286
287 CommandCounterIncrement();
288
289 pubform = (Form_pg_publication) GETSTRUCT(tup);
290
291 /* Invalidate the relcache. */
292 if (pubform->puballtables)
293 {
294 CacheInvalidateRelcacheAll();
295 }
296 else
297 {
298 List *relids = GetPublicationRelations(pubform->oid);
299
300 /*
301 * We don't want to send too many individual messages, at some point
302 * it's cheaper to just reset whole relcache.
303 */
304 if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
305 {
306 ListCell *lc;
307
308 foreach(lc, relids)
309 {
310 Oid relid = lfirst_oid(lc);
311
312 CacheInvalidateRelcacheByRelid(relid);
313 }
314 }
315 else
316 CacheInvalidateRelcacheAll();
317 }
318
319 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
320 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
321 (Node *) stmt);
322
323 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
324}
325
326/*
327 * Add or remove table to/from publication.
328 */
329static void
330AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
331 HeapTuple tup)
332{
333 List *rels = NIL;
334 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
335 Oid pubid = pubform->oid;
336
337 /* Check that user is allowed to manipulate the publication tables. */
338 if (pubform->puballtables)
339 ereport(ERROR,
340 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
341 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
342 NameStr(pubform->pubname)),
343 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
344
345 Assert(list_length(stmt->tables) > 0);
346
347 rels = OpenTableList(stmt->tables);
348
349 if (stmt->tableAction == DEFELEM_ADD)
350 PublicationAddTables(pubid, rels, false, stmt);
351 else if (stmt->tableAction == DEFELEM_DROP)
352 PublicationDropTables(pubid, rels, false);
353 else /* DEFELEM_SET */
354 {
355 List *oldrelids = GetPublicationRelations(pubid);
356 List *delrels = NIL;
357 ListCell *oldlc;
358
359 /* Calculate which relations to drop. */
360 foreach(oldlc, oldrelids)
361 {
362 Oid oldrelid = lfirst_oid(oldlc);
363 ListCell *newlc;
364 bool found = false;
365
366 foreach(newlc, rels)
367 {
368 Relation newrel = (Relation) lfirst(newlc);
369
370 if (RelationGetRelid(newrel) == oldrelid)
371 {
372 found = true;
373 break;
374 }
375 }
376
377 if (!found)
378 {
379 Relation oldrel = table_open(oldrelid,
380 ShareUpdateExclusiveLock);
381
382 delrels = lappend(delrels, oldrel);
383 }
384 }
385
386 /* And drop them. */
387 PublicationDropTables(pubid, delrels, true);
388
389 /*
390 * Don't bother calculating the difference for adding, we'll catch and
391 * skip existing ones when doing catalog update.
392 */
393 PublicationAddTables(pubid, rels, true, stmt);
394
395 CloseTableList(delrels);
396 }
397
398 CloseTableList(rels);
399}
400
401/*
402 * Alter the existing publication.
403 *
404 * This is dispatcher function for AlterPublicationOptions and
405 * AlterPublicationTables.
406 */
407void
408AlterPublication(AlterPublicationStmt *stmt)
409{
410 Relation rel;
411 HeapTuple tup;
412 Form_pg_publication pubform;
413
414 rel = table_open(PublicationRelationId, RowExclusiveLock);
415
416 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
417 CStringGetDatum(stmt->pubname));
418
419 if (!HeapTupleIsValid(tup))
420 ereport(ERROR,
421 (errcode(ERRCODE_UNDEFINED_OBJECT),
422 errmsg("publication \"%s\" does not exist",
423 stmt->pubname)));
424
425 pubform = (Form_pg_publication) GETSTRUCT(tup);
426
427 /* must be owner */
428 if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
429 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
430 stmt->pubname);
431
432 if (stmt->options)
433 AlterPublicationOptions(stmt, rel, tup);
434 else
435 AlterPublicationTables(stmt, rel, tup);
436
437 /* Cleanup. */
438 heap_freetuple(tup);
439 table_close(rel, RowExclusiveLock);
440}
441
442/*
443 * Drop publication by OID
444 */
445void
446RemovePublicationById(Oid pubid)
447{
448 Relation rel;
449 HeapTuple tup;
450
451 rel = table_open(PublicationRelationId, RowExclusiveLock);
452
453 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
454
455 if (!HeapTupleIsValid(tup))
456 elog(ERROR, "cache lookup failed for publication %u", pubid);
457
458 CatalogTupleDelete(rel, &tup->t_self);
459
460 ReleaseSysCache(tup);
461
462 table_close(rel, RowExclusiveLock);
463}
464
465/*
466 * Remove relation from publication by mapping OID.
467 */
468void
469RemovePublicationRelById(Oid proid)
470{
471 Relation rel;
472 HeapTuple tup;
473 Form_pg_publication_rel pubrel;
474
475 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
476
477 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
478
479 if (!HeapTupleIsValid(tup))
480 elog(ERROR, "cache lookup failed for publication table %u",
481 proid);
482
483 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
484
485 /* Invalidate relcache so that publication info is rebuilt. */
486 CacheInvalidateRelcacheByRelid(pubrel->prrelid);
487
488 CatalogTupleDelete(rel, &tup->t_self);
489
490 ReleaseSysCache(tup);
491
492 table_close(rel, RowExclusiveLock);
493}
494
495/*
496 * Open relations specified by a RangeVar list.
497 * The returned tables are locked in ShareUpdateExclusiveLock mode.
498 */
499static List *
500OpenTableList(List *tables)
501{
502 List *relids = NIL;
503 List *rels = NIL;
504 ListCell *lc;
505
506 /*
507 * Open, share-lock, and check all the explicitly-specified relations
508 */
509 foreach(lc, tables)
510 {
511 RangeVar *rv = castNode(RangeVar, lfirst(lc));
512 bool recurse = rv->inh;
513 Relation rel;
514 Oid myrelid;
515
516 /* Allow query cancel in case this takes a long time */
517 CHECK_FOR_INTERRUPTS();
518
519 rel = table_openrv(rv, ShareUpdateExclusiveLock);
520 myrelid = RelationGetRelid(rel);
521
522 /*
523 * Filter out duplicates if user specifies "foo, foo".
524 *
525 * Note that this algorithm is known to not be very efficient (O(N^2))
526 * but given that it only works on list of tables given to us by user
527 * it's deemed acceptable.
528 */
529 if (list_member_oid(relids, myrelid))
530 {
531 table_close(rel, ShareUpdateExclusiveLock);
532 continue;
533 }
534
535 rels = lappend(rels, rel);
536 relids = lappend_oid(relids, myrelid);
537
538 /* Add children of this rel, if requested */
539 if (recurse)
540 {
541 List *children;
542 ListCell *child;
543
544 children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
545 NULL);
546
547 foreach(child, children)
548 {
549 Oid childrelid = lfirst_oid(child);
550
551 /* Allow query cancel in case this takes a long time */
552 CHECK_FOR_INTERRUPTS();
553
554 /*
555 * Skip duplicates if user specified both parent and child
556 * tables.
557 */
558 if (list_member_oid(relids, childrelid))
559 continue;
560
561 /* find_all_inheritors already got lock */
562 rel = table_open(childrelid, NoLock);
563 rels = lappend(rels, rel);
564 relids = lappend_oid(relids, childrelid);
565 }
566 }
567 }
568
569 list_free(relids);
570
571 return rels;
572}
573
574/*
575 * Close all relations in the list.
576 */
577static void
578CloseTableList(List *rels)
579{
580 ListCell *lc;
581
582 foreach(lc, rels)
583 {
584 Relation rel = (Relation) lfirst(lc);
585
586 table_close(rel, NoLock);
587 }
588}
589
590/*
591 * Add listed tables to the publication.
592 */
593static void
594PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
595 AlterPublicationStmt *stmt)
596{
597 ListCell *lc;
598
599 Assert(!stmt || !stmt->for_all_tables);
600
601 foreach(lc, rels)
602 {
603 Relation rel = (Relation) lfirst(lc);
604 ObjectAddress obj;
605
606 /* Must be owner of the table or superuser. */
607 if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
608 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
609 RelationGetRelationName(rel));
610
611 obj = publication_add_relation(pubid, rel, if_not_exists);
612 if (stmt)
613 {
614 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
615 (Node *) stmt);
616
617 InvokeObjectPostCreateHook(PublicationRelRelationId,
618 obj.objectId, 0);
619 }
620 }
621}
622
623/*
624 * Remove listed tables from the publication.
625 */
626static void
627PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
628{
629 ObjectAddress obj;
630 ListCell *lc;
631 Oid prid;
632
633 foreach(lc, rels)
634 {
635 Relation rel = (Relation) lfirst(lc);
636 Oid relid = RelationGetRelid(rel);
637
638 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
639 ObjectIdGetDatum(relid),
640 ObjectIdGetDatum(pubid));
641 if (!OidIsValid(prid))
642 {
643 if (missing_ok)
644 continue;
645
646 ereport(ERROR,
647 (errcode(ERRCODE_UNDEFINED_OBJECT),
648 errmsg("relation \"%s\" is not part of the publication",
649 RelationGetRelationName(rel))));
650 }
651
652 ObjectAddressSet(obj, PublicationRelRelationId, prid);
653 performDeletion(&obj, DROP_CASCADE, 0);
654 }
655}
656
657/*
658 * Internal workhorse for changing a publication owner
659 */
660static void
661AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
662{
663 Form_pg_publication form;
664
665 form = (Form_pg_publication) GETSTRUCT(tup);
666
667 if (form->pubowner == newOwnerId)
668 return;
669
670 if (!superuser())
671 {
672 AclResult aclresult;
673
674 /* Must be owner */
675 if (!pg_publication_ownercheck(form->oid, GetUserId()))
676 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
677 NameStr(form->pubname));
678
679 /* Must be able to become new owner */
680 check_is_member_of_role(GetUserId(), newOwnerId);
681
682 /* New owner must have CREATE privilege on database */
683 aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
684 if (aclresult != ACLCHECK_OK)
685 aclcheck_error(aclresult, OBJECT_DATABASE,
686 get_database_name(MyDatabaseId));
687
688 if (form->puballtables && !superuser_arg(newOwnerId))
689 ereport(ERROR,
690 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
691 errmsg("permission denied to change owner of publication \"%s\"",
692 NameStr(form->pubname)),
693 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
694 }
695
696 form->pubowner = newOwnerId;
697 CatalogTupleUpdate(rel, &tup->t_self, tup);
698
699 /* Update owner dependency reference */
700 changeDependencyOnOwner(PublicationRelationId,
701 form->oid,
702 newOwnerId);
703
704 InvokeObjectPostAlterHook(PublicationRelationId,
705 form->oid, 0);
706}
707
708/*
709 * Change publication owner -- by name
710 */
711ObjectAddress
712AlterPublicationOwner(const char *name, Oid newOwnerId)
713{
714 Oid subid;
715 HeapTuple tup;
716 Relation rel;
717 ObjectAddress address;
718 Form_pg_publication pubform;
719
720 rel = table_open(PublicationRelationId, RowExclusiveLock);
721
722 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
723
724 if (!HeapTupleIsValid(tup))
725 ereport(ERROR,
726 (errcode(ERRCODE_UNDEFINED_OBJECT),
727 errmsg("publication \"%s\" does not exist", name)));
728
729 pubform = (Form_pg_publication) GETSTRUCT(tup);
730 subid = pubform->oid;
731
732 AlterPublicationOwner_internal(rel, tup, newOwnerId);
733
734 ObjectAddressSet(address, PublicationRelationId, subid);
735
736 heap_freetuple(tup);
737
738 table_close(rel, RowExclusiveLock);
739
740 return address;
741}
742
743/*
744 * Change publication owner -- by OID
745 */
746void
747AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
748{
749 HeapTuple tup;
750 Relation rel;
751
752 rel = table_open(PublicationRelationId, RowExclusiveLock);
753
754 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
755
756 if (!HeapTupleIsValid(tup))
757 ereport(ERROR,
758 (errcode(ERRCODE_UNDEFINED_OBJECT),
759 errmsg("publication with OID %u does not exist", subid)));
760
761 AlterPublicationOwner_internal(rel, tup, newOwnerId);
762
763 heap_freetuple(tup);
764
765 table_close(rel, RowExclusiveLock);
766}
767