| 1 | /*------------------------------------------------------------------------- |
| 2 | * relation.c |
| 3 | * PostgreSQL logical replication |
| 4 | * |
| 5 | * Copyright (c) 2016-2019, PostgreSQL Global Development Group |
| 6 | * |
| 7 | * IDENTIFICATION |
| 8 | * src/backend/replication/logical/relation.c |
| 9 | * |
| 10 | * NOTES |
| 11 | * This file contains helper functions for logical replication relation |
| 12 | * mapping cache. |
| 13 | * |
| 14 | *------------------------------------------------------------------------- |
| 15 | */ |
| 16 | |
| 17 | #include "postgres.h" |
| 18 | |
| 19 | #include "access/sysattr.h" |
| 20 | #include "access/table.h" |
| 21 | #include "catalog/namespace.h" |
| 22 | #include "catalog/pg_subscription_rel.h" |
| 23 | #include "executor/executor.h" |
| 24 | #include "nodes/makefuncs.h" |
| 25 | #include "replication/logicalrelation.h" |
| 26 | #include "replication/worker_internal.h" |
| 27 | #include "utils/builtins.h" |
| 28 | #include "utils/inval.h" |
| 29 | #include "utils/lsyscache.h" |
| 30 | #include "utils/memutils.h" |
| 31 | #include "utils/syscache.h" |
| 32 | |
| 33 | static MemoryContext LogicalRepRelMapContext = NULL; |
| 34 | |
| 35 | static HTAB *LogicalRepRelMap = NULL; |
| 36 | static HTAB *LogicalRepTypMap = NULL; |
| 37 | |
| 38 | |
| 39 | /* |
| 40 | * Relcache invalidation callback for our relation map cache. |
| 41 | */ |
| 42 | static void |
| 43 | logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) |
| 44 | { |
| 45 | LogicalRepRelMapEntry *entry; |
| 46 | |
| 47 | /* Just to be sure. */ |
| 48 | if (LogicalRepRelMap == NULL) |
| 49 | return; |
| 50 | |
| 51 | if (reloid != InvalidOid) |
| 52 | { |
| 53 | HASH_SEQ_STATUS status; |
| 54 | |
| 55 | hash_seq_init(&status, LogicalRepRelMap); |
| 56 | |
| 57 | /* TODO, use inverse lookup hashtable? */ |
| 58 | while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
| 59 | { |
| 60 | if (entry->localreloid == reloid) |
| 61 | { |
| 62 | entry->localreloid = InvalidOid; |
| 63 | hash_seq_term(&status); |
| 64 | break; |
| 65 | } |
| 66 | } |
| 67 | } |
| 68 | else |
| 69 | { |
| 70 | /* invalidate all cache entries */ |
| 71 | HASH_SEQ_STATUS status; |
| 72 | |
| 73 | hash_seq_init(&status, LogicalRepRelMap); |
| 74 | |
| 75 | while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) |
| 76 | entry->localreloid = InvalidOid; |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | /* |
| 81 | * Initialize the relation map cache. |
| 82 | */ |
| 83 | static void |
| 84 | logicalrep_relmap_init(void) |
| 85 | { |
| 86 | HASHCTL ctl; |
| 87 | |
| 88 | if (!LogicalRepRelMapContext) |
| 89 | LogicalRepRelMapContext = |
| 90 | AllocSetContextCreate(CacheMemoryContext, |
| 91 | "LogicalRepRelMapContext" , |
| 92 | ALLOCSET_DEFAULT_SIZES); |
| 93 | |
| 94 | /* Initialize the relation hash table. */ |
| 95 | MemSet(&ctl, 0, sizeof(ctl)); |
| 96 | ctl.keysize = sizeof(LogicalRepRelId); |
| 97 | ctl.entrysize = sizeof(LogicalRepRelMapEntry); |
| 98 | ctl.hcxt = LogicalRepRelMapContext; |
| 99 | |
| 100 | LogicalRepRelMap = hash_create("logicalrep relation map cache" , 128, &ctl, |
| 101 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
| 102 | |
| 103 | /* Initialize the type hash table. */ |
| 104 | MemSet(&ctl, 0, sizeof(ctl)); |
| 105 | ctl.keysize = sizeof(Oid); |
| 106 | ctl.entrysize = sizeof(LogicalRepTyp); |
| 107 | ctl.hcxt = LogicalRepRelMapContext; |
| 108 | |
| 109 | /* This will usually be small. */ |
| 110 | LogicalRepTypMap = hash_create("logicalrep type map cache" , 2, &ctl, |
| 111 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
| 112 | |
| 113 | /* Watch for invalidation events. */ |
| 114 | CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, |
| 115 | (Datum) 0); |
| 116 | } |
| 117 | |
| 118 | /* |
| 119 | * Free the entry of a relation map cache. |
| 120 | */ |
| 121 | static void |
| 122 | logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) |
| 123 | { |
| 124 | LogicalRepRelation *remoterel; |
| 125 | |
| 126 | remoterel = &entry->remoterel; |
| 127 | |
| 128 | pfree(remoterel->nspname); |
| 129 | pfree(remoterel->relname); |
| 130 | |
| 131 | if (remoterel->natts > 0) |
| 132 | { |
| 133 | int i; |
| 134 | |
| 135 | for (i = 0; i < remoterel->natts; i++) |
| 136 | pfree(remoterel->attnames[i]); |
| 137 | |
| 138 | pfree(remoterel->attnames); |
| 139 | pfree(remoterel->atttyps); |
| 140 | } |
| 141 | bms_free(remoterel->attkeys); |
| 142 | |
| 143 | if (entry->attrmap) |
| 144 | pfree(entry->attrmap); |
| 145 | } |
| 146 | |
| 147 | /* |
| 148 | * Add new entry or update existing entry in the relation map cache. |
| 149 | * |
| 150 | * Called when new relation mapping is sent by the publisher to update |
| 151 | * our expected view of incoming data from said publisher. |
| 152 | */ |
| 153 | void |
| 154 | logicalrep_relmap_update(LogicalRepRelation *remoterel) |
| 155 | { |
| 156 | MemoryContext oldctx; |
| 157 | LogicalRepRelMapEntry *entry; |
| 158 | bool found; |
| 159 | int i; |
| 160 | |
| 161 | if (LogicalRepRelMap == NULL) |
| 162 | logicalrep_relmap_init(); |
| 163 | |
| 164 | /* |
| 165 | * HASH_ENTER returns the existing entry if present or creates a new one. |
| 166 | */ |
| 167 | entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid, |
| 168 | HASH_ENTER, &found); |
| 169 | |
| 170 | if (found) |
| 171 | logicalrep_relmap_free_entry(entry); |
| 172 | |
| 173 | memset(entry, 0, sizeof(LogicalRepRelMapEntry)); |
| 174 | |
| 175 | /* Make cached copy of the data */ |
| 176 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
| 177 | entry->remoterel.remoteid = remoterel->remoteid; |
| 178 | entry->remoterel.nspname = pstrdup(remoterel->nspname); |
| 179 | entry->remoterel.relname = pstrdup(remoterel->relname); |
| 180 | entry->remoterel.natts = remoterel->natts; |
| 181 | entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); |
| 182 | entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); |
| 183 | for (i = 0; i < remoterel->natts; i++) |
| 184 | { |
| 185 | entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); |
| 186 | entry->remoterel.atttyps[i] = remoterel->atttyps[i]; |
| 187 | } |
| 188 | entry->remoterel.replident = remoterel->replident; |
| 189 | entry->remoterel.attkeys = bms_copy(remoterel->attkeys); |
| 190 | MemoryContextSwitchTo(oldctx); |
| 191 | } |
| 192 | |
| 193 | /* |
| 194 | * Find attribute index in TupleDesc struct by attribute name. |
| 195 | * |
| 196 | * Returns -1 if not found. |
| 197 | */ |
| 198 | static int |
| 199 | logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname) |
| 200 | { |
| 201 | int i; |
| 202 | |
| 203 | for (i = 0; i < remoterel->natts; i++) |
| 204 | { |
| 205 | if (strcmp(remoterel->attnames[i], attname) == 0) |
| 206 | return i; |
| 207 | } |
| 208 | |
| 209 | return -1; |
| 210 | } |
| 211 | |
| 212 | /* |
| 213 | * Open the local relation associated with the remote one. |
| 214 | * |
| 215 | * Optionally rebuilds the Relcache mapping if it was invalidated |
| 216 | * by local DDL. |
| 217 | */ |
| 218 | LogicalRepRelMapEntry * |
| 219 | logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) |
| 220 | { |
| 221 | LogicalRepRelMapEntry *entry; |
| 222 | bool found; |
| 223 | |
| 224 | if (LogicalRepRelMap == NULL) |
| 225 | logicalrep_relmap_init(); |
| 226 | |
| 227 | /* Search for existing entry. */ |
| 228 | entry = hash_search(LogicalRepRelMap, (void *) &remoteid, |
| 229 | HASH_FIND, &found); |
| 230 | |
| 231 | if (!found) |
| 232 | elog(ERROR, "no relation map entry for remote relation ID %u" , |
| 233 | remoteid); |
| 234 | |
| 235 | /* Need to update the local cache? */ |
| 236 | if (!OidIsValid(entry->localreloid)) |
| 237 | { |
| 238 | Oid relid; |
| 239 | int i; |
| 240 | int found; |
| 241 | Bitmapset *idkey; |
| 242 | TupleDesc desc; |
| 243 | LogicalRepRelation *remoterel; |
| 244 | MemoryContext oldctx; |
| 245 | |
| 246 | remoterel = &entry->remoterel; |
| 247 | |
| 248 | /* Try to find and lock the relation by name. */ |
| 249 | relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname, |
| 250 | remoterel->relname, -1), |
| 251 | lockmode, true); |
| 252 | if (!OidIsValid(relid)) |
| 253 | ereport(ERROR, |
| 254 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| 255 | errmsg("logical replication target relation \"%s.%s\" does not exist" , |
| 256 | remoterel->nspname, remoterel->relname))); |
| 257 | entry->localrel = table_open(relid, NoLock); |
| 258 | |
| 259 | /* Check for supported relkind. */ |
| 260 | CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind, |
| 261 | remoterel->nspname, remoterel->relname); |
| 262 | |
| 263 | /* |
| 264 | * Build the mapping of local attribute numbers to remote attribute |
| 265 | * numbers and validate that we don't miss any replicated columns as |
| 266 | * that would result in potentially unwanted data loss. |
| 267 | */ |
| 268 | desc = RelationGetDescr(entry->localrel); |
| 269 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
| 270 | entry->attrmap = palloc(desc->natts * sizeof(int)); |
| 271 | MemoryContextSwitchTo(oldctx); |
| 272 | |
| 273 | found = 0; |
| 274 | for (i = 0; i < desc->natts; i++) |
| 275 | { |
| 276 | int attnum; |
| 277 | Form_pg_attribute attr = TupleDescAttr(desc, i); |
| 278 | |
| 279 | if (attr->attisdropped || attr->attgenerated) |
| 280 | { |
| 281 | entry->attrmap[i] = -1; |
| 282 | continue; |
| 283 | } |
| 284 | |
| 285 | attnum = logicalrep_rel_att_by_name(remoterel, |
| 286 | NameStr(attr->attname)); |
| 287 | |
| 288 | entry->attrmap[i] = attnum; |
| 289 | if (attnum >= 0) |
| 290 | found++; |
| 291 | } |
| 292 | |
| 293 | /* TODO, detail message with names of missing columns */ |
| 294 | if (found < remoterel->natts) |
| 295 | ereport(ERROR, |
| 296 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| 297 | errmsg("logical replication target relation \"%s.%s\" is missing " |
| 298 | "some replicated columns" , |
| 299 | remoterel->nspname, remoterel->relname))); |
| 300 | |
| 301 | /* |
| 302 | * Check that replica identity matches. We allow for stricter replica |
| 303 | * identity (fewer columns) on subscriber as that will not stop us |
| 304 | * from finding unique tuple. IE, if publisher has identity |
| 305 | * (id,timestamp) and subscriber just (id) this will not be a problem, |
| 306 | * but in the opposite scenario it will. |
| 307 | * |
| 308 | * Don't throw any error here just mark the relation entry as not |
| 309 | * updatable, as replica identity is only for updates and deletes but |
| 310 | * inserts can be replicated even without it. |
| 311 | */ |
| 312 | entry->updatable = true; |
| 313 | idkey = RelationGetIndexAttrBitmap(entry->localrel, |
| 314 | INDEX_ATTR_BITMAP_IDENTITY_KEY); |
| 315 | /* fallback to PK if no replica identity */ |
| 316 | if (idkey == NULL) |
| 317 | { |
| 318 | idkey = RelationGetIndexAttrBitmap(entry->localrel, |
| 319 | INDEX_ATTR_BITMAP_PRIMARY_KEY); |
| 320 | |
| 321 | /* |
| 322 | * If no replica identity index and no PK, the published table |
| 323 | * must have replica identity FULL. |
| 324 | */ |
| 325 | if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) |
| 326 | entry->updatable = false; |
| 327 | } |
| 328 | |
| 329 | i = -1; |
| 330 | while ((i = bms_next_member(idkey, i)) >= 0) |
| 331 | { |
| 332 | int attnum = i + FirstLowInvalidHeapAttributeNumber; |
| 333 | |
| 334 | if (!AttrNumberIsForUserDefinedAttr(attnum)) |
| 335 | ereport(ERROR, |
| 336 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| 337 | errmsg("logical replication target relation \"%s.%s\" uses " |
| 338 | "system columns in REPLICA IDENTITY index" , |
| 339 | remoterel->nspname, remoterel->relname))); |
| 340 | |
| 341 | attnum = AttrNumberGetAttrOffset(attnum); |
| 342 | |
| 343 | if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys)) |
| 344 | { |
| 345 | entry->updatable = false; |
| 346 | break; |
| 347 | } |
| 348 | } |
| 349 | |
| 350 | entry->localreloid = relid; |
| 351 | } |
| 352 | else |
| 353 | entry->localrel = table_open(entry->localreloid, lockmode); |
| 354 | |
| 355 | if (entry->state != SUBREL_STATE_READY) |
| 356 | entry->state = GetSubscriptionRelState(MySubscription->oid, |
| 357 | entry->localreloid, |
| 358 | &entry->statelsn, |
| 359 | true); |
| 360 | |
| 361 | return entry; |
| 362 | } |
| 363 | |
| 364 | /* |
| 365 | * Close the previously opened logical relation. |
| 366 | */ |
| 367 | void |
| 368 | logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode) |
| 369 | { |
| 370 | table_close(rel->localrel, lockmode); |
| 371 | rel->localrel = NULL; |
| 372 | } |
| 373 | |
| 374 | /* |
| 375 | * Free the type map cache entry data. |
| 376 | */ |
| 377 | static void |
| 378 | logicalrep_typmap_free_entry(LogicalRepTyp *entry) |
| 379 | { |
| 380 | pfree(entry->nspname); |
| 381 | pfree(entry->typname); |
| 382 | } |
| 383 | |
| 384 | /* |
| 385 | * Add new entry or update existing entry in the type map cache. |
| 386 | */ |
| 387 | void |
| 388 | logicalrep_typmap_update(LogicalRepTyp *remotetyp) |
| 389 | { |
| 390 | MemoryContext oldctx; |
| 391 | LogicalRepTyp *entry; |
| 392 | bool found; |
| 393 | |
| 394 | if (LogicalRepTypMap == NULL) |
| 395 | logicalrep_relmap_init(); |
| 396 | |
| 397 | /* |
| 398 | * HASH_ENTER returns the existing entry if present or creates a new one. |
| 399 | */ |
| 400 | entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid, |
| 401 | HASH_ENTER, &found); |
| 402 | |
| 403 | if (found) |
| 404 | logicalrep_typmap_free_entry(entry); |
| 405 | |
| 406 | /* Make cached copy of the data */ |
| 407 | entry->remoteid = remotetyp->remoteid; |
| 408 | oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); |
| 409 | entry->nspname = pstrdup(remotetyp->nspname); |
| 410 | entry->typname = pstrdup(remotetyp->typname); |
| 411 | MemoryContextSwitchTo(oldctx); |
| 412 | } |
| 413 | |
| 414 | /* |
| 415 | * Fetch type name from the cache by remote type OID. |
| 416 | * |
| 417 | * Return a substitute value if we cannot find the data type; no message is |
| 418 | * sent to the log in that case, because this is used by error callback |
| 419 | * already. |
| 420 | */ |
| 421 | char * |
| 422 | logicalrep_typmap_gettypname(Oid remoteid) |
| 423 | { |
| 424 | LogicalRepTyp *entry; |
| 425 | bool found; |
| 426 | |
| 427 | /* Internal types are mapped directly. */ |
| 428 | if (remoteid < FirstGenbkiObjectId) |
| 429 | { |
| 430 | if (!get_typisdefined(remoteid)) |
| 431 | { |
| 432 | /* |
| 433 | * This can be caused by having a publisher with a higher |
| 434 | * PostgreSQL major version than the subscriber. |
| 435 | */ |
| 436 | return psprintf("unrecognized %u" , remoteid); |
| 437 | } |
| 438 | |
| 439 | return format_type_be(remoteid); |
| 440 | } |
| 441 | |
| 442 | if (LogicalRepTypMap == NULL) |
| 443 | { |
| 444 | /* |
| 445 | * If the typemap is not initialized yet, we cannot possibly attempt |
| 446 | * to search the hash table; but there's no way we know the type |
| 447 | * locally yet, since we haven't received a message about this type, |
| 448 | * so this is the best we can do. |
| 449 | */ |
| 450 | return psprintf("unrecognized %u" , remoteid); |
| 451 | } |
| 452 | |
| 453 | /* search the mapping */ |
| 454 | entry = hash_search(LogicalRepTypMap, (void *) &remoteid, |
| 455 | HASH_FIND, &found); |
| 456 | if (!found) |
| 457 | return psprintf("unrecognized %u" , remoteid); |
| 458 | |
| 459 | Assert(OidIsValid(entry->remoteid)); |
| 460 | return psprintf("%s.%s" , entry->nspname, entry->typname); |
| 461 | } |
| 462 | |