1/*-------------------------------------------------------------------------
2 * relation.c
3 * PostgreSQL logical replication
4 *
5 * Copyright (c) 2016-2019, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/relation.c
9 *
10 * NOTES
11 * This file contains helper functions for logical replication relation
12 * mapping cache.
13 *
14 *-------------------------------------------------------------------------
15 */
16
17#include "postgres.h"
18
19#include "access/sysattr.h"
20#include "access/table.h"
21#include "catalog/namespace.h"
22#include "catalog/pg_subscription_rel.h"
23#include "executor/executor.h"
24#include "nodes/makefuncs.h"
25#include "replication/logicalrelation.h"
26#include "replication/worker_internal.h"
27#include "utils/builtins.h"
28#include "utils/inval.h"
29#include "utils/lsyscache.h"
30#include "utils/memutils.h"
31#include "utils/syscache.h"
32
33static MemoryContext LogicalRepRelMapContext = NULL;
34
35static HTAB *LogicalRepRelMap = NULL;
36static HTAB *LogicalRepTypMap = NULL;
37
38
39/*
40 * Relcache invalidation callback for our relation map cache.
41 */
42static void
43logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
44{
45 LogicalRepRelMapEntry *entry;
46
47 /* Just to be sure. */
48 if (LogicalRepRelMap == NULL)
49 return;
50
51 if (reloid != InvalidOid)
52 {
53 HASH_SEQ_STATUS status;
54
55 hash_seq_init(&status, LogicalRepRelMap);
56
57 /* TODO, use inverse lookup hashtable? */
58 while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
59 {
60 if (entry->localreloid == reloid)
61 {
62 entry->localreloid = InvalidOid;
63 hash_seq_term(&status);
64 break;
65 }
66 }
67 }
68 else
69 {
70 /* invalidate all cache entries */
71 HASH_SEQ_STATUS status;
72
73 hash_seq_init(&status, LogicalRepRelMap);
74
75 while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
76 entry->localreloid = InvalidOid;
77 }
78}
79
80/*
81 * Initialize the relation map cache.
82 */
83static void
84logicalrep_relmap_init(void)
85{
86 HASHCTL ctl;
87
88 if (!LogicalRepRelMapContext)
89 LogicalRepRelMapContext =
90 AllocSetContextCreate(CacheMemoryContext,
91 "LogicalRepRelMapContext",
92 ALLOCSET_DEFAULT_SIZES);
93
94 /* Initialize the relation hash table. */
95 MemSet(&ctl, 0, sizeof(ctl));
96 ctl.keysize = sizeof(LogicalRepRelId);
97 ctl.entrysize = sizeof(LogicalRepRelMapEntry);
98 ctl.hcxt = LogicalRepRelMapContext;
99
100 LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
101 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
102
103 /* Initialize the type hash table. */
104 MemSet(&ctl, 0, sizeof(ctl));
105 ctl.keysize = sizeof(Oid);
106 ctl.entrysize = sizeof(LogicalRepTyp);
107 ctl.hcxt = LogicalRepRelMapContext;
108
109 /* This will usually be small. */
110 LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
111 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
112
113 /* Watch for invalidation events. */
114 CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
115 (Datum) 0);
116}
117
118/*
119 * Free the entry of a relation map cache.
120 */
121static void
122logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
123{
124 LogicalRepRelation *remoterel;
125
126 remoterel = &entry->remoterel;
127
128 pfree(remoterel->nspname);
129 pfree(remoterel->relname);
130
131 if (remoterel->natts > 0)
132 {
133 int i;
134
135 for (i = 0; i < remoterel->natts; i++)
136 pfree(remoterel->attnames[i]);
137
138 pfree(remoterel->attnames);
139 pfree(remoterel->atttyps);
140 }
141 bms_free(remoterel->attkeys);
142
143 if (entry->attrmap)
144 pfree(entry->attrmap);
145}
146
147/*
148 * Add new entry or update existing entry in the relation map cache.
149 *
150 * Called when new relation mapping is sent by the publisher to update
151 * our expected view of incoming data from said publisher.
152 */
153void
154logicalrep_relmap_update(LogicalRepRelation *remoterel)
155{
156 MemoryContext oldctx;
157 LogicalRepRelMapEntry *entry;
158 bool found;
159 int i;
160
161 if (LogicalRepRelMap == NULL)
162 logicalrep_relmap_init();
163
164 /*
165 * HASH_ENTER returns the existing entry if present or creates a new one.
166 */
167 entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
168 HASH_ENTER, &found);
169
170 if (found)
171 logicalrep_relmap_free_entry(entry);
172
173 memset(entry, 0, sizeof(LogicalRepRelMapEntry));
174
175 /* Make cached copy of the data */
176 oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
177 entry->remoterel.remoteid = remoterel->remoteid;
178 entry->remoterel.nspname = pstrdup(remoterel->nspname);
179 entry->remoterel.relname = pstrdup(remoterel->relname);
180 entry->remoterel.natts = remoterel->natts;
181 entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
182 entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
183 for (i = 0; i < remoterel->natts; i++)
184 {
185 entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
186 entry->remoterel.atttyps[i] = remoterel->atttyps[i];
187 }
188 entry->remoterel.replident = remoterel->replident;
189 entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
190 MemoryContextSwitchTo(oldctx);
191}
192
193/*
194 * Find attribute index in TupleDesc struct by attribute name.
195 *
196 * Returns -1 if not found.
197 */
198static int
199logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
200{
201 int i;
202
203 for (i = 0; i < remoterel->natts; i++)
204 {
205 if (strcmp(remoterel->attnames[i], attname) == 0)
206 return i;
207 }
208
209 return -1;
210}
211
212/*
213 * Open the local relation associated with the remote one.
214 *
215 * Optionally rebuilds the Relcache mapping if it was invalidated
216 * by local DDL.
217 */
218LogicalRepRelMapEntry *
219logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
220{
221 LogicalRepRelMapEntry *entry;
222 bool found;
223
224 if (LogicalRepRelMap == NULL)
225 logicalrep_relmap_init();
226
227 /* Search for existing entry. */
228 entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
229 HASH_FIND, &found);
230
231 if (!found)
232 elog(ERROR, "no relation map entry for remote relation ID %u",
233 remoteid);
234
235 /* Need to update the local cache? */
236 if (!OidIsValid(entry->localreloid))
237 {
238 Oid relid;
239 int i;
240 int found;
241 Bitmapset *idkey;
242 TupleDesc desc;
243 LogicalRepRelation *remoterel;
244 MemoryContext oldctx;
245
246 remoterel = &entry->remoterel;
247
248 /* Try to find and lock the relation by name. */
249 relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
250 remoterel->relname, -1),
251 lockmode, true);
252 if (!OidIsValid(relid))
253 ereport(ERROR,
254 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
255 errmsg("logical replication target relation \"%s.%s\" does not exist",
256 remoterel->nspname, remoterel->relname)));
257 entry->localrel = table_open(relid, NoLock);
258
259 /* Check for supported relkind. */
260 CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
261 remoterel->nspname, remoterel->relname);
262
263 /*
264 * Build the mapping of local attribute numbers to remote attribute
265 * numbers and validate that we don't miss any replicated columns as
266 * that would result in potentially unwanted data loss.
267 */
268 desc = RelationGetDescr(entry->localrel);
269 oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
270 entry->attrmap = palloc(desc->natts * sizeof(int));
271 MemoryContextSwitchTo(oldctx);
272
273 found = 0;
274 for (i = 0; i < desc->natts; i++)
275 {
276 int attnum;
277 Form_pg_attribute attr = TupleDescAttr(desc, i);
278
279 if (attr->attisdropped || attr->attgenerated)
280 {
281 entry->attrmap[i] = -1;
282 continue;
283 }
284
285 attnum = logicalrep_rel_att_by_name(remoterel,
286 NameStr(attr->attname));
287
288 entry->attrmap[i] = attnum;
289 if (attnum >= 0)
290 found++;
291 }
292
293 /* TODO, detail message with names of missing columns */
294 if (found < remoterel->natts)
295 ereport(ERROR,
296 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
297 errmsg("logical replication target relation \"%s.%s\" is missing "
298 "some replicated columns",
299 remoterel->nspname, remoterel->relname)));
300
301 /*
302 * Check that replica identity matches. We allow for stricter replica
303 * identity (fewer columns) on subscriber as that will not stop us
304 * from finding unique tuple. IE, if publisher has identity
305 * (id,timestamp) and subscriber just (id) this will not be a problem,
306 * but in the opposite scenario it will.
307 *
308 * Don't throw any error here just mark the relation entry as not
309 * updatable, as replica identity is only for updates and deletes but
310 * inserts can be replicated even without it.
311 */
312 entry->updatable = true;
313 idkey = RelationGetIndexAttrBitmap(entry->localrel,
314 INDEX_ATTR_BITMAP_IDENTITY_KEY);
315 /* fallback to PK if no replica identity */
316 if (idkey == NULL)
317 {
318 idkey = RelationGetIndexAttrBitmap(entry->localrel,
319 INDEX_ATTR_BITMAP_PRIMARY_KEY);
320
321 /*
322 * If no replica identity index and no PK, the published table
323 * must have replica identity FULL.
324 */
325 if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
326 entry->updatable = false;
327 }
328
329 i = -1;
330 while ((i = bms_next_member(idkey, i)) >= 0)
331 {
332 int attnum = i + FirstLowInvalidHeapAttributeNumber;
333
334 if (!AttrNumberIsForUserDefinedAttr(attnum))
335 ereport(ERROR,
336 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
337 errmsg("logical replication target relation \"%s.%s\" uses "
338 "system columns in REPLICA IDENTITY index",
339 remoterel->nspname, remoterel->relname)));
340
341 attnum = AttrNumberGetAttrOffset(attnum);
342
343 if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
344 {
345 entry->updatable = false;
346 break;
347 }
348 }
349
350 entry->localreloid = relid;
351 }
352 else
353 entry->localrel = table_open(entry->localreloid, lockmode);
354
355 if (entry->state != SUBREL_STATE_READY)
356 entry->state = GetSubscriptionRelState(MySubscription->oid,
357 entry->localreloid,
358 &entry->statelsn,
359 true);
360
361 return entry;
362}
363
364/*
365 * Close the previously opened logical relation.
366 */
367void
368logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
369{
370 table_close(rel->localrel, lockmode);
371 rel->localrel = NULL;
372}
373
374/*
375 * Free the type map cache entry data.
376 */
377static void
378logicalrep_typmap_free_entry(LogicalRepTyp *entry)
379{
380 pfree(entry->nspname);
381 pfree(entry->typname);
382}
383
384/*
385 * Add new entry or update existing entry in the type map cache.
386 */
387void
388logicalrep_typmap_update(LogicalRepTyp *remotetyp)
389{
390 MemoryContext oldctx;
391 LogicalRepTyp *entry;
392 bool found;
393
394 if (LogicalRepTypMap == NULL)
395 logicalrep_relmap_init();
396
397 /*
398 * HASH_ENTER returns the existing entry if present or creates a new one.
399 */
400 entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
401 HASH_ENTER, &found);
402
403 if (found)
404 logicalrep_typmap_free_entry(entry);
405
406 /* Make cached copy of the data */
407 entry->remoteid = remotetyp->remoteid;
408 oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
409 entry->nspname = pstrdup(remotetyp->nspname);
410 entry->typname = pstrdup(remotetyp->typname);
411 MemoryContextSwitchTo(oldctx);
412}
413
414/*
415 * Fetch type name from the cache by remote type OID.
416 *
417 * Return a substitute value if we cannot find the data type; no message is
418 * sent to the log in that case, because this is used by error callback
419 * already.
420 */
421char *
422logicalrep_typmap_gettypname(Oid remoteid)
423{
424 LogicalRepTyp *entry;
425 bool found;
426
427 /* Internal types are mapped directly. */
428 if (remoteid < FirstGenbkiObjectId)
429 {
430 if (!get_typisdefined(remoteid))
431 {
432 /*
433 * This can be caused by having a publisher with a higher
434 * PostgreSQL major version than the subscriber.
435 */
436 return psprintf("unrecognized %u", remoteid);
437 }
438
439 return format_type_be(remoteid);
440 }
441
442 if (LogicalRepTypMap == NULL)
443 {
444 /*
445 * If the typemap is not initialized yet, we cannot possibly attempt
446 * to search the hash table; but there's no way we know the type
447 * locally yet, since we haven't received a message about this type,
448 * so this is the best we can do.
449 */
450 return psprintf("unrecognized %u", remoteid);
451 }
452
453 /* search the mapping */
454 entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
455 HASH_FIND, &found);
456 if (!found)
457 return psprintf("unrecognized %u", remoteid);
458
459 Assert(OidIsValid(entry->remoteid));
460 return psprintf("%s.%s", entry->nspname, entry->typname);
461}
462