1/*-------------------------------------------------------------------------
2 *
3 * reorderbuffer.c
4 * PostgreSQL logical replay/reorder buffer management
5 *
6 *
7 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/reorderbuffer.c
12 *
13 * NOTES
14 * This module gets handed individual pieces of transactions in the order
15 * they are written to the WAL and is responsible to reassemble them into
16 * toplevel transaction sized pieces. When a transaction is completely
17 * reassembled - signalled by reading the transaction commit record - it
18 * will then call the output plugin (cf. ReorderBufferCommit()) with the
19 * individual changes. The output plugins rely on snapshots built by
20 * snapbuild.c which hands them to us.
21 *
22 * Transactions and subtransactions/savepoints in postgres are not
23 * immediately linked to each other from outside the performing
24 * backend. Only at commit/abort (or special xact_assignment records) they
25 * are linked together. Which means that we will have to splice together a
26 * toplevel transaction from its subtransactions. To do that efficiently we
27 * build a binary heap indexed by the smallest current lsn of the individual
28 * subtransactions' changestreams. As the individual streams are inherently
29 * ordered by LSN - since that is where we build them from - the transaction
30 * can easily be reassembled by always using the subtransaction with the
31 * smallest current LSN from the heap.
32 *
33 * In order to cope with large transactions - which can be several times as
34 * big as the available memory - this module supports spooling the contents
35 * of a large transactions to disk. When the transaction is replayed the
36 * contents of individual (sub-)transactions will be read from disk in
37 * chunks.
38 *
39 * This module also has to deal with reassembling toast records from the
40 * individual chunks stored in WAL. When a new (or initial) version of a
41 * tuple is stored in WAL it will always be preceded by the toast chunks
42 * emitted for the columns stored out of line. Within a single toplevel
43 * transaction there will be no other data carrying records between a row's
44 * toast chunks and the row data itself. See ReorderBufferToast* for
45 * details.
46 *
47 * ReorderBuffer uses two special memory context types - SlabContext for
48 * allocations of fixed-length structures (changes and transactions), and
49 * GenerationContext for the variable-length transaction data (allocated
50 * and freed in groups with similar lifespan).
51 *
52 * -------------------------------------------------------------------------
53 */
54#include "postgres.h"
55
56#include <unistd.h>
57#include <sys/stat.h>
58
59#include "access/heapam.h"
60#include "access/rewriteheap.h"
61#include "access/transam.h"
62#include "access/tuptoaster.h"
63#include "access/xact.h"
64#include "access/xlog_internal.h"
65#include "catalog/catalog.h"
66#include "lib/binaryheap.h"
67#include "miscadmin.h"
68#include "pgstat.h"
69#include "replication/logical.h"
70#include "replication/reorderbuffer.h"
71#include "replication/slot.h"
72#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
73#include "storage/bufmgr.h"
74#include "storage/fd.h"
75#include "storage/sinval.h"
76#include "utils/builtins.h"
77#include "utils/combocid.h"
78#include "utils/memdebug.h"
79#include "utils/memutils.h"
80#include "utils/rel.h"
81#include "utils/relfilenodemap.h"
82
83
84/* entry for a hash table we use to map from xid to our transaction state */
85typedef struct ReorderBufferTXNByIdEnt
86{
87 TransactionId xid;
88 ReorderBufferTXN *txn;
89} ReorderBufferTXNByIdEnt;
90
91/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
92typedef struct ReorderBufferTupleCidKey
93{
94 RelFileNode relnode;
95 ItemPointerData tid;
96} ReorderBufferTupleCidKey;
97
98typedef struct ReorderBufferTupleCidEnt
99{
100 ReorderBufferTupleCidKey key;
101 CommandId cmin;
102 CommandId cmax;
103 CommandId combocid; /* just for debugging */
104} ReorderBufferTupleCidEnt;
105
106/* k-way in-order change iteration support structures */
107typedef struct ReorderBufferIterTXNEntry
108{
109 XLogRecPtr lsn;
110 ReorderBufferChange *change;
111 ReorderBufferTXN *txn;
112 int fd;
113 XLogSegNo segno;
114} ReorderBufferIterTXNEntry;
115
116typedef struct ReorderBufferIterTXNState
117{
118 binaryheap *heap;
119 Size nr_txns;
120 dlist_head old_change;
121 ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
122} ReorderBufferIterTXNState;
123
124/* toast datastructures */
125typedef struct ReorderBufferToastEnt
126{
127 Oid chunk_id; /* toast_table.chunk_id */
128 int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
129 * have seen */
130 Size num_chunks; /* number of chunks we've already seen */
131 Size size; /* combined size of chunks seen */
132 dlist_head chunks; /* linked list of chunks */
133 struct varlena *reconstructed; /* reconstructed varlena now pointed to in
134 * main tup */
135} ReorderBufferToastEnt;
136
137/* Disk serialization support datastructures */
138typedef struct ReorderBufferDiskChange
139{
140 Size size;
141 ReorderBufferChange change;
142 /* data follows */
143} ReorderBufferDiskChange;
144
145/*
146 * Maximum number of changes kept in memory, per transaction. After that,
147 * changes are spooled to disk.
148 *
149 * The current value should be sufficient to decode the entire transaction
150 * without hitting disk in OLTP workloads, while starting to spool to disk in
151 * other workloads reasonably fast.
152 *
153 * At some point in the future it probably makes sense to have a more elaborate
154 * resource management here, but it's not entirely clear what that would look
155 * like.
156 */
157static const Size max_changes_in_memory = 4096;
158
159/* ---------------------------------------
160 * primary reorderbuffer support routines
161 * ---------------------------------------
162 */
163static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
164static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
165static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
166 TransactionId xid, bool create, bool *is_new,
167 XLogRecPtr lsn, bool create_as_top);
168static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
169 ReorderBufferTXN *subtxn);
170
171static void AssertTXNLsnOrder(ReorderBuffer *rb);
172
173/* ---------------------------------------
174 * support functions for lsn-order iterating over the ->changes of a
175 * transaction and its subtransactions
176 *
177 * used for iteration over the k-way heap merge of a transaction and its
178 * subtransactions
179 * ---------------------------------------
180 */
181static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
182static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
183static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
184 ReorderBufferIterTXNState *state);
185static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
186
187/*
188 * ---------------------------------------
189 * Disk serialization support functions
190 * ---------------------------------------
191 */
192static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
193static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
194static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
195 int fd, ReorderBufferChange *change);
196static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
197 int *fd, XLogSegNo *segno);
198static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
199 char *change);
200static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
201static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
202static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
203 TransactionId xid, XLogSegNo segno);
204
205static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
206static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
207 ReorderBufferTXN *txn, CommandId cid);
208
209/* ---------------------------------------
210 * toast reassembly support
211 * ---------------------------------------
212 */
213static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
214static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
215static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
216 Relation relation, ReorderBufferChange *change);
217static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
218 Relation relation, ReorderBufferChange *change);
219
220
221/*
222 * Allocate a new ReorderBuffer and clean out any old serialized state from
223 * prior ReorderBuffer instances for the same slot.
224 */
225ReorderBuffer *
226ReorderBufferAllocate(void)
227{
228 ReorderBuffer *buffer;
229 HASHCTL hash_ctl;
230 MemoryContext new_ctx;
231
232 Assert(MyReplicationSlot != NULL);
233
234 /* allocate memory in own context, to have better accountability */
235 new_ctx = AllocSetContextCreate(CurrentMemoryContext,
236 "ReorderBuffer",
237 ALLOCSET_DEFAULT_SIZES);
238
239 buffer =
240 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
241
242 memset(&hash_ctl, 0, sizeof(hash_ctl));
243
244 buffer->context = new_ctx;
245
246 buffer->change_context = SlabContextCreate(new_ctx,
247 "Change",
248 SLAB_DEFAULT_BLOCK_SIZE,
249 sizeof(ReorderBufferChange));
250
251 buffer->txn_context = SlabContextCreate(new_ctx,
252 "TXN",
253 SLAB_DEFAULT_BLOCK_SIZE,
254 sizeof(ReorderBufferTXN));
255
256 buffer->tup_context = GenerationContextCreate(new_ctx,
257 "Tuples",
258 SLAB_LARGE_BLOCK_SIZE);
259
260 hash_ctl.keysize = sizeof(TransactionId);
261 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
262 hash_ctl.hcxt = buffer->context;
263
264 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
265 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
266
267 buffer->by_txn_last_xid = InvalidTransactionId;
268 buffer->by_txn_last_txn = NULL;
269
270 buffer->outbuf = NULL;
271 buffer->outbufsize = 0;
272
273 buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
274
275 dlist_init(&buffer->toplevel_by_lsn);
276 dlist_init(&buffer->txns_by_base_snapshot_lsn);
277
278 /*
279 * Ensure there's no stale data from prior uses of this slot, in case some
280 * prior exit avoided calling ReorderBufferFree. Failure to do this can
281 * produce duplicated txns, and it's very cheap if there's nothing there.
282 */
283 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
284
285 return buffer;
286}
287
288/*
289 * Free a ReorderBuffer
290 */
291void
292ReorderBufferFree(ReorderBuffer *rb)
293{
294 MemoryContext context = rb->context;
295
296 /*
297 * We free separately allocated data by entirely scrapping reorderbuffer's
298 * memory context.
299 */
300 MemoryContextDelete(context);
301
302 /* Free disk space used by unconsumed reorder buffers */
303 ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
304}
305
306/*
307 * Get an unused, possibly preallocated, ReorderBufferTXN.
308 */
309static ReorderBufferTXN *
310ReorderBufferGetTXN(ReorderBuffer *rb)
311{
312 ReorderBufferTXN *txn;
313
314 txn = (ReorderBufferTXN *)
315 MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
316
317 memset(txn, 0, sizeof(ReorderBufferTXN));
318
319 dlist_init(&txn->changes);
320 dlist_init(&txn->tuplecids);
321 dlist_init(&txn->subtxns);
322
323 return txn;
324}
325
326/*
327 * Free a ReorderBufferTXN.
328 */
329static void
330ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
331{
332 /* clean the lookup cache if we were cached (quite likely) */
333 if (rb->by_txn_last_xid == txn->xid)
334 {
335 rb->by_txn_last_xid = InvalidTransactionId;
336 rb->by_txn_last_txn = NULL;
337 }
338
339 /* free data that's contained */
340
341 if (txn->tuplecid_hash != NULL)
342 {
343 hash_destroy(txn->tuplecid_hash);
344 txn->tuplecid_hash = NULL;
345 }
346
347 if (txn->invalidations)
348 {
349 pfree(txn->invalidations);
350 txn->invalidations = NULL;
351 }
352
353 pfree(txn);
354}
355
356/*
357 * Get an fresh ReorderBufferChange.
358 */
359ReorderBufferChange *
360ReorderBufferGetChange(ReorderBuffer *rb)
361{
362 ReorderBufferChange *change;
363
364 change = (ReorderBufferChange *)
365 MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
366
367 memset(change, 0, sizeof(ReorderBufferChange));
368 return change;
369}
370
371/*
372 * Free an ReorderBufferChange.
373 */
374void
375ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
376{
377 /* free contained data */
378 switch (change->action)
379 {
380 case REORDER_BUFFER_CHANGE_INSERT:
381 case REORDER_BUFFER_CHANGE_UPDATE:
382 case REORDER_BUFFER_CHANGE_DELETE:
383 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
384 if (change->data.tp.newtuple)
385 {
386 ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
387 change->data.tp.newtuple = NULL;
388 }
389
390 if (change->data.tp.oldtuple)
391 {
392 ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
393 change->data.tp.oldtuple = NULL;
394 }
395 break;
396 case REORDER_BUFFER_CHANGE_MESSAGE:
397 if (change->data.msg.prefix != NULL)
398 pfree(change->data.msg.prefix);
399 change->data.msg.prefix = NULL;
400 if (change->data.msg.message != NULL)
401 pfree(change->data.msg.message);
402 change->data.msg.message = NULL;
403 break;
404 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
405 if (change->data.snapshot)
406 {
407 ReorderBufferFreeSnap(rb, change->data.snapshot);
408 change->data.snapshot = NULL;
409 }
410 break;
411 /* no data in addition to the struct itself */
412 case REORDER_BUFFER_CHANGE_TRUNCATE:
413 if (change->data.truncate.relids != NULL)
414 {
415 ReorderBufferReturnRelids(rb, change->data.truncate.relids);
416 change->data.truncate.relids = NULL;
417 }
418 break;
419 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
420 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
421 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
422 break;
423 }
424
425 pfree(change);
426}
427
428/*
429 * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
430 * tuple_len (excluding header overhead).
431 */
432ReorderBufferTupleBuf *
433ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
434{
435 ReorderBufferTupleBuf *tuple;
436 Size alloc_len;
437
438 alloc_len = tuple_len + SizeofHeapTupleHeader;
439
440 tuple = (ReorderBufferTupleBuf *)
441 MemoryContextAlloc(rb->tup_context,
442 sizeof(ReorderBufferTupleBuf) +
443 MAXIMUM_ALIGNOF + alloc_len);
444 tuple->alloc_tuple_size = alloc_len;
445 tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
446
447 return tuple;
448}
449
450/*
451 * Free an ReorderBufferTupleBuf.
452 */
453void
454ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
455{
456 pfree(tuple);
457}
458
459/*
460 * Get an array for relids of truncated relations.
461 *
462 * We use the global memory context (for the whole reorder buffer), because
463 * none of the existing ones seems like a good match (some are SLAB, so we
464 * can't use those, and tup_context is meant for tuple data, not relids). We
465 * could add yet another context, but it seems like an overkill - TRUNCATE is
466 * not particularly common operation, so it does not seem worth it.
467 */
468Oid *
469ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
470{
471 Oid *relids;
472 Size alloc_len;
473
474 alloc_len = sizeof(Oid) * nrelids;
475
476 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
477
478 return relids;
479}
480
481/*
482 * Free an array of relids.
483 */
484void
485ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
486{
487 pfree(relids);
488}
489
490/*
491 * Return the ReorderBufferTXN from the given buffer, specified by Xid.
492 * If create is true, and a transaction doesn't already exist, create it
493 * (with the given LSN, and as top transaction if that's specified);
494 * when this happens, is_new is set to true.
495 */
496static ReorderBufferTXN *
497ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
498 bool *is_new, XLogRecPtr lsn, bool create_as_top)
499{
500 ReorderBufferTXN *txn;
501 ReorderBufferTXNByIdEnt *ent;
502 bool found;
503
504 Assert(TransactionIdIsValid(xid));
505
506 /*
507 * Check the one-entry lookup cache first
508 */
509 if (TransactionIdIsValid(rb->by_txn_last_xid) &&
510 rb->by_txn_last_xid == xid)
511 {
512 txn = rb->by_txn_last_txn;
513
514 if (txn != NULL)
515 {
516 /* found it, and it's valid */
517 if (is_new)
518 *is_new = false;
519 return txn;
520 }
521
522 /*
523 * cached as non-existent, and asked not to create? Then nothing else
524 * to do.
525 */
526 if (!create)
527 return NULL;
528 /* otherwise fall through to create it */
529 }
530
531 /*
532 * If the cache wasn't hit or it yielded an "does-not-exist" and we want
533 * to create an entry.
534 */
535
536 /* search the lookup table */
537 ent = (ReorderBufferTXNByIdEnt *)
538 hash_search(rb->by_txn,
539 (void *) &xid,
540 create ? HASH_ENTER : HASH_FIND,
541 &found);
542 if (found)
543 txn = ent->txn;
544 else if (create)
545 {
546 /* initialize the new entry, if creation was requested */
547 Assert(ent != NULL);
548 Assert(lsn != InvalidXLogRecPtr);
549
550 ent->txn = ReorderBufferGetTXN(rb);
551 ent->txn->xid = xid;
552 txn = ent->txn;
553 txn->first_lsn = lsn;
554 txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
555
556 if (create_as_top)
557 {
558 dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
559 AssertTXNLsnOrder(rb);
560 }
561 }
562 else
563 txn = NULL; /* not found and not asked to create */
564
565 /* update cache */
566 rb->by_txn_last_xid = xid;
567 rb->by_txn_last_txn = txn;
568
569 if (is_new)
570 *is_new = !found;
571
572 Assert(!create || txn != NULL);
573 return txn;
574}
575
576/*
577 * Queue a change into a transaction so it can be replayed upon commit.
578 */
579void
580ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
581 ReorderBufferChange *change)
582{
583 ReorderBufferTXN *txn;
584
585 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
586
587 change->lsn = lsn;
588 Assert(InvalidXLogRecPtr != lsn);
589 dlist_push_tail(&txn->changes, &change->node);
590 txn->nentries++;
591 txn->nentries_mem++;
592
593 ReorderBufferCheckSerializeTXN(rb, txn);
594}
595
596/*
597 * Queue message into a transaction so it can be processed upon commit.
598 */
599void
600ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
601 Snapshot snapshot, XLogRecPtr lsn,
602 bool transactional, const char *prefix,
603 Size message_size, const char *message)
604{
605 if (transactional)
606 {
607 MemoryContext oldcontext;
608 ReorderBufferChange *change;
609
610 Assert(xid != InvalidTransactionId);
611
612 oldcontext = MemoryContextSwitchTo(rb->context);
613
614 change = ReorderBufferGetChange(rb);
615 change->action = REORDER_BUFFER_CHANGE_MESSAGE;
616 change->data.msg.prefix = pstrdup(prefix);
617 change->data.msg.message_size = message_size;
618 change->data.msg.message = palloc(message_size);
619 memcpy(change->data.msg.message, message, message_size);
620
621 ReorderBufferQueueChange(rb, xid, lsn, change);
622
623 MemoryContextSwitchTo(oldcontext);
624 }
625 else
626 {
627 ReorderBufferTXN *txn = NULL;
628 volatile Snapshot snapshot_now = snapshot;
629
630 if (xid != InvalidTransactionId)
631 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
632
633 /* setup snapshot to allow catalog access */
634 SetupHistoricSnapshot(snapshot_now, NULL);
635 PG_TRY();
636 {
637 rb->message(rb, txn, lsn, false, prefix, message_size, message);
638
639 TeardownHistoricSnapshot(false);
640 }
641 PG_CATCH();
642 {
643 TeardownHistoricSnapshot(true);
644 PG_RE_THROW();
645 }
646 PG_END_TRY();
647 }
648}
649
650/*
651 * AssertTXNLsnOrder
652 * Verify LSN ordering of transaction lists in the reorderbuffer
653 *
654 * Other LSN-related invariants are checked too.
655 *
656 * No-op if assertions are not in use.
657 */
658static void
659AssertTXNLsnOrder(ReorderBuffer *rb)
660{
661#ifdef USE_ASSERT_CHECKING
662 dlist_iter iter;
663 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
664 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
665
666 dlist_foreach(iter, &rb->toplevel_by_lsn)
667 {
668 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
669 iter.cur);
670
671 /* start LSN must be set */
672 Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
673
674 /* If there is an end LSN, it must be higher than start LSN */
675 if (cur_txn->end_lsn != InvalidXLogRecPtr)
676 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
677
678 /* Current initial LSN must be strictly higher than previous */
679 if (prev_first_lsn != InvalidXLogRecPtr)
680 Assert(prev_first_lsn < cur_txn->first_lsn);
681
682 /* known-as-subtxn txns must not be listed */
683 Assert(!cur_txn->is_known_as_subxact);
684
685 prev_first_lsn = cur_txn->first_lsn;
686 }
687
688 dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
689 {
690 ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
691 base_snapshot_node,
692 iter.cur);
693
694 /* base snapshot (and its LSN) must be set */
695 Assert(cur_txn->base_snapshot != NULL);
696 Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
697
698 /* current LSN must be strictly higher than previous */
699 if (prev_base_snap_lsn != InvalidXLogRecPtr)
700 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
701
702 /* known-as-subtxn txns must not be listed */
703 Assert(!cur_txn->is_known_as_subxact);
704
705 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
706 }
707#endif
708}
709
710/*
711 * ReorderBufferGetOldestTXN
712 * Return oldest transaction in reorderbuffer
713 */
714ReorderBufferTXN *
715ReorderBufferGetOldestTXN(ReorderBuffer *rb)
716{
717 ReorderBufferTXN *txn;
718
719 AssertTXNLsnOrder(rb);
720
721 if (dlist_is_empty(&rb->toplevel_by_lsn))
722 return NULL;
723
724 txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
725
726 Assert(!txn->is_known_as_subxact);
727 Assert(txn->first_lsn != InvalidXLogRecPtr);
728 return txn;
729}
730
731/*
732 * ReorderBufferGetOldestXmin
733 * Return oldest Xmin in reorderbuffer
734 *
735 * Returns oldest possibly running Xid from the point of view of snapshots
736 * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
737 * there are none.
738 *
739 * Since snapshots are assigned monotonically, this equals the Xmin of the
740 * base snapshot with minimal base_snapshot_lsn.
741 */
742TransactionId
743ReorderBufferGetOldestXmin(ReorderBuffer *rb)
744{
745 ReorderBufferTXN *txn;
746
747 AssertTXNLsnOrder(rb);
748
749 if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
750 return InvalidTransactionId;
751
752 txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
753 &rb->txns_by_base_snapshot_lsn);
754 return txn->base_snapshot->xmin;
755}
756
757void
758ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
759{
760 rb->current_restart_decoding_lsn = ptr;
761}
762
763/*
764 * ReorderBufferAssignChild
765 *
766 * Make note that we know that subxid is a subtransaction of xid, seen as of
767 * the given lsn.
768 */
769void
770ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
771 TransactionId subxid, XLogRecPtr lsn)
772{
773 ReorderBufferTXN *txn;
774 ReorderBufferTXN *subtxn;
775 bool new_top;
776 bool new_sub;
777
778 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
779 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
780
781 if (new_top && !new_sub)
782 elog(ERROR, "subtransaction logged without previous top-level txn record");
783
784 if (!new_sub)
785 {
786 if (subtxn->is_known_as_subxact)
787 {
788 /* already associated, nothing to do */
789 return;
790 }
791 else
792 {
793 /*
794 * We already saw this transaction, but initially added it to the
795 * list of top-level txns. Now that we know it's not top-level,
796 * remove it from there.
797 */
798 dlist_delete(&subtxn->node);
799 }
800 }
801
802 subtxn->is_known_as_subxact = true;
803 subtxn->toplevel_xid = xid;
804 Assert(subtxn->nsubtxns == 0);
805
806 /* add to subtransaction list */
807 dlist_push_tail(&txn->subtxns, &subtxn->node);
808 txn->nsubtxns++;
809
810 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
811 ReorderBufferTransferSnapToParent(txn, subtxn);
812
813 /* Verify LSN-ordering invariant */
814 AssertTXNLsnOrder(rb);
815}
816
817/*
818 * ReorderBufferTransferSnapToParent
819 * Transfer base snapshot from subtxn to top-level txn, if needed
820 *
821 * This is done if the top-level txn doesn't have a base snapshot, or if the
822 * subtxn's base snapshot has an earlier LSN than the top-level txn's base
823 * snapshot's LSN. This can happen if there are no changes in the toplevel
824 * txn but there are some in the subtxn, or the first change in subtxn has
825 * earlier LSN than first change in the top-level txn and we learned about
826 * their kinship only now.
827 *
828 * The subtransaction's snapshot is cleared regardless of the transfer
829 * happening, since it's not needed anymore in either case.
830 *
831 * We do this as soon as we become aware of their kinship, to avoid queueing
832 * extra snapshots to txns known-as-subtxns -- only top-level txns will
833 * receive further snapshots.
834 */
835static void
836ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
837 ReorderBufferTXN *subtxn)
838{
839 Assert(subtxn->toplevel_xid == txn->xid);
840
841 if (subtxn->base_snapshot != NULL)
842 {
843 if (txn->base_snapshot == NULL ||
844 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
845 {
846 /*
847 * If the toplevel transaction already has a base snapshot but
848 * it's newer than the subxact's, purge it.
849 */
850 if (txn->base_snapshot != NULL)
851 {
852 SnapBuildSnapDecRefcount(txn->base_snapshot);
853 dlist_delete(&txn->base_snapshot_node);
854 }
855
856 /*
857 * The snapshot is now the top transaction's; transfer it, and
858 * adjust the list position of the top transaction in the list by
859 * moving it to where the subtransaction is.
860 */
861 txn->base_snapshot = subtxn->base_snapshot;
862 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
863 dlist_insert_before(&subtxn->base_snapshot_node,
864 &txn->base_snapshot_node);
865
866 /*
867 * The subtransaction doesn't have a snapshot anymore (so it
868 * mustn't be in the list.)
869 */
870 subtxn->base_snapshot = NULL;
871 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
872 dlist_delete(&subtxn->base_snapshot_node);
873 }
874 else
875 {
876 /* Base snap of toplevel is fine, so subxact's is not needed */
877 SnapBuildSnapDecRefcount(subtxn->base_snapshot);
878 dlist_delete(&subtxn->base_snapshot_node);
879 subtxn->base_snapshot = NULL;
880 subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
881 }
882 }
883}
884
885/*
886 * Associate a subtransaction with its toplevel transaction at commit
887 * time. There may be no further changes added after this.
888 */
889void
890ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
891 TransactionId subxid, XLogRecPtr commit_lsn,
892 XLogRecPtr end_lsn)
893{
894 ReorderBufferTXN *subtxn;
895
896 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
897 InvalidXLogRecPtr, false);
898
899 /*
900 * No need to do anything if that subtxn didn't contain any changes
901 */
902 if (!subtxn)
903 return;
904
905 subtxn->final_lsn = commit_lsn;
906 subtxn->end_lsn = end_lsn;
907
908 /*
909 * Assign this subxact as a child of the toplevel xact (no-op if already
910 * done.)
911 */
912 ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
913}
914
915
916/*
917 * Support for efficiently iterating over a transaction's and its
918 * subtransactions' changes.
919 *
920 * We do by doing a k-way merge between transactions/subtransactions. For that
921 * we model the current heads of the different transactions as a binary heap
922 * so we easily know which (sub-)transaction has the change with the smallest
923 * lsn next.
924 *
925 * We assume the changes in individual transactions are already sorted by LSN.
926 */
927
928/*
929 * Binary heap comparison function.
930 */
931static int
932ReorderBufferIterCompare(Datum a, Datum b, void *arg)
933{
934 ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
935 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
936 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
937
938 if (pos_a < pos_b)
939 return 1;
940 else if (pos_a == pos_b)
941 return 0;
942 return -1;
943}
944
945/*
946 * Allocate & initialize an iterator which iterates in lsn order over a
947 * transaction and all its subtransactions.
948 */
949static ReorderBufferIterTXNState *
950ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
951{
952 Size nr_txns = 0;
953 ReorderBufferIterTXNState *state;
954 dlist_iter cur_txn_i;
955 int32 off;
956
957 /*
958 * Calculate the size of our heap: one element for every transaction that
959 * contains changes. (Besides the transactions already in the reorder
960 * buffer, we count the one we were directly passed.)
961 */
962 if (txn->nentries > 0)
963 nr_txns++;
964
965 dlist_foreach(cur_txn_i, &txn->subtxns)
966 {
967 ReorderBufferTXN *cur_txn;
968
969 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
970
971 if (cur_txn->nentries > 0)
972 nr_txns++;
973 }
974
975 /*
976 * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
977 * need to allocate/build a heap then.
978 */
979
980 /* allocate iteration state */
981 state = (ReorderBufferIterTXNState *)
982 MemoryContextAllocZero(rb->context,
983 sizeof(ReorderBufferIterTXNState) +
984 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
985
986 state->nr_txns = nr_txns;
987 dlist_init(&state->old_change);
988
989 for (off = 0; off < state->nr_txns; off++)
990 {
991 state->entries[off].fd = -1;
992 state->entries[off].segno = 0;
993 }
994
995 /* allocate heap */
996 state->heap = binaryheap_allocate(state->nr_txns,
997 ReorderBufferIterCompare,
998 state);
999
1000 /*
1001 * Now insert items into the binary heap, in an unordered fashion. (We
1002 * will run a heap assembly step at the end; this is more efficient.)
1003 */
1004
1005 off = 0;
1006
1007 /* add toplevel transaction if it contains changes */
1008 if (txn->nentries > 0)
1009 {
1010 ReorderBufferChange *cur_change;
1011
1012 if (txn->serialized)
1013 {
1014 /* serialize remaining changes */
1015 ReorderBufferSerializeTXN(rb, txn);
1016 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1017 &state->entries[off].segno);
1018 }
1019
1020 cur_change = dlist_head_element(ReorderBufferChange, node,
1021 &txn->changes);
1022
1023 state->entries[off].lsn = cur_change->lsn;
1024 state->entries[off].change = cur_change;
1025 state->entries[off].txn = txn;
1026
1027 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1028 }
1029
1030 /* add subtransactions if they contain changes */
1031 dlist_foreach(cur_txn_i, &txn->subtxns)
1032 {
1033 ReorderBufferTXN *cur_txn;
1034
1035 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1036
1037 if (cur_txn->nentries > 0)
1038 {
1039 ReorderBufferChange *cur_change;
1040
1041 if (cur_txn->serialized)
1042 {
1043 /* serialize remaining changes */
1044 ReorderBufferSerializeTXN(rb, cur_txn);
1045 ReorderBufferRestoreChanges(rb, cur_txn,
1046 &state->entries[off].fd,
1047 &state->entries[off].segno);
1048 }
1049 cur_change = dlist_head_element(ReorderBufferChange, node,
1050 &cur_txn->changes);
1051
1052 state->entries[off].lsn = cur_change->lsn;
1053 state->entries[off].change = cur_change;
1054 state->entries[off].txn = cur_txn;
1055
1056 binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1057 }
1058 }
1059
1060 /* assemble a valid binary heap */
1061 binaryheap_build(state->heap);
1062
1063 return state;
1064}
1065
1066/*
1067 * Return the next change when iterating over a transaction and its
1068 * subtransactions.
1069 *
1070 * Returns NULL when no further changes exist.
1071 */
1072static ReorderBufferChange *
1073ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1074{
1075 ReorderBufferChange *change;
1076 ReorderBufferIterTXNEntry *entry;
1077 int32 off;
1078
1079 /* nothing there anymore */
1080 if (state->heap->bh_size == 0)
1081 return NULL;
1082
1083 off = DatumGetInt32(binaryheap_first(state->heap));
1084 entry = &state->entries[off];
1085
1086 /* free memory we might have "leaked" in the previous *Next call */
1087 if (!dlist_is_empty(&state->old_change))
1088 {
1089 change = dlist_container(ReorderBufferChange, node,
1090 dlist_pop_head_node(&state->old_change));
1091 ReorderBufferReturnChange(rb, change);
1092 Assert(dlist_is_empty(&state->old_change));
1093 }
1094
1095 change = entry->change;
1096
1097 /*
1098 * update heap with information about which transaction has the next
1099 * relevant change in LSN order
1100 */
1101
1102 /* there are in-memory changes */
1103 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1104 {
1105 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1106 ReorderBufferChange *next_change =
1107 dlist_container(ReorderBufferChange, node, next);
1108
1109 /* txn stays the same */
1110 state->entries[off].lsn = next_change->lsn;
1111 state->entries[off].change = next_change;
1112
1113 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1114 return change;
1115 }
1116
1117 /* try to load changes from disk */
1118 if (entry->txn->nentries != entry->txn->nentries_mem)
1119 {
1120 /*
1121 * Ugly: restoring changes will reuse *Change records, thus delete the
1122 * current one from the per-tx list and only free in the next call.
1123 */
1124 dlist_delete(&change->node);
1125 dlist_push_tail(&state->old_change, &change->node);
1126
1127 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1128 &state->entries[off].segno))
1129 {
1130 /* successfully restored changes from disk */
1131 ReorderBufferChange *next_change =
1132 dlist_head_element(ReorderBufferChange, node,
1133 &entry->txn->changes);
1134
1135 elog(DEBUG2, "restored %u/%u changes from disk",
1136 (uint32) entry->txn->nentries_mem,
1137 (uint32) entry->txn->nentries);
1138
1139 Assert(entry->txn->nentries_mem);
1140 /* txn stays the same */
1141 state->entries[off].lsn = next_change->lsn;
1142 state->entries[off].change = next_change;
1143 binaryheap_replace_first(state->heap, Int32GetDatum(off));
1144
1145 return change;
1146 }
1147 }
1148
1149 /* ok, no changes there anymore, remove */
1150 binaryheap_remove_first(state->heap);
1151
1152 return change;
1153}
1154
1155/*
1156 * Deallocate the iterator
1157 */
1158static void
1159ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1160 ReorderBufferIterTXNState *state)
1161{
1162 int32 off;
1163
1164 for (off = 0; off < state->nr_txns; off++)
1165 {
1166 if (state->entries[off].fd != -1)
1167 CloseTransientFile(state->entries[off].fd);
1168 }
1169
1170 /* free memory we might have "leaked" in the last *Next call */
1171 if (!dlist_is_empty(&state->old_change))
1172 {
1173 ReorderBufferChange *change;
1174
1175 change = dlist_container(ReorderBufferChange, node,
1176 dlist_pop_head_node(&state->old_change));
1177 ReorderBufferReturnChange(rb, change);
1178 Assert(dlist_is_empty(&state->old_change));
1179 }
1180
1181 binaryheap_free(state->heap);
1182 pfree(state);
1183}
1184
1185/*
1186 * Cleanup the contents of a transaction, usually after the transaction
1187 * committed or aborted.
1188 */
1189static void
1190ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1191{
1192 bool found;
1193 dlist_mutable_iter iter;
1194
1195 /* cleanup subtransactions & their changes */
1196 dlist_foreach_modify(iter, &txn->subtxns)
1197 {
1198 ReorderBufferTXN *subtxn;
1199
1200 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1201
1202 /*
1203 * Subtransactions are always associated to the toplevel TXN, even if
1204 * they originally were happening inside another subtxn, so we won't
1205 * ever recurse more than one level deep here.
1206 */
1207 Assert(subtxn->is_known_as_subxact);
1208 Assert(subtxn->nsubtxns == 0);
1209
1210 ReorderBufferCleanupTXN(rb, subtxn);
1211 }
1212
1213 /* cleanup changes in the toplevel txn */
1214 dlist_foreach_modify(iter, &txn->changes)
1215 {
1216 ReorderBufferChange *change;
1217
1218 change = dlist_container(ReorderBufferChange, node, iter.cur);
1219
1220 ReorderBufferReturnChange(rb, change);
1221 }
1222
1223 /*
1224 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1225 * They are always stored in the toplevel transaction.
1226 */
1227 dlist_foreach_modify(iter, &txn->tuplecids)
1228 {
1229 ReorderBufferChange *change;
1230
1231 change = dlist_container(ReorderBufferChange, node, iter.cur);
1232 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1233 ReorderBufferReturnChange(rb, change);
1234 }
1235
1236 /*
1237 * Cleanup the base snapshot, if set.
1238 */
1239 if (txn->base_snapshot != NULL)
1240 {
1241 SnapBuildSnapDecRefcount(txn->base_snapshot);
1242 dlist_delete(&txn->base_snapshot_node);
1243 }
1244
1245 /*
1246 * Remove TXN from its containing list.
1247 *
1248 * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1249 * parent's list of known subxacts; this leaves the parent's nsubxacts
1250 * count too high, but we don't care. Otherwise, we are deleting the TXN
1251 * from the LSN-ordered list of toplevel TXNs.
1252 */
1253 dlist_delete(&txn->node);
1254
1255 /* now remove reference from buffer */
1256 hash_search(rb->by_txn,
1257 (void *) &txn->xid,
1258 HASH_REMOVE,
1259 &found);
1260 Assert(found);
1261
1262 /* remove entries spilled to disk */
1263 if (txn->serialized)
1264 ReorderBufferRestoreCleanup(rb, txn);
1265
1266 /* deallocate */
1267 ReorderBufferReturnTXN(rb, txn);
1268}
1269
1270/*
1271 * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1272 * HeapTupleSatisfiesHistoricMVCC.
1273 */
1274static void
1275ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1276{
1277 dlist_iter iter;
1278 HASHCTL hash_ctl;
1279
1280 if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1281 return;
1282
1283 memset(&hash_ctl, 0, sizeof(hash_ctl));
1284
1285 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1286 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1287 hash_ctl.hcxt = rb->context;
1288
1289 /*
1290 * create the hash with the exact number of to-be-stored tuplecids from
1291 * the start
1292 */
1293 txn->tuplecid_hash =
1294 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1295 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1296
1297 dlist_foreach(iter, &txn->tuplecids)
1298 {
1299 ReorderBufferTupleCidKey key;
1300 ReorderBufferTupleCidEnt *ent;
1301 bool found;
1302 ReorderBufferChange *change;
1303
1304 change = dlist_container(ReorderBufferChange, node, iter.cur);
1305
1306 Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1307
1308 /* be careful about padding */
1309 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1310
1311 key.relnode = change->data.tuplecid.node;
1312
1313 ItemPointerCopy(&change->data.tuplecid.tid,
1314 &key.tid);
1315
1316 ent = (ReorderBufferTupleCidEnt *)
1317 hash_search(txn->tuplecid_hash,
1318 (void *) &key,
1319 HASH_ENTER | HASH_FIND,
1320 &found);
1321 if (!found)
1322 {
1323 ent->cmin = change->data.tuplecid.cmin;
1324 ent->cmax = change->data.tuplecid.cmax;
1325 ent->combocid = change->data.tuplecid.combocid;
1326 }
1327 else
1328 {
1329 /*
1330 * Maybe we already saw this tuple before in this transaction, but
1331 * if so it must have the same cmin.
1332 */
1333 Assert(ent->cmin == change->data.tuplecid.cmin);
1334
1335 /*
1336 * cmax may be initially invalid, but once set it can only grow,
1337 * and never become invalid again.
1338 */
1339 Assert((ent->cmax == InvalidCommandId) ||
1340 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1341 (change->data.tuplecid.cmax > ent->cmax)));
1342 ent->cmax = change->data.tuplecid.cmax;
1343 }
1344 }
1345}
1346
1347/*
1348 * Copy a provided snapshot so we can modify it privately. This is needed so
1349 * that catalog modifying transactions can look into intermediate catalog
1350 * states.
1351 */
1352static Snapshot
1353ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1354 ReorderBufferTXN *txn, CommandId cid)
1355{
1356 Snapshot snap;
1357 dlist_iter iter;
1358 int i = 0;
1359 Size size;
1360
1361 size = sizeof(SnapshotData) +
1362 sizeof(TransactionId) * orig_snap->xcnt +
1363 sizeof(TransactionId) * (txn->nsubtxns + 1);
1364
1365 snap = MemoryContextAllocZero(rb->context, size);
1366 memcpy(snap, orig_snap, sizeof(SnapshotData));
1367
1368 snap->copied = true;
1369 snap->active_count = 1; /* mark as active so nobody frees it */
1370 snap->regd_count = 0;
1371 snap->xip = (TransactionId *) (snap + 1);
1372
1373 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1374
1375 /*
1376 * snap->subxip contains all txids that belong to our transaction which we
1377 * need to check via cmin/cmax. That's why we store the toplevel
1378 * transaction in there as well.
1379 */
1380 snap->subxip = snap->xip + snap->xcnt;
1381 snap->subxip[i++] = txn->xid;
1382
1383 /*
1384 * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1385 * Since it's an upper boundary it is safe to use it for the allocation
1386 * above.
1387 */
1388 snap->subxcnt = 1;
1389
1390 dlist_foreach(iter, &txn->subtxns)
1391 {
1392 ReorderBufferTXN *sub_txn;
1393
1394 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1395 snap->subxip[i++] = sub_txn->xid;
1396 snap->subxcnt++;
1397 }
1398
1399 /* sort so we can bsearch() later */
1400 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1401
1402 /* store the specified current CommandId */
1403 snap->curcid = cid;
1404
1405 return snap;
1406}
1407
1408/*
1409 * Free a previously ReorderBufferCopySnap'ed snapshot
1410 */
1411static void
1412ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1413{
1414 if (snap->copied)
1415 pfree(snap);
1416 else
1417 SnapBuildSnapDecRefcount(snap);
1418}
1419
1420/*
1421 * Perform the replay of a transaction and its non-aborted subtransactions.
1422 *
1423 * Subtransactions previously have to be processed by
1424 * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1425 * transaction with ReorderBufferAssignChild.
1426 *
1427 * We currently can only decode a transaction's contents when its commit
1428 * record is read because that's the only place where we know about cache
1429 * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1430 * and subtransactions (using a k-way merge) and replay the changes in lsn
1431 * order.
1432 */
1433void
1434ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
1435 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1436 TimestampTz commit_time,
1437 RepOriginId origin_id, XLogRecPtr origin_lsn)
1438{
1439 ReorderBufferTXN *txn;
1440 volatile Snapshot snapshot_now;
1441 volatile CommandId command_id = FirstCommandId;
1442 bool using_subtxn;
1443 ReorderBufferIterTXNState *volatile iterstate = NULL;
1444
1445 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1446 false);
1447
1448 /* unknown transaction, nothing to replay */
1449 if (txn == NULL)
1450 return;
1451
1452 txn->final_lsn = commit_lsn;
1453 txn->end_lsn = end_lsn;
1454 txn->commit_time = commit_time;
1455 txn->origin_id = origin_id;
1456 txn->origin_lsn = origin_lsn;
1457
1458 /*
1459 * If this transaction has no snapshot, it didn't make any changes to the
1460 * database, so there's nothing to decode. Note that
1461 * ReorderBufferCommitChild will have transferred any snapshots from
1462 * subtransactions if there were any.
1463 */
1464 if (txn->base_snapshot == NULL)
1465 {
1466 Assert(txn->ninvalidations == 0);
1467 ReorderBufferCleanupTXN(rb, txn);
1468 return;
1469 }
1470
1471 snapshot_now = txn->base_snapshot;
1472
1473 /* build data to be able to lookup the CommandIds of catalog tuples */
1474 ReorderBufferBuildTupleCidHash(rb, txn);
1475
1476 /* setup the initial snapshot */
1477 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1478
1479 /*
1480 * Decoding needs access to syscaches et al., which in turn use
1481 * heavyweight locks and such. Thus we need to have enough state around to
1482 * keep track of those. The easiest way is to simply use a transaction
1483 * internally. That also allows us to easily enforce that nothing writes
1484 * to the database by checking for xid assignments.
1485 *
1486 * When we're called via the SQL SRF there's already a transaction
1487 * started, so start an explicit subtransaction there.
1488 */
1489 using_subtxn = IsTransactionOrTransactionBlock();
1490
1491 PG_TRY();
1492 {
1493 ReorderBufferChange *change;
1494 ReorderBufferChange *specinsert = NULL;
1495
1496 if (using_subtxn)
1497 BeginInternalSubTransaction("replay");
1498 else
1499 StartTransactionCommand();
1500
1501 rb->begin(rb, txn);
1502
1503 iterstate = ReorderBufferIterTXNInit(rb, txn);
1504 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1505 {
1506 Relation relation = NULL;
1507 Oid reloid;
1508
1509 switch (change->action)
1510 {
1511 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
1512
1513 /*
1514 * Confirmation for speculative insertion arrived. Simply
1515 * use as a normal record. It'll be cleaned up at the end
1516 * of INSERT processing.
1517 */
1518 if (specinsert == NULL)
1519 elog(ERROR, "invalid ordering of speculative insertion changes");
1520 Assert(specinsert->data.tp.oldtuple == NULL);
1521 change = specinsert;
1522 change->action = REORDER_BUFFER_CHANGE_INSERT;
1523
1524 /* intentionally fall through */
1525 case REORDER_BUFFER_CHANGE_INSERT:
1526 case REORDER_BUFFER_CHANGE_UPDATE:
1527 case REORDER_BUFFER_CHANGE_DELETE:
1528 Assert(snapshot_now);
1529
1530 reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1531 change->data.tp.relnode.relNode);
1532
1533 /*
1534 * Mapped catalog tuple without data, emitted while
1535 * catalog table was in the process of being rewritten. We
1536 * can fail to look up the relfilenode, because the
1537 * relmapper has no "historic" view, in contrast to normal
1538 * the normal catalog during decoding. Thus repeated
1539 * rewrites can cause a lookup failure. That's OK because
1540 * we do not decode catalog changes anyway. Normally such
1541 * tuples would be skipped over below, but we can't
1542 * identify whether the table should be logically logged
1543 * without mapping the relfilenode to the oid.
1544 */
1545 if (reloid == InvalidOid &&
1546 change->data.tp.newtuple == NULL &&
1547 change->data.tp.oldtuple == NULL)
1548 goto change_done;
1549 else if (reloid == InvalidOid)
1550 elog(ERROR, "could not map filenode \"%s\" to relation OID",
1551 relpathperm(change->data.tp.relnode,
1552 MAIN_FORKNUM));
1553
1554 relation = RelationIdGetRelation(reloid);
1555
1556 if (!RelationIsValid(relation))
1557 elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1558 reloid,
1559 relpathperm(change->data.tp.relnode,
1560 MAIN_FORKNUM));
1561
1562 if (!RelationIsLogicallyLogged(relation))
1563 goto change_done;
1564
1565 /*
1566 * Ignore temporary heaps created during DDL unless the
1567 * plugin has asked for them.
1568 */
1569 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1570 goto change_done;
1571
1572 /*
1573 * For now ignore sequence changes entirely. Most of the
1574 * time they don't log changes using records we
1575 * understand, so it doesn't make sense to handle the few
1576 * cases we do.
1577 */
1578 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1579 goto change_done;
1580
1581 /* user-triggered change */
1582 if (!IsToastRelation(relation))
1583 {
1584 ReorderBufferToastReplace(rb, txn, relation, change);
1585 rb->apply_change(rb, txn, relation, change);
1586
1587 /*
1588 * Only clear reassembled toast chunks if we're sure
1589 * they're not required anymore. The creator of the
1590 * tuple tells us.
1591 */
1592 if (change->data.tp.clear_toast_afterwards)
1593 ReorderBufferToastReset(rb, txn);
1594 }
1595 /* we're not interested in toast deletions */
1596 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1597 {
1598 /*
1599 * Need to reassemble the full toasted Datum in
1600 * memory, to ensure the chunks don't get reused till
1601 * we're done remove it from the list of this
1602 * transaction's changes. Otherwise it will get
1603 * freed/reused while restoring spooled data from
1604 * disk.
1605 */
1606 Assert(change->data.tp.newtuple != NULL);
1607
1608 dlist_delete(&change->node);
1609 ReorderBufferToastAppendChunk(rb, txn, relation,
1610 change);
1611 }
1612
1613 change_done:
1614
1615 /*
1616 * Either speculative insertion was confirmed, or it was
1617 * unsuccessful and the record isn't needed anymore.
1618 */
1619 if (specinsert != NULL)
1620 {
1621 ReorderBufferReturnChange(rb, specinsert);
1622 specinsert = NULL;
1623 }
1624
1625 if (relation != NULL)
1626 {
1627 RelationClose(relation);
1628 relation = NULL;
1629 }
1630 break;
1631
1632 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
1633
1634 /*
1635 * Speculative insertions are dealt with by delaying the
1636 * processing of the insert until the confirmation record
1637 * arrives. For that we simply unlink the record from the
1638 * chain, so it does not get freed/reused while restoring
1639 * spooled data from disk.
1640 *
1641 * This is safe in the face of concurrent catalog changes
1642 * because the relevant relation can't be changed between
1643 * speculative insertion and confirmation due to
1644 * CheckTableNotInUse() and locking.
1645 */
1646
1647 /* clear out a pending (and thus failed) speculation */
1648 if (specinsert != NULL)
1649 {
1650 ReorderBufferReturnChange(rb, specinsert);
1651 specinsert = NULL;
1652 }
1653
1654 /* and memorize the pending insertion */
1655 dlist_delete(&change->node);
1656 specinsert = change;
1657 break;
1658
1659 case REORDER_BUFFER_CHANGE_TRUNCATE:
1660 {
1661 int i;
1662 int nrelids = change->data.truncate.nrelids;
1663 int nrelations = 0;
1664 Relation *relations;
1665
1666 relations = palloc0(nrelids * sizeof(Relation));
1667 for (i = 0; i < nrelids; i++)
1668 {
1669 Oid relid = change->data.truncate.relids[i];
1670 Relation relation;
1671
1672 relation = RelationIdGetRelation(relid);
1673
1674 if (!RelationIsValid(relation))
1675 elog(ERROR, "could not open relation with OID %u", relid);
1676
1677 if (!RelationIsLogicallyLogged(relation))
1678 continue;
1679
1680 relations[nrelations++] = relation;
1681 }
1682
1683 rb->apply_truncate(rb, txn, nrelations, relations, change);
1684
1685 for (i = 0; i < nrelations; i++)
1686 RelationClose(relations[i]);
1687
1688 break;
1689 }
1690
1691 case REORDER_BUFFER_CHANGE_MESSAGE:
1692 rb->message(rb, txn, change->lsn, true,
1693 change->data.msg.prefix,
1694 change->data.msg.message_size,
1695 change->data.msg.message);
1696 break;
1697
1698 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
1699 /* get rid of the old */
1700 TeardownHistoricSnapshot(false);
1701
1702 if (snapshot_now->copied)
1703 {
1704 ReorderBufferFreeSnap(rb, snapshot_now);
1705 snapshot_now =
1706 ReorderBufferCopySnap(rb, change->data.snapshot,
1707 txn, command_id);
1708 }
1709
1710 /*
1711 * Restored from disk, need to be careful not to double
1712 * free. We could introduce refcounting for that, but for
1713 * now this seems infrequent enough not to care.
1714 */
1715 else if (change->data.snapshot->copied)
1716 {
1717 snapshot_now =
1718 ReorderBufferCopySnap(rb, change->data.snapshot,
1719 txn, command_id);
1720 }
1721 else
1722 {
1723 snapshot_now = change->data.snapshot;
1724 }
1725
1726
1727 /* and continue with the new one */
1728 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1729 break;
1730
1731 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
1732 Assert(change->data.command_id != InvalidCommandId);
1733
1734 if (command_id < change->data.command_id)
1735 {
1736 command_id = change->data.command_id;
1737
1738 if (!snapshot_now->copied)
1739 {
1740 /* we don't use the global one anymore */
1741 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1742 txn, command_id);
1743 }
1744
1745 snapshot_now->curcid = command_id;
1746
1747 TeardownHistoricSnapshot(false);
1748 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1749
1750 /*
1751 * Every time the CommandId is incremented, we could
1752 * see new catalog contents, so execute all
1753 * invalidations.
1754 */
1755 ReorderBufferExecuteInvalidations(rb, txn);
1756 }
1757
1758 break;
1759
1760 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
1761 elog(ERROR, "tuplecid value in changequeue");
1762 break;
1763 }
1764 }
1765
1766 /*
1767 * There's a speculative insertion remaining, just clean in up, it
1768 * can't have been successful, otherwise we'd gotten a confirmation
1769 * record.
1770 */
1771 if (specinsert)
1772 {
1773 ReorderBufferReturnChange(rb, specinsert);
1774 specinsert = NULL;
1775 }
1776
1777 /* clean up the iterator */
1778 ReorderBufferIterTXNFinish(rb, iterstate);
1779 iterstate = NULL;
1780
1781 /* call commit callback */
1782 rb->commit(rb, txn, commit_lsn);
1783
1784 /* this is just a sanity check against bad output plugin behaviour */
1785 if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
1786 elog(ERROR, "output plugin used XID %u",
1787 GetCurrentTransactionId());
1788
1789 /* cleanup */
1790 TeardownHistoricSnapshot(false);
1791
1792 /*
1793 * Aborting the current (sub-)transaction as a whole has the right
1794 * semantics. We want all locks acquired in here to be released, not
1795 * reassigned to the parent and we do not want any database access
1796 * have persistent effects.
1797 */
1798 AbortCurrentTransaction();
1799
1800 /* make sure there's no cache pollution */
1801 ReorderBufferExecuteInvalidations(rb, txn);
1802
1803 if (using_subtxn)
1804 RollbackAndReleaseCurrentSubTransaction();
1805
1806 if (snapshot_now->copied)
1807 ReorderBufferFreeSnap(rb, snapshot_now);
1808
1809 /* remove potential on-disk data, and deallocate */
1810 ReorderBufferCleanupTXN(rb, txn);
1811 }
1812 PG_CATCH();
1813 {
1814 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1815 if (iterstate)
1816 ReorderBufferIterTXNFinish(rb, iterstate);
1817
1818 TeardownHistoricSnapshot(true);
1819
1820 /*
1821 * Force cache invalidation to happen outside of a valid transaction
1822 * to prevent catalog access as we just caught an error.
1823 */
1824 AbortCurrentTransaction();
1825
1826 /* make sure there's no cache pollution */
1827 ReorderBufferExecuteInvalidations(rb, txn);
1828
1829 if (using_subtxn)
1830 RollbackAndReleaseCurrentSubTransaction();
1831
1832 if (snapshot_now->copied)
1833 ReorderBufferFreeSnap(rb, snapshot_now);
1834
1835 /* remove potential on-disk data, and deallocate */
1836 ReorderBufferCleanupTXN(rb, txn);
1837
1838 PG_RE_THROW();
1839 }
1840 PG_END_TRY();
1841}
1842
1843/*
1844 * Abort a transaction that possibly has previous changes. Needs to be first
1845 * called for subtransactions and then for the toplevel xid.
1846 *
1847 * NB: Transactions handled here have to have actively aborted (i.e. have
1848 * produced an abort record). Implicitly aborted transactions are handled via
1849 * ReorderBufferAbortOld(); transactions we're just not interested in, but
1850 * which have committed are handled in ReorderBufferForget().
1851 *
1852 * This function purges this transaction and its contents from memory and
1853 * disk.
1854 */
1855void
1856ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1857{
1858 ReorderBufferTXN *txn;
1859
1860 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1861 false);
1862
1863 /* unknown, nothing to remove */
1864 if (txn == NULL)
1865 return;
1866
1867 /* cosmetic... */
1868 txn->final_lsn = lsn;
1869
1870 /* remove potential on-disk data, and deallocate */
1871 ReorderBufferCleanupTXN(rb, txn);
1872}
1873
1874/*
1875 * Abort all transactions that aren't actually running anymore because the
1876 * server restarted.
1877 *
1878 * NB: These really have to be transactions that have aborted due to a server
1879 * crash/immediate restart, as we don't deal with invalidations here.
1880 */
1881void
1882ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
1883{
1884 dlist_mutable_iter it;
1885
1886 /*
1887 * Iterate through all (potential) toplevel TXNs and abort all that are
1888 * older than what possibly can be running. Once we've found the first
1889 * that is alive we stop, there might be some that acquired an xid earlier
1890 * but started writing later, but it's unlikely and they will be cleaned
1891 * up in a later call to this function.
1892 */
1893 dlist_foreach_modify(it, &rb->toplevel_by_lsn)
1894 {
1895 ReorderBufferTXN *txn;
1896
1897 txn = dlist_container(ReorderBufferTXN, node, it.cur);
1898
1899 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1900 {
1901 /*
1902 * We set final_lsn on a transaction when we decode its commit or
1903 * abort record, but we never see those records for crashed
1904 * transactions. To ensure cleanup of these transactions, set
1905 * final_lsn to that of their last change; this causes
1906 * ReorderBufferRestoreCleanup to do the right thing.
1907 */
1908 if (txn->serialized && txn->final_lsn == 0)
1909 {
1910 ReorderBufferChange *last =
1911 dlist_tail_element(ReorderBufferChange, node, &txn->changes);
1912
1913 txn->final_lsn = last->lsn;
1914 }
1915
1916 elog(DEBUG2, "aborting old transaction %u", txn->xid);
1917
1918 /* remove potential on-disk data, and deallocate this tx */
1919 ReorderBufferCleanupTXN(rb, txn);
1920 }
1921 else
1922 return;
1923 }
1924}
1925
1926/*
1927 * Forget the contents of a transaction if we aren't interested in its
1928 * contents. Needs to be first called for subtransactions and then for the
1929 * toplevel xid.
1930 *
1931 * This is significantly different to ReorderBufferAbort() because
1932 * transactions that have committed need to be treated differently from aborted
1933 * ones since they may have modified the catalog.
1934 *
1935 * Note that this is only allowed to be called in the moment a transaction
1936 * commit has just been read, not earlier; otherwise later records referring
1937 * to this xid might re-create the transaction incompletely.
1938 */
1939void
1940ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
1941{
1942 ReorderBufferTXN *txn;
1943
1944 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1945 false);
1946
1947 /* unknown, nothing to forget */
1948 if (txn == NULL)
1949 return;
1950
1951 /* cosmetic... */
1952 txn->final_lsn = lsn;
1953
1954 /*
1955 * Process cache invalidation messages if there are any. Even if we're not
1956 * interested in the transaction's contents, it could have manipulated the
1957 * catalog and we need to update the caches according to that.
1958 */
1959 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1960 ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
1961 txn->invalidations);
1962 else
1963 Assert(txn->ninvalidations == 0);
1964
1965 /* remove potential on-disk data, and deallocate */
1966 ReorderBufferCleanupTXN(rb, txn);
1967}
1968
1969/*
1970 * Execute invalidations happening outside the context of a decoded
1971 * transaction. That currently happens either for xid-less commits
1972 * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
1973 * transactions (via ReorderBufferForget()).
1974 */
1975void
1976ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
1977 SharedInvalidationMessage *invalidations)
1978{
1979 bool use_subtxn = IsTransactionOrTransactionBlock();
1980 int i;
1981
1982 if (use_subtxn)
1983 BeginInternalSubTransaction("replay");
1984
1985 /*
1986 * Force invalidations to happen outside of a valid transaction - that way
1987 * entries will just be marked as invalid without accessing the catalog.
1988 * That's advantageous because we don't need to setup the full state
1989 * necessary for catalog access.
1990 */
1991 if (use_subtxn)
1992 AbortCurrentTransaction();
1993
1994 for (i = 0; i < ninvalidations; i++)
1995 LocalExecuteInvalidationMessage(&invalidations[i]);
1996
1997 if (use_subtxn)
1998 RollbackAndReleaseCurrentSubTransaction();
1999}
2000
2001/*
2002 * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2003 * least once for every xid in XLogRecord->xl_xid (other places in records
2004 * may, but do not have to be passed through here).
2005 *
2006 * Reorderbuffer keeps some datastructures about transactions in LSN order,
2007 * for efficiency. To do that it has to know about when transactions are seen
2008 * first in the WAL. As many types of records are not actually interesting for
2009 * logical decoding, they do not necessarily pass though here.
2010 */
2011void
2012ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
2013{
2014 /* many records won't have an xid assigned, centralize check here */
2015 if (xid != InvalidTransactionId)
2016 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2017}
2018
2019/*
2020 * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2021 * because the previous snapshot doesn't describe the catalog correctly for
2022 * following rows.
2023 */
2024void
2025ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
2026 XLogRecPtr lsn, Snapshot snap)
2027{
2028 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2029
2030 change->data.snapshot = snap;
2031 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
2032
2033 ReorderBufferQueueChange(rb, xid, lsn, change);
2034}
2035
2036/*
2037 * Set up the transaction's base snapshot.
2038 *
2039 * If we know that xid is a subtransaction, set the base snapshot on the
2040 * top-level transaction instead.
2041 */
2042void
2043ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
2044 XLogRecPtr lsn, Snapshot snap)
2045{
2046 ReorderBufferTXN *txn;
2047 bool is_new;
2048
2049 AssertArg(snap != NULL);
2050
2051 /*
2052 * Fetch the transaction to operate on. If we know it's a subtransaction,
2053 * operate on its top-level transaction instead.
2054 */
2055 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2056 if (txn->is_known_as_subxact)
2057 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2058 NULL, InvalidXLogRecPtr, false);
2059 Assert(txn->base_snapshot == NULL);
2060
2061 txn->base_snapshot = snap;
2062 txn->base_snapshot_lsn = lsn;
2063 dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
2064
2065 AssertTXNLsnOrder(rb);
2066}
2067
2068/*
2069 * Access the catalog with this CommandId at this point in the changestream.
2070 *
2071 * May only be called for command ids > 1
2072 */
2073void
2074ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
2075 XLogRecPtr lsn, CommandId cid)
2076{
2077 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2078
2079 change->data.command_id = cid;
2080 change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
2081
2082 ReorderBufferQueueChange(rb, xid, lsn, change);
2083}
2084
2085
2086/*
2087 * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2088 */
2089void
2090ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
2091 XLogRecPtr lsn, RelFileNode node,
2092 ItemPointerData tid, CommandId cmin,
2093 CommandId cmax, CommandId combocid)
2094{
2095 ReorderBufferChange *change = ReorderBufferGetChange(rb);
2096 ReorderBufferTXN *txn;
2097
2098 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2099
2100 change->data.tuplecid.node = node;
2101 change->data.tuplecid.tid = tid;
2102 change->data.tuplecid.cmin = cmin;
2103 change->data.tuplecid.cmax = cmax;
2104 change->data.tuplecid.combocid = combocid;
2105 change->lsn = lsn;
2106 change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
2107
2108 dlist_push_tail(&txn->tuplecids, &change->node);
2109 txn->ntuplecids++;
2110}
2111
2112/*
2113 * Setup the invalidation of the toplevel transaction.
2114 *
2115 * This needs to be done before ReorderBufferCommit is called!
2116 */
2117void
2118ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
2119 XLogRecPtr lsn, Size nmsgs,
2120 SharedInvalidationMessage *msgs)
2121{
2122 ReorderBufferTXN *txn;
2123
2124 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2125
2126 if (txn->ninvalidations != 0)
2127 elog(ERROR, "only ever add one set of invalidations");
2128
2129 Assert(nmsgs > 0);
2130
2131 txn->ninvalidations = nmsgs;
2132 txn->invalidations = (SharedInvalidationMessage *)
2133 MemoryContextAlloc(rb->context,
2134 sizeof(SharedInvalidationMessage) * nmsgs);
2135 memcpy(txn->invalidations, msgs,
2136 sizeof(SharedInvalidationMessage) * nmsgs);
2137}
2138
2139/*
2140 * Apply all invalidations we know. Possibly we only need parts at this point
2141 * in the changestream but we don't know which those are.
2142 */
2143static void
2144ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
2145{
2146 int i;
2147
2148 for (i = 0; i < txn->ninvalidations; i++)
2149 LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2150}
2151
2152/*
2153 * Mark a transaction as containing catalog changes
2154 */
2155void
2156ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
2157 XLogRecPtr lsn)
2158{
2159 ReorderBufferTXN *txn;
2160
2161 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2162
2163 txn->has_catalog_changes = true;
2164}
2165
2166/*
2167 * Query whether a transaction is already *known* to contain catalog
2168 * changes. This can be wrong until directly before the commit!
2169 */
2170bool
2171ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
2172{
2173 ReorderBufferTXN *txn;
2174
2175 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2176 false);
2177 if (txn == NULL)
2178 return false;
2179
2180 return txn->has_catalog_changes;
2181}
2182
2183/*
2184 * ReorderBufferXidHasBaseSnapshot
2185 * Have we already set the base snapshot for the given txn/subtxn?
2186 */
2187bool
2188ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
2189{
2190 ReorderBufferTXN *txn;
2191
2192 txn = ReorderBufferTXNByXid(rb, xid, false,
2193 NULL, InvalidXLogRecPtr, false);
2194
2195 /* transaction isn't known yet, ergo no snapshot */
2196 if (txn == NULL)
2197 return false;
2198
2199 /* a known subtxn? operate on top-level txn instead */
2200 if (txn->is_known_as_subxact)
2201 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2202 NULL, InvalidXLogRecPtr, false);
2203
2204 return txn->base_snapshot != NULL;
2205}
2206
2207
2208/*
2209 * ---------------------------------------
2210 * Disk serialization support
2211 * ---------------------------------------
2212 */
2213
2214/*
2215 * Ensure the IO buffer is >= sz.
2216 */
2217static void
2218ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
2219{
2220 if (!rb->outbufsize)
2221 {
2222 rb->outbuf = MemoryContextAlloc(rb->context, sz);
2223 rb->outbufsize = sz;
2224 }
2225 else if (rb->outbufsize < sz)
2226 {
2227 rb->outbuf = repalloc(rb->outbuf, sz);
2228 rb->outbufsize = sz;
2229 }
2230}
2231
2232/*
2233 * Check whether the transaction tx should spill its data to disk.
2234 */
2235static void
2236ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2237{
2238 /*
2239 * TODO: improve accounting so we cheaply can take subtransactions into
2240 * account here.
2241 */
2242 if (txn->nentries_mem >= max_changes_in_memory)
2243 {
2244 ReorderBufferSerializeTXN(rb, txn);
2245 Assert(txn->nentries_mem == 0);
2246 }
2247}
2248
2249/*
2250 * Spill data of a large transaction (and its subtransactions) to disk.
2251 */
2252static void
2253ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
2254{
2255 dlist_iter subtxn_i;
2256 dlist_mutable_iter change_i;
2257 int fd = -1;
2258 XLogSegNo curOpenSegNo = 0;
2259 Size spilled = 0;
2260
2261 elog(DEBUG2, "spill %u changes in XID %u to disk",
2262 (uint32) txn->nentries_mem, txn->xid);
2263
2264 /* do the same to all child TXs */
2265 dlist_foreach(subtxn_i, &txn->subtxns)
2266 {
2267 ReorderBufferTXN *subtxn;
2268
2269 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2270 ReorderBufferSerializeTXN(rb, subtxn);
2271 }
2272
2273 /* serialize changestream */
2274 dlist_foreach_modify(change_i, &txn->changes)
2275 {
2276 ReorderBufferChange *change;
2277
2278 change = dlist_container(ReorderBufferChange, node, change_i.cur);
2279
2280 /*
2281 * store in segment in which it belongs by start lsn, don't split over
2282 * multiple segments tho
2283 */
2284 if (fd == -1 ||
2285 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2286 {
2287 char path[MAXPGPATH];
2288
2289 if (fd != -1)
2290 CloseTransientFile(fd);
2291
2292 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2293
2294 /*
2295 * No need to care about TLIs here, only used during a single run,
2296 * so each LSN only maps to a specific WAL record.
2297 */
2298 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2299 curOpenSegNo);
2300
2301 /* open segment, create it if necessary */
2302 fd = OpenTransientFile(path,
2303 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2304
2305 if (fd < 0)
2306 ereport(ERROR,
2307 (errcode_for_file_access(),
2308 errmsg("could not open file \"%s\": %m", path)));
2309 }
2310
2311 ReorderBufferSerializeChange(rb, txn, fd, change);
2312 dlist_delete(&change->node);
2313 ReorderBufferReturnChange(rb, change);
2314
2315 spilled++;
2316 }
2317
2318 Assert(spilled == txn->nentries_mem);
2319 Assert(dlist_is_empty(&txn->changes));
2320 txn->nentries_mem = 0;
2321 txn->serialized = true;
2322
2323 if (fd != -1)
2324 CloseTransientFile(fd);
2325}
2326
2327/*
2328 * Serialize individual change to disk.
2329 */
2330static void
2331ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2332 int fd, ReorderBufferChange *change)
2333{
2334 ReorderBufferDiskChange *ondisk;
2335 Size sz = sizeof(ReorderBufferDiskChange);
2336
2337 ReorderBufferSerializeReserve(rb, sz);
2338
2339 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2340 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2341
2342 switch (change->action)
2343 {
2344 /* fall through these, they're all similar enough */
2345 case REORDER_BUFFER_CHANGE_INSERT:
2346 case REORDER_BUFFER_CHANGE_UPDATE:
2347 case REORDER_BUFFER_CHANGE_DELETE:
2348 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2349 {
2350 char *data;
2351 ReorderBufferTupleBuf *oldtup,
2352 *newtup;
2353 Size oldlen = 0;
2354 Size newlen = 0;
2355
2356 oldtup = change->data.tp.oldtuple;
2357 newtup = change->data.tp.newtuple;
2358
2359 if (oldtup)
2360 {
2361 sz += sizeof(HeapTupleData);
2362 oldlen = oldtup->tuple.t_len;
2363 sz += oldlen;
2364 }
2365
2366 if (newtup)
2367 {
2368 sz += sizeof(HeapTupleData);
2369 newlen = newtup->tuple.t_len;
2370 sz += newlen;
2371 }
2372
2373 /* make sure we have enough space */
2374 ReorderBufferSerializeReserve(rb, sz);
2375
2376 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2377 /* might have been reallocated above */
2378 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2379
2380 if (oldlen)
2381 {
2382 memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2383 data += sizeof(HeapTupleData);
2384
2385 memcpy(data, oldtup->tuple.t_data, oldlen);
2386 data += oldlen;
2387 }
2388
2389 if (newlen)
2390 {
2391 memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2392 data += sizeof(HeapTupleData);
2393
2394 memcpy(data, newtup->tuple.t_data, newlen);
2395 data += newlen;
2396 }
2397 break;
2398 }
2399 case REORDER_BUFFER_CHANGE_MESSAGE:
2400 {
2401 char *data;
2402 Size prefix_size = strlen(change->data.msg.prefix) + 1;
2403
2404 sz += prefix_size + change->data.msg.message_size +
2405 sizeof(Size) + sizeof(Size);
2406 ReorderBufferSerializeReserve(rb, sz);
2407
2408 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2409
2410 /* might have been reallocated above */
2411 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2412
2413 /* write the prefix including the size */
2414 memcpy(data, &prefix_size, sizeof(Size));
2415 data += sizeof(Size);
2416 memcpy(data, change->data.msg.prefix,
2417 prefix_size);
2418 data += prefix_size;
2419
2420 /* write the message including the size */
2421 memcpy(data, &change->data.msg.message_size, sizeof(Size));
2422 data += sizeof(Size);
2423 memcpy(data, change->data.msg.message,
2424 change->data.msg.message_size);
2425 data += change->data.msg.message_size;
2426
2427 break;
2428 }
2429 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2430 {
2431 Snapshot snap;
2432 char *data;
2433
2434 snap = change->data.snapshot;
2435
2436 sz += sizeof(SnapshotData) +
2437 sizeof(TransactionId) * snap->xcnt +
2438 sizeof(TransactionId) * snap->subxcnt
2439 ;
2440
2441 /* make sure we have enough space */
2442 ReorderBufferSerializeReserve(rb, sz);
2443 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2444 /* might have been reallocated above */
2445 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2446
2447 memcpy(data, snap, sizeof(SnapshotData));
2448 data += sizeof(SnapshotData);
2449
2450 if (snap->xcnt)
2451 {
2452 memcpy(data, snap->xip,
2453 sizeof(TransactionId) * snap->xcnt);
2454 data += sizeof(TransactionId) * snap->xcnt;
2455 }
2456
2457 if (snap->subxcnt)
2458 {
2459 memcpy(data, snap->subxip,
2460 sizeof(TransactionId) * snap->subxcnt);
2461 data += sizeof(TransactionId) * snap->subxcnt;
2462 }
2463 break;
2464 }
2465 case REORDER_BUFFER_CHANGE_TRUNCATE:
2466 {
2467 Size size;
2468 char *data;
2469
2470 /* account for the OIDs of truncated relations */
2471 size = sizeof(Oid) * change->data.truncate.nrelids;
2472 sz += size;
2473
2474 /* make sure we have enough space */
2475 ReorderBufferSerializeReserve(rb, sz);
2476
2477 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2478 /* might have been reallocated above */
2479 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2480
2481 memcpy(data, change->data.truncate.relids, size);
2482 data += size;
2483
2484 break;
2485 }
2486 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2487 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2488 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2489 /* ReorderBufferChange contains everything important */
2490 break;
2491 }
2492
2493 ondisk->size = sz;
2494
2495 errno = 0;
2496 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
2497 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2498 {
2499 int save_errno = errno;
2500
2501 CloseTransientFile(fd);
2502
2503 /* if write didn't set errno, assume problem is no disk space */
2504 errno = save_errno ? save_errno : ENOSPC;
2505 ereport(ERROR,
2506 (errcode_for_file_access(),
2507 errmsg("could not write to data file for XID %u: %m",
2508 txn->xid)));
2509 }
2510 pgstat_report_wait_end();
2511
2512 Assert(ondisk->change.action == change->action);
2513}
2514
2515/*
2516 * Restore a number of changes spilled to disk back into memory.
2517 */
2518static Size
2519ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
2520 int *fd, XLogSegNo *segno)
2521{
2522 Size restored = 0;
2523 XLogSegNo last_segno;
2524 dlist_mutable_iter cleanup_iter;
2525
2526 Assert(txn->first_lsn != InvalidXLogRecPtr);
2527 Assert(txn->final_lsn != InvalidXLogRecPtr);
2528
2529 /* free current entries, so we have memory for more */
2530 dlist_foreach_modify(cleanup_iter, &txn->changes)
2531 {
2532 ReorderBufferChange *cleanup =
2533 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2534
2535 dlist_delete(&cleanup->node);
2536 ReorderBufferReturnChange(rb, cleanup);
2537 }
2538 txn->nentries_mem = 0;
2539 Assert(dlist_is_empty(&txn->changes));
2540
2541 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2542
2543 while (restored < max_changes_in_memory && *segno <= last_segno)
2544 {
2545 int readBytes;
2546 ReorderBufferDiskChange *ondisk;
2547
2548 if (*fd == -1)
2549 {
2550 char path[MAXPGPATH];
2551
2552 /* first time in */
2553 if (*segno == 0)
2554 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2555
2556 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2557
2558 /*
2559 * No need to care about TLIs here, only used during a single run,
2560 * so each LSN only maps to a specific WAL record.
2561 */
2562 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2563 *segno);
2564
2565 *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2566 if (*fd < 0 && errno == ENOENT)
2567 {
2568 *fd = -1;
2569 (*segno)++;
2570 continue;
2571 }
2572 else if (*fd < 0)
2573 ereport(ERROR,
2574 (errcode_for_file_access(),
2575 errmsg("could not open file \"%s\": %m",
2576 path)));
2577 }
2578
2579 /*
2580 * Read the statically sized part of a change which has information
2581 * about the total size. If we couldn't read a record, we're at the
2582 * end of this file.
2583 */
2584 ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
2585 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2586 readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2587 pgstat_report_wait_end();
2588
2589 /* eof */
2590 if (readBytes == 0)
2591 {
2592 CloseTransientFile(*fd);
2593 *fd = -1;
2594 (*segno)++;
2595 continue;
2596 }
2597 else if (readBytes < 0)
2598 ereport(ERROR,
2599 (errcode_for_file_access(),
2600 errmsg("could not read from reorderbuffer spill file: %m")));
2601 else if (readBytes != sizeof(ReorderBufferDiskChange))
2602 ereport(ERROR,
2603 (errcode_for_file_access(),
2604 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2605 readBytes,
2606 (uint32) sizeof(ReorderBufferDiskChange))));
2607
2608 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2609
2610 ReorderBufferSerializeReserve(rb,
2611 sizeof(ReorderBufferDiskChange) + ondisk->size);
2612 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2613
2614 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
2615 readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2616 ondisk->size - sizeof(ReorderBufferDiskChange));
2617 pgstat_report_wait_end();
2618
2619 if (readBytes < 0)
2620 ereport(ERROR,
2621 (errcode_for_file_access(),
2622 errmsg("could not read from reorderbuffer spill file: %m")));
2623 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2624 ereport(ERROR,
2625 (errcode_for_file_access(),
2626 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2627 readBytes,
2628 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2629
2630 /*
2631 * ok, read a full change from disk, now restore it into proper
2632 * in-memory format
2633 */
2634 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2635 restored++;
2636 }
2637
2638 return restored;
2639}
2640
2641/*
2642 * Convert change from its on-disk format to in-memory format and queue it onto
2643 * the TXN's ->changes list.
2644 *
2645 * Note: although "data" is declared char*, at entry it points to a
2646 * maxalign'd buffer, making it safe in most of this function to assume
2647 * that the pointed-to data is suitably aligned for direct access.
2648 */
2649static void
2650ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2651 char *data)
2652{
2653 ReorderBufferDiskChange *ondisk;
2654 ReorderBufferChange *change;
2655
2656 ondisk = (ReorderBufferDiskChange *) data;
2657
2658 change = ReorderBufferGetChange(rb);
2659
2660 /* copy static part */
2661 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2662
2663 data += sizeof(ReorderBufferDiskChange);
2664
2665 /* restore individual stuff */
2666 switch (change->action)
2667 {
2668 /* fall through these, they're all similar enough */
2669 case REORDER_BUFFER_CHANGE_INSERT:
2670 case REORDER_BUFFER_CHANGE_UPDATE:
2671 case REORDER_BUFFER_CHANGE_DELETE:
2672 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2673 if (change->data.tp.oldtuple)
2674 {
2675 uint32 tuplelen = ((HeapTuple) data)->t_len;
2676
2677 change->data.tp.oldtuple =
2678 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2679
2680 /* restore ->tuple */
2681 memcpy(&change->data.tp.oldtuple->tuple, data,
2682 sizeof(HeapTupleData));
2683 data += sizeof(HeapTupleData);
2684
2685 /* reset t_data pointer into the new tuplebuf */
2686 change->data.tp.oldtuple->tuple.t_data =
2687 ReorderBufferTupleBufData(change->data.tp.oldtuple);
2688
2689 /* restore tuple data itself */
2690 memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2691 data += tuplelen;
2692 }
2693
2694 if (change->data.tp.newtuple)
2695 {
2696 /* here, data might not be suitably aligned! */
2697 uint32 tuplelen;
2698
2699 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2700 sizeof(uint32));
2701
2702 change->data.tp.newtuple =
2703 ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
2704
2705 /* restore ->tuple */
2706 memcpy(&change->data.tp.newtuple->tuple, data,
2707 sizeof(HeapTupleData));
2708 data += sizeof(HeapTupleData);
2709
2710 /* reset t_data pointer into the new tuplebuf */
2711 change->data.tp.newtuple->tuple.t_data =
2712 ReorderBufferTupleBufData(change->data.tp.newtuple);
2713
2714 /* restore tuple data itself */
2715 memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2716 data += tuplelen;
2717 }
2718
2719 break;
2720 case REORDER_BUFFER_CHANGE_MESSAGE:
2721 {
2722 Size prefix_size;
2723
2724 /* read prefix */
2725 memcpy(&prefix_size, data, sizeof(Size));
2726 data += sizeof(Size);
2727 change->data.msg.prefix = MemoryContextAlloc(rb->context,
2728 prefix_size);
2729 memcpy(change->data.msg.prefix, data, prefix_size);
2730 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2731 data += prefix_size;
2732
2733 /* read the message */
2734 memcpy(&change->data.msg.message_size, data, sizeof(Size));
2735 data += sizeof(Size);
2736 change->data.msg.message = MemoryContextAlloc(rb->context,
2737 change->data.msg.message_size);
2738 memcpy(change->data.msg.message, data,
2739 change->data.msg.message_size);
2740 data += change->data.msg.message_size;
2741
2742 break;
2743 }
2744 case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2745 {
2746 Snapshot oldsnap;
2747 Snapshot newsnap;
2748 Size size;
2749
2750 oldsnap = (Snapshot) data;
2751
2752 size = sizeof(SnapshotData) +
2753 sizeof(TransactionId) * oldsnap->xcnt +
2754 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2755
2756 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2757
2758 newsnap = change->data.snapshot;
2759
2760 memcpy(newsnap, data, size);
2761 newsnap->xip = (TransactionId *)
2762 (((char *) newsnap) + sizeof(SnapshotData));
2763 newsnap->subxip = newsnap->xip + newsnap->xcnt;
2764 newsnap->copied = true;
2765 break;
2766 }
2767 /* the base struct contains all the data, easy peasy */
2768 case REORDER_BUFFER_CHANGE_TRUNCATE:
2769 {
2770 Oid *relids;
2771
2772 relids = ReorderBufferGetRelids(rb,
2773 change->data.truncate.nrelids);
2774 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
2775 change->data.truncate.relids = relids;
2776
2777 break;
2778 }
2779 case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2780 case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2781 case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2782 break;
2783 }
2784
2785 dlist_push_tail(&txn->changes, &change->node);
2786 txn->nentries_mem++;
2787}
2788
2789/*
2790 * Remove all on-disk stored for the passed in transaction.
2791 */
2792static void
2793ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
2794{
2795 XLogSegNo first;
2796 XLogSegNo cur;
2797 XLogSegNo last;
2798
2799 Assert(txn->first_lsn != InvalidXLogRecPtr);
2800 Assert(txn->final_lsn != InvalidXLogRecPtr);
2801
2802 XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2803 XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2804
2805 /* iterate over all possible filenames, and delete them */
2806 for (cur = first; cur <= last; cur++)
2807 {
2808 char path[MAXPGPATH];
2809
2810 ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
2811 if (unlink(path) != 0 && errno != ENOENT)
2812 ereport(ERROR,
2813 (errcode_for_file_access(),
2814 errmsg("could not remove file \"%s\": %m", path)));
2815 }
2816}
2817
2818/*
2819 * Remove any leftover serialized reorder buffers from a slot directory after a
2820 * prior crash or decoding session exit.
2821 */
2822static void
2823ReorderBufferCleanupSerializedTXNs(const char *slotname)
2824{
2825 DIR *spill_dir;
2826 struct dirent *spill_de;
2827 struct stat statbuf;
2828 char path[MAXPGPATH * 2 + 12];
2829
2830 sprintf(path, "pg_replslot/%s", slotname);
2831
2832 /* we're only handling directories here, skip if it's not ours */
2833 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2834 return;
2835
2836 spill_dir = AllocateDir(path);
2837 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2838 {
2839 /* only look at names that can be ours */
2840 if (strncmp(spill_de->d_name, "xid", 3) == 0)
2841 {
2842 snprintf(path, sizeof(path),
2843 "pg_replslot/%s/%s", slotname,
2844 spill_de->d_name);
2845
2846 if (unlink(path) != 0)
2847 ereport(ERROR,
2848 (errcode_for_file_access(),
2849 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
2850 path, slotname)));
2851 }
2852 }
2853 FreeDir(spill_dir);
2854}
2855
2856/*
2857 * Given a replication slot, transaction ID and segment number, fill in the
2858 * corresponding spill file into 'path', which is a caller-owned buffer of size
2859 * at least MAXPGPATH.
2860 */
2861static void
2862ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2863 XLogSegNo segno)
2864{
2865 XLogRecPtr recptr;
2866
2867 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
2868
2869 snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
2870 NameStr(MyReplicationSlot->data.name),
2871 xid,
2872 (uint32) (recptr >> 32), (uint32) recptr);
2873}
2874
2875/*
2876 * Delete all data spilled to disk after we've restarted/crashed. It will be
2877 * recreated when the respective slots are reused.
2878 */
2879void
2880StartupReorderBuffer(void)
2881{
2882 DIR *logical_dir;
2883 struct dirent *logical_de;
2884
2885 logical_dir = AllocateDir("pg_replslot");
2886 while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2887 {
2888 if (strcmp(logical_de->d_name, ".") == 0 ||
2889 strcmp(logical_de->d_name, "..") == 0)
2890 continue;
2891
2892 /* if it cannot be a slot, skip the directory */
2893 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2894 continue;
2895
2896 /*
2897 * ok, has to be a surviving logical slot, iterate and delete
2898 * everything starting with xid-*
2899 */
2900 ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
2901 }
2902 FreeDir(logical_dir);
2903}
2904
2905/* ---------------------------------------
2906 * toast reassembly support
2907 * ---------------------------------------
2908 */
2909
2910/*
2911 * Initialize per tuple toast reconstruction support.
2912 */
2913static void
2914ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
2915{
2916 HASHCTL hash_ctl;
2917
2918 Assert(txn->toast_hash == NULL);
2919
2920 memset(&hash_ctl, 0, sizeof(hash_ctl));
2921 hash_ctl.keysize = sizeof(Oid);
2922 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2923 hash_ctl.hcxt = rb->context;
2924 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2925 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2926}
2927
2928/*
2929 * Per toast-chunk handling for toast reconstruction
2930 *
2931 * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2932 * toasted Datum comes along.
2933 */
2934static void
2935ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
2936 Relation relation, ReorderBufferChange *change)
2937{
2938 ReorderBufferToastEnt *ent;
2939 ReorderBufferTupleBuf *newtup;
2940 bool found;
2941 int32 chunksize;
2942 bool isnull;
2943 Pointer chunk;
2944 TupleDesc desc = RelationGetDescr(relation);
2945 Oid chunk_id;
2946 int32 chunk_seq;
2947
2948 if (txn->toast_hash == NULL)
2949 ReorderBufferToastInitHash(rb, txn);
2950
2951 Assert(IsToastRelation(relation));
2952
2953 newtup = change->data.tp.newtuple;
2954 chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2955 Assert(!isnull);
2956 chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2957 Assert(!isnull);
2958
2959 ent = (ReorderBufferToastEnt *)
2960 hash_search(txn->toast_hash,
2961 (void *) &chunk_id,
2962 HASH_ENTER,
2963 &found);
2964
2965 if (!found)
2966 {
2967 Assert(ent->chunk_id == chunk_id);
2968 ent->num_chunks = 0;
2969 ent->last_chunk_seq = 0;
2970 ent->size = 0;
2971 ent->reconstructed = NULL;
2972 dlist_init(&ent->chunks);
2973
2974 if (chunk_seq != 0)
2975 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2976 chunk_seq, chunk_id);
2977 }
2978 else if (found && chunk_seq != ent->last_chunk_seq + 1)
2979 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2980 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2981
2982 chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2983 Assert(!isnull);
2984
2985 /* calculate size so we can allocate the right size at once later */
2986 if (!VARATT_IS_EXTENDED(chunk))
2987 chunksize = VARSIZE(chunk) - VARHDRSZ;
2988 else if (VARATT_IS_SHORT(chunk))
2989 /* could happen due to heap_form_tuple doing its thing */
2990 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2991 else
2992 elog(ERROR, "unexpected type of toast chunk");
2993
2994 ent->size += chunksize;
2995 ent->last_chunk_seq = chunk_seq;
2996 ent->num_chunks++;
2997 dlist_push_tail(&ent->chunks, &change->node);
2998}
2999
3000/*
3001 * Rejigger change->newtuple to point to in-memory toast tuples instead to
3002 * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3003 *
3004 * We cannot replace unchanged toast tuples though, so those will still point
3005 * to on-disk toast data.
3006 */
3007static void
3008ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
3009 Relation relation, ReorderBufferChange *change)
3010{
3011 TupleDesc desc;
3012 int natt;
3013 Datum *attrs;
3014 bool *isnull;
3015 bool *free;
3016 HeapTuple tmphtup;
3017 Relation toast_rel;
3018 TupleDesc toast_desc;
3019 MemoryContext oldcontext;
3020 ReorderBufferTupleBuf *newtup;
3021
3022 /* no toast tuples changed */
3023 if (txn->toast_hash == NULL)
3024 return;
3025
3026 oldcontext = MemoryContextSwitchTo(rb->context);
3027
3028 /* we should only have toast tuples in an INSERT or UPDATE */
3029 Assert(change->data.tp.newtuple);
3030
3031 desc = RelationGetDescr(relation);
3032
3033 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3034 if (!RelationIsValid(toast_rel))
3035 elog(ERROR, "could not open relation with OID %u",
3036 relation->rd_rel->reltoastrelid);
3037
3038 toast_desc = RelationGetDescr(toast_rel);
3039
3040 /* should we allocate from stack instead? */
3041 attrs = palloc0(sizeof(Datum) * desc->natts);
3042 isnull = palloc0(sizeof(bool) * desc->natts);
3043 free = palloc0(sizeof(bool) * desc->natts);
3044
3045 newtup = change->data.tp.newtuple;
3046
3047 heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3048
3049 for (natt = 0; natt < desc->natts; natt++)
3050 {
3051 Form_pg_attribute attr = TupleDescAttr(desc, natt);
3052 ReorderBufferToastEnt *ent;
3053 struct varlena *varlena;
3054
3055 /* va_rawsize is the size of the original datum -- including header */
3056 struct varatt_external toast_pointer;
3057 struct varatt_indirect redirect_pointer;
3058 struct varlena *new_datum = NULL;
3059 struct varlena *reconstructed;
3060 dlist_iter it;
3061 Size data_done = 0;
3062
3063 /* system columns aren't toasted */
3064 if (attr->attnum < 0)
3065 continue;
3066
3067 if (attr->attisdropped)
3068 continue;
3069
3070 /* not a varlena datatype */
3071 if (attr->attlen != -1)
3072 continue;
3073
3074 /* no data */
3075 if (isnull[natt])
3076 continue;
3077
3078 /* ok, we know we have a toast datum */
3079 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3080
3081 /* no need to do anything if the tuple isn't external */
3082 if (!VARATT_IS_EXTERNAL(varlena))
3083 continue;
3084
3085 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3086
3087 /*
3088 * Check whether the toast tuple changed, replace if so.
3089 */
3090 ent = (ReorderBufferToastEnt *)
3091 hash_search(txn->toast_hash,
3092 (void *) &toast_pointer.va_valueid,
3093 HASH_FIND,
3094 NULL);
3095 if (ent == NULL)
3096 continue;
3097
3098 new_datum =
3099 (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3100
3101 free[natt] = true;
3102
3103 reconstructed = palloc0(toast_pointer.va_rawsize);
3104
3105 ent->reconstructed = reconstructed;
3106
3107 /* stitch toast tuple back together from its parts */
3108 dlist_foreach(it, &ent->chunks)
3109 {
3110 bool isnull;
3111 ReorderBufferChange *cchange;
3112 ReorderBufferTupleBuf *ctup;
3113 Pointer chunk;
3114
3115 cchange = dlist_container(ReorderBufferChange, node, it.cur);
3116 ctup = cchange->data.tp.newtuple;
3117 chunk = DatumGetPointer(
3118 fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3119
3120 Assert(!isnull);
3121 Assert(!VARATT_IS_EXTERNAL(chunk));
3122 Assert(!VARATT_IS_SHORT(chunk));
3123
3124 memcpy(VARDATA(reconstructed) + data_done,
3125 VARDATA(chunk),
3126 VARSIZE(chunk) - VARHDRSZ);
3127 data_done += VARSIZE(chunk) - VARHDRSZ;
3128 }
3129 Assert(data_done == toast_pointer.va_extsize);
3130
3131 /* make sure its marked as compressed or not */
3132 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3133 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3134 else
3135 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3136
3137 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3138 redirect_pointer.pointer = reconstructed;
3139
3140 SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
3141 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3142 sizeof(redirect_pointer));
3143
3144 attrs[natt] = PointerGetDatum(new_datum);
3145 }
3146
3147 /*
3148 * Build tuple in separate memory & copy tuple back into the tuplebuf
3149 * passed to the output plugin. We can't directly heap_fill_tuple() into
3150 * the tuplebuf because attrs[] will point back into the current content.
3151 */
3152 tmphtup = heap_form_tuple(desc, attrs, isnull);
3153 Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3154 Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3155
3156 memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3157 newtup->tuple.t_len = tmphtup->t_len;
3158
3159 /*
3160 * free resources we won't further need, more persistent stuff will be
3161 * free'd in ReorderBufferToastReset().
3162 */
3163 RelationClose(toast_rel);
3164 pfree(tmphtup);
3165 for (natt = 0; natt < desc->natts; natt++)
3166 {
3167 if (free[natt])
3168 pfree(DatumGetPointer(attrs[natt]));
3169 }
3170 pfree(attrs);
3171 pfree(free);
3172 pfree(isnull);
3173
3174 MemoryContextSwitchTo(oldcontext);
3175}
3176
3177/*
3178 * Free all resources allocated for toast reconstruction.
3179 */
3180static void
3181ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
3182{
3183 HASH_SEQ_STATUS hstat;
3184 ReorderBufferToastEnt *ent;
3185
3186 if (txn->toast_hash == NULL)
3187 return;
3188
3189 /* sequentially walk over the hash and free everything */
3190 hash_seq_init(&hstat, txn->toast_hash);
3191 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3192 {
3193 dlist_mutable_iter it;
3194
3195 if (ent->reconstructed != NULL)
3196 pfree(ent->reconstructed);
3197
3198 dlist_foreach_modify(it, &ent->chunks)
3199 {
3200 ReorderBufferChange *change =
3201 dlist_container(ReorderBufferChange, node, it.cur);
3202
3203 dlist_delete(&change->node);
3204 ReorderBufferReturnChange(rb, change);
3205 }
3206 }
3207
3208 hash_destroy(txn->toast_hash);
3209 txn->toast_hash = NULL;
3210}
3211
3212
3213/* ---------------------------------------
3214 * Visibility support for logical decoding
3215 *
3216 *
3217 * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3218 * always rely on stored cmin/cmax values because of two scenarios:
3219 *
3220 * * A tuple got changed multiple times during a single transaction and thus
3221 * has got a combocid. Combocid's are only valid for the duration of a
3222 * single transaction.
3223 * * A tuple with a cmin but no cmax (and thus no combocid) got
3224 * deleted/updated in another transaction than the one which created it
3225 * which we are looking at right now. As only one of cmin, cmax or combocid
3226 * is actually stored in the heap we don't have access to the value we
3227 * need anymore.
3228 *
3229 * To resolve those problems we have a per-transaction hash of (cmin,
3230 * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3231 * (cmin, cmax) values. That also takes care of combocids by simply
3232 * not caring about them at all. As we have the real cmin/cmax values
3233 * combocids aren't interesting.
3234 *
3235 * As we only care about catalog tuples here the overhead of this
3236 * hashtable should be acceptable.
3237 *
3238 * Heap rewrites complicate this a bit, check rewriteheap.c for
3239 * details.
3240 * -------------------------------------------------------------------------
3241 */
3242
3243/* struct for qsort()ing mapping files by lsn somewhat efficiently */
3244typedef struct RewriteMappingFile
3245{
3246 XLogRecPtr lsn;
3247 char fname[MAXPGPATH];
3248} RewriteMappingFile;
3249
3250#if NOT_USED
3251static void
3252DisplayMapping(HTAB *tuplecid_data)
3253{
3254 HASH_SEQ_STATUS hstat;
3255 ReorderBufferTupleCidEnt *ent;
3256
3257 hash_seq_init(&hstat, tuplecid_data);
3258 while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3259 {
3260 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3261 ent->key.relnode.dbNode,
3262 ent->key.relnode.spcNode,
3263 ent->key.relnode.relNode,
3264 ItemPointerGetBlockNumber(&ent->key.tid),
3265 ItemPointerGetOffsetNumber(&ent->key.tid),
3266 ent->cmin,
3267 ent->cmax
3268 );
3269 }
3270}
3271#endif
3272
3273/*
3274 * Apply a single mapping file to tuplecid_data.
3275 *
3276 * The mapping file has to have been verified to be a) committed b) for our
3277 * transaction c) applied in LSN order.
3278 */
3279static void
3280ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3281{
3282 char path[MAXPGPATH];
3283 int fd;
3284 int readBytes;
3285 LogicalRewriteMappingData map;
3286
3287 sprintf(path, "pg_logical/mappings/%s", fname);
3288 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3289 if (fd < 0)
3290 ereport(ERROR,
3291 (errcode_for_file_access(),
3292 errmsg("could not open file \"%s\": %m", path)));
3293
3294 while (true)
3295 {
3296 ReorderBufferTupleCidKey key;
3297 ReorderBufferTupleCidEnt *ent;
3298 ReorderBufferTupleCidEnt *new_ent;
3299 bool found;
3300
3301 /* be careful about padding */
3302 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3303
3304 /* read all mappings till the end of the file */
3305 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
3306 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3307 pgstat_report_wait_end();
3308
3309 if (readBytes < 0)
3310 ereport(ERROR,
3311 (errcode_for_file_access(),
3312 errmsg("could not read file \"%s\": %m",
3313 path)));
3314 else if (readBytes == 0) /* EOF */
3315 break;
3316 else if (readBytes != sizeof(LogicalRewriteMappingData))
3317 ereport(ERROR,
3318 (errcode_for_file_access(),
3319 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3320 path, readBytes,
3321 (int32) sizeof(LogicalRewriteMappingData))));
3322
3323 key.relnode = map.old_node;
3324 ItemPointerCopy(&map.old_tid,
3325 &key.tid);
3326
3327
3328 ent = (ReorderBufferTupleCidEnt *)
3329 hash_search(tuplecid_data,
3330 (void *) &key,
3331 HASH_FIND,
3332 NULL);
3333
3334 /* no existing mapping, no need to update */
3335 if (!ent)
3336 continue;
3337
3338 key.relnode = map.new_node;
3339 ItemPointerCopy(&map.new_tid,
3340 &key.tid);
3341
3342 new_ent = (ReorderBufferTupleCidEnt *)
3343 hash_search(tuplecid_data,
3344 (void *) &key,
3345 HASH_ENTER,
3346 &found);
3347
3348 if (found)
3349 {
3350 /*
3351 * Make sure the existing mapping makes sense. We sometime update
3352 * old records that did not yet have a cmax (e.g. pg_class' own
3353 * entry while rewriting it) during rewrites, so allow that.
3354 */
3355 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3356 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3357 }
3358 else
3359 {
3360 /* update mapping */
3361 new_ent->cmin = ent->cmin;
3362 new_ent->cmax = ent->cmax;
3363 new_ent->combocid = ent->combocid;
3364 }
3365 }
3366
3367 if (CloseTransientFile(fd))
3368 ereport(ERROR,
3369 (errcode_for_file_access(),
3370 errmsg("could not close file \"%s\": %m", path)));
3371}
3372
3373
3374/*
3375 * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'.
3376 */
3377static bool
3378TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
3379{
3380 return bsearch(&xid, xip, num,
3381 sizeof(TransactionId), xidComparator) != NULL;
3382}
3383
3384/*
3385 * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3386 */
3387static int
3388file_sort_by_lsn(const void *a_p, const void *b_p)
3389{
3390 RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3391 RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3392
3393 if (a->lsn < b->lsn)
3394 return -1;
3395 else if (a->lsn > b->lsn)
3396 return 1;
3397 return 0;
3398}
3399
3400/*
3401 * Apply any existing logical remapping files if there are any targeted at our
3402 * transaction for relid.
3403 */
3404static void
3405UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
3406{
3407 DIR *mapping_dir;
3408 struct dirent *mapping_de;
3409 List *files = NIL;
3410 ListCell *file;
3411 RewriteMappingFile **files_a;
3412 size_t off;
3413 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3414
3415 mapping_dir = AllocateDir("pg_logical/mappings");
3416 while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3417 {
3418 Oid f_dboid;
3419 Oid f_relid;
3420 TransactionId f_mapped_xid;
3421 TransactionId f_create_xid;
3422 XLogRecPtr f_lsn;
3423 uint32 f_hi,
3424 f_lo;
3425 RewriteMappingFile *f;
3426
3427 if (strcmp(mapping_de->d_name, ".") == 0 ||
3428 strcmp(mapping_de->d_name, "..") == 0)
3429 continue;
3430
3431 /* Ignore files that aren't ours */
3432 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3433 continue;
3434
3435 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3436 &f_dboid, &f_relid, &f_hi, &f_lo,
3437 &f_mapped_xid, &f_create_xid) != 6)
3438 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3439
3440 f_lsn = ((uint64) f_hi) << 32 | f_lo;
3441
3442 /* mapping for another database */
3443 if (f_dboid != dboid)
3444 continue;
3445
3446 /* mapping for another relation */
3447 if (f_relid != relid)
3448 continue;
3449
3450 /* did the creating transaction abort? */
3451 if (!TransactionIdDidCommit(f_create_xid))
3452 continue;
3453
3454 /* not for our transaction */
3455 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3456 continue;
3457
3458 /* ok, relevant, queue for apply */
3459 f = palloc(sizeof(RewriteMappingFile));
3460 f->lsn = f_lsn;
3461 strcpy(f->fname, mapping_de->d_name);
3462 files = lappend(files, f);
3463 }
3464 FreeDir(mapping_dir);
3465
3466 /* build array we can easily sort */
3467 files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3468 off = 0;
3469 foreach(file, files)
3470 {
3471 files_a[off++] = lfirst(file);
3472 }
3473
3474 /* sort files so we apply them in LSN order */
3475 qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3476 file_sort_by_lsn);
3477
3478 for (off = 0; off < list_length(files); off++)
3479 {
3480 RewriteMappingFile *f = files_a[off];
3481
3482 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3483 snapshot->subxip[0]);
3484 ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3485 pfree(f);
3486 }
3487}
3488
3489/*
3490 * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3491 * combocids.
3492 */
3493bool
3494ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
3495 Snapshot snapshot,
3496 HeapTuple htup, Buffer buffer,
3497 CommandId *cmin, CommandId *cmax)
3498{
3499 ReorderBufferTupleCidKey key;
3500 ReorderBufferTupleCidEnt *ent;
3501 ForkNumber forkno;
3502 BlockNumber blockno;
3503 bool updated_mapping = false;
3504
3505 /* be careful about padding */
3506 memset(&key, 0, sizeof(key));
3507
3508 Assert(!BufferIsLocal(buffer));
3509
3510 /*
3511 * get relfilenode from the buffer, no convenient way to access it other
3512 * than that.
3513 */
3514 BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3515
3516 /* tuples can only be in the main fork */
3517 Assert(forkno == MAIN_FORKNUM);
3518 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3519
3520 ItemPointerCopy(&htup->t_self,
3521 &key.tid);
3522
3523restart:
3524 ent = (ReorderBufferTupleCidEnt *)
3525 hash_search(tuplecid_data,
3526 (void *) &key,
3527 HASH_FIND,
3528 NULL);
3529
3530 /*
3531 * failed to find a mapping, check whether the table was rewritten and
3532 * apply mapping if so, but only do that once - there can be no new
3533 * mappings while we are in here since we have to hold a lock on the
3534 * relation.
3535 */
3536 if (ent == NULL && !updated_mapping)
3537 {
3538 UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3539 /* now check but don't update for a mapping again */
3540 updated_mapping = true;
3541 goto restart;
3542 }
3543 else if (ent == NULL)
3544 return false;
3545
3546 if (cmin)
3547 *cmin = ent->cmin;
3548 if (cmax)
3549 *cmax = ent->cmax;
3550 return true;
3551}
3552