1/*
2 * reorderbuffer.h
3 * PostgreSQL logical replay/reorder buffer management.
4 *
5 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
6 *
7 * src/include/replication/reorderbuffer.h
8 */
9#ifndef REORDERBUFFER_H
10#define REORDERBUFFER_H
11
12#include "access/htup_details.h"
13#include "lib/ilist.h"
14#include "storage/sinval.h"
15#include "utils/hsearch.h"
16#include "utils/relcache.h"
17#include "utils/snapshot.h"
18#include "utils/timestamp.h"
19
20/* an individual tuple, stored in one chunk of memory */
21typedef struct ReorderBufferTupleBuf
22{
23 /* position in preallocated list */
24 slist_node node;
25
26 /* tuple header, the interesting bit for users of logical decoding */
27 HeapTupleData tuple;
28
29 /* pre-allocated size of tuple buffer, different from tuple size */
30 Size alloc_tuple_size;
31
32 /* actual tuple data follows */
33} ReorderBufferTupleBuf;
34
35/* pointer to the data stored in a TupleBuf */
36#define ReorderBufferTupleBufData(p) \
37 ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
38
39/*
40 * Types of the change passed to a 'change' callback.
41 *
42 * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
43 * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
44 * changes. Users of the decoding facilities will never see changes with
45 * *_INTERNAL_* actions.
46 *
47 * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern
48 * "speculative insertions", and their confirmation respectively. They're
49 * used by INSERT .. ON CONFLICT .. UPDATE. Users of logical decoding don't
50 * have to care about these.
51 */
52enum ReorderBufferChangeType
53{
54 REORDER_BUFFER_CHANGE_INSERT,
55 REORDER_BUFFER_CHANGE_UPDATE,
56 REORDER_BUFFER_CHANGE_DELETE,
57 REORDER_BUFFER_CHANGE_MESSAGE,
58 REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
59 REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
60 REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
61 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
62 REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
63 REORDER_BUFFER_CHANGE_TRUNCATE
64};
65
66/*
67 * a single 'change', can be an insert (with one tuple), an update (old, new),
68 * or a delete (old).
69 *
70 * The same struct is also used internally for other purposes but that should
71 * never be visible outside reorderbuffer.c.
72 */
73typedef struct ReorderBufferChange
74{
75 XLogRecPtr lsn;
76
77 /* The type of change. */
78 enum ReorderBufferChangeType action;
79
80 RepOriginId origin_id;
81
82 /*
83 * Context data for the change. Which part of the union is valid depends
84 * on action.
85 */
86 union
87 {
88 /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
89 struct
90 {
91 /* relation that has been changed */
92 RelFileNode relnode;
93
94 /* no previously reassembled toast chunks are necessary anymore */
95 bool clear_toast_afterwards;
96
97 /* valid for DELETE || UPDATE */
98 ReorderBufferTupleBuf *oldtuple;
99 /* valid for INSERT || UPDATE */
100 ReorderBufferTupleBuf *newtuple;
101 } tp;
102
103 /*
104 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
105 * set of relations to be truncated.
106 */
107 struct
108 {
109 Size nrelids;
110 bool cascade;
111 bool restart_seqs;
112 Oid *relids;
113 } truncate;
114
115 /* Message with arbitrary data. */
116 struct
117 {
118 char *prefix;
119 Size message_size;
120 char *message;
121 } msg;
122
123 /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
124 Snapshot snapshot;
125
126 /*
127 * New command id for existing snapshot in a catalog changing tx. Set
128 * when action == *_INTERNAL_COMMAND_ID.
129 */
130 CommandId command_id;
131
132 /*
133 * New cid mapping for catalog changing transaction, set when action
134 * == *_INTERNAL_TUPLECID.
135 */
136 struct
137 {
138 RelFileNode node;
139 ItemPointerData tid;
140 CommandId cmin;
141 CommandId cmax;
142 CommandId combocid;
143 } tuplecid;
144 } data;
145
146 /*
147 * While in use this is how a change is linked into a transactions,
148 * otherwise it's the preallocated list.
149 */
150 dlist_node node;
151} ReorderBufferChange;
152
153typedef struct ReorderBufferTXN
154{
155 /*
156 * The transactions transaction id, can be a toplevel or sub xid.
157 */
158 TransactionId xid;
159
160 /* did the TX have catalog changes */
161 bool has_catalog_changes;
162
163 /* Do we know this is a subxact? Xid of top-level txn if so */
164 bool is_known_as_subxact;
165 TransactionId toplevel_xid;
166
167 /*
168 * LSN of the first data carrying, WAL record with knowledge about this
169 * xid. This is allowed to *not* be first record adorned with this xid, if
170 * the previous records aren't relevant for logical decoding.
171 */
172 XLogRecPtr first_lsn;
173
174 /* ----
175 * LSN of the record that lead to this xact to be committed or
176 * aborted. This can be a
177 * * plain commit record
178 * * plain commit record, of a parent transaction
179 * * prepared transaction commit
180 * * plain abort record
181 * * prepared transaction abort
182 * * error during decoding
183 * * for a crashed transaction, the LSN of the last change, regardless of
184 * what it was.
185 * ----
186 */
187 XLogRecPtr final_lsn;
188
189 /*
190 * LSN pointing to the end of the commit record + 1.
191 */
192 XLogRecPtr end_lsn;
193
194 /*
195 * LSN of the last lsn at which snapshot information reside, so we can
196 * restart decoding from there and fully recover this transaction from
197 * WAL.
198 */
199 XLogRecPtr restart_decoding_lsn;
200
201 /* origin of the change that caused this transaction */
202 RepOriginId origin_id;
203 XLogRecPtr origin_lsn;
204
205 /*
206 * Commit time, only known when we read the actual commit record.
207 */
208 TimestampTz commit_time;
209
210 /*
211 * The base snapshot is used to decode all changes until either this
212 * transaction modifies the catalog, or another catalog-modifying
213 * transaction commits.
214 */
215 Snapshot base_snapshot;
216 XLogRecPtr base_snapshot_lsn;
217 dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
218
219 /*
220 * How many ReorderBufferChange's do we have in this txn.
221 *
222 * Changes in subtransactions are *not* included but tracked separately.
223 */
224 uint64 nentries;
225
226 /*
227 * How many of the above entries are stored in memory in contrast to being
228 * spilled to disk.
229 */
230 uint64 nentries_mem;
231
232 /*
233 * Has this transaction been spilled to disk? It's not always possible to
234 * deduce that fact by comparing nentries with nentries_mem, because e.g.
235 * subtransactions of a large transaction might get serialized together
236 * with the parent - if they're restored to memory they'd have
237 * nentries_mem == nentries.
238 */
239 bool serialized;
240
241 /*
242 * List of ReorderBufferChange structs, including new Snapshots and new
243 * CommandIds
244 */
245 dlist_head changes;
246
247 /*
248 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
249 * Those are always assigned to the toplevel transaction. (Keep track of
250 * #entries to create a hash of the right size)
251 */
252 dlist_head tuplecids;
253 uint64 ntuplecids;
254
255 /*
256 * On-demand built hash for looking up the above values.
257 */
258 HTAB *tuplecid_hash;
259
260 /*
261 * Hash containing (potentially partial) toast entries. NULL if no toast
262 * tuples have been found for the current change.
263 */
264 HTAB *toast_hash;
265
266 /*
267 * non-hierarchical list of subtransactions that are *not* aborted. Only
268 * used in toplevel transactions.
269 */
270 dlist_head subtxns;
271 uint32 nsubtxns;
272
273 /*
274 * Stored cache invalidations. This is not a linked list because we get
275 * all the invalidations at once.
276 */
277 uint32 ninvalidations;
278 SharedInvalidationMessage *invalidations;
279
280 /* ---
281 * Position in one of three lists:
282 * * list of subtransactions if we are *known* to be subxact
283 * * list of toplevel xacts (can be an as-yet unknown subxact)
284 * * list of preallocated ReorderBufferTXNs (if unused)
285 * ---
286 */
287 dlist_node node;
288
289} ReorderBufferTXN;
290
291/* so we can define the callbacks used inside struct ReorderBuffer itself */
292typedef struct ReorderBuffer ReorderBuffer;
293
294/* change callback signature */
295typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb,
296 ReorderBufferTXN *txn,
297 Relation relation,
298 ReorderBufferChange *change);
299
300/* truncate callback signature */
301typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb,
302 ReorderBufferTXN *txn,
303 int nrelations,
304 Relation relations[],
305 ReorderBufferChange *change);
306
307/* begin callback signature */
308typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
309 ReorderBufferTXN *txn);
310
311/* commit callback signature */
312typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
313 ReorderBufferTXN *txn,
314 XLogRecPtr commit_lsn);
315
316/* message callback signature */
317typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
318 ReorderBufferTXN *txn,
319 XLogRecPtr message_lsn,
320 bool transactional,
321 const char *prefix, Size sz,
322 const char *message);
323
324struct ReorderBuffer
325{
326 /*
327 * xid => ReorderBufferTXN lookup table
328 */
329 HTAB *by_txn;
330
331 /*
332 * Transactions that could be a toplevel xact, ordered by LSN of the first
333 * record bearing that xid.
334 */
335 dlist_head toplevel_by_lsn;
336
337 /*
338 * Transactions and subtransactions that have a base snapshot, ordered by
339 * LSN of the record which caused us to first obtain the base snapshot.
340 * This is not the same as toplevel_by_lsn, because we only set the base
341 * snapshot on the first logical-decoding-relevant record (eg. heap
342 * writes), whereas the initial LSN could be set by other operations.
343 */
344 dlist_head txns_by_base_snapshot_lsn;
345
346 /*
347 * one-entry sized cache for by_txn. Very frequently the same txn gets
348 * looked up over and over again.
349 */
350 TransactionId by_txn_last_xid;
351 ReorderBufferTXN *by_txn_last_txn;
352
353 /*
354 * Callbacks to be called when a transactions commits.
355 */
356 ReorderBufferBeginCB begin;
357 ReorderBufferApplyChangeCB apply_change;
358 ReorderBufferApplyTruncateCB apply_truncate;
359 ReorderBufferCommitCB commit;
360 ReorderBufferMessageCB message;
361
362 /*
363 * Pointer that will be passed untouched to the callbacks.
364 */
365 void *private_data;
366
367 /*
368 * Saved output plugin option
369 */
370 bool output_rewrites;
371
372 /*
373 * Private memory context.
374 */
375 MemoryContext context;
376
377 /*
378 * Memory contexts for specific types objects
379 */
380 MemoryContext change_context;
381 MemoryContext txn_context;
382 MemoryContext tup_context;
383
384 XLogRecPtr current_restart_decoding_lsn;
385
386 /* buffer for disk<->memory conversions */
387 char *outbuf;
388 Size outbufsize;
389};
390
391
392ReorderBuffer *ReorderBufferAllocate(void);
393void ReorderBufferFree(ReorderBuffer *);
394
395ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
396void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
397ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
398void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
399
400Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
401void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
402
403void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
404void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
405 bool transactional, const char *prefix,
406 Size message_size, const char *message);
407void ReorderBufferCommit(ReorderBuffer *, TransactionId,
408 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
409 TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
410void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
411void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
412 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
413void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
414void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
415void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
416
417void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
418void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
419void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
420 CommandId cid);
421void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
422 RelFileNode node, ItemPointerData pt,
423 CommandId cmin, CommandId cmax, CommandId combocid);
424void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
425 Size nmsgs, SharedInvalidationMessage *msgs);
426void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
427 SharedInvalidationMessage *invalidations);
428void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
429void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
430bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
431bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
432
433ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
434TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
435
436void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
437
438void StartupReorderBuffer(void);
439
440#endif
441