1/* -------------------------------------------------------------------------
2 *
3 * decode.c
4 * This module decodes WAL records read using xlogreader.h's APIs for the
5 * purpose of logical decoding by passing information to the
6 * reorderbuffer module (containing the actual changes) and to the
7 * snapbuild module to build a fitting catalog snapshot (to be able to
8 * properly decode the changes in the reorderbuffer).
9 *
10 * NOTE:
11 * This basically tries to handle all low level xlog stuff for
12 * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
13 * specific record's struct is used to pass data along, but those just
14 * happen to contain the right amount of data in a convenient
15 * format. There isn't and shouldn't be much intelligence about the
16 * contents of records in here except turning them into a more usable
17 * format.
18 *
19 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
20 * Portions Copyright (c) 1994, Regents of the University of California
21 *
22 * IDENTIFICATION
23 * src/backend/replication/logical/decode.c
24 *
25 * -------------------------------------------------------------------------
26 */
27#include "postgres.h"
28
29#include "access/heapam.h"
30#include "access/heapam_xlog.h"
31#include "access/transam.h"
32#include "access/xact.h"
33#include "access/xlog_internal.h"
34#include "access/xlogutils.h"
35#include "access/xlogreader.h"
36#include "access/xlogrecord.h"
37
38#include "catalog/pg_control.h"
39
40#include "replication/decode.h"
41#include "replication/logical.h"
42#include "replication/message.h"
43#include "replication/reorderbuffer.h"
44#include "replication/origin.h"
45#include "replication/snapbuild.h"
46
47#include "storage/standby.h"
48
49typedef struct XLogRecordBuffer
50{
51 XLogRecPtr origptr;
52 XLogRecPtr endptr;
53 XLogReaderState *record;
54} XLogRecordBuffer;
55
56/* RMGR Handlers */
57static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
58static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
59static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
60static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
61static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
62static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
63
64/* individual record(group)'s handlers */
65static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
66static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
67static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
68static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
69static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
70static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
71
72static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
73 xl_xact_parsed_commit *parsed, TransactionId xid);
74static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
75 xl_xact_parsed_abort *parsed, TransactionId xid);
76
77/* common function to decode tuples */
78static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
79
80/*
81 * Take every XLogReadRecord()ed record and perform the actions required to
82 * decode it using the output plugin already setup in the logical decoding
83 * context.
84 *
85 * NB: Note that every record's xid needs to be processed by reorderbuffer
86 * (xids contained in the content of records are not relevant for this rule).
87 * That means that for records which'd otherwise not go through the
88 * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
89 * call ReorderBufferProcessXid for each record type by default, because
90 * e.g. empty xacts can be handled more efficiently if there's no previous
91 * state for them.
92 *
93 * We also support the ability to fast forward thru records, skipping some
94 * record types completely - see individual record types for details.
95 */
96void
97LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
98{
99 XLogRecordBuffer buf;
100
101 buf.origptr = ctx->reader->ReadRecPtr;
102 buf.endptr = ctx->reader->EndRecPtr;
103 buf.record = record;
104
105 /* cast so we get a warning when new rmgrs are added */
106 switch ((RmgrIds) XLogRecGetRmid(record))
107 {
108 /*
109 * Rmgrs we care about for logical decoding. Add new rmgrs in
110 * rmgrlist.h's order.
111 */
112 case RM_XLOG_ID:
113 DecodeXLogOp(ctx, &buf);
114 break;
115
116 case RM_XACT_ID:
117 DecodeXactOp(ctx, &buf);
118 break;
119
120 case RM_STANDBY_ID:
121 DecodeStandbyOp(ctx, &buf);
122 break;
123
124 case RM_HEAP2_ID:
125 DecodeHeap2Op(ctx, &buf);
126 break;
127
128 case RM_HEAP_ID:
129 DecodeHeapOp(ctx, &buf);
130 break;
131
132 case RM_LOGICALMSG_ID:
133 DecodeLogicalMsgOp(ctx, &buf);
134 break;
135
136 /*
137 * Rmgrs irrelevant for logical decoding; they describe stuff not
138 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
139 * order.
140 */
141 case RM_SMGR_ID:
142 case RM_CLOG_ID:
143 case RM_DBASE_ID:
144 case RM_TBLSPC_ID:
145 case RM_MULTIXACT_ID:
146 case RM_RELMAP_ID:
147 case RM_BTREE_ID:
148 case RM_HASH_ID:
149 case RM_GIN_ID:
150 case RM_GIST_ID:
151 case RM_SEQ_ID:
152 case RM_SPGIST_ID:
153 case RM_BRIN_ID:
154 case RM_COMMIT_TS_ID:
155 case RM_REPLORIGIN_ID:
156 case RM_GENERIC_ID:
157 /* just deal with xid, and done */
158 ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
159 buf.origptr);
160 break;
161 case RM_NEXT_ID:
162 elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
163 }
164}
165
166/*
167 * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
168 */
169static void
170DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
171{
172 SnapBuild *builder = ctx->snapshot_builder;
173 uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
174
175 ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
176 buf->origptr);
177
178 switch (info)
179 {
180 /* this is also used in END_OF_RECOVERY checkpoints */
181 case XLOG_CHECKPOINT_SHUTDOWN:
182 case XLOG_END_OF_RECOVERY:
183 SnapBuildSerializationPoint(builder, buf->origptr);
184
185 break;
186 case XLOG_CHECKPOINT_ONLINE:
187
188 /*
189 * a RUNNING_XACTS record will have been logged near to this, we
190 * can restart from there.
191 */
192 break;
193 case XLOG_NOOP:
194 case XLOG_NEXTOID:
195 case XLOG_SWITCH:
196 case XLOG_BACKUP_END:
197 case XLOG_PARAMETER_CHANGE:
198 case XLOG_RESTORE_POINT:
199 case XLOG_FPW_CHANGE:
200 case XLOG_FPI_FOR_HINT:
201 case XLOG_FPI:
202 break;
203 default:
204 elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
205 }
206}
207
208/*
209 * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
210 */
211static void
212DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
213{
214 SnapBuild *builder = ctx->snapshot_builder;
215 ReorderBuffer *reorder = ctx->reorder;
216 XLogReaderState *r = buf->record;
217 uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
218
219 /*
220 * If the snapshot isn't yet fully built, we cannot decode anything, so
221 * bail out.
222 *
223 * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
224 * when the snapshot is being built: it is possible to get later records
225 * that require subxids to be properly assigned.
226 */
227 if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
228 info != XLOG_XACT_ASSIGNMENT)
229 return;
230
231 switch (info)
232 {
233 case XLOG_XACT_COMMIT:
234 case XLOG_XACT_COMMIT_PREPARED:
235 {
236 xl_xact_commit *xlrec;
237 xl_xact_parsed_commit parsed;
238 TransactionId xid;
239
240 xlrec = (xl_xact_commit *) XLogRecGetData(r);
241 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
242
243 if (!TransactionIdIsValid(parsed.twophase_xid))
244 xid = XLogRecGetXid(r);
245 else
246 xid = parsed.twophase_xid;
247
248 DecodeCommit(ctx, buf, &parsed, xid);
249 break;
250 }
251 case XLOG_XACT_ABORT:
252 case XLOG_XACT_ABORT_PREPARED:
253 {
254 xl_xact_abort *xlrec;
255 xl_xact_parsed_abort parsed;
256 TransactionId xid;
257
258 xlrec = (xl_xact_abort *) XLogRecGetData(r);
259 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
260
261 if (!TransactionIdIsValid(parsed.twophase_xid))
262 xid = XLogRecGetXid(r);
263 else
264 xid = parsed.twophase_xid;
265
266 DecodeAbort(ctx, buf, &parsed, xid);
267 break;
268 }
269 case XLOG_XACT_ASSIGNMENT:
270 {
271 xl_xact_assignment *xlrec;
272 int i;
273 TransactionId *sub_xid;
274
275 xlrec = (xl_xact_assignment *) XLogRecGetData(r);
276
277 sub_xid = &xlrec->xsub[0];
278
279 for (i = 0; i < xlrec->nsubxacts; i++)
280 {
281 ReorderBufferAssignChild(reorder, xlrec->xtop,
282 *(sub_xid++), buf->origptr);
283 }
284 break;
285 }
286 case XLOG_XACT_PREPARE:
287
288 /*
289 * Currently decoding ignores PREPARE TRANSACTION and will just
290 * decode the transaction when the COMMIT PREPARED is sent or
291 * throw away the transaction's contents when a ROLLBACK PREPARED
292 * is received. In the future we could add code to expose prepared
293 * transactions in the changestream allowing for a kind of
294 * distributed 2PC.
295 */
296 ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
297 break;
298 default:
299 elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
300 }
301}
302
303/*
304 * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
305 */
306static void
307DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
308{
309 SnapBuild *builder = ctx->snapshot_builder;
310 XLogReaderState *r = buf->record;
311 uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
312
313 ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
314
315 switch (info)
316 {
317 case XLOG_RUNNING_XACTS:
318 {
319 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
320
321 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
322
323 /*
324 * Abort all transactions that we keep track of, that are
325 * older than the record's oldestRunningXid. This is the most
326 * convenient spot for doing so since, in contrast to shutdown
327 * or end-of-recovery checkpoints, we have information about
328 * all running transactions which includes prepared ones,
329 * while shutdown checkpoints just know that no non-prepared
330 * transactions are in progress.
331 */
332 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
333 }
334 break;
335 case XLOG_STANDBY_LOCK:
336 break;
337 case XLOG_INVALIDATIONS:
338 {
339 xl_invalidations *invalidations =
340 (xl_invalidations *) XLogRecGetData(r);
341
342 if (!ctx->fast_forward)
343 ReorderBufferImmediateInvalidation(ctx->reorder,
344 invalidations->nmsgs,
345 invalidations->msgs);
346 }
347 break;
348 default:
349 elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
350 }
351}
352
353/*
354 * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
355 */
356static void
357DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
358{
359 uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
360 TransactionId xid = XLogRecGetXid(buf->record);
361 SnapBuild *builder = ctx->snapshot_builder;
362
363 ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
364
365 /*
366 * If we don't have snapshot or we are just fast-forwarding, there is no
367 * point in decoding changes.
368 */
369 if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
370 ctx->fast_forward)
371 return;
372
373 switch (info)
374 {
375 case XLOG_HEAP2_MULTI_INSERT:
376 if (!ctx->fast_forward &&
377 SnapBuildProcessChange(builder, xid, buf->origptr))
378 DecodeMultiInsert(ctx, buf);
379 break;
380 case XLOG_HEAP2_NEW_CID:
381 {
382 xl_heap_new_cid *xlrec;
383
384 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
385 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
386
387 break;
388 }
389 case XLOG_HEAP2_REWRITE:
390
391 /*
392 * Although these records only exist to serve the needs of logical
393 * decoding, all the work happens as part of crash or archive
394 * recovery, so we don't need to do anything here.
395 */
396 break;
397
398 /*
399 * Everything else here is just low level physical stuff we're not
400 * interested in.
401 */
402 case XLOG_HEAP2_FREEZE_PAGE:
403 case XLOG_HEAP2_CLEAN:
404 case XLOG_HEAP2_CLEANUP_INFO:
405 case XLOG_HEAP2_VISIBLE:
406 case XLOG_HEAP2_LOCK_UPDATED:
407 break;
408 default:
409 elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
410 }
411}
412
413/*
414 * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
415 */
416static void
417DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
418{
419 uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
420 TransactionId xid = XLogRecGetXid(buf->record);
421 SnapBuild *builder = ctx->snapshot_builder;
422
423 ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
424
425 /*
426 * If we don't have snapshot or we are just fast-forwarding, there is no
427 * point in decoding data changes.
428 */
429 if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
430 ctx->fast_forward)
431 return;
432
433 switch (info)
434 {
435 case XLOG_HEAP_INSERT:
436 if (SnapBuildProcessChange(builder, xid, buf->origptr))
437 DecodeInsert(ctx, buf);
438 break;
439
440 /*
441 * Treat HOT update as normal updates. There is no useful
442 * information in the fact that we could make it a HOT update
443 * locally and the WAL layout is compatible.
444 */
445 case XLOG_HEAP_HOT_UPDATE:
446 case XLOG_HEAP_UPDATE:
447 if (SnapBuildProcessChange(builder, xid, buf->origptr))
448 DecodeUpdate(ctx, buf);
449 break;
450
451 case XLOG_HEAP_DELETE:
452 if (SnapBuildProcessChange(builder, xid, buf->origptr))
453 DecodeDelete(ctx, buf);
454 break;
455
456 case XLOG_HEAP_TRUNCATE:
457 if (SnapBuildProcessChange(builder, xid, buf->origptr))
458 DecodeTruncate(ctx, buf);
459 break;
460
461 case XLOG_HEAP_INPLACE:
462
463 /*
464 * Inplace updates are only ever performed on catalog tuples and
465 * can, per definition, not change tuple visibility. Since we
466 * don't decode catalog tuples, we're not interested in the
467 * record's contents.
468 *
469 * In-place updates can be used either by XID-bearing transactions
470 * (e.g. in CREATE INDEX CONCURRENTLY) or by XID-less
471 * transactions (e.g. VACUUM). In the former case, the commit
472 * record will include cache invalidations, so we mark the
473 * transaction as catalog modifying here. Currently that's
474 * redundant because the commit will do that as well, but once we
475 * support decoding in-progress relations, this will be important.
476 */
477 if (!TransactionIdIsValid(xid))
478 break;
479
480 SnapBuildProcessChange(builder, xid, buf->origptr);
481 ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
482 break;
483
484 case XLOG_HEAP_CONFIRM:
485 if (SnapBuildProcessChange(builder, xid, buf->origptr))
486 DecodeSpecConfirm(ctx, buf);
487 break;
488
489 case XLOG_HEAP_LOCK:
490 /* we don't care about row level locks for now */
491 break;
492
493 default:
494 elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
495 break;
496 }
497}
498
499static inline bool
500FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
501{
502 if (ctx->callbacks.filter_by_origin_cb == NULL)
503 return false;
504
505 return filter_by_origin_cb_wrapper(ctx, origin_id);
506}
507
508/*
509 * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
510 */
511static void
512DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
513{
514 SnapBuild *builder = ctx->snapshot_builder;
515 XLogReaderState *r = buf->record;
516 TransactionId xid = XLogRecGetXid(r);
517 uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
518 RepOriginId origin_id = XLogRecGetOrigin(r);
519 Snapshot snapshot;
520 xl_logical_message *message;
521
522 if (info != XLOG_LOGICAL_MESSAGE)
523 elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
524
525 ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
526
527 /*
528 * If we don't have snapshot or we are just fast-forwarding, there is no
529 * point in decoding messages.
530 */
531 if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
532 ctx->fast_forward)
533 return;
534
535 message = (xl_logical_message *) XLogRecGetData(r);
536
537 if (message->dbId != ctx->slot->data.database ||
538 FilterByOrigin(ctx, origin_id))
539 return;
540
541 if (message->transactional &&
542 !SnapBuildProcessChange(builder, xid, buf->origptr))
543 return;
544 else if (!message->transactional &&
545 (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
546 SnapBuildXactNeedsSkip(builder, buf->origptr)))
547 return;
548
549 snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
550 ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
551 message->transactional,
552 message->message, /* first part of message is
553 * prefix */
554 message->message_size,
555 message->message + message->prefix_size);
556}
557
558/*
559 * Consolidated commit record handling between the different form of commit
560 * records.
561 */
562static void
563DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
564 xl_xact_parsed_commit *parsed, TransactionId xid)
565{
566 XLogRecPtr origin_lsn = InvalidXLogRecPtr;
567 TimestampTz commit_time = parsed->xact_time;
568 RepOriginId origin_id = XLogRecGetOrigin(buf->record);
569 int i;
570
571 if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
572 {
573 origin_lsn = parsed->origin_lsn;
574 commit_time = parsed->origin_timestamp;
575 }
576
577 /*
578 * Process invalidation messages, even if we're not interested in the
579 * transaction's contents, since the various caches need to always be
580 * consistent.
581 */
582 if (parsed->nmsgs > 0)
583 {
584 if (!ctx->fast_forward)
585 ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
586 parsed->nmsgs, parsed->msgs);
587 ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
588 }
589
590 SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
591 parsed->nsubxacts, parsed->subxacts);
592
593 /* ----
594 * Check whether we are interested in this specific transaction, and tell
595 * the reorderbuffer to forget the content of the (sub-)transactions
596 * if not.
597 *
598 * There can be several reasons we might not be interested in this
599 * transaction:
600 * 1) We might not be interested in decoding transactions up to this
601 * LSN. This can happen because we previously decoded it and now just
602 * are restarting or if we haven't assembled a consistent snapshot yet.
603 * 2) The transaction happened in another database.
604 * 3) The output plugin is not interested in the origin.
605 * 4) We are doing fast-forwarding
606 *
607 * We can't just use ReorderBufferAbort() here, because we need to execute
608 * the transaction's invalidations. This currently won't be needed if
609 * we're just skipping over the transaction because currently we only do
610 * so during startup, to get to the first transaction the client needs. As
611 * we have reset the catalog caches before starting to read WAL, and we
612 * haven't yet touched any catalogs, there can't be anything to invalidate.
613 * But if we're "forgetting" this commit because it's it happened in
614 * another database, the invalidations might be important, because they
615 * could be for shared catalogs and we might have loaded data into the
616 * relevant syscaches.
617 * ---
618 */
619 if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
620 (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
621 ctx->fast_forward || FilterByOrigin(ctx, origin_id))
622 {
623 for (i = 0; i < parsed->nsubxacts; i++)
624 {
625 ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
626 }
627 ReorderBufferForget(ctx->reorder, xid, buf->origptr);
628
629 return;
630 }
631
632 /* tell the reorderbuffer about the surviving subtransactions */
633 for (i = 0; i < parsed->nsubxacts; i++)
634 {
635 ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
636 buf->origptr, buf->endptr);
637 }
638
639 /* replay actions of all transaction + subtransactions in order */
640 ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
641 commit_time, origin_id, origin_lsn);
642}
643
644/*
645 * Get the data from the various forms of abort records and pass it on to
646 * snapbuild.c and reorderbuffer.c
647 */
648static void
649DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
650 xl_xact_parsed_abort *parsed, TransactionId xid)
651{
652 int i;
653
654 for (i = 0; i < parsed->nsubxacts; i++)
655 {
656 ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
657 buf->record->EndRecPtr);
658 }
659
660 ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
661}
662
663/*
664 * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
665 *
666 * Deletes can contain the new tuple.
667 */
668static void
669DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
670{
671 Size datalen;
672 char *tupledata;
673 Size tuplelen;
674 XLogReaderState *r = buf->record;
675 xl_heap_insert *xlrec;
676 ReorderBufferChange *change;
677 RelFileNode target_node;
678
679 xlrec = (xl_heap_insert *) XLogRecGetData(r);
680
681 /*
682 * Ignore insert records without new tuples (this does happen when
683 * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
684 */
685 if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
686 return;
687
688 /* only interested in our database */
689 XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
690 if (target_node.dbNode != ctx->slot->data.database)
691 return;
692
693 /* output plugin doesn't look for this origin, no need to queue */
694 if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
695 return;
696
697 change = ReorderBufferGetChange(ctx->reorder);
698 if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
699 change->action = REORDER_BUFFER_CHANGE_INSERT;
700 else
701 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
702 change->origin_id = XLogRecGetOrigin(r);
703
704 memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
705
706 tupledata = XLogRecGetBlockData(r, 0, &datalen);
707 tuplelen = datalen - SizeOfHeapHeader;
708
709 change->data.tp.newtuple =
710 ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
711
712 DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
713
714 change->data.tp.clear_toast_afterwards = true;
715
716 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
717}
718
719/*
720 * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
721 * in the record, from wal into proper tuplebufs.
722 *
723 * Updates can possibly contain a new tuple and the old primary key.
724 */
725static void
726DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
727{
728 XLogReaderState *r = buf->record;
729 xl_heap_update *xlrec;
730 ReorderBufferChange *change;
731 char *data;
732 RelFileNode target_node;
733
734 xlrec = (xl_heap_update *) XLogRecGetData(r);
735
736 /* only interested in our database */
737 XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
738 if (target_node.dbNode != ctx->slot->data.database)
739 return;
740
741 /* output plugin doesn't look for this origin, no need to queue */
742 if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
743 return;
744
745 change = ReorderBufferGetChange(ctx->reorder);
746 change->action = REORDER_BUFFER_CHANGE_UPDATE;
747 change->origin_id = XLogRecGetOrigin(r);
748 memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
749
750 if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
751 {
752 Size datalen;
753 Size tuplelen;
754
755 data = XLogRecGetBlockData(r, 0, &datalen);
756
757 tuplelen = datalen - SizeOfHeapHeader;
758
759 change->data.tp.newtuple =
760 ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
761
762 DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
763 }
764
765 if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
766 {
767 Size datalen;
768 Size tuplelen;
769
770 /* caution, remaining data in record is not aligned */
771 data = XLogRecGetData(r) + SizeOfHeapUpdate;
772 datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
773 tuplelen = datalen - SizeOfHeapHeader;
774
775 change->data.tp.oldtuple =
776 ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
777
778 DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
779 }
780
781 change->data.tp.clear_toast_afterwards = true;
782
783 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
784}
785
786/*
787 * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
788 *
789 * Deletes can possibly contain the old primary key.
790 */
791static void
792DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
793{
794 XLogReaderState *r = buf->record;
795 xl_heap_delete *xlrec;
796 ReorderBufferChange *change;
797 RelFileNode target_node;
798
799 xlrec = (xl_heap_delete *) XLogRecGetData(r);
800
801 /* only interested in our database */
802 XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
803 if (target_node.dbNode != ctx->slot->data.database)
804 return;
805
806 /*
807 * Super deletions are irrelevant for logical decoding, it's driven by the
808 * confirmation records.
809 */
810 if (xlrec->flags & XLH_DELETE_IS_SUPER)
811 return;
812
813 /* output plugin doesn't look for this origin, no need to queue */
814 if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
815 return;
816
817 change = ReorderBufferGetChange(ctx->reorder);
818 change->action = REORDER_BUFFER_CHANGE_DELETE;
819 change->origin_id = XLogRecGetOrigin(r);
820
821 memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
822
823 /* old primary key stored */
824 if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
825 {
826 Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
827 Size tuplelen = datalen - SizeOfHeapHeader;
828
829 Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
830
831 change->data.tp.oldtuple =
832 ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
833
834 DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
835 datalen, change->data.tp.oldtuple);
836 }
837
838 change->data.tp.clear_toast_afterwards = true;
839
840 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
841}
842
843/*
844 * Parse XLOG_HEAP_TRUNCATE from wal
845 */
846static void
847DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
848{
849 XLogReaderState *r = buf->record;
850 xl_heap_truncate *xlrec;
851 ReorderBufferChange *change;
852
853 xlrec = (xl_heap_truncate *) XLogRecGetData(r);
854
855 /* only interested in our database */
856 if (xlrec->dbId != ctx->slot->data.database)
857 return;
858
859 /* output plugin doesn't look for this origin, no need to queue */
860 if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
861 return;
862
863 change = ReorderBufferGetChange(ctx->reorder);
864 change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
865 change->origin_id = XLogRecGetOrigin(r);
866 if (xlrec->flags & XLH_TRUNCATE_CASCADE)
867 change->data.truncate.cascade = true;
868 if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
869 change->data.truncate.restart_seqs = true;
870 change->data.truncate.nrelids = xlrec->nrelids;
871 change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
872 xlrec->nrelids);
873 memcpy(change->data.truncate.relids, xlrec->relids,
874 xlrec->nrelids * sizeof(Oid));
875 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
876 buf->origptr, change);
877}
878
879/*
880 * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
881 *
882 * Currently MULTI_INSERT will always contain the full tuples.
883 */
884static void
885DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
886{
887 XLogReaderState *r = buf->record;
888 xl_heap_multi_insert *xlrec;
889 int i;
890 char *data;
891 char *tupledata;
892 Size tuplelen;
893 RelFileNode rnode;
894
895 xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
896
897 /* only interested in our database */
898 XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
899 if (rnode.dbNode != ctx->slot->data.database)
900 return;
901
902 /* output plugin doesn't look for this origin, no need to queue */
903 if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
904 return;
905
906 tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
907
908 data = tupledata;
909 for (i = 0; i < xlrec->ntuples; i++)
910 {
911 ReorderBufferChange *change;
912 xl_multi_insert_tuple *xlhdr;
913 int datalen;
914 ReorderBufferTupleBuf *tuple;
915
916 change = ReorderBufferGetChange(ctx->reorder);
917 change->action = REORDER_BUFFER_CHANGE_INSERT;
918 change->origin_id = XLogRecGetOrigin(r);
919
920 memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
921
922 /*
923 * CONTAINS_NEW_TUPLE will always be set currently as multi_insert
924 * isn't used for catalogs, but better be future proof.
925 *
926 * We decode the tuple in pretty much the same way as DecodeXLogTuple,
927 * but since the layout is slightly different, we can't use it here.
928 */
929 if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
930 {
931 HeapTupleHeader header;
932
933 xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
934 data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
935 datalen = xlhdr->datalen;
936
937 change->data.tp.newtuple =
938 ReorderBufferGetTupleBuf(ctx->reorder, datalen);
939
940 tuple = change->data.tp.newtuple;
941 header = tuple->tuple.t_data;
942
943 /* not a disk based tuple */
944 ItemPointerSetInvalid(&tuple->tuple.t_self);
945
946 /*
947 * We can only figure this out after reassembling the
948 * transactions.
949 */
950 tuple->tuple.t_tableOid = InvalidOid;
951
952 tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
953
954 memset(header, 0, SizeofHeapTupleHeader);
955
956 memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
957 (char *) data,
958 datalen);
959 data += datalen;
960
961 header->t_infomask = xlhdr->t_infomask;
962 header->t_infomask2 = xlhdr->t_infomask2;
963 header->t_hoff = xlhdr->t_hoff;
964 }
965
966 /*
967 * Reset toast reassembly state only after the last row in the last
968 * xl_multi_insert_tuple record emitted by one heap_multi_insert()
969 * call.
970 */
971 if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
972 (i + 1) == xlrec->ntuples)
973 change->data.tp.clear_toast_afterwards = true;
974 else
975 change->data.tp.clear_toast_afterwards = false;
976
977 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
978 buf->origptr, change);
979 }
980 Assert(data == tupledata + tuplelen);
981}
982
983/*
984 * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
985 *
986 * This is pretty trivial, all the state essentially already setup by the
987 * speculative insertion.
988 */
989static void
990DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
991{
992 XLogReaderState *r = buf->record;
993 ReorderBufferChange *change;
994 RelFileNode target_node;
995
996 /* only interested in our database */
997 XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
998 if (target_node.dbNode != ctx->slot->data.database)
999 return;
1000
1001 /* output plugin doesn't look for this origin, no need to queue */
1002 if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1003 return;
1004
1005 change = ReorderBufferGetChange(ctx->reorder);
1006 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
1007 change->origin_id = XLogRecGetOrigin(r);
1008
1009 memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
1010
1011 change->data.tp.clear_toast_afterwards = true;
1012
1013 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
1014}
1015
1016
1017/*
1018 * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
1019 * (but not by heap_multi_insert) into a tuplebuf.
1020 *
1021 * The size 'len' and the pointer 'data' in the record need to be
1022 * computed outside as they are record specific.
1023 */
1024static void
1025DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
1026{
1027 xl_heap_header xlhdr;
1028 int datalen = len - SizeOfHeapHeader;
1029 HeapTupleHeader header;
1030
1031 Assert(datalen >= 0);
1032
1033 tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
1034 header = tuple->tuple.t_data;
1035
1036 /* not a disk based tuple */
1037 ItemPointerSetInvalid(&tuple->tuple.t_self);
1038
1039 /* we can only figure this out after reassembling the transactions */
1040 tuple->tuple.t_tableOid = InvalidOid;
1041
1042 /* data is not stored aligned, copy to aligned storage */
1043 memcpy((char *) &xlhdr,
1044 data,
1045 SizeOfHeapHeader);
1046
1047 memset(header, 0, SizeofHeapTupleHeader);
1048
1049 memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
1050 data + SizeOfHeapHeader,
1051 datalen);
1052
1053 header->t_infomask = xlhdr.t_infomask;
1054 header->t_infomask2 = xlhdr.t_infomask2;
1055 header->t_hoff = xlhdr.t_hoff;
1056}
1057