1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * pg_subscription.c |
4 | * replication subscriptions |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * IDENTIFICATION |
10 | * src/backend/catalog/pg_subscription.c |
11 | * |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | |
15 | #include "postgres.h" |
16 | |
17 | #include "miscadmin.h" |
18 | |
19 | #include "access/genam.h" |
20 | #include "access/heapam.h" |
21 | #include "access/htup_details.h" |
22 | #include "access/tableam.h" |
23 | #include "access/xact.h" |
24 | |
25 | #include "catalog/indexing.h" |
26 | #include "catalog/pg_type.h" |
27 | #include "catalog/pg_subscription.h" |
28 | #include "catalog/pg_subscription_rel.h" |
29 | |
30 | #include "nodes/makefuncs.h" |
31 | |
32 | #include "storage/lmgr.h" |
33 | |
34 | #include "utils/array.h" |
35 | #include "utils/builtins.h" |
36 | #include "utils/fmgroids.h" |
37 | #include "utils/pg_lsn.h" |
38 | #include "utils/rel.h" |
39 | #include "utils/syscache.h" |
40 | |
41 | |
42 | static List *textarray_to_stringlist(ArrayType *textarray); |
43 | |
44 | /* |
45 | * Fetch the subscription from the syscache. |
46 | */ |
47 | Subscription * |
48 | GetSubscription(Oid subid, bool missing_ok) |
49 | { |
50 | HeapTuple tup; |
51 | Subscription *sub; |
52 | Form_pg_subscription subform; |
53 | Datum datum; |
54 | bool isnull; |
55 | |
56 | tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); |
57 | |
58 | if (!HeapTupleIsValid(tup)) |
59 | { |
60 | if (missing_ok) |
61 | return NULL; |
62 | |
63 | elog(ERROR, "cache lookup failed for subscription %u" , subid); |
64 | } |
65 | |
66 | subform = (Form_pg_subscription) GETSTRUCT(tup); |
67 | |
68 | sub = (Subscription *) palloc(sizeof(Subscription)); |
69 | sub->oid = subid; |
70 | sub->dbid = subform->subdbid; |
71 | sub->name = pstrdup(NameStr(subform->subname)); |
72 | sub->owner = subform->subowner; |
73 | sub->enabled = subform->subenabled; |
74 | |
75 | /* Get conninfo */ |
76 | datum = SysCacheGetAttr(SUBSCRIPTIONOID, |
77 | tup, |
78 | Anum_pg_subscription_subconninfo, |
79 | &isnull); |
80 | Assert(!isnull); |
81 | sub->conninfo = TextDatumGetCString(datum); |
82 | |
83 | /* Get slotname */ |
84 | datum = SysCacheGetAttr(SUBSCRIPTIONOID, |
85 | tup, |
86 | Anum_pg_subscription_subslotname, |
87 | &isnull); |
88 | if (!isnull) |
89 | sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); |
90 | else |
91 | sub->slotname = NULL; |
92 | |
93 | /* Get synccommit */ |
94 | datum = SysCacheGetAttr(SUBSCRIPTIONOID, |
95 | tup, |
96 | Anum_pg_subscription_subsynccommit, |
97 | &isnull); |
98 | Assert(!isnull); |
99 | sub->synccommit = TextDatumGetCString(datum); |
100 | |
101 | /* Get publications */ |
102 | datum = SysCacheGetAttr(SUBSCRIPTIONOID, |
103 | tup, |
104 | Anum_pg_subscription_subpublications, |
105 | &isnull); |
106 | Assert(!isnull); |
107 | sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); |
108 | |
109 | ReleaseSysCache(tup); |
110 | |
111 | return sub; |
112 | } |
113 | |
114 | /* |
115 | * Return number of subscriptions defined in given database. |
116 | * Used by dropdb() to check if database can indeed be dropped. |
117 | */ |
118 | int |
119 | CountDBSubscriptions(Oid dbid) |
120 | { |
121 | int nsubs = 0; |
122 | Relation rel; |
123 | ScanKeyData scankey; |
124 | SysScanDesc scan; |
125 | HeapTuple tup; |
126 | |
127 | rel = table_open(SubscriptionRelationId, RowExclusiveLock); |
128 | |
129 | ScanKeyInit(&scankey, |
130 | Anum_pg_subscription_subdbid, |
131 | BTEqualStrategyNumber, F_OIDEQ, |
132 | ObjectIdGetDatum(dbid)); |
133 | |
134 | scan = systable_beginscan(rel, InvalidOid, false, |
135 | NULL, 1, &scankey); |
136 | |
137 | while (HeapTupleIsValid(tup = systable_getnext(scan))) |
138 | nsubs++; |
139 | |
140 | systable_endscan(scan); |
141 | |
142 | table_close(rel, NoLock); |
143 | |
144 | return nsubs; |
145 | } |
146 | |
147 | /* |
148 | * Free memory allocated by subscription struct. |
149 | */ |
150 | void |
151 | FreeSubscription(Subscription *sub) |
152 | { |
153 | pfree(sub->name); |
154 | pfree(sub->conninfo); |
155 | if (sub->slotname) |
156 | pfree(sub->slotname); |
157 | list_free_deep(sub->publications); |
158 | pfree(sub); |
159 | } |
160 | |
161 | /* |
162 | * get_subscription_oid - given a subscription name, look up the OID |
163 | * |
164 | * If missing_ok is false, throw an error if name not found. If true, just |
165 | * return InvalidOid. |
166 | */ |
167 | Oid |
168 | get_subscription_oid(const char *subname, bool missing_ok) |
169 | { |
170 | Oid oid; |
171 | |
172 | oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid, |
173 | MyDatabaseId, CStringGetDatum(subname)); |
174 | if (!OidIsValid(oid) && !missing_ok) |
175 | ereport(ERROR, |
176 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
177 | errmsg("subscription \"%s\" does not exist" , subname))); |
178 | return oid; |
179 | } |
180 | |
181 | /* |
182 | * get_subscription_name - given a subscription OID, look up the name |
183 | * |
184 | * If missing_ok is false, throw an error if name not found. If true, just |
185 | * return NULL. |
186 | */ |
187 | char * |
188 | get_subscription_name(Oid subid, bool missing_ok) |
189 | { |
190 | HeapTuple tup; |
191 | char *subname; |
192 | Form_pg_subscription subform; |
193 | |
194 | tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); |
195 | |
196 | if (!HeapTupleIsValid(tup)) |
197 | { |
198 | if (!missing_ok) |
199 | elog(ERROR, "cache lookup failed for subscription %u" , subid); |
200 | return NULL; |
201 | } |
202 | |
203 | subform = (Form_pg_subscription) GETSTRUCT(tup); |
204 | subname = pstrdup(NameStr(subform->subname)); |
205 | |
206 | ReleaseSysCache(tup); |
207 | |
208 | return subname; |
209 | } |
210 | |
211 | /* |
212 | * Convert text array to list of strings. |
213 | * |
214 | * Note: the resulting list of strings is pallocated here. |
215 | */ |
216 | static List * |
217 | textarray_to_stringlist(ArrayType *textarray) |
218 | { |
219 | Datum *elems; |
220 | int nelems, |
221 | i; |
222 | List *res = NIL; |
223 | |
224 | deconstruct_array(textarray, |
225 | TEXTOID, -1, false, 'i', |
226 | &elems, NULL, &nelems); |
227 | |
228 | if (nelems == 0) |
229 | return NIL; |
230 | |
231 | for (i = 0; i < nelems; i++) |
232 | res = lappend(res, makeString(TextDatumGetCString(elems[i]))); |
233 | |
234 | return res; |
235 | } |
236 | |
237 | /* |
238 | * Add new state record for a subscription table. |
239 | */ |
240 | void |
241 | AddSubscriptionRelState(Oid subid, Oid relid, char state, |
242 | XLogRecPtr sublsn) |
243 | { |
244 | Relation rel; |
245 | HeapTuple tup; |
246 | bool nulls[Natts_pg_subscription_rel]; |
247 | Datum values[Natts_pg_subscription_rel]; |
248 | |
249 | LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); |
250 | |
251 | rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); |
252 | |
253 | /* Try finding existing mapping. */ |
254 | tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, |
255 | ObjectIdGetDatum(relid), |
256 | ObjectIdGetDatum(subid)); |
257 | if (HeapTupleIsValid(tup)) |
258 | elog(ERROR, "subscription table %u in subscription %u already exists" , |
259 | relid, subid); |
260 | |
261 | /* Form the tuple. */ |
262 | memset(values, 0, sizeof(values)); |
263 | memset(nulls, false, sizeof(nulls)); |
264 | values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); |
265 | values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); |
266 | values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); |
267 | if (sublsn != InvalidXLogRecPtr) |
268 | values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); |
269 | else |
270 | nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; |
271 | |
272 | tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); |
273 | |
274 | /* Insert tuple into catalog. */ |
275 | CatalogTupleInsert(rel, tup); |
276 | |
277 | heap_freetuple(tup); |
278 | |
279 | /* Cleanup. */ |
280 | table_close(rel, NoLock); |
281 | } |
282 | |
283 | /* |
284 | * Update the state of a subscription table. |
285 | */ |
286 | void |
287 | UpdateSubscriptionRelState(Oid subid, Oid relid, char state, |
288 | XLogRecPtr sublsn) |
289 | { |
290 | Relation rel; |
291 | HeapTuple tup; |
292 | bool nulls[Natts_pg_subscription_rel]; |
293 | Datum values[Natts_pg_subscription_rel]; |
294 | bool replaces[Natts_pg_subscription_rel]; |
295 | |
296 | LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); |
297 | |
298 | rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); |
299 | |
300 | /* Try finding existing mapping. */ |
301 | tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, |
302 | ObjectIdGetDatum(relid), |
303 | ObjectIdGetDatum(subid)); |
304 | if (!HeapTupleIsValid(tup)) |
305 | elog(ERROR, "subscription table %u in subscription %u does not exist" , |
306 | relid, subid); |
307 | |
308 | /* Update the tuple. */ |
309 | memset(values, 0, sizeof(values)); |
310 | memset(nulls, false, sizeof(nulls)); |
311 | memset(replaces, false, sizeof(replaces)); |
312 | |
313 | replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; |
314 | values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); |
315 | |
316 | replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; |
317 | if (sublsn != InvalidXLogRecPtr) |
318 | values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); |
319 | else |
320 | nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; |
321 | |
322 | tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, |
323 | replaces); |
324 | |
325 | /* Update the catalog. */ |
326 | CatalogTupleUpdate(rel, &tup->t_self, tup); |
327 | |
328 | /* Cleanup. */ |
329 | table_close(rel, NoLock); |
330 | } |
331 | |
332 | /* |
333 | * Get state of subscription table. |
334 | * |
335 | * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true. |
336 | */ |
337 | char |
338 | GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, |
339 | bool missing_ok) |
340 | { |
341 | Relation rel; |
342 | HeapTuple tup; |
343 | char substate; |
344 | bool isnull; |
345 | Datum d; |
346 | |
347 | rel = table_open(SubscriptionRelRelationId, AccessShareLock); |
348 | |
349 | /* Try finding the mapping. */ |
350 | tup = SearchSysCache2(SUBSCRIPTIONRELMAP, |
351 | ObjectIdGetDatum(relid), |
352 | ObjectIdGetDatum(subid)); |
353 | |
354 | if (!HeapTupleIsValid(tup)) |
355 | { |
356 | if (missing_ok) |
357 | { |
358 | table_close(rel, AccessShareLock); |
359 | *sublsn = InvalidXLogRecPtr; |
360 | return SUBREL_STATE_UNKNOWN; |
361 | } |
362 | |
363 | elog(ERROR, "subscription table %u in subscription %u does not exist" , |
364 | relid, subid); |
365 | } |
366 | |
367 | /* Get the state. */ |
368 | d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, |
369 | Anum_pg_subscription_rel_srsubstate, &isnull); |
370 | Assert(!isnull); |
371 | substate = DatumGetChar(d); |
372 | d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, |
373 | Anum_pg_subscription_rel_srsublsn, &isnull); |
374 | if (isnull) |
375 | *sublsn = InvalidXLogRecPtr; |
376 | else |
377 | *sublsn = DatumGetLSN(d); |
378 | |
379 | /* Cleanup */ |
380 | ReleaseSysCache(tup); |
381 | table_close(rel, AccessShareLock); |
382 | |
383 | return substate; |
384 | } |
385 | |
386 | /* |
387 | * Drop subscription relation mapping. These can be for a particular |
388 | * subscription, or for a particular relation, or both. |
389 | */ |
390 | void |
391 | RemoveSubscriptionRel(Oid subid, Oid relid) |
392 | { |
393 | Relation rel; |
394 | TableScanDesc scan; |
395 | ScanKeyData skey[2]; |
396 | HeapTuple tup; |
397 | int nkeys = 0; |
398 | |
399 | rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); |
400 | |
401 | if (OidIsValid(subid)) |
402 | { |
403 | ScanKeyInit(&skey[nkeys++], |
404 | Anum_pg_subscription_rel_srsubid, |
405 | BTEqualStrategyNumber, |
406 | F_OIDEQ, |
407 | ObjectIdGetDatum(subid)); |
408 | } |
409 | |
410 | if (OidIsValid(relid)) |
411 | { |
412 | ScanKeyInit(&skey[nkeys++], |
413 | Anum_pg_subscription_rel_srrelid, |
414 | BTEqualStrategyNumber, |
415 | F_OIDEQ, |
416 | ObjectIdGetDatum(relid)); |
417 | } |
418 | |
419 | /* Do the search and delete what we found. */ |
420 | scan = table_beginscan_catalog(rel, nkeys, skey); |
421 | while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) |
422 | { |
423 | CatalogTupleDelete(rel, &tup->t_self); |
424 | } |
425 | table_endscan(scan); |
426 | |
427 | table_close(rel, RowExclusiveLock); |
428 | } |
429 | |
430 | |
431 | /* |
432 | * Get all relations for subscription. |
433 | * |
434 | * Returned list is palloc'ed in current memory context. |
435 | */ |
436 | List * |
437 | GetSubscriptionRelations(Oid subid) |
438 | { |
439 | List *res = NIL; |
440 | Relation rel; |
441 | HeapTuple tup; |
442 | int nkeys = 0; |
443 | ScanKeyData skey[2]; |
444 | SysScanDesc scan; |
445 | |
446 | rel = table_open(SubscriptionRelRelationId, AccessShareLock); |
447 | |
448 | ScanKeyInit(&skey[nkeys++], |
449 | Anum_pg_subscription_rel_srsubid, |
450 | BTEqualStrategyNumber, F_OIDEQ, |
451 | ObjectIdGetDatum(subid)); |
452 | |
453 | scan = systable_beginscan(rel, InvalidOid, false, |
454 | NULL, nkeys, skey); |
455 | |
456 | while (HeapTupleIsValid(tup = systable_getnext(scan))) |
457 | { |
458 | Form_pg_subscription_rel subrel; |
459 | SubscriptionRelState *relstate; |
460 | |
461 | subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); |
462 | |
463 | relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); |
464 | relstate->relid = subrel->srrelid; |
465 | relstate->state = subrel->srsubstate; |
466 | relstate->lsn = subrel->srsublsn; |
467 | |
468 | res = lappend(res, relstate); |
469 | } |
470 | |
471 | /* Cleanup */ |
472 | systable_endscan(scan); |
473 | table_close(rel, AccessShareLock); |
474 | |
475 | return res; |
476 | } |
477 | |
478 | /* |
479 | * Get all relations for subscription that are not in a ready state. |
480 | * |
481 | * Returned list is palloc'ed in current memory context. |
482 | */ |
483 | List * |
484 | GetSubscriptionNotReadyRelations(Oid subid) |
485 | { |
486 | List *res = NIL; |
487 | Relation rel; |
488 | HeapTuple tup; |
489 | int nkeys = 0; |
490 | ScanKeyData skey[2]; |
491 | SysScanDesc scan; |
492 | |
493 | rel = table_open(SubscriptionRelRelationId, AccessShareLock); |
494 | |
495 | ScanKeyInit(&skey[nkeys++], |
496 | Anum_pg_subscription_rel_srsubid, |
497 | BTEqualStrategyNumber, F_OIDEQ, |
498 | ObjectIdGetDatum(subid)); |
499 | |
500 | ScanKeyInit(&skey[nkeys++], |
501 | Anum_pg_subscription_rel_srsubstate, |
502 | BTEqualStrategyNumber, F_CHARNE, |
503 | CharGetDatum(SUBREL_STATE_READY)); |
504 | |
505 | scan = systable_beginscan(rel, InvalidOid, false, |
506 | NULL, nkeys, skey); |
507 | |
508 | while (HeapTupleIsValid(tup = systable_getnext(scan))) |
509 | { |
510 | Form_pg_subscription_rel subrel; |
511 | SubscriptionRelState *relstate; |
512 | |
513 | subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); |
514 | |
515 | relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); |
516 | relstate->relid = subrel->srrelid; |
517 | relstate->state = subrel->srsubstate; |
518 | relstate->lsn = subrel->srsublsn; |
519 | |
520 | res = lappend(res, relstate); |
521 | } |
522 | |
523 | /* Cleanup */ |
524 | systable_endscan(scan); |
525 | table_close(rel, AccessShareLock); |
526 | |
527 | return res; |
528 | } |
529 | |