1 | /*------------------------------------------------------------------------- |
2 | * relation.c |
3 | * PostgreSQL logical replication |
4 | * |
5 | * Copyright (c) 2016-2019, PostgreSQL Global Development Group |
6 | * |
7 | * IDENTIFICATION |
8 | * src/backend/replication/logical/relation.c |
9 | * |
10 | * NOTES |
11 | * This file contains helper functions for logical replication relation |
12 | * mapping cache. |
13 | * |
14 | *------------------------------------------------------------------------- |
15 | */ |
16 | |
17 | #include "postgres.h" |
18 | |
19 | #include "access/sysattr.h" |
20 | #include "access/table.h" |
21 | #include "catalog/namespace.h" |
22 | #include "catalog/pg_subscription_rel.h" |
23 | #include "executor/executor.h" |
24 | #include "nodes/makefuncs.h" |
25 | #include "replication/logicalrelation.h" |
26 | #include "replication/worker_internal.h" |
27 | #include "utils/builtins.h" |
28 | #include "utils/inval.h" |
29 | #include "utils/lsyscache.h" |
30 | #include "utils/memutils.h" |
31 | #include "utils/syscache.h" |
32 | |
33 | static MemoryContext LogicalRepRelMapContext = NULL; |
34 | |
35 | static HTAB *LogicalRepRelMap = NULL; |
36 | static HTAB *LogicalRepTypMap = NULL; |
37 | |
38 | |
39 | /* |
40 | * Relcache invalidation callback for our relation map cache. |
41 | */ |
42 | static void |
43 | logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) |
44 | { |
45 | LogicalRepRelMapEntry *entry; |
46 | |
47 | /* Just to be sure. */ |
48 | if (LogicalRepRelMap == NULL) |
49 | return; |
50 | |
51 | if (reloid != InvalidOid) |
52 | { |
53 | HASH_SEQ_STATUS status; |
54 | |
55 | hash_seq_init(&status, LogicalRepRelMap); |
56 | |
57 | /* TODO, use inverse lookup hashtable? */ |
58 | while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
59 | { |
60 | if (entry->localreloid == reloid) |
61 | { |
62 | entry->localreloid = InvalidOid; |
63 | hash_seq_term(&status); |
64 | break; |
65 | } |
66 | } |
67 | } |
68 | else |
69 | { |
70 | /* invalidate all cache entries */ |
71 | HASH_SEQ_STATUS status; |
72 | |
73 | hash_seq_init(&status, LogicalRepRelMap); |
74 | |
75 | while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
76 | entry->localreloid = InvalidOid; |
77 | } |
78 | } |
79 | |
80 | /* |
81 | * Initialize the relation map cache. |
82 | */ |
83 | static void |
84 | logicalrep_relmap_init(void) |
85 | { |
86 | HASHCTL ctl; |
87 | |
88 | if (!LogicalRepRelMapContext) |
89 | LogicalRepRelMapContext = |
90 | AllocSetContextCreate(CacheMemoryContext, |
91 | "LogicalRepRelMapContext" , |
92 | ALLOCSET_DEFAULT_SIZES); |
93 | |
94 | /* Initialize the relation hash table. */ |
95 | MemSet(&ctl, 0, sizeof(ctl)); |
96 | ctl.keysize = sizeof(LogicalRepRelId); |
97 | ctl.entrysize = sizeof(LogicalRepRelMapEntry); |
98 | ctl.hcxt = LogicalRepRelMapContext; |
99 | |
100 | LogicalRepRelMap = hash_create("logicalrep relation map cache" , 128, &ctl, |
101 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
102 | |
103 | /* Initialize the type hash table. */ |
104 | MemSet(&ctl, 0, sizeof(ctl)); |
105 | ctl.keysize = sizeof(Oid); |
106 | ctl.entrysize = sizeof(LogicalRepTyp); |
107 | ctl.hcxt = LogicalRepRelMapContext; |
108 | |
109 | /* This will usually be small. */ |
110 | LogicalRepTypMap = hash_create("logicalrep type map cache" , 2, &ctl, |
111 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
112 | |
113 | /* Watch for invalidation events. */ |
114 | CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, |
115 | (Datum) 0); |
116 | } |
117 | |
118 | /* |
119 | * Free the entry of a relation map cache. |
120 | */ |
121 | static void |
122 | logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) |
123 | { |
124 | LogicalRepRelation *remoterel; |
125 | |
126 | remoterel = &entry->remoterel; |
127 | |
128 | pfree(remoterel->nspname); |
129 | pfree(remoterel->relname); |
130 | |
131 | if (remoterel->natts > 0) |
132 | { |
133 | int i; |
134 | |
135 | for (i = 0; i < remoterel->natts; i++) |
136 | pfree(remoterel->attnames[i]); |
137 | |
138 | pfree(remoterel->attnames); |
139 | pfree(remoterel->atttyps); |
140 | } |
141 | bms_free(remoterel->attkeys); |
142 | |
143 | if (entry->attrmap) |
144 | pfree(entry->attrmap); |
145 | } |
146 | |
147 | /* |
148 | * Add new entry or update existing entry in the relation map cache. |
149 | * |
150 | * Called when new relation mapping is sent by the publisher to update |
151 | * our expected view of incoming data from said publisher. |
152 | */ |
153 | void |
154 | logicalrep_relmap_update(LogicalRepRelation *remoterel) |
155 | { |
156 | MemoryContext oldctx; |
157 | LogicalRepRelMapEntry *entry; |
158 | bool found; |
159 | int i; |
160 | |
161 | if (LogicalRepRelMap == NULL) |
162 | logicalrep_relmap_init(); |
163 | |
164 | /* |
165 | * HASH_ENTER returns the existing entry if present or creates a new one. |
166 | */ |
167 | entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid, |
168 | HASH_ENTER, &found); |
169 | |
170 | if (found) |
171 | logicalrep_relmap_free_entry(entry); |
172 | |
173 | memset(entry, 0, sizeof(LogicalRepRelMapEntry)); |
174 | |
175 | /* Make cached copy of the data */ |
176 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
177 | entry->remoterel.remoteid = remoterel->remoteid; |
178 | entry->remoterel.nspname = pstrdup(remoterel->nspname); |
179 | entry->remoterel.relname = pstrdup(remoterel->relname); |
180 | entry->remoterel.natts = remoterel->natts; |
181 | entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
182 | entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
183 | for (i = 0; i < remoterel->natts; i++) |
184 | { |
185 | entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
186 | entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
187 | } |
188 | entry->remoterel.replident = remoterel->replident; |
189 | entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
190 | MemoryContextSwitchTo(oldctx); |
191 | } |
192 | |
193 | /* |
194 | * Find attribute index in TupleDesc struct by attribute name. |
195 | * |
196 | * Returns -1 if not found. |
197 | */ |
198 | static int |
199 | logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) |
200 | { |
201 | int i; |
202 | |
203 | for (i = 0; i < remoterel->natts; i++) |
204 | { |
205 | if (strcmp(remoterel->attnames[i], attname) == 0) |
206 | return i; |
207 | } |
208 | |
209 | return -1; |
210 | } |
211 | |
212 | /* |
213 | * Open the local relation associated with the remote one. |
214 | * |
215 | * Optionally rebuilds the Relcache mapping if it was invalidated |
216 | * by local DDL. |
217 | */ |
218 | LogicalRepRelMapEntry * |
219 | logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) |
220 | { |
221 | LogicalRepRelMapEntry *entry; |
222 | bool found; |
223 | |
224 | if (LogicalRepRelMap == NULL) |
225 | logicalrep_relmap_init(); |
226 | |
227 | /* Search for existing entry. */ |
228 | entry = hash_search(LogicalRepRelMap, (void *) &remoteid, |
229 | HASH_FIND, &found); |
230 | |
231 | if (!found) |
232 | elog(ERROR, "no relation map entry for remote relation ID %u" , |
233 | remoteid); |
234 | |
235 | /* Need to update the local cache? */ |
236 | if (!OidIsValid(entry->localreloid)) |
237 | { |
238 | Oid relid; |
239 | int i; |
240 | int found; |
241 | Bitmapset *idkey; |
242 | TupleDesc desc; |
243 | LogicalRepRelation *remoterel; |
244 | MemoryContext oldctx; |
245 | |
246 | remoterel = &entry->remoterel; |
247 | |
248 | /* Try to find and lock the relation by name. */ |
249 | relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, |
250 | remoterel->relname, -1), |
251 | lockmode, true); |
252 | if (!OidIsValid(relid)) |
253 | ereport(ERROR, |
254 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
255 | errmsg("logical replication target relation \"%s.%s\" does not exist" , |
256 | remoterel->nspname, remoterel->relname))); |
257 | entry->localrel = table_open(relid, NoLock); |
258 | |
259 | /* Check for supported relkind. */ |
260 | CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, |
261 | remoterel->nspname, remoterel->relname); |
262 | |
263 | /* |
264 | * Build the mapping of local attribute numbers to remote attribute |
265 | * numbers and validate that we don't miss any replicated columns as |
266 | * that would result in potentially unwanted data loss. |
267 | */ |
268 | desc = RelationGetDescr(entry->localrel); |
269 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
270 | entry->attrmap = palloc(desc->natts * sizeof(int)); |
271 | MemoryContextSwitchTo(oldctx); |
272 | |
273 | found = 0; |
274 | for (i = 0; i < desc->natts; i++) |
275 | { |
276 | int attnum; |
277 | Form_pg_attribute attr = TupleDescAttr(desc, i); |
278 | |
279 | if (attr->attisdropped || attr->attgenerated) |
280 | { |
281 | entry->attrmap[i] = -1; |
282 | continue; |
283 | } |
284 | |
285 | attnum = logicalrep_rel_att_by_name(remoterel, |
286 | NameStr(attr->attname)); |
287 | |
288 | entry->attrmap[i] = attnum; |
289 | if (attnum >= 0) |
290 | found++; |
291 | } |
292 | |
293 | /* TODO, detail message with names of missing columns */ |
294 | if (found < remoterel->natts) |
295 | ereport(ERROR, |
296 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
297 | errmsg("logical replication target relation \"%s.%s\" is missing " |
298 | "some replicated columns" , |
299 | remoterel->nspname, remoterel->relname))); |
300 | |
301 | /* |
302 | * Check that replica identity matches. We allow for stricter replica |
303 | * identity (fewer columns) on subscriber as that will not stop us |
304 | * from finding unique tuple. IE, if publisher has identity |
305 | * (id,timestamp) and subscriber just (id) this will not be a problem, |
306 | * but in the opposite scenario it will. |
307 | * |
308 | * Don't throw any error here just mark the relation entry as not |
309 | * updatable, as replica identity is only for updates and deletes but |
310 | * inserts can be replicated even without it. |
311 | */ |
312 | entry->updatable = true; |
313 | idkey = RelationGetIndexAttrBitmap(entry->localrel, |
314 | INDEX_ATTR_BITMAP_IDENTITY_KEY); |
315 | /* fallback to PK if no replica identity */ |
316 | if (idkey == NULL) |
317 | { |
318 | idkey = RelationGetIndexAttrBitmap(entry->localrel, |
319 | INDEX_ATTR_BITMAP_PRIMARY_KEY); |
320 | |
321 | /* |
322 | * If no replica identity index and no PK, the published table |
323 | * must have replica identity FULL. |
324 | */ |
325 | if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) |
326 | entry->updatable = false; |
327 | } |
328 | |
329 | i = -1; |
330 | while ((i = bms_next_member(idkey, i)) >= 0) |
331 | { |
332 | int attnum = i + FirstLowInvalidHeapAttributeNumber; |
333 | |
334 | if (!AttrNumberIsForUserDefinedAttr(attnum)) |
335 | ereport(ERROR, |
336 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
337 | errmsg("logical replication target relation \"%s.%s\" uses " |
338 | "system columns in REPLICA IDENTITY index" , |
339 | remoterel->nspname, remoterel->relname))); |
340 | |
341 | attnum = AttrNumberGetAttrOffset(attnum); |
342 | |
343 | if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys)) |
344 | { |
345 | entry->updatable = false; |
346 | break; |
347 | } |
348 | } |
349 | |
350 | entry->localreloid = relid; |
351 | } |
352 | else |
353 | entry->localrel = table_open(entry->localreloid, lockmode); |
354 | |
355 | if (entry->state != SUBREL_STATE_READY) |
356 | entry->state = GetSubscriptionRelState(MySubscription->oid, |
357 | entry->localreloid, |
358 | &entry->statelsn, |
359 | true); |
360 | |
361 | return entry; |
362 | } |
363 | |
364 | /* |
365 | * Close the previously opened logical relation. |
366 | */ |
367 | void |
368 | logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) |
369 | { |
370 | table_close(rel->localrel, lockmode); |
371 | rel->localrel = NULL; |
372 | } |
373 | |
374 | /* |
375 | * Free the type map cache entry data. |
376 | */ |
377 | static void |
378 | logicalrep_typmap_free_entry(LogicalRepTyp *entry) |
379 | { |
380 | pfree(entry->nspname); |
381 | pfree(entry->typname); |
382 | } |
383 | |
384 | /* |
385 | * Add new entry or update existing entry in the type map cache. |
386 | */ |
387 | void |
388 | logicalrep_typmap_update(LogicalRepTyp *remotetyp) |
389 | { |
390 | MemoryContext oldctx; |
391 | LogicalRepTyp *entry; |
392 | bool found; |
393 | |
394 | if (LogicalRepTypMap == NULL) |
395 | logicalrep_relmap_init(); |
396 | |
397 | /* |
398 | * HASH_ENTER returns the existing entry if present or creates a new one. |
399 | */ |
400 | entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid, |
401 | HASH_ENTER, &found); |
402 | |
403 | if (found) |
404 | logicalrep_typmap_free_entry(entry); |
405 | |
406 | /* Make cached copy of the data */ |
407 | entry->remoteid = remotetyp->remoteid; |
408 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
409 | entry->nspname = pstrdup(remotetyp->nspname); |
410 | entry->typname = pstrdup(remotetyp->typname); |
411 | MemoryContextSwitchTo(oldctx); |
412 | } |
413 | |
414 | /* |
415 | * Fetch type name from the cache by remote type OID. |
416 | * |
417 | * Return a substitute value if we cannot find the data type; no message is |
418 | * sent to the log in that case, because this is used by error callback |
419 | * already. |
420 | */ |
421 | char * |
422 | logicalrep_typmap_gettypname(Oid remoteid) |
423 | { |
424 | LogicalRepTyp *entry; |
425 | bool found; |
426 | |
427 | /* Internal types are mapped directly. */ |
428 | if (remoteid < FirstGenbkiObjectId) |
429 | { |
430 | if (!get_typisdefined(remoteid)) |
431 | { |
432 | /* |
433 | * This can be caused by having a publisher with a higher |
434 | * PostgreSQL major version than the subscriber. |
435 | */ |
436 | return psprintf("unrecognized %u" , remoteid); |
437 | } |
438 | |
439 | return format_type_be(remoteid); |
440 | } |
441 | |
442 | if (LogicalRepTypMap == NULL) |
443 | { |
444 | /* |
445 | * If the typemap is not initialized yet, we cannot possibly attempt |
446 | * to search the hash table; but there's no way we know the type |
447 | * locally yet, since we haven't received a message about this type, |
448 | * so this is the best we can do. |
449 | */ |
450 | return psprintf("unrecognized %u" , remoteid); |
451 | } |
452 | |
453 | /* search the mapping */ |
454 | entry = hash_search(LogicalRepTypMap, (void *) &remoteid, |
455 | HASH_FIND, &found); |
456 | if (!found) |
457 | return psprintf("unrecognized %u" , remoteid); |
458 | |
459 | Assert(OidIsValid(entry->remoteid)); |
460 | return psprintf("%s.%s" , entry->nspname, entry->typname); |
461 | } |
462 | |