1/*-------------------------------------------------------------------------
2 *
3 * snapbuild.c
4 *
5 * Infrastructure for building historic catalog snapshots based on contents
6 * of the WAL, for the purpose of decoding heapam.c style values in the
7 * WAL.
8 *
9 * NOTES:
10 *
11 * We build snapshots which can *only* be used to read catalog contents and we
12 * do so by reading and interpreting the WAL stream. The aim is to build a
13 * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 * at the time the XLogRecord was generated.
15 *
16 * To build the snapshots we reuse the infrastructure built for Hot
17 * Standby. The in-memory snapshots we build look different than HS' because
18 * we have different needs. To successfully decode data from the WAL we only
19 * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 * tables since the data we decode is wholly contained in the WAL
21 * records. Also, our snapshots need to be different in comparison to normal
22 * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 * pg_subtrans for information about committed transactions because they might
24 * commit in the future from the POV of the WAL entry we're currently
25 * decoding. This definition has the advantage that we only need to prevent
26 * removal of catalog rows, while normal table's rows can still be
27 * removed. This is achieved by using the replication slot mechanism.
28 *
29 * As the percentage of transactions modifying the catalog normally is fairly
30 * small in comparisons to ones only manipulating user data, we keep track of
31 * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 * track of all running transactions like it's done in a normal snapshot. Note
33 * that we're generally only looking at transactions that have acquired an
34 * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 * that we consider committed, everything else is considered aborted/in
36 * progress. That also allows us not to care about subtransactions before they
37 * have committed which means this module, in contrast to HS, doesn't have to
38 * care about suboverflowed subtransactions and similar.
39 *
40 * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 * transactions we need Snapshots that see intermediate versions of the
42 * catalog in a transaction. During normal operation this is achieved by using
43 * CommandIds/cmin/cmax. The problem with that however is that for space
44 * efficiency reasons only one value of that is stored
45 * (cf. combocid.c). Since ComboCids are only available in memory we log
46 * additional information which allows us to get the original (cmin, cmax)
47 * pair during visibility checks. Check the reorderbuffer.c's comment above
48 * ResolveCminCmaxDuringDecoding() for details.
49 *
50 * To facilitate all this we need our own visibility routine, as the normal
51 * ones are optimized for different usecases.
52 *
53 * To replace the normal catalog snapshots with decoding ones use the
54 * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55 *
56 *
57 *
58 * The snapbuild machinery is starting up in several stages, as illustrated
59 * by the following graph describing the SnapBuild->state transitions:
60 *
61 * +-------------------------+
62 * +----| START |-------------+
63 * | +-------------------------+ |
64 * | | |
65 * | | |
66 * | running_xacts #1 |
67 * | | |
68 * | | |
69 * | v |
70 * | +-------------------------+ v
71 * | | BUILDING_SNAPSHOT |------------>|
72 * | +-------------------------+ |
73 * | | |
74 * | | |
75 * | running_xacts #2, xacts from #1 finished |
76 * | | |
77 * | | |
78 * | v |
79 * | +-------------------------+ v
80 * | | FULL_SNAPSHOT |------------>|
81 * | +-------------------------+ |
82 * | | |
83 * running_xacts | saved snapshot
84 * with zero xacts | at running_xacts's lsn
85 * | | |
86 * | running_xacts with xacts from #2 finished |
87 * | | |
88 * | v |
89 * | +-------------------------+ |
90 * +--->|SNAPBUILD_CONSISTENT |<------------+
91 * +-------------------------+
92 *
93 * Initially the machinery is in the START stage. When an xl_running_xacts
94 * record is read that is sufficiently new (above the safe xmin horizon),
95 * there's a state transition. If there were no running xacts when the
96 * running_xacts record was generated, we'll directly go into CONSISTENT
97 * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
98 * snapshot means that all transactions that start henceforth can be decoded
99 * in their entirety, but transactions that started previously can't. In
100 * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
101 * running transactions have committed or aborted.
102 *
103 * Only transactions that commit after CONSISTENT state has been reached will
104 * be replayed, even though they might have started while still in
105 * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
106 * changes has been exported, but all the following ones will be. That point
107 * is a convenient point to initialize replication from, which is why we
108 * export a snapshot at that point, which *can* be used to read normal data.
109 *
110 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
111 *
112 * IDENTIFICATION
113 * src/backend/replication/snapbuild.c
114 *
115 *-------------------------------------------------------------------------
116 */
117
118#include "postgres.h"
119
120#include <sys/stat.h>
121#include <unistd.h>
122
123#include "miscadmin.h"
124
125#include "access/heapam_xlog.h"
126#include "access/transam.h"
127#include "access/xact.h"
128
129#include "pgstat.h"
130
131#include "replication/logical.h"
132#include "replication/reorderbuffer.h"
133#include "replication/snapbuild.h"
134
135#include "utils/builtins.h"
136#include "utils/memutils.h"
137#include "utils/snapshot.h"
138#include "utils/snapmgr.h"
139
140#include "storage/block.h" /* debugging output */
141#include "storage/fd.h"
142#include "storage/lmgr.h"
143#include "storage/proc.h"
144#include "storage/procarray.h"
145#include "storage/standby.h"
146
147/*
148 * This struct contains the current state of the snapshot building
149 * machinery. Besides a forward declaration in the header, it is not exposed
150 * to the public, so we can easily change its contents.
151 */
152struct SnapBuild
153{
154 /* how far are we along building our first full snapshot */
155 SnapBuildState state;
156
157 /* private memory context used to allocate memory for this module. */
158 MemoryContext context;
159
160 /* all transactions < than this have committed/aborted */
161 TransactionId xmin;
162
163 /* all transactions >= than this are uncommitted */
164 TransactionId xmax;
165
166 /*
167 * Don't replay commits from an LSN < this LSN. This can be set externally
168 * but it will also be advanced (never retreat) from within snapbuild.c.
169 */
170 XLogRecPtr start_decoding_at;
171
172 /*
173 * Don't start decoding WAL until the "xl_running_xacts" information
174 * indicates there are no running xids with an xid smaller than this.
175 */
176 TransactionId initial_xmin_horizon;
177
178 /* Indicates if we are building full snapshot or just catalog one. */
179 bool building_full_snapshot;
180
181 /*
182 * Snapshot that's valid to see the catalog state seen at this moment.
183 */
184 Snapshot snapshot;
185
186 /*
187 * LSN of the last location we are sure a snapshot has been serialized to.
188 */
189 XLogRecPtr last_serialized_snapshot;
190
191 /*
192 * The reorderbuffer we need to update with usable snapshots et al.
193 */
194 ReorderBuffer *reorder;
195
196 /*
197 * Outdated: This struct isn't used for its original purpose anymore, but
198 * can't be removed / changed in a minor version, because it's stored
199 * on-disk.
200 */
201 struct
202 {
203 /*
204 * NB: This field is misused, until a major version can break on-disk
205 * compatibility. See SnapBuildNextPhaseAt() /
206 * SnapBuildStartNextPhaseAt().
207 */
208 TransactionId was_xmin;
209 TransactionId was_xmax;
210
211 size_t was_xcnt; /* number of used xip entries */
212 size_t was_xcnt_space; /* allocated size of xip */
213 TransactionId *was_xip; /* running xacts array, xidComparator-sorted */
214 } was_running;
215
216 /*
217 * Array of transactions which could have catalog changes that committed
218 * between xmin and xmax.
219 */
220 struct
221 {
222 /* number of committed transactions */
223 size_t xcnt;
224
225 /* available space for committed transactions */
226 size_t xcnt_space;
227
228 /*
229 * Until we reach a CONSISTENT state, we record commits of all
230 * transactions, not just the catalog changing ones. Record when that
231 * changes so we know we cannot export a snapshot safely anymore.
232 */
233 bool includes_all_transactions;
234
235 /*
236 * Array of committed transactions that have modified the catalog.
237 *
238 * As this array is frequently modified we do *not* keep it in
239 * xidComparator order. Instead we sort the array when building &
240 * distributing a snapshot.
241 *
242 * TODO: It's unclear whether that reasoning has much merit. Every
243 * time we add something here after becoming consistent will also
244 * require distributing a snapshot. Storing them sorted would
245 * potentially also make it easier to purge (but more complicated wrt
246 * wraparound?). Should be improved if sorting while building the
247 * snapshot shows up in profiles.
248 */
249 TransactionId *xip;
250 } committed;
251};
252
253/*
254 * Starting a transaction -- which we need to do while exporting a snapshot --
255 * removes knowledge about the previously used resowner, so we save it here.
256 */
257static ResourceOwner SavedResourceOwnerDuringExport = NULL;
258static bool ExportInProgress = false;
259
260/* ->committed manipulation */
261static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
262
263/* snapshot building/manipulation/distribution functions */
264static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
265
266static void SnapBuildFreeSnapshot(Snapshot snap);
267
268static void SnapBuildSnapIncRefcount(Snapshot snap);
269
270static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
271
272/* xlog reading helper functions for SnapBuildProcessRecord */
273static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
274static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
275
276/* serialization functions */
277static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
278static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
279
280/*
281 * Return TransactionId after which the next phase of initial snapshot
282 * building will happen.
283 */
284static inline TransactionId
285SnapBuildNextPhaseAt(SnapBuild *builder)
286{
287 /*
288 * For backward compatibility reasons this has to be stored in the wrongly
289 * named field. Will be fixed in next major version.
290 */
291 return builder->was_running.was_xmax;
292}
293
294/*
295 * Set TransactionId after which the next phase of initial snapshot building
296 * will happen.
297 */
298static inline void
299SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
300{
301 /*
302 * For backward compatibility reasons this has to be stored in the wrongly
303 * named field. Will be fixed in next major version.
304 */
305 builder->was_running.was_xmax = at;
306}
307
308/*
309 * Allocate a new snapshot builder.
310 *
311 * xmin_horizon is the xid >= which we can be sure no catalog rows have been
312 * removed, start_lsn is the LSN >= we want to replay commits.
313 */
314SnapBuild *
315AllocateSnapshotBuilder(ReorderBuffer *reorder,
316 TransactionId xmin_horizon,
317 XLogRecPtr start_lsn,
318 bool need_full_snapshot)
319{
320 MemoryContext context;
321 MemoryContext oldcontext;
322 SnapBuild *builder;
323
324 /* allocate memory in own context, to have better accountability */
325 context = AllocSetContextCreate(CurrentMemoryContext,
326 "snapshot builder context",
327 ALLOCSET_DEFAULT_SIZES);
328 oldcontext = MemoryContextSwitchTo(context);
329
330 builder = palloc0(sizeof(SnapBuild));
331
332 builder->state = SNAPBUILD_START;
333 builder->context = context;
334 builder->reorder = reorder;
335 /* Other struct members initialized by zeroing via palloc0 above */
336
337 builder->committed.xcnt = 0;
338 builder->committed.xcnt_space = 128; /* arbitrary number */
339 builder->committed.xip =
340 palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
341 builder->committed.includes_all_transactions = true;
342
343 builder->initial_xmin_horizon = xmin_horizon;
344 builder->start_decoding_at = start_lsn;
345 builder->building_full_snapshot = need_full_snapshot;
346
347 MemoryContextSwitchTo(oldcontext);
348
349 return builder;
350}
351
352/*
353 * Free a snapshot builder.
354 */
355void
356FreeSnapshotBuilder(SnapBuild *builder)
357{
358 MemoryContext context = builder->context;
359
360 /* free snapshot explicitly, that contains some error checking */
361 if (builder->snapshot != NULL)
362 {
363 SnapBuildSnapDecRefcount(builder->snapshot);
364 builder->snapshot = NULL;
365 }
366
367 /* other resources are deallocated via memory context reset */
368 MemoryContextDelete(context);
369}
370
371/*
372 * Free an unreferenced snapshot that has previously been built by us.
373 */
374static void
375SnapBuildFreeSnapshot(Snapshot snap)
376{
377 /* make sure we don't get passed an external snapshot */
378 Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
379
380 /* make sure nobody modified our snapshot */
381 Assert(snap->curcid == FirstCommandId);
382 Assert(!snap->suboverflowed);
383 Assert(!snap->takenDuringRecovery);
384 Assert(snap->regd_count == 0);
385
386 /* slightly more likely, so it's checked even without c-asserts */
387 if (snap->copied)
388 elog(ERROR, "cannot free a copied snapshot");
389
390 if (snap->active_count)
391 elog(ERROR, "cannot free an active snapshot");
392
393 pfree(snap);
394}
395
396/*
397 * In which state of snapshot building are we?
398 */
399SnapBuildState
400SnapBuildCurrentState(SnapBuild *builder)
401{
402 return builder->state;
403}
404
405/*
406 * Should the contents of transaction ending at 'ptr' be decoded?
407 */
408bool
409SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
410{
411 return ptr < builder->start_decoding_at;
412}
413
414/*
415 * Increase refcount of a snapshot.
416 *
417 * This is used when handing out a snapshot to some external resource or when
418 * adding a Snapshot as builder->snapshot.
419 */
420static void
421SnapBuildSnapIncRefcount(Snapshot snap)
422{
423 snap->active_count++;
424}
425
426/*
427 * Decrease refcount of a snapshot and free if the refcount reaches zero.
428 *
429 * Externally visible, so that external resources that have been handed an
430 * IncRef'ed Snapshot can adjust its refcount easily.
431 */
432void
433SnapBuildSnapDecRefcount(Snapshot snap)
434{
435 /* make sure we don't get passed an external snapshot */
436 Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
437
438 /* make sure nobody modified our snapshot */
439 Assert(snap->curcid == FirstCommandId);
440 Assert(!snap->suboverflowed);
441 Assert(!snap->takenDuringRecovery);
442
443 Assert(snap->regd_count == 0);
444
445 Assert(snap->active_count > 0);
446
447 /* slightly more likely, so it's checked even without casserts */
448 if (snap->copied)
449 elog(ERROR, "cannot free a copied snapshot");
450
451 snap->active_count--;
452 if (snap->active_count == 0)
453 SnapBuildFreeSnapshot(snap);
454}
455
456/*
457 * Build a new snapshot, based on currently committed catalog-modifying
458 * transactions.
459 *
460 * In-progress transactions with catalog access are *not* allowed to modify
461 * these snapshots; they have to copy them and fill in appropriate ->curcid
462 * and ->subxip/subxcnt values.
463 */
464static Snapshot
465SnapBuildBuildSnapshot(SnapBuild *builder)
466{
467 Snapshot snapshot;
468 Size ssize;
469
470 Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
471
472 ssize = sizeof(SnapshotData)
473 + sizeof(TransactionId) * builder->committed.xcnt
474 + sizeof(TransactionId) * 1 /* toplevel xid */ ;
475
476 snapshot = MemoryContextAllocZero(builder->context, ssize);
477
478 snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
479
480 /*
481 * We misuse the original meaning of SnapshotData's xip and subxip fields
482 * to make the more fitting for our needs.
483 *
484 * In the 'xip' array we store transactions that have to be treated as
485 * committed. Since we will only ever look at tuples from transactions
486 * that have modified the catalog it's more efficient to store those few
487 * that exist between xmin and xmax (frequently there are none).
488 *
489 * Snapshots that are used in transactions that have modified the catalog
490 * also use the 'subxip' array to store their toplevel xid and all the
491 * subtransaction xids so we can recognize when we need to treat rows as
492 * visible that are not in xip but still need to be visible. Subxip only
493 * gets filled when the transaction is copied into the context of a
494 * catalog modifying transaction since we otherwise share a snapshot
495 * between transactions. As long as a txn hasn't modified the catalog it
496 * doesn't need to treat any uncommitted rows as visible, so there is no
497 * need for those xids.
498 *
499 * Both arrays are qsort'ed so that we can use bsearch() on them.
500 */
501 Assert(TransactionIdIsNormal(builder->xmin));
502 Assert(TransactionIdIsNormal(builder->xmax));
503
504 snapshot->xmin = builder->xmin;
505 snapshot->xmax = builder->xmax;
506
507 /* store all transactions to be treated as committed by this snapshot */
508 snapshot->xip =
509 (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
510 snapshot->xcnt = builder->committed.xcnt;
511 memcpy(snapshot->xip,
512 builder->committed.xip,
513 builder->committed.xcnt * sizeof(TransactionId));
514
515 /* sort so we can bsearch() */
516 qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
517
518 /*
519 * Initially, subxip is empty, i.e. it's a snapshot to be used by
520 * transactions that don't modify the catalog. Will be filled by
521 * ReorderBufferCopySnap() if necessary.
522 */
523 snapshot->subxcnt = 0;
524 snapshot->subxip = NULL;
525
526 snapshot->suboverflowed = false;
527 snapshot->takenDuringRecovery = false;
528 snapshot->copied = false;
529 snapshot->curcid = FirstCommandId;
530 snapshot->active_count = 0;
531 snapshot->regd_count = 0;
532
533 return snapshot;
534}
535
536/*
537 * Build the initial slot snapshot and convert it to a normal snapshot that
538 * is understood by HeapTupleSatisfiesMVCC.
539 *
540 * The snapshot will be usable directly in current transaction or exported
541 * for loading in different transaction.
542 */
543Snapshot
544SnapBuildInitialSnapshot(SnapBuild *builder)
545{
546 Snapshot snap;
547 TransactionId xid;
548 TransactionId *newxip;
549 int newxcnt = 0;
550
551 Assert(!FirstSnapshotSet);
552 Assert(XactIsoLevel == XACT_REPEATABLE_READ);
553
554 if (builder->state != SNAPBUILD_CONSISTENT)
555 elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
556
557 if (!builder->committed.includes_all_transactions)
558 elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
559
560 /* so we don't overwrite the existing value */
561 if (TransactionIdIsValid(MyPgXact->xmin))
562 elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
563
564 snap = SnapBuildBuildSnapshot(builder);
565
566 /*
567 * We know that snap->xmin is alive, enforced by the logical xmin
568 * mechanism. Due to that we can do this without locks, we're only
569 * changing our own value.
570 */
571#ifdef USE_ASSERT_CHECKING
572 {
573 TransactionId safeXid;
574
575 LWLockAcquire(ProcArrayLock, LW_SHARED);
576 safeXid = GetOldestSafeDecodingTransactionId(false);
577 LWLockRelease(ProcArrayLock);
578
579 Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
580 }
581#endif
582
583 MyPgXact->xmin = snap->xmin;
584
585 /* allocate in transaction context */
586 newxip = (TransactionId *)
587 palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
588
589 /*
590 * snapbuild.c builds transactions in an "inverted" manner, which means it
591 * stores committed transactions in ->xip, not ones in progress. Build a
592 * classical snapshot by marking all non-committed transactions as
593 * in-progress. This can be expensive.
594 */
595 for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
596 {
597 void *test;
598
599 /*
600 * Check whether transaction committed using the decoding snapshot
601 * meaning of ->xip.
602 */
603 test = bsearch(&xid, snap->xip, snap->xcnt,
604 sizeof(TransactionId), xidComparator);
605
606 if (test == NULL)
607 {
608 if (newxcnt >= GetMaxSnapshotXidCount())
609 ereport(ERROR,
610 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
611 errmsg("initial slot snapshot too large")));
612
613 newxip[newxcnt++] = xid;
614 }
615
616 TransactionIdAdvance(xid);
617 }
618
619 /* adjust remaining snapshot fields as needed */
620 snap->snapshot_type = SNAPSHOT_MVCC;
621 snap->xcnt = newxcnt;
622 snap->xip = newxip;
623
624 return snap;
625}
626
627/*
628 * Export a snapshot so it can be set in another session with SET TRANSACTION
629 * SNAPSHOT.
630 *
631 * For that we need to start a transaction in the current backend as the
632 * importing side checks whether the source transaction is still open to make
633 * sure the xmin horizon hasn't advanced since then.
634 */
635const char *
636SnapBuildExportSnapshot(SnapBuild *builder)
637{
638 Snapshot snap;
639 char *snapname;
640
641 if (IsTransactionOrTransactionBlock())
642 elog(ERROR, "cannot export a snapshot from within a transaction");
643
644 if (SavedResourceOwnerDuringExport)
645 elog(ERROR, "can only export one snapshot at a time");
646
647 SavedResourceOwnerDuringExport = CurrentResourceOwner;
648 ExportInProgress = true;
649
650 StartTransactionCommand();
651
652 /* There doesn't seem to a nice API to set these */
653 XactIsoLevel = XACT_REPEATABLE_READ;
654 XactReadOnly = true;
655
656 snap = SnapBuildInitialSnapshot(builder);
657
658 /*
659 * now that we've built a plain snapshot, make it active and use the
660 * normal mechanisms for exporting it
661 */
662 snapname = ExportSnapshot(snap);
663
664 ereport(LOG,
665 (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
666 "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
667 snap->xcnt,
668 snapname, snap->xcnt)));
669 return snapname;
670}
671
672/*
673 * Ensure there is a snapshot and if not build one for current transaction.
674 */
675Snapshot
676SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
677{
678 Assert(builder->state == SNAPBUILD_CONSISTENT);
679
680 /* only build a new snapshot if we don't have a prebuilt one */
681 if (builder->snapshot == NULL)
682 {
683 builder->snapshot = SnapBuildBuildSnapshot(builder);
684 /* increase refcount for the snapshot builder */
685 SnapBuildSnapIncRefcount(builder->snapshot);
686 }
687
688 return builder->snapshot;
689}
690
691/*
692 * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
693 * any. Aborts the previously started transaction and resets the resource
694 * owner back to its original value.
695 */
696void
697SnapBuildClearExportedSnapshot(void)
698{
699 /* nothing exported, that is the usual case */
700 if (!ExportInProgress)
701 return;
702
703 if (!IsTransactionState())
704 elog(ERROR, "clearing exported snapshot in wrong transaction state");
705
706 /* make sure nothing could have ever happened */
707 AbortCurrentTransaction();
708
709 CurrentResourceOwner = SavedResourceOwnerDuringExport;
710 SavedResourceOwnerDuringExport = NULL;
711 ExportInProgress = false;
712}
713
714/*
715 * Handle the effects of a single heap change, appropriate to the current state
716 * of the snapshot builder and returns whether changes made at (xid, lsn) can
717 * be decoded.
718 */
719bool
720SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
721{
722 /*
723 * We can't handle data in transactions if we haven't built a snapshot
724 * yet, so don't store them.
725 */
726 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
727 return false;
728
729 /*
730 * No point in keeping track of changes in transactions that we don't have
731 * enough information about to decode. This means that they started before
732 * we got into the SNAPBUILD_FULL_SNAPSHOT state.
733 */
734 if (builder->state < SNAPBUILD_CONSISTENT &&
735 TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder)))
736 return false;
737
738 /*
739 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
740 * be needed to decode the change we're currently processing.
741 */
742 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
743 {
744 /* only build a new snapshot if we don't have a prebuilt one */
745 if (builder->snapshot == NULL)
746 {
747 builder->snapshot = SnapBuildBuildSnapshot(builder);
748 /* increase refcount for the snapshot builder */
749 SnapBuildSnapIncRefcount(builder->snapshot);
750 }
751
752 /*
753 * Increase refcount for the transaction we're handing the snapshot
754 * out to.
755 */
756 SnapBuildSnapIncRefcount(builder->snapshot);
757 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
758 builder->snapshot);
759 }
760
761 return true;
762}
763
764/*
765 * Do CommandId/ComboCid handling after reading an xl_heap_new_cid record.
766 * This implies that a transaction has done some form of write to system
767 * catalogs.
768 */
769void
770SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
771 XLogRecPtr lsn, xl_heap_new_cid *xlrec)
772{
773 CommandId cid;
774
775 /*
776 * we only log new_cid's if a catalog tuple was modified, so mark the
777 * transaction as containing catalog modifications
778 */
779 ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
780
781 ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
782 xlrec->target_node, xlrec->target_tid,
783 xlrec->cmin, xlrec->cmax,
784 xlrec->combocid);
785
786 /* figure out new command id */
787 if (xlrec->cmin != InvalidCommandId &&
788 xlrec->cmax != InvalidCommandId)
789 cid = Max(xlrec->cmin, xlrec->cmax);
790 else if (xlrec->cmax != InvalidCommandId)
791 cid = xlrec->cmax;
792 else if (xlrec->cmin != InvalidCommandId)
793 cid = xlrec->cmin;
794 else
795 {
796 cid = InvalidCommandId; /* silence compiler */
797 elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
798 }
799
800 ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
801}
802
803/*
804 * Add a new Snapshot to all transactions we're decoding that currently are
805 * in-progress so they can see new catalog contents made by the transaction
806 * that just committed. This is necessary because those in-progress
807 * transactions will use the new catalog's contents from here on (at the very
808 * least everything they do needs to be compatible with newer catalog
809 * contents).
810 */
811static void
812SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
813{
814 dlist_iter txn_i;
815 ReorderBufferTXN *txn;
816
817 /*
818 * Iterate through all toplevel transactions. This can include
819 * subtransactions which we just don't yet know to be that, but that's
820 * fine, they will just get an unnecessary snapshot queued.
821 */
822 dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
823 {
824 txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
825
826 Assert(TransactionIdIsValid(txn->xid));
827
828 /*
829 * If we don't have a base snapshot yet, there are no changes in this
830 * transaction which in turn implies we don't yet need a snapshot at
831 * all. We'll add a snapshot when the first change gets queued.
832 *
833 * NB: This works correctly even for subtransactions because
834 * ReorderBufferAssignChild() takes care to transfer the base snapshot
835 * to the top-level transaction, and while iterating the changequeue
836 * we'll get the change from the subtxn.
837 */
838 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
839 continue;
840
841 elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
842 txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
843
844 /*
845 * increase the snapshot's refcount for the transaction we are handing
846 * it out to
847 */
848 SnapBuildSnapIncRefcount(builder->snapshot);
849 ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
850 builder->snapshot);
851 }
852}
853
854/*
855 * Keep track of a new catalog changing transaction that has committed.
856 */
857static void
858SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
859{
860 Assert(TransactionIdIsValid(xid));
861
862 if (builder->committed.xcnt == builder->committed.xcnt_space)
863 {
864 builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
865
866 elog(DEBUG1, "increasing space for committed transactions to %u",
867 (uint32) builder->committed.xcnt_space);
868
869 builder->committed.xip = repalloc(builder->committed.xip,
870 builder->committed.xcnt_space * sizeof(TransactionId));
871 }
872
873 /*
874 * TODO: It might make sense to keep the array sorted here instead of
875 * doing it every time we build a new snapshot. On the other hand this
876 * gets called repeatedly when a transaction with subtransactions commits.
877 */
878 builder->committed.xip[builder->committed.xcnt++] = xid;
879}
880
881/*
882 * Remove knowledge about transactions we treat as committed that are smaller
883 * than ->xmin. Those won't ever get checked via the ->committed array but via
884 * the clog machinery, so we don't need to waste memory on them.
885 */
886static void
887SnapBuildPurgeCommittedTxn(SnapBuild *builder)
888{
889 int off;
890 TransactionId *workspace;
891 int surviving_xids = 0;
892
893 /* not ready yet */
894 if (!TransactionIdIsNormal(builder->xmin))
895 return;
896
897 /* TODO: Neater algorithm than just copying and iterating? */
898 workspace =
899 MemoryContextAlloc(builder->context,
900 builder->committed.xcnt * sizeof(TransactionId));
901
902 /* copy xids that still are interesting to workspace */
903 for (off = 0; off < builder->committed.xcnt; off++)
904 {
905 if (NormalTransactionIdPrecedes(builder->committed.xip[off],
906 builder->xmin))
907 ; /* remove */
908 else
909 workspace[surviving_xids++] = builder->committed.xip[off];
910 }
911
912 /* copy workspace back to persistent state */
913 memcpy(builder->committed.xip, workspace,
914 surviving_xids * sizeof(TransactionId));
915
916 elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
917 (uint32) builder->committed.xcnt, (uint32) surviving_xids,
918 builder->xmin, builder->xmax);
919 builder->committed.xcnt = surviving_xids;
920
921 pfree(workspace);
922}
923
924/*
925 * Handle everything that needs to be done when a transaction commits
926 */
927void
928SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
929 int nsubxacts, TransactionId *subxacts)
930{
931 int nxact;
932
933 bool needs_snapshot = false;
934 bool needs_timetravel = false;
935 bool sub_needs_timetravel = false;
936
937 TransactionId xmax = xid;
938
939 /*
940 * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
941 * will they be part of a snapshot. So we don't need to record anything.
942 */
943 if (builder->state == SNAPBUILD_START ||
944 (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
945 TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
946 {
947 /* ensure that only commits after this are getting replayed */
948 if (builder->start_decoding_at <= lsn)
949 builder->start_decoding_at = lsn + 1;
950 return;
951 }
952
953 if (builder->state < SNAPBUILD_CONSISTENT)
954 {
955 /* ensure that only commits after this are getting replayed */
956 if (builder->start_decoding_at <= lsn)
957 builder->start_decoding_at = lsn + 1;
958
959 /*
960 * If building an exportable snapshot, force xid to be tracked, even
961 * if the transaction didn't modify the catalog.
962 */
963 if (builder->building_full_snapshot)
964 {
965 needs_timetravel = true;
966 }
967 }
968
969 for (nxact = 0; nxact < nsubxacts; nxact++)
970 {
971 TransactionId subxid = subxacts[nxact];
972
973 /*
974 * Add subtransaction to base snapshot if catalog modifying, we don't
975 * distinguish to toplevel transactions there.
976 */
977 if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
978 {
979 sub_needs_timetravel = true;
980 needs_snapshot = true;
981
982 elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
983 xid, subxid);
984
985 SnapBuildAddCommittedTxn(builder, subxid);
986
987 if (NormalTransactionIdFollows(subxid, xmax))
988 xmax = subxid;
989 }
990
991 /*
992 * If we're forcing timetravel we also need visibility information
993 * about subtransaction, so keep track of subtransaction's state, even
994 * if not catalog modifying. Don't need to distribute a snapshot in
995 * that case.
996 */
997 else if (needs_timetravel)
998 {
999 SnapBuildAddCommittedTxn(builder, subxid);
1000 if (NormalTransactionIdFollows(subxid, xmax))
1001 xmax = subxid;
1002 }
1003 }
1004
1005 /* if top-level modified catalog, it'll need a snapshot */
1006 if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1007 {
1008 elog(DEBUG2, "found top level transaction %u, with catalog changes",
1009 xid);
1010 needs_snapshot = true;
1011 needs_timetravel = true;
1012 SnapBuildAddCommittedTxn(builder, xid);
1013 }
1014 else if (sub_needs_timetravel)
1015 {
1016 /* track toplevel txn as well, subxact alone isn't meaningful */
1017 SnapBuildAddCommittedTxn(builder, xid);
1018 }
1019 else if (needs_timetravel)
1020 {
1021 elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1022
1023 SnapBuildAddCommittedTxn(builder, xid);
1024 }
1025
1026 if (!needs_timetravel)
1027 {
1028 /* record that we cannot export a general snapshot anymore */
1029 builder->committed.includes_all_transactions = false;
1030 }
1031
1032 Assert(!needs_snapshot || needs_timetravel);
1033
1034 /*
1035 * Adjust xmax of the snapshot builder, we only do that for committed,
1036 * catalog modifying, transactions, everything else isn't interesting for
1037 * us since we'll never look at the respective rows.
1038 */
1039 if (needs_timetravel &&
1040 (!TransactionIdIsValid(builder->xmax) ||
1041 TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1042 {
1043 builder->xmax = xmax;
1044 TransactionIdAdvance(builder->xmax);
1045 }
1046
1047 /* if there's any reason to build a historic snapshot, do so now */
1048 if (needs_snapshot)
1049 {
1050 /*
1051 * If we haven't built a complete snapshot yet there's no need to hand
1052 * it out, it wouldn't (and couldn't) be used anyway.
1053 */
1054 if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1055 return;
1056
1057 /*
1058 * Decrease the snapshot builder's refcount of the old snapshot, note
1059 * that it still will be used if it has been handed out to the
1060 * reorderbuffer earlier.
1061 */
1062 if (builder->snapshot)
1063 SnapBuildSnapDecRefcount(builder->snapshot);
1064
1065 builder->snapshot = SnapBuildBuildSnapshot(builder);
1066
1067 /* we might need to execute invalidations, add snapshot */
1068 if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1069 {
1070 SnapBuildSnapIncRefcount(builder->snapshot);
1071 ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1072 builder->snapshot);
1073 }
1074
1075 /* refcount of the snapshot builder for the new snapshot */
1076 SnapBuildSnapIncRefcount(builder->snapshot);
1077
1078 /* add a new catalog snapshot to all currently running transactions */
1079 SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1080 }
1081}
1082
1083
1084/* -----------------------------------
1085 * Snapshot building functions dealing with xlog records
1086 * -----------------------------------
1087 */
1088
1089/*
1090 * Process a running xacts record, and use its information to first build a
1091 * historic snapshot and later to release resources that aren't needed
1092 * anymore.
1093 */
1094void
1095SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1096{
1097 ReorderBufferTXN *txn;
1098 TransactionId xmin;
1099
1100 /*
1101 * If we're not consistent yet, inspect the record to see whether it
1102 * allows to get closer to being consistent. If we are consistent, dump
1103 * our snapshot so others or we, after a restart, can use it.
1104 */
1105 if (builder->state < SNAPBUILD_CONSISTENT)
1106 {
1107 /* returns false if there's no point in performing cleanup just yet */
1108 if (!SnapBuildFindSnapshot(builder, lsn, running))
1109 return;
1110 }
1111 else
1112 SnapBuildSerialize(builder, lsn);
1113
1114 /*
1115 * Update range of interesting xids based on the running xacts
1116 * information. We don't increase ->xmax using it, because once we are in
1117 * a consistent state we can do that ourselves and much more efficiently
1118 * so, because we only need to do it for catalog transactions since we
1119 * only ever look at those.
1120 *
1121 * NB: We only increase xmax when a catalog modifying transaction commits
1122 * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1123 * xmin, which looks odd but is correct and actually more efficient, since
1124 * we hit fast paths in heapam_visibility.c.
1125 */
1126 builder->xmin = running->oldestRunningXid;
1127
1128 /* Remove transactions we don't need to keep track off anymore */
1129 SnapBuildPurgeCommittedTxn(builder);
1130
1131 /*
1132 * Advance the xmin limit for the current replication slot, to allow
1133 * vacuum to clean up the tuples this slot has been protecting.
1134 *
1135 * The reorderbuffer might have an xmin among the currently running
1136 * snapshots; use it if so. If not, we need only consider the snapshots
1137 * we'll produce later, which can't be less than the oldest running xid in
1138 * the record we're reading now.
1139 */
1140 xmin = ReorderBufferGetOldestXmin(builder->reorder);
1141 if (xmin == InvalidTransactionId)
1142 xmin = running->oldestRunningXid;
1143 elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1144 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1145 LogicalIncreaseXminForSlot(lsn, xmin);
1146
1147 /*
1148 * Also tell the slot where we can restart decoding from. We don't want to
1149 * do that after every commit because changing that implies an fsync of
1150 * the logical slot's state file, so we only do it every time we see a
1151 * running xacts record.
1152 *
1153 * Do so by looking for the oldest in progress transaction (determined by
1154 * the first LSN of any of its relevant records). Every transaction
1155 * remembers the last location we stored the snapshot to disk before its
1156 * beginning. That point is where we can restart from.
1157 */
1158
1159 /*
1160 * Can't know about a serialized snapshot's location if we're not
1161 * consistent.
1162 */
1163 if (builder->state < SNAPBUILD_CONSISTENT)
1164 return;
1165
1166 txn = ReorderBufferGetOldestTXN(builder->reorder);
1167
1168 /*
1169 * oldest ongoing txn might have started when we didn't yet serialize
1170 * anything because we hadn't reached a consistent state yet.
1171 */
1172 if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1173 LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1174
1175 /*
1176 * No in-progress transaction, can reuse the last serialized snapshot if
1177 * we have one.
1178 */
1179 else if (txn == NULL &&
1180 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1181 builder->last_serialized_snapshot != InvalidXLogRecPtr)
1182 LogicalIncreaseRestartDecodingForSlot(lsn,
1183 builder->last_serialized_snapshot);
1184}
1185
1186
1187/*
1188 * Build the start of a snapshot that's capable of decoding the catalog.
1189 *
1190 * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1191 * consistent.
1192 *
1193 * Returns true if there is a point in performing internal maintenance/cleanup
1194 * using the xl_running_xacts record.
1195 */
1196static bool
1197SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1198{
1199 /* ---
1200 * Build catalog decoding snapshot incrementally using information about
1201 * the currently running transactions. There are several ways to do that:
1202 *
1203 * a) There were no running transactions when the xl_running_xacts record
1204 * was inserted, jump to CONSISTENT immediately. We might find such a
1205 * state while waiting on c)'s sub-states.
1206 *
1207 * b) This (in a previous run) or another decoding slot serialized a
1208 * snapshot to disk that we can use. Can't use this method for the
1209 * initial snapshot when slot is being created and needs full snapshot
1210 * for export or direct use, as that snapshot will only contain catalog
1211 * modifying transactions.
1212 *
1213 * c) First incrementally build a snapshot for catalog tuples
1214 * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1215 * transactions to finish. Every transaction starting after that
1216 * (FULL_SNAPSHOT state), has enough information to be decoded. But
1217 * for older running transactions no viable snapshot exists yet, so
1218 * CONSISTENT will only be reached once all of those have finished.
1219 * ---
1220 */
1221
1222 /*
1223 * xl_running_xact record is older than what we can use, we might not have
1224 * all necessary catalog rows anymore.
1225 */
1226 if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1227 NormalTransactionIdPrecedes(running->oldestRunningXid,
1228 builder->initial_xmin_horizon))
1229 {
1230 ereport(DEBUG1,
1231 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1232 (uint32) (lsn >> 32), (uint32) lsn),
1233 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1234 builder->initial_xmin_horizon, running->oldestRunningXid)));
1235
1236
1237 SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1238
1239 return true;
1240 }
1241
1242 /*
1243 * a) No transaction were running, we can jump to consistent.
1244 *
1245 * This is not affected by races around xl_running_xacts, because we can
1246 * miss transaction commits, but currently not transactions starting.
1247 *
1248 * NB: We might have already started to incrementally assemble a snapshot,
1249 * so we need to be careful to deal with that.
1250 */
1251 if (running->oldestRunningXid == running->nextXid)
1252 {
1253 if (builder->start_decoding_at == InvalidXLogRecPtr ||
1254 builder->start_decoding_at <= lsn)
1255 /* can decode everything after this */
1256 builder->start_decoding_at = lsn + 1;
1257
1258 /* As no transactions were running xmin/xmax can be trivially set. */
1259 builder->xmin = running->nextXid; /* < are finished */
1260 builder->xmax = running->nextXid; /* >= are running */
1261
1262 /* so we can safely use the faster comparisons */
1263 Assert(TransactionIdIsNormal(builder->xmin));
1264 Assert(TransactionIdIsNormal(builder->xmax));
1265
1266 builder->state = SNAPBUILD_CONSISTENT;
1267 SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1268
1269 ereport(LOG,
1270 (errmsg("logical decoding found consistent point at %X/%X",
1271 (uint32) (lsn >> 32), (uint32) lsn),
1272 errdetail("There are no running transactions.")));
1273
1274 return false;
1275 }
1276 /* b) valid on disk state and not building full snapshot */
1277 else if (!builder->building_full_snapshot &&
1278 SnapBuildRestore(builder, lsn))
1279 {
1280 /* there won't be any state to cleanup */
1281 return false;
1282 }
1283
1284 /*
1285 * c) transition from START to BUILDING_SNAPSHOT.
1286 *
1287 * In START state, and a xl_running_xacts record with running xacts is
1288 * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1289 * record xl_running_xacts->nextXid. Once all running xacts have finished
1290 * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1291 * might look that we could use xl_running_xact's ->xids information to
1292 * get there quicker, but that is problematic because transactions marked
1293 * as running, might already have inserted their commit record - it's
1294 * infeasible to change that with locking.
1295 */
1296 else if (builder->state == SNAPBUILD_START)
1297 {
1298 builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1299 SnapBuildStartNextPhaseAt(builder, running->nextXid);
1300
1301 /*
1302 * Start with an xmin/xmax that's correct for future, when all the
1303 * currently running transactions have finished. We'll update both
1304 * while waiting for the pending transactions to finish.
1305 */
1306 builder->xmin = running->nextXid; /* < are finished */
1307 builder->xmax = running->nextXid; /* >= are running */
1308
1309 /* so we can safely use the faster comparisons */
1310 Assert(TransactionIdIsNormal(builder->xmin));
1311 Assert(TransactionIdIsNormal(builder->xmax));
1312
1313 ereport(LOG,
1314 (errmsg("logical decoding found initial starting point at %X/%X",
1315 (uint32) (lsn >> 32), (uint32) lsn),
1316 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1317 running->xcnt, running->nextXid)));
1318
1319 SnapBuildWaitSnapshot(running, running->nextXid);
1320 }
1321
1322 /*
1323 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1324 *
1325 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1326 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1327 * means all transactions starting afterwards have enough information to
1328 * be decoded. Switch to FULL_SNAPSHOT.
1329 */
1330 else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1331 TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1332 running->oldestRunningXid))
1333 {
1334 builder->state = SNAPBUILD_FULL_SNAPSHOT;
1335 SnapBuildStartNextPhaseAt(builder, running->nextXid);
1336
1337 ereport(LOG,
1338 (errmsg("logical decoding found initial consistent point at %X/%X",
1339 (uint32) (lsn >> 32), (uint32) lsn),
1340 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1341 running->xcnt, running->nextXid)));
1342
1343 SnapBuildWaitSnapshot(running, running->nextXid);
1344 }
1345
1346 /*
1347 * c) transition from FULL_SNAPSHOT to CONSISTENT.
1348 *
1349 * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts'
1350 * oldestRunningXid is >= than nextXid from when we switched to
1351 * FULL_SNAPSHOT. This means all transactions that are currently in
1352 * progress have a catalog snapshot, and all their changes have been
1353 * collected. Switch to CONSISTENT.
1354 */
1355 else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1356 TransactionIdPrecedesOrEquals(SnapBuildNextPhaseAt(builder),
1357 running->oldestRunningXid))
1358 {
1359 builder->state = SNAPBUILD_CONSISTENT;
1360 SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
1361
1362 ereport(LOG,
1363 (errmsg("logical decoding found consistent point at %X/%X",
1364 (uint32) (lsn >> 32), (uint32) lsn),
1365 errdetail("There are no old transactions anymore.")));
1366 }
1367
1368 /*
1369 * We already started to track running xacts and need to wait for all
1370 * in-progress ones to finish. We fall through to the normal processing of
1371 * records so incremental cleanup can be performed.
1372 */
1373 return true;
1374
1375}
1376
1377/* ---
1378 * Iterate through xids in record, wait for all older than the cutoff to
1379 * finish. Then, if possible, log a new xl_running_xacts record.
1380 *
1381 * This isn't required for the correctness of decoding, but to:
1382 * a) allow isolationtester to notice that we're currently waiting for
1383 * something.
1384 * b) log a new xl_running_xacts record where it'd be helpful, without having
1385 * to write for bgwriter or checkpointer.
1386 * ---
1387 */
1388static void
1389SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1390{
1391 int off;
1392
1393 for (off = 0; off < running->xcnt; off++)
1394 {
1395 TransactionId xid = running->xids[off];
1396
1397 /*
1398 * Upper layers should prevent that we ever need to wait on ourselves.
1399 * Check anyway, since failing to do so would either result in an
1400 * endless wait or an Assert() failure.
1401 */
1402 if (TransactionIdIsCurrentTransactionId(xid))
1403 elog(ERROR, "waiting for ourselves");
1404
1405 if (TransactionIdFollows(xid, cutoff))
1406 continue;
1407
1408 XactLockTableWait(xid, NULL, NULL, XLTW_None);
1409 }
1410
1411 /*
1412 * All transactions we needed to finish finished - try to ensure there is
1413 * another xl_running_xacts record in a timely manner, without having to
1414 * write for bgwriter or checkpointer to log one. During recovery we
1415 * can't enforce that, so we'll have to wait.
1416 */
1417 if (!RecoveryInProgress())
1418 {
1419 LogStandbySnapshot();
1420 }
1421}
1422
1423/* -----------------------------------
1424 * Snapshot serialization support
1425 * -----------------------------------
1426 */
1427
1428/*
1429 * We store current state of struct SnapBuild on disk in the following manner:
1430 *
1431 * struct SnapBuildOnDisk;
1432 * TransactionId * running.xcnt_space;
1433 * TransactionId * committed.xcnt; (*not xcnt_space*)
1434 *
1435 */
1436typedef struct SnapBuildOnDisk
1437{
1438 /* first part of this struct needs to be version independent */
1439
1440 /* data not covered by checksum */
1441 uint32 magic;
1442 pg_crc32c checksum;
1443
1444 /* data covered by checksum */
1445
1446 /* version, in case we want to support pg_upgrade */
1447 uint32 version;
1448 /* how large is the on disk data, excluding the constant sized part */
1449 uint32 length;
1450
1451 /* version dependent part */
1452 SnapBuild builder;
1453
1454 /* variable amount of TransactionIds follows */
1455} SnapBuildOnDisk;
1456
1457#define SnapBuildOnDiskConstantSize \
1458 offsetof(SnapBuildOnDisk, builder)
1459#define SnapBuildOnDiskNotChecksummedSize \
1460 offsetof(SnapBuildOnDisk, version)
1461
1462#define SNAPBUILD_MAGIC 0x51A1E001
1463#define SNAPBUILD_VERSION 2
1464
1465/*
1466 * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1467 *
1468 * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1469 * a record that's a potential location for a serialized snapshot.
1470 */
1471void
1472SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1473{
1474 if (builder->state < SNAPBUILD_CONSISTENT)
1475 SnapBuildRestore(builder, lsn);
1476 else
1477 SnapBuildSerialize(builder, lsn);
1478}
1479
1480/*
1481 * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1482 * been done by another decoding process.
1483 */
1484static void
1485SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1486{
1487 Size needed_length;
1488 SnapBuildOnDisk *ondisk;
1489 char *ondisk_c;
1490 int fd;
1491 char tmppath[MAXPGPATH];
1492 char path[MAXPGPATH];
1493 int ret;
1494 struct stat stat_buf;
1495 Size sz;
1496
1497 Assert(lsn != InvalidXLogRecPtr);
1498 Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1499 builder->last_serialized_snapshot <= lsn);
1500
1501 /*
1502 * no point in serializing if we cannot continue to work immediately after
1503 * restoring the snapshot
1504 */
1505 if (builder->state < SNAPBUILD_CONSISTENT)
1506 return;
1507
1508 /*
1509 * We identify snapshots by the LSN they are valid for. We don't need to
1510 * include timelines in the name as each LSN maps to exactly one timeline
1511 * unless the user used pg_resetwal or similar. If a user did so, there's
1512 * no hope continuing to decode anyway.
1513 */
1514 sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1515 (uint32) (lsn >> 32), (uint32) lsn);
1516
1517 /*
1518 * first check whether some other backend already has written the snapshot
1519 * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1520 * as a valid state. Everything else is an unexpected error.
1521 */
1522 ret = stat(path, &stat_buf);
1523
1524 if (ret != 0 && errno != ENOENT)
1525 ereport(ERROR,
1526 (errcode_for_file_access(),
1527 errmsg("could not stat file \"%s\": %m", path)));
1528
1529 else if (ret == 0)
1530 {
1531 /*
1532 * somebody else has already serialized to this point, don't overwrite
1533 * but remember location, so we don't need to read old data again.
1534 *
1535 * To be sure it has been synced to disk after the rename() from the
1536 * tempfile filename to the real filename, we just repeat the fsync.
1537 * That ought to be cheap because in most scenarios it should already
1538 * be safely on disk.
1539 */
1540 fsync_fname(path, false);
1541 fsync_fname("pg_logical/snapshots", true);
1542
1543 builder->last_serialized_snapshot = lsn;
1544 goto out;
1545 }
1546
1547 /*
1548 * there is an obvious race condition here between the time we stat(2) the
1549 * file and us writing the file. But we rename the file into place
1550 * atomically and all files created need to contain the same data anyway,
1551 * so this is perfectly fine, although a bit of a resource waste. Locking
1552 * seems like pointless complication.
1553 */
1554 elog(DEBUG1, "serializing snapshot to %s", path);
1555
1556 /* to make sure only we will write to this tempfile, include pid */
1557 sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%u.tmp",
1558 (uint32) (lsn >> 32), (uint32) lsn, MyProcPid);
1559
1560 /*
1561 * Unlink temporary file if it already exists, needs to have been before a
1562 * crash/error since we won't enter this function twice from within a
1563 * single decoding slot/backend and the temporary file contains the pid of
1564 * the current process.
1565 */
1566 if (unlink(tmppath) != 0 && errno != ENOENT)
1567 ereport(ERROR,
1568 (errcode_for_file_access(),
1569 errmsg("could not remove file \"%s\": %m", tmppath)));
1570
1571 needed_length = sizeof(SnapBuildOnDisk) +
1572 sizeof(TransactionId) * builder->committed.xcnt;
1573
1574 ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
1575 ondisk = (SnapBuildOnDisk *) ondisk_c;
1576 ondisk->magic = SNAPBUILD_MAGIC;
1577 ondisk->version = SNAPBUILD_VERSION;
1578 ondisk->length = needed_length;
1579 INIT_CRC32C(ondisk->checksum);
1580 COMP_CRC32C(ondisk->checksum,
1581 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1582 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1583 ondisk_c += sizeof(SnapBuildOnDisk);
1584
1585 memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1586 /* NULL-ify memory-only data */
1587 ondisk->builder.context = NULL;
1588 ondisk->builder.snapshot = NULL;
1589 ondisk->builder.reorder = NULL;
1590 ondisk->builder.committed.xip = NULL;
1591
1592 COMP_CRC32C(ondisk->checksum,
1593 &ondisk->builder,
1594 sizeof(SnapBuild));
1595
1596 /* there shouldn't be any running xacts */
1597 Assert(builder->was_running.was_xcnt == 0);
1598
1599 /* copy committed xacts */
1600 sz = sizeof(TransactionId) * builder->committed.xcnt;
1601 memcpy(ondisk_c, builder->committed.xip, sz);
1602 COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1603 ondisk_c += sz;
1604
1605 FIN_CRC32C(ondisk->checksum);
1606
1607 /* we have valid data now, open tempfile and write it there */
1608 fd = OpenTransientFile(tmppath,
1609 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1610 if (fd < 0)
1611 ereport(ERROR,
1612 (errcode_for_file_access(),
1613 errmsg("could not open file \"%s\": %m", tmppath)));
1614
1615 errno = 0;
1616 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1617 if ((write(fd, ondisk, needed_length)) != needed_length)
1618 {
1619 int save_errno = errno;
1620
1621 CloseTransientFile(fd);
1622
1623 /* if write didn't set errno, assume problem is no disk space */
1624 errno = save_errno ? save_errno : ENOSPC;
1625 ereport(ERROR,
1626 (errcode_for_file_access(),
1627 errmsg("could not write to file \"%s\": %m", tmppath)));
1628 }
1629 pgstat_report_wait_end();
1630
1631 /*
1632 * fsync the file before renaming so that even if we crash after this we
1633 * have either a fully valid file or nothing.
1634 *
1635 * It's safe to just ERROR on fsync() here because we'll retry the whole
1636 * operation including the writes.
1637 *
1638 * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1639 * some noticeable overhead since it's performed synchronously during
1640 * decoding?
1641 */
1642 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1643 if (pg_fsync(fd) != 0)
1644 {
1645 int save_errno = errno;
1646
1647 CloseTransientFile(fd);
1648 errno = save_errno;
1649 ereport(ERROR,
1650 (errcode_for_file_access(),
1651 errmsg("could not fsync file \"%s\": %m", tmppath)));
1652 }
1653 pgstat_report_wait_end();
1654
1655 if (CloseTransientFile(fd))
1656 ereport(ERROR,
1657 (errcode_for_file_access(),
1658 errmsg("could not close file \"%s\": %m", tmppath)));
1659
1660 fsync_fname("pg_logical/snapshots", true);
1661
1662 /*
1663 * We may overwrite the work from some other backend, but that's ok, our
1664 * snapshot is valid as well, we'll just have done some superfluous work.
1665 */
1666 if (rename(tmppath, path) != 0)
1667 {
1668 ereport(ERROR,
1669 (errcode_for_file_access(),
1670 errmsg("could not rename file \"%s\" to \"%s\": %m",
1671 tmppath, path)));
1672 }
1673
1674 /* make sure we persist */
1675 fsync_fname(path, false);
1676 fsync_fname("pg_logical/snapshots", true);
1677
1678 /*
1679 * Now there's no way we can loose the dumped state anymore, remember this
1680 * as a serialization point.
1681 */
1682 builder->last_serialized_snapshot = lsn;
1683
1684out:
1685 ReorderBufferSetRestartPoint(builder->reorder,
1686 builder->last_serialized_snapshot);
1687}
1688
1689/*
1690 * Restore a snapshot into 'builder' if previously one has been stored at the
1691 * location indicated by 'lsn'. Returns true if successful, false otherwise.
1692 */
1693static bool
1694SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1695{
1696 SnapBuildOnDisk ondisk;
1697 int fd;
1698 char path[MAXPGPATH];
1699 Size sz;
1700 int readBytes;
1701 pg_crc32c checksum;
1702
1703 /* no point in loading a snapshot if we're already there */
1704 if (builder->state == SNAPBUILD_CONSISTENT)
1705 return false;
1706
1707 sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1708 (uint32) (lsn >> 32), (uint32) lsn);
1709
1710 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1711
1712 if (fd < 0 && errno == ENOENT)
1713 return false;
1714 else if (fd < 0)
1715 ereport(ERROR,
1716 (errcode_for_file_access(),
1717 errmsg("could not open file \"%s\": %m", path)));
1718
1719 /* ----
1720 * Make sure the snapshot had been stored safely to disk, that's normally
1721 * cheap.
1722 * Note that we do not need PANIC here, nobody will be able to use the
1723 * slot without fsyncing, and saving it won't succeed without an fsync()
1724 * either...
1725 * ----
1726 */
1727 fsync_fname(path, false);
1728 fsync_fname("pg_logical/snapshots", true);
1729
1730
1731 /* read statically sized portion of snapshot */
1732 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1733 readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
1734 pgstat_report_wait_end();
1735 if (readBytes != SnapBuildOnDiskConstantSize)
1736 {
1737 int save_errno = errno;
1738
1739 CloseTransientFile(fd);
1740
1741 if (readBytes < 0)
1742 {
1743 errno = save_errno;
1744 ereport(ERROR,
1745 (errcode_for_file_access(),
1746 errmsg("could not read file \"%s\": %m", path)));
1747 }
1748 else
1749 ereport(ERROR,
1750 (errcode(ERRCODE_DATA_CORRUPTED),
1751 errmsg("could not read file \"%s\": read %d of %zu",
1752 path, readBytes,
1753 (Size) SnapBuildOnDiskConstantSize)));
1754 }
1755
1756 if (ondisk.magic != SNAPBUILD_MAGIC)
1757 ereport(ERROR,
1758 (errcode(ERRCODE_DATA_CORRUPTED),
1759 errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1760 path, ondisk.magic, SNAPBUILD_MAGIC)));
1761
1762 if (ondisk.version != SNAPBUILD_VERSION)
1763 ereport(ERROR,
1764 (errcode(ERRCODE_DATA_CORRUPTED),
1765 errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1766 path, ondisk.version, SNAPBUILD_VERSION)));
1767
1768 INIT_CRC32C(checksum);
1769 COMP_CRC32C(checksum,
1770 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1771 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1772
1773 /* read SnapBuild */
1774 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1775 readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
1776 pgstat_report_wait_end();
1777 if (readBytes != sizeof(SnapBuild))
1778 {
1779 int save_errno = errno;
1780
1781 CloseTransientFile(fd);
1782
1783 if (readBytes < 0)
1784 {
1785 errno = save_errno;
1786 ereport(ERROR,
1787 (errcode_for_file_access(),
1788 errmsg("could not read file \"%s\": %m", path)));
1789 }
1790 else
1791 ereport(ERROR,
1792 (errcode(ERRCODE_DATA_CORRUPTED),
1793 errmsg("could not read file \"%s\": read %d of %zu",
1794 path, readBytes, sizeof(SnapBuild))));
1795 }
1796 COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1797
1798 /* restore running xacts (dead, but kept for backward compat) */
1799 sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
1800 ondisk.builder.was_running.was_xip =
1801 MemoryContextAllocZero(builder->context, sz);
1802 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1803 readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
1804 pgstat_report_wait_end();
1805 if (readBytes != sz)
1806 {
1807 int save_errno = errno;
1808
1809 CloseTransientFile(fd);
1810
1811 if (readBytes < 0)
1812 {
1813 errno = save_errno;
1814 ereport(ERROR,
1815 (errcode_for_file_access(),
1816 errmsg("could not read file \"%s\": %m", path)));
1817 }
1818 else
1819 ereport(ERROR,
1820 (errcode(ERRCODE_DATA_CORRUPTED),
1821 errmsg("could not read file \"%s\": read %d of %zu",
1822 path, readBytes, sz)));
1823 }
1824 COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
1825
1826 /* restore committed xacts information */
1827 sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1828 ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1829 pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1830 readBytes = read(fd, ondisk.builder.committed.xip, sz);
1831 pgstat_report_wait_end();
1832 if (readBytes != sz)
1833 {
1834 int save_errno = errno;
1835
1836 CloseTransientFile(fd);
1837
1838 if (readBytes < 0)
1839 {
1840 errno = save_errno;
1841 ereport(ERROR,
1842 (errcode_for_file_access(),
1843 errmsg("could not read file \"%s\": %m", path)));
1844 }
1845 else
1846 ereport(ERROR,
1847 (errcode(ERRCODE_DATA_CORRUPTED),
1848 errmsg("could not read file \"%s\": read %d of %zu",
1849 path, readBytes, sz)));
1850 }
1851 COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1852
1853 if (CloseTransientFile(fd))
1854 ereport(ERROR,
1855 (errcode_for_file_access(),
1856 errmsg("could not close file \"%s\": %m", path)));
1857
1858 FIN_CRC32C(checksum);
1859
1860 /* verify checksum of what we've read */
1861 if (!EQ_CRC32C(checksum, ondisk.checksum))
1862 ereport(ERROR,
1863 (errcode(ERRCODE_DATA_CORRUPTED),
1864 errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1865 path, checksum, ondisk.checksum)));
1866
1867 /*
1868 * ok, we now have a sensible snapshot here, figure out if it has more
1869 * information than we have.
1870 */
1871
1872 /*
1873 * We are only interested in consistent snapshots for now, comparing
1874 * whether one incomplete snapshot is more "advanced" seems to be
1875 * unnecessarily complex.
1876 */
1877 if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1878 goto snapshot_not_interesting;
1879
1880 /*
1881 * Don't use a snapshot that requires an xmin that we cannot guarantee to
1882 * be available.
1883 */
1884 if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1885 goto snapshot_not_interesting;
1886
1887
1888 /* ok, we think the snapshot is sensible, copy over everything important */
1889 builder->xmin = ondisk.builder.xmin;
1890 builder->xmax = ondisk.builder.xmax;
1891 builder->state = ondisk.builder.state;
1892
1893 builder->committed.xcnt = ondisk.builder.committed.xcnt;
1894 /* We only allocated/stored xcnt, not xcnt_space xids ! */
1895 /* don't overwrite preallocated xip, if we don't have anything here */
1896 if (builder->committed.xcnt > 0)
1897 {
1898 pfree(builder->committed.xip);
1899 builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1900 builder->committed.xip = ondisk.builder.committed.xip;
1901 }
1902 ondisk.builder.committed.xip = NULL;
1903
1904 /* our snapshot is not interesting anymore, build a new one */
1905 if (builder->snapshot != NULL)
1906 {
1907 SnapBuildSnapDecRefcount(builder->snapshot);
1908 }
1909 builder->snapshot = SnapBuildBuildSnapshot(builder);
1910 SnapBuildSnapIncRefcount(builder->snapshot);
1911
1912 ReorderBufferSetRestartPoint(builder->reorder, lsn);
1913
1914 Assert(builder->state == SNAPBUILD_CONSISTENT);
1915
1916 ereport(LOG,
1917 (errmsg("logical decoding found consistent point at %X/%X",
1918 (uint32) (lsn >> 32), (uint32) lsn),
1919 errdetail("Logical decoding will begin using saved snapshot.")));
1920 return true;
1921
1922snapshot_not_interesting:
1923 if (ondisk.builder.committed.xip != NULL)
1924 pfree(ondisk.builder.committed.xip);
1925 return false;
1926}
1927
1928/*
1929 * Remove all serialized snapshots that are not required anymore because no
1930 * slot can need them. This doesn't actually have to run during a checkpoint,
1931 * but it's a convenient point to schedule this.
1932 *
1933 * NB: We run this during checkpoints even if logical decoding is disabled so
1934 * we cleanup old slots at some point after it got disabled.
1935 */
1936void
1937CheckPointSnapBuild(void)
1938{
1939 XLogRecPtr cutoff;
1940 XLogRecPtr redo;
1941 DIR *snap_dir;
1942 struct dirent *snap_de;
1943 char path[MAXPGPATH + 21];
1944
1945 /*
1946 * We start off with a minimum of the last redo pointer. No new
1947 * replication slot will start before that, so that's a safe upper bound
1948 * for removal.
1949 */
1950 redo = GetRedoRecPtr();
1951
1952 /* now check for the restart ptrs from existing slots */
1953 cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1954
1955 /* don't start earlier than the restart lsn */
1956 if (redo < cutoff)
1957 cutoff = redo;
1958
1959 snap_dir = AllocateDir("pg_logical/snapshots");
1960 while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
1961 {
1962 uint32 hi;
1963 uint32 lo;
1964 XLogRecPtr lsn;
1965 struct stat statbuf;
1966
1967 if (strcmp(snap_de->d_name, ".") == 0 ||
1968 strcmp(snap_de->d_name, "..") == 0)
1969 continue;
1970
1971 snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
1972
1973 if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
1974 {
1975 elog(DEBUG1, "only regular files expected: %s", path);
1976 continue;
1977 }
1978
1979 /*
1980 * temporary filenames from SnapBuildSerialize() include the LSN and
1981 * everything but are postfixed by .$pid.tmp. We can just remove them
1982 * the same as other files because there can be none that are
1983 * currently being written that are older than cutoff.
1984 *
1985 * We just log a message if a file doesn't fit the pattern, it's
1986 * probably some editors lock/state file or similar...
1987 */
1988 if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
1989 {
1990 ereport(LOG,
1991 (errmsg("could not parse file name \"%s\"", path)));
1992 continue;
1993 }
1994
1995 lsn = ((uint64) hi) << 32 | lo;
1996
1997 /* check whether we still need it */
1998 if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
1999 {
2000 elog(DEBUG1, "removing snapbuild snapshot %s", path);
2001
2002 /*
2003 * It's not particularly harmful, though strange, if we can't
2004 * remove the file here. Don't prevent the checkpoint from
2005 * completing, that'd be a cure worse than the disease.
2006 */
2007 if (unlink(path) < 0)
2008 {
2009 ereport(LOG,
2010 (errcode_for_file_access(),
2011 errmsg("could not remove file \"%s\": %m",
2012 path)));
2013 continue;
2014 }
2015 }
2016 }
2017 FreeDir(snap_dir);
2018}
2019