1/*-------------------------------------------------------------------------
2 *
3 * message.c
4 * Generic logical messages.
5 *
6 * Copyright (c) 2013-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/message.c
10 *
11 * NOTES
12 *
13 * Generic logical messages allow XLOG logging of arbitrary binary blobs that
14 * get passed to the logical decoding plugin. In normal XLOG processing they
15 * are same as NOOP.
16 *
17 * These messages can be either transactional or non-transactional.
18 * Transactional messages are part of current transaction and will be sent to
19 * decoding plugin using in a same way as DML operations.
20 * Non-transactional messages are sent to the plugin at the time when the
21 * logical decoding reads them from XLOG. This also means that transactional
22 * messages won't be delivered if the transaction was rolled back but the
23 * non-transactional one will always be delivered.
24 *
25 * Every message carries prefix to avoid conflicts between different decoding
26 * plugins. The plugin authors must take extra care to use unique prefix,
27 * good options seems to be for example to use the name of the extension.
28 *
29 * ---------------------------------------------------------------------------
30 */
31
32#include "postgres.h"
33
34#include "miscadmin.h"
35
36#include "access/xact.h"
37
38#include "catalog/indexing.h"
39
40#include "nodes/execnodes.h"
41
42#include "replication/message.h"
43#include "replication/logical.h"
44
45#include "utils/memutils.h"
46
47/*
48 * Write logical decoding message into XLog.
49 */
50XLogRecPtr
51LogLogicalMessage(const char *prefix, const char *message, size_t size,
52 bool transactional)
53{
54 xl_logical_message xlrec;
55
56 /*
57 * Force xid to be allocated if we're emitting a transactional message.
58 */
59 if (transactional)
60 {
61 Assert(IsTransactionState());
62 GetCurrentTransactionId();
63 }
64
65 xlrec.dbId = MyDatabaseId;
66 xlrec.transactional = transactional;
67 xlrec.prefix_size = strlen(prefix) + 1;
68 xlrec.message_size = size;
69
70 XLogBeginInsert();
71 XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
72 XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
73 XLogRegisterData(unconstify(char *, message), size);
74
75 /* allow origin filtering */
76 XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
77
78 return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
79}
80
81/*
82 * Redo is basically just noop for logical decoding messages.
83 */
84void
85logicalmsg_redo(XLogReaderState *record)
86{
87 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
88
89 if (info != XLOG_LOGICAL_MESSAGE)
90 elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
91
92 /* This is only interesting for logical decoding, see decode.c. */
93}
94