1/*-------------------------------------------------------------------------
2 *
3 * pgoutput.c
4 * Logical Replication output plugin
5 *
6 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/pgoutput/pgoutput.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "catalog/pg_publication.h"
16
17#include "replication/logical.h"
18#include "replication/logicalproto.h"
19#include "replication/origin.h"
20#include "replication/pgoutput.h"
21
22#include "utils/inval.h"
23#include "utils/int8.h"
24#include "utils/memutils.h"
25#include "utils/syscache.h"
26#include "utils/varlena.h"
27
28PG_MODULE_MAGIC;
29
30extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
31
32static void pgoutput_startup(LogicalDecodingContext *ctx,
33 OutputPluginOptions *opt, bool is_init);
34static void pgoutput_shutdown(LogicalDecodingContext *ctx);
35static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
36 ReorderBufferTXN *txn);
37static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
38 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
39static void pgoutput_change(LogicalDecodingContext *ctx,
40 ReorderBufferTXN *txn, Relation rel,
41 ReorderBufferChange *change);
42static void pgoutput_truncate(LogicalDecodingContext *ctx,
43 ReorderBufferTXN *txn, int nrelations, Relation relations[],
44 ReorderBufferChange *change);
45static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
46 RepOriginId origin_id);
47
48static bool publications_valid;
49
50static List *LoadPublications(List *pubnames);
51static void publication_invalidation_cb(Datum arg, int cacheid,
52 uint32 hashvalue);
53
54/* Entry in the map used to remember which relation schemas we sent. */
55typedef struct RelationSyncEntry
56{
57 Oid relid; /* relation oid */
58 bool schema_sent; /* did we send the schema? */
59 bool replicate_valid;
60 PublicationActions pubactions;
61} RelationSyncEntry;
62
63/* Map used to remember which relation schemas we sent. */
64static HTAB *RelationSyncCache = NULL;
65
66static void init_rel_sync_cache(MemoryContext decoding_context);
67static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
68static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
69static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
70 uint32 hashvalue);
71
72/*
73 * Specify output plugin callbacks
74 */
75void
76_PG_output_plugin_init(OutputPluginCallbacks *cb)
77{
78 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
79
80 cb->startup_cb = pgoutput_startup;
81 cb->begin_cb = pgoutput_begin_txn;
82 cb->change_cb = pgoutput_change;
83 cb->truncate_cb = pgoutput_truncate;
84 cb->commit_cb = pgoutput_commit_txn;
85 cb->filter_by_origin_cb = pgoutput_origin_filter;
86 cb->shutdown_cb = pgoutput_shutdown;
87}
88
89static void
90parse_output_parameters(List *options, uint32 *protocol_version,
91 List **publication_names)
92{
93 ListCell *lc;
94 bool protocol_version_given = false;
95 bool publication_names_given = false;
96
97 foreach(lc, options)
98 {
99 DefElem *defel = (DefElem *) lfirst(lc);
100
101 Assert(defel->arg == NULL || IsA(defel->arg, String));
102
103 /* Check each param, whether or not we recognize it */
104 if (strcmp(defel->defname, "proto_version") == 0)
105 {
106 int64 parsed;
107
108 if (protocol_version_given)
109 ereport(ERROR,
110 (errcode(ERRCODE_SYNTAX_ERROR),
111 errmsg("conflicting or redundant options")));
112 protocol_version_given = true;
113
114 if (!scanint8(strVal(defel->arg), true, &parsed))
115 ereport(ERROR,
116 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 errmsg("invalid proto_version")));
118
119 if (parsed > PG_UINT32_MAX || parsed < 0)
120 ereport(ERROR,
121 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
122 errmsg("proto_version \"%s\" out of range",
123 strVal(defel->arg))));
124
125 *protocol_version = (uint32) parsed;
126 }
127 else if (strcmp(defel->defname, "publication_names") == 0)
128 {
129 if (publication_names_given)
130 ereport(ERROR,
131 (errcode(ERRCODE_SYNTAX_ERROR),
132 errmsg("conflicting or redundant options")));
133 publication_names_given = true;
134
135 if (!SplitIdentifierString(strVal(defel->arg), ',',
136 publication_names))
137 ereport(ERROR,
138 (errcode(ERRCODE_INVALID_NAME),
139 errmsg("invalid publication_names syntax")));
140 }
141 else
142 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
143 }
144}
145
146/*
147 * Initialize this plugin
148 */
149static void
150pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
151 bool is_init)
152{
153 PGOutputData *data = palloc0(sizeof(PGOutputData));
154
155 /* Create our memory context for private allocations. */
156 data->context = AllocSetContextCreate(ctx->context,
157 "logical replication output context",
158 ALLOCSET_DEFAULT_SIZES);
159
160 ctx->output_plugin_private = data;
161
162 /* This plugin uses binary protocol. */
163 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
164
165 /*
166 * This is replication start and not slot initialization.
167 *
168 * Parse and validate options passed by the client.
169 */
170 if (!is_init)
171 {
172 /* Parse the params and ERROR if we see any we don't recognize */
173 parse_output_parameters(ctx->output_plugin_options,
174 &data->protocol_version,
175 &data->publication_names);
176
177 /* Check if we support requested protocol */
178 if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
179 ereport(ERROR,
180 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
181 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
182 data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
183
184 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
185 ereport(ERROR,
186 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
187 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
188 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
189
190 if (list_length(data->publication_names) < 1)
191 ereport(ERROR,
192 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
193 errmsg("publication_names parameter missing")));
194
195 /* Init publication state. */
196 data->publications = NIL;
197 publications_valid = false;
198 CacheRegisterSyscacheCallback(PUBLICATIONOID,
199 publication_invalidation_cb,
200 (Datum) 0);
201
202 /* Initialize relation schema cache. */
203 init_rel_sync_cache(CacheMemoryContext);
204 }
205}
206
207/*
208 * BEGIN callback
209 */
210static void
211pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
212{
213 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
214
215 OutputPluginPrepareWrite(ctx, !send_replication_origin);
216 logicalrep_write_begin(ctx->out, txn);
217
218 if (send_replication_origin)
219 {
220 char *origin;
221
222 /* Message boundary */
223 OutputPluginWrite(ctx, false);
224 OutputPluginPrepareWrite(ctx, true);
225
226 /*----------
227 * XXX: which behaviour do we want here?
228 *
229 * Alternatives:
230 * - don't send origin message if origin name not found
231 * (that's what we do now)
232 * - throw error - that will break replication, not good
233 * - send some special "unknown" origin
234 *----------
235 */
236 if (replorigin_by_oid(txn->origin_id, true, &origin))
237 logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
238 }
239
240 OutputPluginWrite(ctx, true);
241}
242
243/*
244 * COMMIT callback
245 */
246static void
247pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
248 XLogRecPtr commit_lsn)
249{
250 OutputPluginUpdateProgress(ctx);
251
252 OutputPluginPrepareWrite(ctx, true);
253 logicalrep_write_commit(ctx->out, txn, commit_lsn);
254 OutputPluginWrite(ctx, true);
255}
256
257/*
258 * Write the relation schema if the current schema hasn't been sent yet.
259 */
260static void
261maybe_send_schema(LogicalDecodingContext *ctx,
262 Relation relation, RelationSyncEntry *relentry)
263{
264 if (!relentry->schema_sent)
265 {
266 TupleDesc desc;
267 int i;
268
269 desc = RelationGetDescr(relation);
270
271 /*
272 * Write out type info if needed. We do that only for user-created
273 * types. We use FirstGenbkiObjectId as the cutoff, so that we only
274 * consider objects with hand-assigned OIDs to be "built in", not for
275 * instance any function or type defined in the information_schema.
276 * This is important because only hand-assigned OIDs can be expected
277 * to remain stable across major versions.
278 */
279 for (i = 0; i < desc->natts; i++)
280 {
281 Form_pg_attribute att = TupleDescAttr(desc, i);
282
283 if (att->attisdropped || att->attgenerated)
284 continue;
285
286 if (att->atttypid < FirstGenbkiObjectId)
287 continue;
288
289 OutputPluginPrepareWrite(ctx, false);
290 logicalrep_write_typ(ctx->out, att->atttypid);
291 OutputPluginWrite(ctx, false);
292 }
293
294 OutputPluginPrepareWrite(ctx, false);
295 logicalrep_write_rel(ctx->out, relation);
296 OutputPluginWrite(ctx, false);
297 relentry->schema_sent = true;
298 }
299}
300
301/*
302 * Sends the decoded DML over wire.
303 */
304static void
305pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
306 Relation relation, ReorderBufferChange *change)
307{
308 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
309 MemoryContext old;
310 RelationSyncEntry *relentry;
311
312 if (!is_publishable_relation(relation))
313 return;
314
315 relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
316
317 /* First check the table filter */
318 switch (change->action)
319 {
320 case REORDER_BUFFER_CHANGE_INSERT:
321 if (!relentry->pubactions.pubinsert)
322 return;
323 break;
324 case REORDER_BUFFER_CHANGE_UPDATE:
325 if (!relentry->pubactions.pubupdate)
326 return;
327 break;
328 case REORDER_BUFFER_CHANGE_DELETE:
329 if (!relentry->pubactions.pubdelete)
330 return;
331 break;
332 default:
333 Assert(false);
334 }
335
336 /* Avoid leaking memory by using and resetting our own context */
337 old = MemoryContextSwitchTo(data->context);
338
339 maybe_send_schema(ctx, relation, relentry);
340
341 /* Send the data */
342 switch (change->action)
343 {
344 case REORDER_BUFFER_CHANGE_INSERT:
345 OutputPluginPrepareWrite(ctx, true);
346 logicalrep_write_insert(ctx->out, relation,
347 &change->data.tp.newtuple->tuple);
348 OutputPluginWrite(ctx, true);
349 break;
350 case REORDER_BUFFER_CHANGE_UPDATE:
351 {
352 HeapTuple oldtuple = change->data.tp.oldtuple ?
353 &change->data.tp.oldtuple->tuple : NULL;
354
355 OutputPluginPrepareWrite(ctx, true);
356 logicalrep_write_update(ctx->out, relation, oldtuple,
357 &change->data.tp.newtuple->tuple);
358 OutputPluginWrite(ctx, true);
359 break;
360 }
361 case REORDER_BUFFER_CHANGE_DELETE:
362 if (change->data.tp.oldtuple)
363 {
364 OutputPluginPrepareWrite(ctx, true);
365 logicalrep_write_delete(ctx->out, relation,
366 &change->data.tp.oldtuple->tuple);
367 OutputPluginWrite(ctx, true);
368 }
369 else
370 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
371 break;
372 default:
373 Assert(false);
374 }
375
376 /* Cleanup */
377 MemoryContextSwitchTo(old);
378 MemoryContextReset(data->context);
379}
380
381static void
382pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
383 int nrelations, Relation relations[], ReorderBufferChange *change)
384{
385 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
386 MemoryContext old;
387 RelationSyncEntry *relentry;
388 int i;
389 int nrelids;
390 Oid *relids;
391
392 old = MemoryContextSwitchTo(data->context);
393
394 relids = palloc0(nrelations * sizeof(Oid));
395 nrelids = 0;
396
397 for (i = 0; i < nrelations; i++)
398 {
399 Relation relation = relations[i];
400 Oid relid = RelationGetRelid(relation);
401
402 if (!is_publishable_relation(relation))
403 continue;
404
405 relentry = get_rel_sync_entry(data, relid);
406
407 if (!relentry->pubactions.pubtruncate)
408 continue;
409
410 relids[nrelids++] = relid;
411 maybe_send_schema(ctx, relation, relentry);
412 }
413
414 if (nrelids > 0)
415 {
416 OutputPluginPrepareWrite(ctx, true);
417 logicalrep_write_truncate(ctx->out,
418 nrelids,
419 relids,
420 change->data.truncate.cascade,
421 change->data.truncate.restart_seqs);
422 OutputPluginWrite(ctx, true);
423 }
424
425 MemoryContextSwitchTo(old);
426 MemoryContextReset(data->context);
427}
428
429/*
430 * Currently we always forward.
431 */
432static bool
433pgoutput_origin_filter(LogicalDecodingContext *ctx,
434 RepOriginId origin_id)
435{
436 return false;
437}
438
439/*
440 * Shutdown the output plugin.
441 *
442 * Note, we don't need to clean the data->context as it's child context
443 * of the ctx->context so it will be cleaned up by logical decoding machinery.
444 */
445static void
446pgoutput_shutdown(LogicalDecodingContext *ctx)
447{
448 if (RelationSyncCache)
449 {
450 hash_destroy(RelationSyncCache);
451 RelationSyncCache = NULL;
452 }
453}
454
455/*
456 * Load publications from the list of publication names.
457 */
458static List *
459LoadPublications(List *pubnames)
460{
461 List *result = NIL;
462 ListCell *lc;
463
464 foreach(lc, pubnames)
465 {
466 char *pubname = (char *) lfirst(lc);
467 Publication *pub = GetPublicationByName(pubname, false);
468
469 result = lappend(result, pub);
470 }
471
472 return result;
473}
474
475/*
476 * Publication cache invalidation callback.
477 */
478static void
479publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
480{
481 publications_valid = false;
482
483 /*
484 * Also invalidate per-relation cache so that next time the filtering info
485 * is checked it will be updated with the new publication settings.
486 */
487 rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
488}
489
490/*
491 * Initialize the relation schema sync cache for a decoding session.
492 *
493 * The hash table is destroyed at the end of a decoding session. While
494 * relcache invalidations still exist and will still be invoked, they
495 * will just see the null hash table global and take no action.
496 */
497static void
498init_rel_sync_cache(MemoryContext cachectx)
499{
500 HASHCTL ctl;
501 MemoryContext old_ctxt;
502
503 if (RelationSyncCache != NULL)
504 return;
505
506 /* Make a new hash table for the cache */
507 MemSet(&ctl, 0, sizeof(ctl));
508 ctl.keysize = sizeof(Oid);
509 ctl.entrysize = sizeof(RelationSyncEntry);
510 ctl.hcxt = cachectx;
511
512 old_ctxt = MemoryContextSwitchTo(cachectx);
513 RelationSyncCache = hash_create("logical replication output relation cache",
514 128, &ctl,
515 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
516 (void) MemoryContextSwitchTo(old_ctxt);
517
518 Assert(RelationSyncCache != NULL);
519
520 CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
521 CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
522 rel_sync_cache_publication_cb,
523 (Datum) 0);
524}
525
526/*
527 * Find or create entry in the relation schema cache.
528 */
529static RelationSyncEntry *
530get_rel_sync_entry(PGOutputData *data, Oid relid)
531{
532 RelationSyncEntry *entry;
533 bool found;
534 MemoryContext oldctx;
535
536 Assert(RelationSyncCache != NULL);
537
538 /* Find cached function info, creating if not found */
539 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
540 entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
541 (void *) &relid,
542 HASH_ENTER, &found);
543 MemoryContextSwitchTo(oldctx);
544 Assert(entry != NULL);
545
546 /* Not found means schema wasn't sent */
547 if (!found || !entry->replicate_valid)
548 {
549 List *pubids = GetRelationPublications(relid);
550 ListCell *lc;
551
552 /* Reload publications if needed before use. */
553 if (!publications_valid)
554 {
555 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
556 if (data->publications)
557 list_free_deep(data->publications);
558
559 data->publications = LoadPublications(data->publication_names);
560 MemoryContextSwitchTo(oldctx);
561 publications_valid = true;
562 }
563
564 /*
565 * Build publication cache. We can't use one provided by relcache as
566 * relcache considers all publications given relation is in, but here
567 * we only need to consider ones that the subscriber requested.
568 */
569 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
570 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
571
572 foreach(lc, data->publications)
573 {
574 Publication *pub = lfirst(lc);
575
576 if (pub->alltables || list_member_oid(pubids, pub->oid))
577 {
578 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
579 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
580 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
581 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
582 }
583
584 if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
585 entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
586 break;
587 }
588
589 list_free(pubids);
590
591 entry->replicate_valid = true;
592 }
593
594 if (!found)
595 entry->schema_sent = false;
596
597 return entry;
598}
599
600/*
601 * Relcache invalidation callback
602 */
603static void
604rel_sync_cache_relation_cb(Datum arg, Oid relid)
605{
606 RelationSyncEntry *entry;
607
608 /*
609 * We can get here if the plugin was used in SQL interface as the
610 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
611 * is no way to unregister the relcache invalidation callback.
612 */
613 if (RelationSyncCache == NULL)
614 return;
615
616 /*
617 * Nobody keeps pointers to entries in this hash table around outside
618 * logical decoding callback calls - but invalidation events can come in
619 * *during* a callback if we access the relcache in the callback. Because
620 * of that we must mark the cache entry as invalid but not remove it from
621 * the hash while it could still be referenced, then prune it at a later
622 * safe point.
623 *
624 * Getting invalidations for relations that aren't in the table is
625 * entirely normal, since there's no way to unregister for an invalidation
626 * event. So we don't care if it's found or not.
627 */
628 entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
629 HASH_FIND, NULL);
630
631 /*
632 * Reset schema sent status as the relation definition may have changed.
633 */
634 if (entry != NULL)
635 entry->schema_sent = false;
636}
637
638/*
639 * Publication relation map syscache invalidation callback
640 */
641static void
642rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
643{
644 HASH_SEQ_STATUS status;
645 RelationSyncEntry *entry;
646
647 /*
648 * We can get here if the plugin was used in SQL interface as the
649 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
650 * is no way to unregister the relcache invalidation callback.
651 */
652 if (RelationSyncCache == NULL)
653 return;
654
655 /*
656 * There is no way to find which entry in our cache the hash belongs to so
657 * mark the whole cache as invalid.
658 */
659 hash_seq_init(&status, RelationSyncCache);
660 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
661 entry->replicate_valid = false;
662}
663