1/*-------------------------------------------------------------------------
2 *
3 * pg_publication.c
4 * publication C API manipulation
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * pg_publication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "funcapi.h"
18#include "miscadmin.h"
19
20#include "access/genam.h"
21#include "access/heapam.h"
22#include "access/htup_details.h"
23#include "access/tableam.h"
24#include "access/xact.h"
25
26#include "catalog/catalog.h"
27#include "catalog/dependency.h"
28#include "catalog/index.h"
29#include "catalog/indexing.h"
30#include "catalog/namespace.h"
31#include "catalog/objectaccess.h"
32#include "catalog/objectaddress.h"
33#include "catalog/pg_type.h"
34#include "catalog/pg_publication.h"
35#include "catalog/pg_publication_rel.h"
36
37#include "utils/array.h"
38#include "utils/builtins.h"
39#include "utils/catcache.h"
40#include "utils/fmgroids.h"
41#include "utils/inval.h"
42#include "utils/lsyscache.h"
43#include "utils/rel.h"
44#include "utils/syscache.h"
45
46/*
47 * Check if relation can be in given publication and throws appropriate
48 * error if not.
49 */
50static void
51check_publication_add_relation(Relation targetrel)
52{
53 /* Give more specific error for partitioned tables */
54 if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
55 ereport(ERROR,
56 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
57 errmsg("\"%s\" is a partitioned table",
58 RelationGetRelationName(targetrel)),
59 errdetail("Adding partitioned tables to publications is not supported."),
60 errhint("You can add the table partitions individually.")));
61
62 /* Must be table */
63 if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
64 ereport(ERROR,
65 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 errmsg("\"%s\" is not a table",
67 RelationGetRelationName(targetrel)),
68 errdetail("Only tables can be added to publications.")));
69
70 /* Can't be system table */
71 if (IsCatalogRelation(targetrel))
72 ereport(ERROR,
73 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 errmsg("\"%s\" is a system table",
75 RelationGetRelationName(targetrel)),
76 errdetail("System tables cannot be added to publications.")));
77
78 /* UNLOGGED and TEMP relations cannot be part of publication. */
79 if (!RelationNeedsWAL(targetrel))
80 ereport(ERROR,
81 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 errmsg("table \"%s\" cannot be replicated",
83 RelationGetRelationName(targetrel)),
84 errdetail("Temporary and unlogged relations cannot be replicated.")));
85}
86
87/*
88 * Returns if relation represented by oid and Form_pg_class entry
89 * is publishable.
90 *
91 * Does same checks as the above, but does not need relation to be opened
92 * and also does not throw errors.
93 *
94 * XXX This also excludes all tables with relid < FirstNormalObjectId,
95 * ie all tables created during initdb. This mainly affects the preinstalled
96 * information_schema. IsCatalogRelationOid() only excludes tables with
97 * relid < FirstBootstrapObjectId, making that test rather redundant,
98 * but really we should get rid of the FirstNormalObjectId test not
99 * IsCatalogRelationOid. We can't do so today because we don't want
100 * information_schema tables to be considered publishable; but this test
101 * is really inadequate for that, since the information_schema could be
102 * dropped and reloaded and then it'll be considered publishable. The best
103 * long-term solution may be to add a "relispublishable" bool to pg_class,
104 * and depend on that instead of OID checks.
105 */
106static bool
107is_publishable_class(Oid relid, Form_pg_class reltuple)
108{
109 return reltuple->relkind == RELKIND_RELATION &&
110 !IsCatalogRelationOid(relid) &&
111 reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
112 relid >= FirstNormalObjectId;
113}
114
115/*
116 * Another variant of this, taking a Relation.
117 */
118bool
119is_publishable_relation(Relation rel)
120{
121 return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
122}
123
124
125/*
126 * SQL-callable variant of the above
127 *
128 * This returns null when the relation does not exist. This is intended to be
129 * used for example in psql to avoid gratuitous errors when there are
130 * concurrent catalog changes.
131 */
132Datum
133pg_relation_is_publishable(PG_FUNCTION_ARGS)
134{
135 Oid relid = PG_GETARG_OID(0);
136 HeapTuple tuple;
137 bool result;
138
139 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
140 if (!HeapTupleIsValid(tuple))
141 PG_RETURN_NULL();
142 result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
143 ReleaseSysCache(tuple);
144 PG_RETURN_BOOL(result);
145}
146
147
148/*
149 * Insert new publication / relation mapping.
150 */
151ObjectAddress
152publication_add_relation(Oid pubid, Relation targetrel,
153 bool if_not_exists)
154{
155 Relation rel;
156 HeapTuple tup;
157 Datum values[Natts_pg_publication_rel];
158 bool nulls[Natts_pg_publication_rel];
159 Oid relid = RelationGetRelid(targetrel);
160 Oid prrelid;
161 Publication *pub = GetPublication(pubid);
162 ObjectAddress myself,
163 referenced;
164
165 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
166
167 /*
168 * Check for duplicates. Note that this does not really prevent
169 * duplicates, it's here just to provide nicer error message in common
170 * case. The real protection is the unique key on the catalog.
171 */
172 if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
173 ObjectIdGetDatum(pubid)))
174 {
175 table_close(rel, RowExclusiveLock);
176
177 if (if_not_exists)
178 return InvalidObjectAddress;
179
180 ereport(ERROR,
181 (errcode(ERRCODE_DUPLICATE_OBJECT),
182 errmsg("relation \"%s\" is already member of publication \"%s\"",
183 RelationGetRelationName(targetrel), pub->name)));
184 }
185
186 check_publication_add_relation(targetrel);
187
188 /* Form a tuple. */
189 memset(values, 0, sizeof(values));
190 memset(nulls, false, sizeof(nulls));
191
192 prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
193 Anum_pg_publication_rel_oid);
194 values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
195 values[Anum_pg_publication_rel_prpubid - 1] =
196 ObjectIdGetDatum(pubid);
197 values[Anum_pg_publication_rel_prrelid - 1] =
198 ObjectIdGetDatum(relid);
199
200 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
201
202 /* Insert tuple into catalog. */
203 CatalogTupleInsert(rel, tup);
204 heap_freetuple(tup);
205
206 ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
207
208 /* Add dependency on the publication */
209 ObjectAddressSet(referenced, PublicationRelationId, pubid);
210 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
211
212 /* Add dependency on the relation */
213 ObjectAddressSet(referenced, RelationRelationId, relid);
214 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
215
216 /* Close the table. */
217 table_close(rel, RowExclusiveLock);
218
219 /* Invalidate relcache so that publication info is rebuilt. */
220 CacheInvalidateRelcache(targetrel);
221
222 return myself;
223}
224
225
226/*
227 * Gets list of publication oids for a relation oid.
228 */
229List *
230GetRelationPublications(Oid relid)
231{
232 List *result = NIL;
233 CatCList *pubrellist;
234 int i;
235
236 /* Find all publications associated with the relation. */
237 pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
238 ObjectIdGetDatum(relid));
239 for (i = 0; i < pubrellist->n_members; i++)
240 {
241 HeapTuple tup = &pubrellist->members[i]->tuple;
242 Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
243
244 result = lappend_oid(result, pubid);
245 }
246
247 ReleaseSysCacheList(pubrellist);
248
249 return result;
250}
251
252/*
253 * Gets list of relation oids for a publication.
254 *
255 * This should only be used for normal publications, the FOR ALL TABLES
256 * should use GetAllTablesPublicationRelations().
257 */
258List *
259GetPublicationRelations(Oid pubid)
260{
261 List *result;
262 Relation pubrelsrel;
263 ScanKeyData scankey;
264 SysScanDesc scan;
265 HeapTuple tup;
266
267 /* Find all publications associated with the relation. */
268 pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
269
270 ScanKeyInit(&scankey,
271 Anum_pg_publication_rel_prpubid,
272 BTEqualStrategyNumber, F_OIDEQ,
273 ObjectIdGetDatum(pubid));
274
275 scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
276 true, NULL, 1, &scankey);
277
278 result = NIL;
279 while (HeapTupleIsValid(tup = systable_getnext(scan)))
280 {
281 Form_pg_publication_rel pubrel;
282
283 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
284
285 result = lappend_oid(result, pubrel->prrelid);
286 }
287
288 systable_endscan(scan);
289 table_close(pubrelsrel, AccessShareLock);
290
291 return result;
292}
293
294/*
295 * Gets list of publication oids for publications marked as FOR ALL TABLES.
296 */
297List *
298GetAllTablesPublications(void)
299{
300 List *result;
301 Relation rel;
302 ScanKeyData scankey;
303 SysScanDesc scan;
304 HeapTuple tup;
305
306 /* Find all publications that are marked as for all tables. */
307 rel = table_open(PublicationRelationId, AccessShareLock);
308
309 ScanKeyInit(&scankey,
310 Anum_pg_publication_puballtables,
311 BTEqualStrategyNumber, F_BOOLEQ,
312 BoolGetDatum(true));
313
314 scan = systable_beginscan(rel, InvalidOid, false,
315 NULL, 1, &scankey);
316
317 result = NIL;
318 while (HeapTupleIsValid(tup = systable_getnext(scan)))
319 {
320 Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
321
322 result = lappend_oid(result, oid);
323 }
324
325 systable_endscan(scan);
326 table_close(rel, AccessShareLock);
327
328 return result;
329}
330
331/*
332 * Gets list of all relation published by FOR ALL TABLES publication(s).
333 */
334List *
335GetAllTablesPublicationRelations(void)
336{
337 Relation classRel;
338 ScanKeyData key[1];
339 TableScanDesc scan;
340 HeapTuple tuple;
341 List *result = NIL;
342
343 classRel = table_open(RelationRelationId, AccessShareLock);
344
345 ScanKeyInit(&key[0],
346 Anum_pg_class_relkind,
347 BTEqualStrategyNumber, F_CHAREQ,
348 CharGetDatum(RELKIND_RELATION));
349
350 scan = table_beginscan_catalog(classRel, 1, key);
351
352 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
353 {
354 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
355 Oid relid = relForm->oid;
356
357 if (is_publishable_class(relid, relForm))
358 result = lappend_oid(result, relid);
359 }
360
361 table_endscan(scan);
362 table_close(classRel, AccessShareLock);
363
364 return result;
365}
366
367/*
368 * Get publication using oid
369 *
370 * The Publication struct and its data are palloc'ed here.
371 */
372Publication *
373GetPublication(Oid pubid)
374{
375 HeapTuple tup;
376 Publication *pub;
377 Form_pg_publication pubform;
378
379 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
380
381 if (!HeapTupleIsValid(tup))
382 elog(ERROR, "cache lookup failed for publication %u", pubid);
383
384 pubform = (Form_pg_publication) GETSTRUCT(tup);
385
386 pub = (Publication *) palloc(sizeof(Publication));
387 pub->oid = pubid;
388 pub->name = pstrdup(NameStr(pubform->pubname));
389 pub->alltables = pubform->puballtables;
390 pub->pubactions.pubinsert = pubform->pubinsert;
391 pub->pubactions.pubupdate = pubform->pubupdate;
392 pub->pubactions.pubdelete = pubform->pubdelete;
393 pub->pubactions.pubtruncate = pubform->pubtruncate;
394
395 ReleaseSysCache(tup);
396
397 return pub;
398}
399
400
401/*
402 * Get Publication using name.
403 */
404Publication *
405GetPublicationByName(const char *pubname, bool missing_ok)
406{
407 Oid oid;
408
409 oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
410 CStringGetDatum(pubname));
411 if (!OidIsValid(oid))
412 {
413 if (missing_ok)
414 return NULL;
415
416 ereport(ERROR,
417 (errcode(ERRCODE_UNDEFINED_OBJECT),
418 errmsg("publication \"%s\" does not exist", pubname)));
419 }
420
421 return GetPublication(oid);
422}
423
424/*
425 * get_publication_oid - given a publication name, look up the OID
426 *
427 * If missing_ok is false, throw an error if name not found. If true, just
428 * return InvalidOid.
429 */
430Oid
431get_publication_oid(const char *pubname, bool missing_ok)
432{
433 Oid oid;
434
435 oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
436 CStringGetDatum(pubname));
437 if (!OidIsValid(oid) && !missing_ok)
438 ereport(ERROR,
439 (errcode(ERRCODE_UNDEFINED_OBJECT),
440 errmsg("publication \"%s\" does not exist", pubname)));
441 return oid;
442}
443
444/*
445 * get_publication_name - given a publication Oid, look up the name
446 *
447 * If missing_ok is false, throw an error if name not found. If true, just
448 * return NULL.
449 */
450char *
451get_publication_name(Oid pubid, bool missing_ok)
452{
453 HeapTuple tup;
454 char *pubname;
455 Form_pg_publication pubform;
456
457 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
458
459 if (!HeapTupleIsValid(tup))
460 {
461 if (!missing_ok)
462 elog(ERROR, "cache lookup failed for publication %u", pubid);
463 return NULL;
464 }
465
466 pubform = (Form_pg_publication) GETSTRUCT(tup);
467 pubname = pstrdup(NameStr(pubform->pubname));
468
469 ReleaseSysCache(tup);
470
471 return pubname;
472}
473
474/*
475 * Returns Oids of tables in a publication.
476 */
477Datum
478pg_get_publication_tables(PG_FUNCTION_ARGS)
479{
480 FuncCallContext *funcctx;
481 char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
482 Publication *publication;
483 List *tables;
484 ListCell **lcp;
485
486 /* stuff done only on the first call of the function */
487 if (SRF_IS_FIRSTCALL())
488 {
489 MemoryContext oldcontext;
490
491 /* create a function context for cross-call persistence */
492 funcctx = SRF_FIRSTCALL_INIT();
493
494 /* switch to memory context appropriate for multiple function calls */
495 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
496
497 publication = GetPublicationByName(pubname, false);
498 if (publication->alltables)
499 tables = GetAllTablesPublicationRelations();
500 else
501 tables = GetPublicationRelations(publication->oid);
502 lcp = (ListCell **) palloc(sizeof(ListCell *));
503 *lcp = list_head(tables);
504 funcctx->user_fctx = (void *) lcp;
505
506 MemoryContextSwitchTo(oldcontext);
507 }
508
509 /* stuff done on every call of the function */
510 funcctx = SRF_PERCALL_SETUP();
511 lcp = (ListCell **) funcctx->user_fctx;
512
513 while (*lcp != NULL)
514 {
515 Oid relid = lfirst_oid(*lcp);
516
517 *lcp = lnext(*lcp);
518 SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
519 }
520
521 SRF_RETURN_DONE(funcctx);
522}
523