1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * pg_publication.c |
4 | * publication C API manipulation |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * IDENTIFICATION |
10 | * pg_publication.c |
11 | * |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | |
15 | #include "postgres.h" |
16 | |
17 | #include "funcapi.h" |
18 | #include "miscadmin.h" |
19 | |
20 | #include "access/genam.h" |
21 | #include "access/heapam.h" |
22 | #include "access/htup_details.h" |
23 | #include "access/tableam.h" |
24 | #include "access/xact.h" |
25 | |
26 | #include "catalog/catalog.h" |
27 | #include "catalog/dependency.h" |
28 | #include "catalog/index.h" |
29 | #include "catalog/indexing.h" |
30 | #include "catalog/namespace.h" |
31 | #include "catalog/objectaccess.h" |
32 | #include "catalog/objectaddress.h" |
33 | #include "catalog/pg_type.h" |
34 | #include "catalog/pg_publication.h" |
35 | #include "catalog/pg_publication_rel.h" |
36 | |
37 | #include "utils/array.h" |
38 | #include "utils/builtins.h" |
39 | #include "utils/catcache.h" |
40 | #include "utils/fmgroids.h" |
41 | #include "utils/inval.h" |
42 | #include "utils/lsyscache.h" |
43 | #include "utils/rel.h" |
44 | #include "utils/syscache.h" |
45 | |
46 | /* |
47 | * Check if relation can be in given publication and throws appropriate |
48 | * error if not. |
49 | */ |
50 | static void |
51 | check_publication_add_relation(Relation targetrel) |
52 | { |
53 | /* Give more specific error for partitioned tables */ |
54 | if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) |
55 | ereport(ERROR, |
56 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
57 | errmsg("\"%s\" is a partitioned table" , |
58 | RelationGetRelationName(targetrel)), |
59 | errdetail("Adding partitioned tables to publications is not supported." ), |
60 | errhint("You can add the table partitions individually." ))); |
61 | |
62 | /* Must be table */ |
63 | if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) |
64 | ereport(ERROR, |
65 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
66 | errmsg("\"%s\" is not a table" , |
67 | RelationGetRelationName(targetrel)), |
68 | errdetail("Only tables can be added to publications." ))); |
69 | |
70 | /* Can't be system table */ |
71 | if (IsCatalogRelation(targetrel)) |
72 | ereport(ERROR, |
73 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
74 | errmsg("\"%s\" is a system table" , |
75 | RelationGetRelationName(targetrel)), |
76 | errdetail("System tables cannot be added to publications." ))); |
77 | |
78 | /* UNLOGGED and TEMP relations cannot be part of publication. */ |
79 | if (!RelationNeedsWAL(targetrel)) |
80 | ereport(ERROR, |
81 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
82 | errmsg("table \"%s\" cannot be replicated" , |
83 | RelationGetRelationName(targetrel)), |
84 | errdetail("Temporary and unlogged relations cannot be replicated." ))); |
85 | } |
86 | |
87 | /* |
88 | * Returns if relation represented by oid and Form_pg_class entry |
89 | * is publishable. |
90 | * |
91 | * Does same checks as the above, but does not need relation to be opened |
92 | * and also does not throw errors. |
93 | * |
94 | * XXX This also excludes all tables with relid < FirstNormalObjectId, |
95 | * ie all tables created during initdb. This mainly affects the preinstalled |
96 | * information_schema. IsCatalogRelationOid() only excludes tables with |
97 | * relid < FirstBootstrapObjectId, making that test rather redundant, |
98 | * but really we should get rid of the FirstNormalObjectId test not |
99 | * IsCatalogRelationOid. We can't do so today because we don't want |
100 | * information_schema tables to be considered publishable; but this test |
101 | * is really inadequate for that, since the information_schema could be |
102 | * dropped and reloaded and then it'll be considered publishable. The best |
103 | * long-term solution may be to add a "relispublishable" bool to pg_class, |
104 | * and depend on that instead of OID checks. |
105 | */ |
106 | static bool |
107 | is_publishable_class(Oid relid, Form_pg_class reltuple) |
108 | { |
109 | return reltuple->relkind == RELKIND_RELATION && |
110 | !IsCatalogRelationOid(relid) && |
111 | reltuple->relpersistence == RELPERSISTENCE_PERMANENT && |
112 | relid >= FirstNormalObjectId; |
113 | } |
114 | |
115 | /* |
116 | * Another variant of this, taking a Relation. |
117 | */ |
118 | bool |
119 | is_publishable_relation(Relation rel) |
120 | { |
121 | return is_publishable_class(RelationGetRelid(rel), rel->rd_rel); |
122 | } |
123 | |
124 | |
125 | /* |
126 | * SQL-callable variant of the above |
127 | * |
128 | * This returns null when the relation does not exist. This is intended to be |
129 | * used for example in psql to avoid gratuitous errors when there are |
130 | * concurrent catalog changes. |
131 | */ |
132 | Datum |
133 | pg_relation_is_publishable(PG_FUNCTION_ARGS) |
134 | { |
135 | Oid relid = PG_GETARG_OID(0); |
136 | HeapTuple tuple; |
137 | bool result; |
138 | |
139 | tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); |
140 | if (!HeapTupleIsValid(tuple)) |
141 | PG_RETURN_NULL(); |
142 | result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)); |
143 | ReleaseSysCache(tuple); |
144 | PG_RETURN_BOOL(result); |
145 | } |
146 | |
147 | |
148 | /* |
149 | * Insert new publication / relation mapping. |
150 | */ |
151 | ObjectAddress |
152 | publication_add_relation(Oid pubid, Relation targetrel, |
153 | bool if_not_exists) |
154 | { |
155 | Relation rel; |
156 | HeapTuple tup; |
157 | Datum values[Natts_pg_publication_rel]; |
158 | bool nulls[Natts_pg_publication_rel]; |
159 | Oid relid = RelationGetRelid(targetrel); |
160 | Oid prrelid; |
161 | Publication *pub = GetPublication(pubid); |
162 | ObjectAddress myself, |
163 | referenced; |
164 | |
165 | rel = table_open(PublicationRelRelationId, RowExclusiveLock); |
166 | |
167 | /* |
168 | * Check for duplicates. Note that this does not really prevent |
169 | * duplicates, it's here just to provide nicer error message in common |
170 | * case. The real protection is the unique key on the catalog. |
171 | */ |
172 | if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), |
173 | ObjectIdGetDatum(pubid))) |
174 | { |
175 | table_close(rel, RowExclusiveLock); |
176 | |
177 | if (if_not_exists) |
178 | return InvalidObjectAddress; |
179 | |
180 | ereport(ERROR, |
181 | (errcode(ERRCODE_DUPLICATE_OBJECT), |
182 | errmsg("relation \"%s\" is already member of publication \"%s\"" , |
183 | RelationGetRelationName(targetrel), pub->name))); |
184 | } |
185 | |
186 | check_publication_add_relation(targetrel); |
187 | |
188 | /* Form a tuple. */ |
189 | memset(values, 0, sizeof(values)); |
190 | memset(nulls, false, sizeof(nulls)); |
191 | |
192 | prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId, |
193 | Anum_pg_publication_rel_oid); |
194 | values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid); |
195 | values[Anum_pg_publication_rel_prpubid - 1] = |
196 | ObjectIdGetDatum(pubid); |
197 | values[Anum_pg_publication_rel_prrelid - 1] = |
198 | ObjectIdGetDatum(relid); |
199 | |
200 | tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); |
201 | |
202 | /* Insert tuple into catalog. */ |
203 | CatalogTupleInsert(rel, tup); |
204 | heap_freetuple(tup); |
205 | |
206 | ObjectAddressSet(myself, PublicationRelRelationId, prrelid); |
207 | |
208 | /* Add dependency on the publication */ |
209 | ObjectAddressSet(referenced, PublicationRelationId, pubid); |
210 | recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); |
211 | |
212 | /* Add dependency on the relation */ |
213 | ObjectAddressSet(referenced, RelationRelationId, relid); |
214 | recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); |
215 | |
216 | /* Close the table. */ |
217 | table_close(rel, RowExclusiveLock); |
218 | |
219 | /* Invalidate relcache so that publication info is rebuilt. */ |
220 | CacheInvalidateRelcache(targetrel); |
221 | |
222 | return myself; |
223 | } |
224 | |
225 | |
226 | /* |
227 | * Gets list of publication oids for a relation oid. |
228 | */ |
229 | List * |
230 | GetRelationPublications(Oid relid) |
231 | { |
232 | List *result = NIL; |
233 | CatCList *pubrellist; |
234 | int i; |
235 | |
236 | /* Find all publications associated with the relation. */ |
237 | pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, |
238 | ObjectIdGetDatum(relid)); |
239 | for (i = 0; i < pubrellist->n_members; i++) |
240 | { |
241 | HeapTuple tup = &pubrellist->members[i]->tuple; |
242 | Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; |
243 | |
244 | result = lappend_oid(result, pubid); |
245 | } |
246 | |
247 | ReleaseSysCacheList(pubrellist); |
248 | |
249 | return result; |
250 | } |
251 | |
252 | /* |
253 | * Gets list of relation oids for a publication. |
254 | * |
255 | * This should only be used for normal publications, the FOR ALL TABLES |
256 | * should use GetAllTablesPublicationRelations(). |
257 | */ |
258 | List * |
259 | GetPublicationRelations(Oid pubid) |
260 | { |
261 | List *result; |
262 | Relation pubrelsrel; |
263 | ScanKeyData scankey; |
264 | SysScanDesc scan; |
265 | HeapTuple tup; |
266 | |
267 | /* Find all publications associated with the relation. */ |
268 | pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); |
269 | |
270 | ScanKeyInit(&scankey, |
271 | Anum_pg_publication_rel_prpubid, |
272 | BTEqualStrategyNumber, F_OIDEQ, |
273 | ObjectIdGetDatum(pubid)); |
274 | |
275 | scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId, |
276 | true, NULL, 1, &scankey); |
277 | |
278 | result = NIL; |
279 | while (HeapTupleIsValid(tup = systable_getnext(scan))) |
280 | { |
281 | Form_pg_publication_rel pubrel; |
282 | |
283 | pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); |
284 | |
285 | result = lappend_oid(result, pubrel->prrelid); |
286 | } |
287 | |
288 | systable_endscan(scan); |
289 | table_close(pubrelsrel, AccessShareLock); |
290 | |
291 | return result; |
292 | } |
293 | |
294 | /* |
295 | * Gets list of publication oids for publications marked as FOR ALL TABLES. |
296 | */ |
297 | List * |
298 | GetAllTablesPublications(void) |
299 | { |
300 | List *result; |
301 | Relation rel; |
302 | ScanKeyData scankey; |
303 | SysScanDesc scan; |
304 | HeapTuple tup; |
305 | |
306 | /* Find all publications that are marked as for all tables. */ |
307 | rel = table_open(PublicationRelationId, AccessShareLock); |
308 | |
309 | ScanKeyInit(&scankey, |
310 | Anum_pg_publication_puballtables, |
311 | BTEqualStrategyNumber, F_BOOLEQ, |
312 | BoolGetDatum(true)); |
313 | |
314 | scan = systable_beginscan(rel, InvalidOid, false, |
315 | NULL, 1, &scankey); |
316 | |
317 | result = NIL; |
318 | while (HeapTupleIsValid(tup = systable_getnext(scan))) |
319 | { |
320 | Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid; |
321 | |
322 | result = lappend_oid(result, oid); |
323 | } |
324 | |
325 | systable_endscan(scan); |
326 | table_close(rel, AccessShareLock); |
327 | |
328 | return result; |
329 | } |
330 | |
331 | /* |
332 | * Gets list of all relation published by FOR ALL TABLES publication(s). |
333 | */ |
334 | List * |
335 | GetAllTablesPublicationRelations(void) |
336 | { |
337 | Relation classRel; |
338 | ScanKeyData key[1]; |
339 | TableScanDesc scan; |
340 | HeapTuple tuple; |
341 | List *result = NIL; |
342 | |
343 | classRel = table_open(RelationRelationId, AccessShareLock); |
344 | |
345 | ScanKeyInit(&key[0], |
346 | Anum_pg_class_relkind, |
347 | BTEqualStrategyNumber, F_CHAREQ, |
348 | CharGetDatum(RELKIND_RELATION)); |
349 | |
350 | scan = table_beginscan_catalog(classRel, 1, key); |
351 | |
352 | while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) |
353 | { |
354 | Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); |
355 | Oid relid = relForm->oid; |
356 | |
357 | if (is_publishable_class(relid, relForm)) |
358 | result = lappend_oid(result, relid); |
359 | } |
360 | |
361 | table_endscan(scan); |
362 | table_close(classRel, AccessShareLock); |
363 | |
364 | return result; |
365 | } |
366 | |
367 | /* |
368 | * Get publication using oid |
369 | * |
370 | * The Publication struct and its data are palloc'ed here. |
371 | */ |
372 | Publication * |
373 | GetPublication(Oid pubid) |
374 | { |
375 | HeapTuple tup; |
376 | Publication *pub; |
377 | Form_pg_publication pubform; |
378 | |
379 | tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); |
380 | |
381 | if (!HeapTupleIsValid(tup)) |
382 | elog(ERROR, "cache lookup failed for publication %u" , pubid); |
383 | |
384 | pubform = (Form_pg_publication) GETSTRUCT(tup); |
385 | |
386 | pub = (Publication *) palloc(sizeof(Publication)); |
387 | pub->oid = pubid; |
388 | pub->name = pstrdup(NameStr(pubform->pubname)); |
389 | pub->alltables = pubform->puballtables; |
390 | pub->pubactions.pubinsert = pubform->pubinsert; |
391 | pub->pubactions.pubupdate = pubform->pubupdate; |
392 | pub->pubactions.pubdelete = pubform->pubdelete; |
393 | pub->pubactions.pubtruncate = pubform->pubtruncate; |
394 | |
395 | ReleaseSysCache(tup); |
396 | |
397 | return pub; |
398 | } |
399 | |
400 | |
401 | /* |
402 | * Get Publication using name. |
403 | */ |
404 | Publication * |
405 | GetPublicationByName(const char *pubname, bool missing_ok) |
406 | { |
407 | Oid oid; |
408 | |
409 | oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid, |
410 | CStringGetDatum(pubname)); |
411 | if (!OidIsValid(oid)) |
412 | { |
413 | if (missing_ok) |
414 | return NULL; |
415 | |
416 | ereport(ERROR, |
417 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
418 | errmsg("publication \"%s\" does not exist" , pubname))); |
419 | } |
420 | |
421 | return GetPublication(oid); |
422 | } |
423 | |
424 | /* |
425 | * get_publication_oid - given a publication name, look up the OID |
426 | * |
427 | * If missing_ok is false, throw an error if name not found. If true, just |
428 | * return InvalidOid. |
429 | */ |
430 | Oid |
431 | get_publication_oid(const char *pubname, bool missing_ok) |
432 | { |
433 | Oid oid; |
434 | |
435 | oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid, |
436 | CStringGetDatum(pubname)); |
437 | if (!OidIsValid(oid) && !missing_ok) |
438 | ereport(ERROR, |
439 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
440 | errmsg("publication \"%s\" does not exist" , pubname))); |
441 | return oid; |
442 | } |
443 | |
444 | /* |
445 | * get_publication_name - given a publication Oid, look up the name |
446 | * |
447 | * If missing_ok is false, throw an error if name not found. If true, just |
448 | * return NULL. |
449 | */ |
450 | char * |
451 | get_publication_name(Oid pubid, bool missing_ok) |
452 | { |
453 | HeapTuple tup; |
454 | char *pubname; |
455 | Form_pg_publication pubform; |
456 | |
457 | tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); |
458 | |
459 | if (!HeapTupleIsValid(tup)) |
460 | { |
461 | if (!missing_ok) |
462 | elog(ERROR, "cache lookup failed for publication %u" , pubid); |
463 | return NULL; |
464 | } |
465 | |
466 | pubform = (Form_pg_publication) GETSTRUCT(tup); |
467 | pubname = pstrdup(NameStr(pubform->pubname)); |
468 | |
469 | ReleaseSysCache(tup); |
470 | |
471 | return pubname; |
472 | } |
473 | |
474 | /* |
475 | * Returns Oids of tables in a publication. |
476 | */ |
477 | Datum |
478 | pg_get_publication_tables(PG_FUNCTION_ARGS) |
479 | { |
480 | FuncCallContext *funcctx; |
481 | char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0)); |
482 | Publication *publication; |
483 | List *tables; |
484 | ListCell **lcp; |
485 | |
486 | /* stuff done only on the first call of the function */ |
487 | if (SRF_IS_FIRSTCALL()) |
488 | { |
489 | MemoryContext oldcontext; |
490 | |
491 | /* create a function context for cross-call persistence */ |
492 | funcctx = SRF_FIRSTCALL_INIT(); |
493 | |
494 | /* switch to memory context appropriate for multiple function calls */ |
495 | oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
496 | |
497 | publication = GetPublicationByName(pubname, false); |
498 | if (publication->alltables) |
499 | tables = GetAllTablesPublicationRelations(); |
500 | else |
501 | tables = GetPublicationRelations(publication->oid); |
502 | lcp = (ListCell **) palloc(sizeof(ListCell *)); |
503 | *lcp = list_head(tables); |
504 | funcctx->user_fctx = (void *) lcp; |
505 | |
506 | MemoryContextSwitchTo(oldcontext); |
507 | } |
508 | |
509 | /* stuff done on every call of the function */ |
510 | funcctx = SRF_PERCALL_SETUP(); |
511 | lcp = (ListCell **) funcctx->user_fctx; |
512 | |
513 | while (*lcp != NULL) |
514 | { |
515 | Oid relid = lfirst_oid(*lcp); |
516 | |
517 | *lcp = lnext(*lcp); |
518 | SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid)); |
519 | } |
520 | |
521 | SRF_RETURN_DONE(funcctx); |
522 | } |
523 | |