1/*-------------------------------------------------------------------------
2 *
3 * pg_subscription.c
4 * replication subscriptions
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "miscadmin.h"
18
19#include "access/genam.h"
20#include "access/heapam.h"
21#include "access/htup_details.h"
22#include "access/tableam.h"
23#include "access/xact.h"
24
25#include "catalog/indexing.h"
26#include "catalog/pg_type.h"
27#include "catalog/pg_subscription.h"
28#include "catalog/pg_subscription_rel.h"
29
30#include "nodes/makefuncs.h"
31
32#include "storage/lmgr.h"
33
34#include "utils/array.h"
35#include "utils/builtins.h"
36#include "utils/fmgroids.h"
37#include "utils/pg_lsn.h"
38#include "utils/rel.h"
39#include "utils/syscache.h"
40
41
42static List *textarray_to_stringlist(ArrayType *textarray);
43
44/*
45 * Fetch the subscription from the syscache.
46 */
47Subscription *
48GetSubscription(Oid subid, bool missing_ok)
49{
50 HeapTuple tup;
51 Subscription *sub;
52 Form_pg_subscription subform;
53 Datum datum;
54 bool isnull;
55
56 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
57
58 if (!HeapTupleIsValid(tup))
59 {
60 if (missing_ok)
61 return NULL;
62
63 elog(ERROR, "cache lookup failed for subscription %u", subid);
64 }
65
66 subform = (Form_pg_subscription) GETSTRUCT(tup);
67
68 sub = (Subscription *) palloc(sizeof(Subscription));
69 sub->oid = subid;
70 sub->dbid = subform->subdbid;
71 sub->name = pstrdup(NameStr(subform->subname));
72 sub->owner = subform->subowner;
73 sub->enabled = subform->subenabled;
74
75 /* Get conninfo */
76 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
77 tup,
78 Anum_pg_subscription_subconninfo,
79 &isnull);
80 Assert(!isnull);
81 sub->conninfo = TextDatumGetCString(datum);
82
83 /* Get slotname */
84 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
85 tup,
86 Anum_pg_subscription_subslotname,
87 &isnull);
88 if (!isnull)
89 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90 else
91 sub->slotname = NULL;
92
93 /* Get synccommit */
94 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
95 tup,
96 Anum_pg_subscription_subsynccommit,
97 &isnull);
98 Assert(!isnull);
99 sub->synccommit = TextDatumGetCString(datum);
100
101 /* Get publications */
102 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
103 tup,
104 Anum_pg_subscription_subpublications,
105 &isnull);
106 Assert(!isnull);
107 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
108
109 ReleaseSysCache(tup);
110
111 return sub;
112}
113
114/*
115 * Return number of subscriptions defined in given database.
116 * Used by dropdb() to check if database can indeed be dropped.
117 */
118int
119CountDBSubscriptions(Oid dbid)
120{
121 int nsubs = 0;
122 Relation rel;
123 ScanKeyData scankey;
124 SysScanDesc scan;
125 HeapTuple tup;
126
127 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
128
129 ScanKeyInit(&scankey,
130 Anum_pg_subscription_subdbid,
131 BTEqualStrategyNumber, F_OIDEQ,
132 ObjectIdGetDatum(dbid));
133
134 scan = systable_beginscan(rel, InvalidOid, false,
135 NULL, 1, &scankey);
136
137 while (HeapTupleIsValid(tup = systable_getnext(scan)))
138 nsubs++;
139
140 systable_endscan(scan);
141
142 table_close(rel, NoLock);
143
144 return nsubs;
145}
146
147/*
148 * Free memory allocated by subscription struct.
149 */
150void
151FreeSubscription(Subscription *sub)
152{
153 pfree(sub->name);
154 pfree(sub->conninfo);
155 if (sub->slotname)
156 pfree(sub->slotname);
157 list_free_deep(sub->publications);
158 pfree(sub);
159}
160
161/*
162 * get_subscription_oid - given a subscription name, look up the OID
163 *
164 * If missing_ok is false, throw an error if name not found. If true, just
165 * return InvalidOid.
166 */
167Oid
168get_subscription_oid(const char *subname, bool missing_ok)
169{
170 Oid oid;
171
172 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
173 MyDatabaseId, CStringGetDatum(subname));
174 if (!OidIsValid(oid) && !missing_ok)
175 ereport(ERROR,
176 (errcode(ERRCODE_UNDEFINED_OBJECT),
177 errmsg("subscription \"%s\" does not exist", subname)));
178 return oid;
179}
180
181/*
182 * get_subscription_name - given a subscription OID, look up the name
183 *
184 * If missing_ok is false, throw an error if name not found. If true, just
185 * return NULL.
186 */
187char *
188get_subscription_name(Oid subid, bool missing_ok)
189{
190 HeapTuple tup;
191 char *subname;
192 Form_pg_subscription subform;
193
194 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
195
196 if (!HeapTupleIsValid(tup))
197 {
198 if (!missing_ok)
199 elog(ERROR, "cache lookup failed for subscription %u", subid);
200 return NULL;
201 }
202
203 subform = (Form_pg_subscription) GETSTRUCT(tup);
204 subname = pstrdup(NameStr(subform->subname));
205
206 ReleaseSysCache(tup);
207
208 return subname;
209}
210
211/*
212 * Convert text array to list of strings.
213 *
214 * Note: the resulting list of strings is pallocated here.
215 */
216static List *
217textarray_to_stringlist(ArrayType *textarray)
218{
219 Datum *elems;
220 int nelems,
221 i;
222 List *res = NIL;
223
224 deconstruct_array(textarray,
225 TEXTOID, -1, false, 'i',
226 &elems, NULL, &nelems);
227
228 if (nelems == 0)
229 return NIL;
230
231 for (i = 0; i < nelems; i++)
232 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
233
234 return res;
235}
236
237/*
238 * Add new state record for a subscription table.
239 */
240void
241AddSubscriptionRelState(Oid subid, Oid relid, char state,
242 XLogRecPtr sublsn)
243{
244 Relation rel;
245 HeapTuple tup;
246 bool nulls[Natts_pg_subscription_rel];
247 Datum values[Natts_pg_subscription_rel];
248
249 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
250
251 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
252
253 /* Try finding existing mapping. */
254 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
255 ObjectIdGetDatum(relid),
256 ObjectIdGetDatum(subid));
257 if (HeapTupleIsValid(tup))
258 elog(ERROR, "subscription table %u in subscription %u already exists",
259 relid, subid);
260
261 /* Form the tuple. */
262 memset(values, 0, sizeof(values));
263 memset(nulls, false, sizeof(nulls));
264 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
265 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
266 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
267 if (sublsn != InvalidXLogRecPtr)
268 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
269 else
270 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
271
272 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
273
274 /* Insert tuple into catalog. */
275 CatalogTupleInsert(rel, tup);
276
277 heap_freetuple(tup);
278
279 /* Cleanup. */
280 table_close(rel, NoLock);
281}
282
283/*
284 * Update the state of a subscription table.
285 */
286void
287UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
288 XLogRecPtr sublsn)
289{
290 Relation rel;
291 HeapTuple tup;
292 bool nulls[Natts_pg_subscription_rel];
293 Datum values[Natts_pg_subscription_rel];
294 bool replaces[Natts_pg_subscription_rel];
295
296 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
297
298 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
299
300 /* Try finding existing mapping. */
301 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
302 ObjectIdGetDatum(relid),
303 ObjectIdGetDatum(subid));
304 if (!HeapTupleIsValid(tup))
305 elog(ERROR, "subscription table %u in subscription %u does not exist",
306 relid, subid);
307
308 /* Update the tuple. */
309 memset(values, 0, sizeof(values));
310 memset(nulls, false, sizeof(nulls));
311 memset(replaces, false, sizeof(replaces));
312
313 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
314 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
315
316 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
317 if (sublsn != InvalidXLogRecPtr)
318 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
319 else
320 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
321
322 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
323 replaces);
324
325 /* Update the catalog. */
326 CatalogTupleUpdate(rel, &tup->t_self, tup);
327
328 /* Cleanup. */
329 table_close(rel, NoLock);
330}
331
332/*
333 * Get state of subscription table.
334 *
335 * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
336 */
337char
338GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
339 bool missing_ok)
340{
341 Relation rel;
342 HeapTuple tup;
343 char substate;
344 bool isnull;
345 Datum d;
346
347 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
348
349 /* Try finding the mapping. */
350 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
351 ObjectIdGetDatum(relid),
352 ObjectIdGetDatum(subid));
353
354 if (!HeapTupleIsValid(tup))
355 {
356 if (missing_ok)
357 {
358 table_close(rel, AccessShareLock);
359 *sublsn = InvalidXLogRecPtr;
360 return SUBREL_STATE_UNKNOWN;
361 }
362
363 elog(ERROR, "subscription table %u in subscription %u does not exist",
364 relid, subid);
365 }
366
367 /* Get the state. */
368 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
369 Anum_pg_subscription_rel_srsubstate, &isnull);
370 Assert(!isnull);
371 substate = DatumGetChar(d);
372 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
373 Anum_pg_subscription_rel_srsublsn, &isnull);
374 if (isnull)
375 *sublsn = InvalidXLogRecPtr;
376 else
377 *sublsn = DatumGetLSN(d);
378
379 /* Cleanup */
380 ReleaseSysCache(tup);
381 table_close(rel, AccessShareLock);
382
383 return substate;
384}
385
386/*
387 * Drop subscription relation mapping. These can be for a particular
388 * subscription, or for a particular relation, or both.
389 */
390void
391RemoveSubscriptionRel(Oid subid, Oid relid)
392{
393 Relation rel;
394 TableScanDesc scan;
395 ScanKeyData skey[2];
396 HeapTuple tup;
397 int nkeys = 0;
398
399 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
400
401 if (OidIsValid(subid))
402 {
403 ScanKeyInit(&skey[nkeys++],
404 Anum_pg_subscription_rel_srsubid,
405 BTEqualStrategyNumber,
406 F_OIDEQ,
407 ObjectIdGetDatum(subid));
408 }
409
410 if (OidIsValid(relid))
411 {
412 ScanKeyInit(&skey[nkeys++],
413 Anum_pg_subscription_rel_srrelid,
414 BTEqualStrategyNumber,
415 F_OIDEQ,
416 ObjectIdGetDatum(relid));
417 }
418
419 /* Do the search and delete what we found. */
420 scan = table_beginscan_catalog(rel, nkeys, skey);
421 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
422 {
423 CatalogTupleDelete(rel, &tup->t_self);
424 }
425 table_endscan(scan);
426
427 table_close(rel, RowExclusiveLock);
428}
429
430
431/*
432 * Get all relations for subscription.
433 *
434 * Returned list is palloc'ed in current memory context.
435 */
436List *
437GetSubscriptionRelations(Oid subid)
438{
439 List *res = NIL;
440 Relation rel;
441 HeapTuple tup;
442 int nkeys = 0;
443 ScanKeyData skey[2];
444 SysScanDesc scan;
445
446 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
447
448 ScanKeyInit(&skey[nkeys++],
449 Anum_pg_subscription_rel_srsubid,
450 BTEqualStrategyNumber, F_OIDEQ,
451 ObjectIdGetDatum(subid));
452
453 scan = systable_beginscan(rel, InvalidOid, false,
454 NULL, nkeys, skey);
455
456 while (HeapTupleIsValid(tup = systable_getnext(scan)))
457 {
458 Form_pg_subscription_rel subrel;
459 SubscriptionRelState *relstate;
460
461 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
462
463 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
464 relstate->relid = subrel->srrelid;
465 relstate->state = subrel->srsubstate;
466 relstate->lsn = subrel->srsublsn;
467
468 res = lappend(res, relstate);
469 }
470
471 /* Cleanup */
472 systable_endscan(scan);
473 table_close(rel, AccessShareLock);
474
475 return res;
476}
477
478/*
479 * Get all relations for subscription that are not in a ready state.
480 *
481 * Returned list is palloc'ed in current memory context.
482 */
483List *
484GetSubscriptionNotReadyRelations(Oid subid)
485{
486 List *res = NIL;
487 Relation rel;
488 HeapTuple tup;
489 int nkeys = 0;
490 ScanKeyData skey[2];
491 SysScanDesc scan;
492
493 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
494
495 ScanKeyInit(&skey[nkeys++],
496 Anum_pg_subscription_rel_srsubid,
497 BTEqualStrategyNumber, F_OIDEQ,
498 ObjectIdGetDatum(subid));
499
500 ScanKeyInit(&skey[nkeys++],
501 Anum_pg_subscription_rel_srsubstate,
502 BTEqualStrategyNumber, F_CHARNE,
503 CharGetDatum(SUBREL_STATE_READY));
504
505 scan = systable_beginscan(rel, InvalidOid, false,
506 NULL, nkeys, skey);
507
508 while (HeapTupleIsValid(tup = systable_getnext(scan)))
509 {
510 Form_pg_subscription_rel subrel;
511 SubscriptionRelState *relstate;
512
513 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
514
515 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
516 relstate->relid = subrel->srrelid;
517 relstate->state = subrel->srsubstate;
518 relstate->lsn = subrel->srsublsn;
519
520 res = lappend(res, relstate);
521 }
522
523 /* Cleanup */
524 systable_endscan(scan);
525 table_close(rel, AccessShareLock);
526
527 return res;
528}
529