1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * message.c |
4 | * Generic logical messages. |
5 | * |
6 | * Copyright (c) 2013-2019, PostgreSQL Global Development Group |
7 | * |
8 | * IDENTIFICATION |
9 | * src/backend/replication/logical/message.c |
10 | * |
11 | * NOTES |
12 | * |
13 | * Generic logical messages allow XLOG logging of arbitrary binary blobs that |
14 | * get passed to the logical decoding plugin. In normal XLOG processing they |
15 | * are same as NOOP. |
16 | * |
17 | * These messages can be either transactional or non-transactional. |
18 | * Transactional messages are part of current transaction and will be sent to |
19 | * decoding plugin using in a same way as DML operations. |
20 | * Non-transactional messages are sent to the plugin at the time when the |
21 | * logical decoding reads them from XLOG. This also means that transactional |
22 | * messages won't be delivered if the transaction was rolled back but the |
23 | * non-transactional one will always be delivered. |
24 | * |
25 | * Every message carries prefix to avoid conflicts between different decoding |
26 | * plugins. The plugin authors must take extra care to use unique prefix, |
27 | * good options seems to be for example to use the name of the extension. |
28 | * |
29 | * --------------------------------------------------------------------------- |
30 | */ |
31 | |
32 | #include "postgres.h" |
33 | |
34 | #include "miscadmin.h" |
35 | |
36 | #include "access/xact.h" |
37 | |
38 | #include "catalog/indexing.h" |
39 | |
40 | #include "nodes/execnodes.h" |
41 | |
42 | #include "replication/message.h" |
43 | #include "replication/logical.h" |
44 | |
45 | #include "utils/memutils.h" |
46 | |
47 | /* |
48 | * Write logical decoding message into XLog. |
49 | */ |
50 | XLogRecPtr |
51 | LogLogicalMessage(const char *prefix, const char *message, size_t size, |
52 | bool transactional) |
53 | { |
54 | xl_logical_message xlrec; |
55 | |
56 | /* |
57 | * Force xid to be allocated if we're emitting a transactional message. |
58 | */ |
59 | if (transactional) |
60 | { |
61 | Assert(IsTransactionState()); |
62 | GetCurrentTransactionId(); |
63 | } |
64 | |
65 | xlrec.dbId = MyDatabaseId; |
66 | xlrec.transactional = transactional; |
67 | xlrec.prefix_size = strlen(prefix) + 1; |
68 | xlrec.message_size = size; |
69 | |
70 | XLogBeginInsert(); |
71 | XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); |
72 | XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); |
73 | XLogRegisterData(unconstify(char *, message), size); |
74 | |
75 | /* allow origin filtering */ |
76 | XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); |
77 | |
78 | return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); |
79 | } |
80 | |
81 | /* |
82 | * Redo is basically just noop for logical decoding messages. |
83 | */ |
84 | void |
85 | logicalmsg_redo(XLogReaderState *record) |
86 | { |
87 | uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
88 | |
89 | if (info != XLOG_LOGICAL_MESSAGE) |
90 | elog(PANIC, "logicalmsg_redo: unknown op code %u" , info); |
91 | |
92 | /* This is only interesting for logical decoding, see decode.c. */ |
93 | } |
94 | |