1/*-------------------------------------------------------------------------
2 *
3 * snapmgr.c
4 * PostgreSQL snapshot manager
5 *
6 * We keep track of snapshots in two ways: those "registered" by resowner.c,
7 * and the "active snapshot" stack. All snapshots in either of them live in
8 * persistent memory. When a snapshot is no longer in any of these lists
9 * (tracked by separate refcounts on each snapshot), its memory can be freed.
10 *
11 * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
12 * regd_count and list it in RegisteredSnapshots, but this reference is not
13 * tracked by a resource owner. We used to use the TopTransactionResourceOwner
14 * to track this snapshot reference, but that introduces logical circularity
15 * and thus makes it impossible to clean up in a sane fashion. It's better to
16 * handle this reference as an internally-tracked registration, so that this
17 * module is entirely lower-level than ResourceOwners.
18 *
19 * Likewise, any snapshots that have been exported by pg_export_snapshot
20 * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
21 * tracked by any resource owner.
22 *
23 * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
24 * is valid, but is not tracked by any resource owner.
25 *
26 * The same is true for historic snapshots used during logical decoding,
27 * their lifetime is managed separately (as they live longer than one xact.c
28 * transaction).
29 *
30 * These arrangements let us reset MyPgXact->xmin when there are no snapshots
31 * referenced by this transaction, and advance it when the one with oldest
32 * Xmin is no longer referenced. For simplicity however, only registered
33 * snapshots not active snapshots participate in tracking which one is oldest;
34 * we don't try to change MyPgXact->xmin except when the active-snapshot
35 * stack is empty.
36 *
37 *
38 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
39 * Portions Copyright (c) 1994, Regents of the University of California
40 *
41 * IDENTIFICATION
42 * src/backend/utils/time/snapmgr.c
43 *
44 *-------------------------------------------------------------------------
45 */
46#include "postgres.h"
47
48#include <sys/stat.h>
49#include <unistd.h>
50
51#include "access/subtrans.h"
52#include "access/transam.h"
53#include "access/xact.h"
54#include "access/xlog.h"
55#include "catalog/catalog.h"
56#include "lib/pairingheap.h"
57#include "miscadmin.h"
58#include "storage/predicate.h"
59#include "storage/proc.h"
60#include "storage/procarray.h"
61#include "storage/sinval.h"
62#include "storage/sinvaladt.h"
63#include "storage/spin.h"
64#include "utils/builtins.h"
65#include "utils/memutils.h"
66#include "utils/rel.h"
67#include "utils/resowner_private.h"
68#include "utils/snapmgr.h"
69#include "utils/syscache.h"
70
71
72/*
73 * GUC parameters
74 */
75int old_snapshot_threshold; /* number of minutes, -1 disables */
76
77/*
78 * Structure for dealing with old_snapshot_threshold implementation.
79 */
80typedef struct OldSnapshotControlData
81{
82 /*
83 * Variables for old snapshot handling are shared among processes and are
84 * only allowed to move forward.
85 */
86 slock_t mutex_current; /* protect current_timestamp */
87 TimestampTz current_timestamp; /* latest snapshot timestamp */
88 slock_t mutex_latest_xmin; /* protect latest_xmin and next_map_update */
89 TransactionId latest_xmin; /* latest snapshot xmin */
90 TimestampTz next_map_update; /* latest snapshot valid up to */
91 slock_t mutex_threshold; /* protect threshold fields */
92 TimestampTz threshold_timestamp; /* earlier snapshot is old */
93 TransactionId threshold_xid; /* earlier xid may be gone */
94
95 /*
96 * Keep one xid per minute for old snapshot error handling.
97 *
98 * Use a circular buffer with a head offset, a count of entries currently
99 * used, and a timestamp corresponding to the xid at the head offset. A
100 * count_used value of zero means that there are no times stored; a
101 * count_used value of OLD_SNAPSHOT_TIME_MAP_ENTRIES means that the buffer
102 * is full and the head must be advanced to add new entries. Use
103 * timestamps aligned to minute boundaries, since that seems less
104 * surprising than aligning based on the first usage timestamp. The
105 * latest bucket is effectively stored within latest_xmin. The circular
106 * buffer is updated when we get a new xmin value that doesn't fall into
107 * the same interval.
108 *
109 * It is OK if the xid for a given time slot is from earlier than
110 * calculated by adding the number of minutes corresponding to the
111 * (possibly wrapped) distance from the head offset to the time of the
112 * head entry, since that just results in the vacuuming of old tuples
113 * being slightly less aggressive. It would not be OK for it to be off in
114 * the other direction, since it might result in vacuuming tuples that are
115 * still expected to be there.
116 *
117 * Use of an SLRU was considered but not chosen because it is more
118 * heavyweight than is needed for this, and would probably not be any less
119 * code to implement.
120 *
121 * Persistence is not needed.
122 */
123 int head_offset; /* subscript of oldest tracked time */
124 TimestampTz head_timestamp; /* time corresponding to head xid */
125 int count_used; /* how many slots are in use */
126 TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
127} OldSnapshotControlData;
128
129static volatile OldSnapshotControlData *oldSnapshotControl;
130
131
132/*
133 * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
134 * mode, and to the latest one taken in a read-committed transaction.
135 * SecondarySnapshot is a snapshot that's always up-to-date as of the current
136 * instant, even in transaction-snapshot mode. It should only be used for
137 * special-purpose code (say, RI checking.) CatalogSnapshot points to an
138 * MVCC snapshot intended to be used for catalog scans; we must invalidate it
139 * whenever a system catalog change occurs.
140 *
141 * These SnapshotData structs are static to simplify memory allocation
142 * (see the hack in GetSnapshotData to avoid repeated malloc/free).
143 */
144static SnapshotData CurrentSnapshotData = {SNAPSHOT_MVCC};
145static SnapshotData SecondarySnapshotData = {SNAPSHOT_MVCC};
146SnapshotData CatalogSnapshotData = {SNAPSHOT_MVCC};
147SnapshotData SnapshotSelfData = {SNAPSHOT_SELF};
148SnapshotData SnapshotAnyData = {SNAPSHOT_ANY};
149
150/* Pointers to valid snapshots */
151static Snapshot CurrentSnapshot = NULL;
152static Snapshot SecondarySnapshot = NULL;
153static Snapshot CatalogSnapshot = NULL;
154static Snapshot HistoricSnapshot = NULL;
155
156/*
157 * These are updated by GetSnapshotData. We initialize them this way
158 * for the convenience of TransactionIdIsInProgress: even in bootstrap
159 * mode, we don't want it to say that BootstrapTransactionId is in progress.
160 *
161 * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
162 * InvalidTransactionId, to ensure that no one tries to use a stale
163 * value. Readers should ensure that it has been set to something else
164 * before using it.
165 */
166TransactionId TransactionXmin = FirstNormalTransactionId;
167TransactionId RecentXmin = FirstNormalTransactionId;
168TransactionId RecentGlobalXmin = InvalidTransactionId;
169TransactionId RecentGlobalDataXmin = InvalidTransactionId;
170
171/* (table, ctid) => (cmin, cmax) mapping during timetravel */
172static HTAB *tuplecid_data = NULL;
173
174/*
175 * Elements of the active snapshot stack.
176 *
177 * Each element here accounts for exactly one active_count on SnapshotData.
178 *
179 * NB: the code assumes that elements in this list are in non-increasing
180 * order of as_level; also, the list must be NULL-terminated.
181 */
182typedef struct ActiveSnapshotElt
183{
184 Snapshot as_snap;
185 int as_level;
186 struct ActiveSnapshotElt *as_next;
187} ActiveSnapshotElt;
188
189/* Top of the stack of active snapshots */
190static ActiveSnapshotElt *ActiveSnapshot = NULL;
191
192/* Bottom of the stack of active snapshots */
193static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
194
195/*
196 * Currently registered Snapshots. Ordered in a heap by xmin, so that we can
197 * quickly find the one with lowest xmin, to advance our MyPgXact->xmin.
198 */
199static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
200 void *arg);
201
202static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
203
204/* first GetTransactionSnapshot call in a transaction? */
205bool FirstSnapshotSet = false;
206
207/*
208 * Remember the serializable transaction snapshot, if any. We cannot trust
209 * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
210 * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
211 */
212static Snapshot FirstXactSnapshot = NULL;
213
214/* Define pathname of exported-snapshot files */
215#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
216
217/* Structure holding info about exported snapshot. */
218typedef struct ExportedSnapshot
219{
220 char *snapfile;
221 Snapshot snapshot;
222} ExportedSnapshot;
223
224/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
225static List *exportedSnapshots = NIL;
226
227/* Prototypes for local functions */
228static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
229static Snapshot CopySnapshot(Snapshot snapshot);
230static void FreeSnapshot(Snapshot snapshot);
231static void SnapshotResetXmin(void);
232
233/*
234 * Snapshot fields to be serialized.
235 *
236 * Only these fields need to be sent to the cooperating backend; the
237 * remaining ones can (and must) be set by the receiver upon restore.
238 */
239typedef struct SerializedSnapshotData
240{
241 TransactionId xmin;
242 TransactionId xmax;
243 uint32 xcnt;
244 int32 subxcnt;
245 bool suboverflowed;
246 bool takenDuringRecovery;
247 CommandId curcid;
248 TimestampTz whenTaken;
249 XLogRecPtr lsn;
250} SerializedSnapshotData;
251
252Size
253SnapMgrShmemSize(void)
254{
255 Size size;
256
257 size = offsetof(OldSnapshotControlData, xid_by_minute);
258 if (old_snapshot_threshold > 0)
259 size = add_size(size, mul_size(sizeof(TransactionId),
260 OLD_SNAPSHOT_TIME_MAP_ENTRIES));
261
262 return size;
263}
264
265/*
266 * Initialize for managing old snapshot detection.
267 */
268void
269SnapMgrInit(void)
270{
271 bool found;
272
273 /*
274 * Create or attach to the OldSnapshotControlData structure.
275 */
276 oldSnapshotControl = (volatile OldSnapshotControlData *)
277 ShmemInitStruct("OldSnapshotControlData",
278 SnapMgrShmemSize(), &found);
279
280 if (!found)
281 {
282 SpinLockInit(&oldSnapshotControl->mutex_current);
283 oldSnapshotControl->current_timestamp = 0;
284 SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
285 oldSnapshotControl->latest_xmin = InvalidTransactionId;
286 oldSnapshotControl->next_map_update = 0;
287 SpinLockInit(&oldSnapshotControl->mutex_threshold);
288 oldSnapshotControl->threshold_timestamp = 0;
289 oldSnapshotControl->threshold_xid = InvalidTransactionId;
290 oldSnapshotControl->head_offset = 0;
291 oldSnapshotControl->head_timestamp = 0;
292 oldSnapshotControl->count_used = 0;
293 }
294}
295
296/*
297 * GetTransactionSnapshot
298 * Get the appropriate snapshot for a new query in a transaction.
299 *
300 * Note that the return value may point at static storage that will be modified
301 * by future calls and by CommandCounterIncrement(). Callers should call
302 * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
303 * used very long.
304 */
305Snapshot
306GetTransactionSnapshot(void)
307{
308 /*
309 * Return historic snapshot if doing logical decoding. We'll never need a
310 * non-historic transaction snapshot in this (sub-)transaction, so there's
311 * no need to be careful to set one up for later calls to
312 * GetTransactionSnapshot().
313 */
314 if (HistoricSnapshotActive())
315 {
316 Assert(!FirstSnapshotSet);
317 return HistoricSnapshot;
318 }
319
320 /* First call in transaction? */
321 if (!FirstSnapshotSet)
322 {
323 /*
324 * Don't allow catalog snapshot to be older than xact snapshot. Must
325 * do this first to allow the empty-heap Assert to succeed.
326 */
327 InvalidateCatalogSnapshot();
328
329 Assert(pairingheap_is_empty(&RegisteredSnapshots));
330 Assert(FirstXactSnapshot == NULL);
331
332 if (IsInParallelMode())
333 elog(ERROR,
334 "cannot take query snapshot during a parallel operation");
335
336 /*
337 * In transaction-snapshot mode, the first snapshot must live until
338 * end of xact regardless of what the caller does with it, so we must
339 * make a copy of it rather than returning CurrentSnapshotData
340 * directly. Furthermore, if we're running in serializable mode,
341 * predicate.c needs to wrap the snapshot fetch in its own processing.
342 */
343 if (IsolationUsesXactSnapshot())
344 {
345 /* First, create the snapshot in CurrentSnapshotData */
346 if (IsolationIsSerializable())
347 CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
348 else
349 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
350 /* Make a saved copy */
351 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
352 FirstXactSnapshot = CurrentSnapshot;
353 /* Mark it as "registered" in FirstXactSnapshot */
354 FirstXactSnapshot->regd_count++;
355 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
356 }
357 else
358 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
359
360 FirstSnapshotSet = true;
361 return CurrentSnapshot;
362 }
363
364 if (IsolationUsesXactSnapshot())
365 return CurrentSnapshot;
366
367 /* Don't allow catalog snapshot to be older than xact snapshot. */
368 InvalidateCatalogSnapshot();
369
370 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
371
372 return CurrentSnapshot;
373}
374
375/*
376 * GetLatestSnapshot
377 * Get a snapshot that is up-to-date as of the current instant,
378 * even if we are executing in transaction-snapshot mode.
379 */
380Snapshot
381GetLatestSnapshot(void)
382{
383 /*
384 * We might be able to relax this, but nothing that could otherwise work
385 * needs it.
386 */
387 if (IsInParallelMode())
388 elog(ERROR,
389 "cannot update SecondarySnapshot during a parallel operation");
390
391 /*
392 * So far there are no cases requiring support for GetLatestSnapshot()
393 * during logical decoding, but it wouldn't be hard to add if required.
394 */
395 Assert(!HistoricSnapshotActive());
396
397 /* If first call in transaction, go ahead and set the xact snapshot */
398 if (!FirstSnapshotSet)
399 return GetTransactionSnapshot();
400
401 SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
402
403 return SecondarySnapshot;
404}
405
406/*
407 * GetOldestSnapshot
408 *
409 * Get the transaction's oldest known snapshot, as judged by the LSN.
410 * Will return NULL if there are no active or registered snapshots.
411 */
412Snapshot
413GetOldestSnapshot(void)
414{
415 Snapshot OldestRegisteredSnapshot = NULL;
416 XLogRecPtr RegisteredLSN = InvalidXLogRecPtr;
417
418 if (!pairingheap_is_empty(&RegisteredSnapshots))
419 {
420 OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
421 pairingheap_first(&RegisteredSnapshots));
422 RegisteredLSN = OldestRegisteredSnapshot->lsn;
423 }
424
425 if (OldestActiveSnapshot != NULL)
426 {
427 XLogRecPtr ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
428
429 if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
430 return OldestActiveSnapshot->as_snap;
431 }
432
433 return OldestRegisteredSnapshot;
434}
435
436/*
437 * GetCatalogSnapshot
438 * Get a snapshot that is sufficiently up-to-date for scan of the
439 * system catalog with the specified OID.
440 */
441Snapshot
442GetCatalogSnapshot(Oid relid)
443{
444 /*
445 * Return historic snapshot while we're doing logical decoding, so we can
446 * see the appropriate state of the catalog.
447 *
448 * This is the primary reason for needing to reset the system caches after
449 * finishing decoding.
450 */
451 if (HistoricSnapshotActive())
452 return HistoricSnapshot;
453
454 return GetNonHistoricCatalogSnapshot(relid);
455}
456
457/*
458 * GetNonHistoricCatalogSnapshot
459 * Get a snapshot that is sufficiently up-to-date for scan of the system
460 * catalog with the specified OID, even while historic snapshots are set
461 * up.
462 */
463Snapshot
464GetNonHistoricCatalogSnapshot(Oid relid)
465{
466 /*
467 * If the caller is trying to scan a relation that has no syscache, no
468 * catcache invalidations will be sent when it is updated. For a few key
469 * relations, snapshot invalidations are sent instead. If we're trying to
470 * scan a relation for which neither catcache nor snapshot invalidations
471 * are sent, we must refresh the snapshot every time.
472 */
473 if (CatalogSnapshot &&
474 !RelationInvalidatesSnapshotsOnly(relid) &&
475 !RelationHasSysCache(relid))
476 InvalidateCatalogSnapshot();
477
478 if (CatalogSnapshot == NULL)
479 {
480 /* Get new snapshot. */
481 CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
482
483 /*
484 * Make sure the catalog snapshot will be accounted for in decisions
485 * about advancing PGXACT->xmin. We could apply RegisterSnapshot, but
486 * that would result in making a physical copy, which is overkill; and
487 * it would also create a dependency on some resource owner, which we
488 * do not want for reasons explained at the head of this file. Instead
489 * just shove the CatalogSnapshot into the pairing heap manually. This
490 * has to be reversed in InvalidateCatalogSnapshot, of course.
491 *
492 * NB: it had better be impossible for this to throw error, since the
493 * CatalogSnapshot pointer is already valid.
494 */
495 pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
496 }
497
498 return CatalogSnapshot;
499}
500
501/*
502 * InvalidateCatalogSnapshot
503 * Mark the current catalog snapshot, if any, as invalid
504 *
505 * We could change this API to allow the caller to provide more fine-grained
506 * invalidation details, so that a change to relation A wouldn't prevent us
507 * from using our cached snapshot to scan relation B, but so far there's no
508 * evidence that the CPU cycles we spent tracking such fine details would be
509 * well-spent.
510 */
511void
512InvalidateCatalogSnapshot(void)
513{
514 if (CatalogSnapshot)
515 {
516 pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
517 CatalogSnapshot = NULL;
518 SnapshotResetXmin();
519 }
520}
521
522/*
523 * InvalidateCatalogSnapshotConditionally
524 * Drop catalog snapshot if it's the only one we have
525 *
526 * This is called when we are about to wait for client input, so we don't
527 * want to continue holding the catalog snapshot if it might mean that the
528 * global xmin horizon can't advance. However, if there are other snapshots
529 * still active or registered, the catalog snapshot isn't likely to be the
530 * oldest one, so we might as well keep it.
531 */
532void
533InvalidateCatalogSnapshotConditionally(void)
534{
535 if (CatalogSnapshot &&
536 ActiveSnapshot == NULL &&
537 pairingheap_is_singular(&RegisteredSnapshots))
538 InvalidateCatalogSnapshot();
539}
540
541/*
542 * SnapshotSetCommandId
543 * Propagate CommandCounterIncrement into the static snapshots, if set
544 */
545void
546SnapshotSetCommandId(CommandId curcid)
547{
548 if (!FirstSnapshotSet)
549 return;
550
551 if (CurrentSnapshot)
552 CurrentSnapshot->curcid = curcid;
553 if (SecondarySnapshot)
554 SecondarySnapshot->curcid = curcid;
555 /* Should we do the same with CatalogSnapshot? */
556}
557
558/*
559 * SetTransactionSnapshot
560 * Set the transaction's snapshot from an imported MVCC snapshot.
561 *
562 * Note that this is very closely tied to GetTransactionSnapshot --- it
563 * must take care of all the same considerations as the first-snapshot case
564 * in GetTransactionSnapshot.
565 */
566static void
567SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
568 int sourcepid, PGPROC *sourceproc)
569{
570 /* Caller should have checked this already */
571 Assert(!FirstSnapshotSet);
572
573 /* Better do this to ensure following Assert succeeds. */
574 InvalidateCatalogSnapshot();
575
576 Assert(pairingheap_is_empty(&RegisteredSnapshots));
577 Assert(FirstXactSnapshot == NULL);
578 Assert(!HistoricSnapshotActive());
579
580 /*
581 * Even though we are not going to use the snapshot it computes, we must
582 * call GetSnapshotData, for two reasons: (1) to be sure that
583 * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
584 * RecentXmin and RecentGlobalXmin. (We could alternatively include those
585 * two variables in exported snapshot files, but it seems better to have
586 * snapshot importers compute reasonably up-to-date values for them.)
587 */
588 CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
589
590 /*
591 * Now copy appropriate fields from the source snapshot.
592 */
593 CurrentSnapshot->xmin = sourcesnap->xmin;
594 CurrentSnapshot->xmax = sourcesnap->xmax;
595 CurrentSnapshot->xcnt = sourcesnap->xcnt;
596 Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
597 memcpy(CurrentSnapshot->xip, sourcesnap->xip,
598 sourcesnap->xcnt * sizeof(TransactionId));
599 CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
600 Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
601 memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
602 sourcesnap->subxcnt * sizeof(TransactionId));
603 CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
604 CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
605 /* NB: curcid should NOT be copied, it's a local matter */
606
607 /*
608 * Now we have to fix what GetSnapshotData did with MyPgXact->xmin and
609 * TransactionXmin. There is a race condition: to make sure we are not
610 * causing the global xmin to go backwards, we have to test that the
611 * source transaction is still running, and that has to be done
612 * atomically. So let procarray.c do it.
613 *
614 * Note: in serializable mode, predicate.c will do this a second time. It
615 * doesn't seem worth contorting the logic here to avoid two calls,
616 * especially since it's not clear that predicate.c *must* do this.
617 */
618 if (sourceproc != NULL)
619 {
620 if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
621 ereport(ERROR,
622 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
623 errmsg("could not import the requested snapshot"),
624 errdetail("The source transaction is not running anymore.")));
625 }
626 else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
627 ereport(ERROR,
628 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
629 errmsg("could not import the requested snapshot"),
630 errdetail("The source process with PID %d is not running anymore.",
631 sourcepid)));
632
633 /*
634 * In transaction-snapshot mode, the first snapshot must live until end of
635 * xact, so we must make a copy of it. Furthermore, if we're running in
636 * serializable mode, predicate.c needs to do its own processing.
637 */
638 if (IsolationUsesXactSnapshot())
639 {
640 if (IsolationIsSerializable())
641 SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
642 sourcepid);
643 /* Make a saved copy */
644 CurrentSnapshot = CopySnapshot(CurrentSnapshot);
645 FirstXactSnapshot = CurrentSnapshot;
646 /* Mark it as "registered" in FirstXactSnapshot */
647 FirstXactSnapshot->regd_count++;
648 pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
649 }
650
651 FirstSnapshotSet = true;
652}
653
654/*
655 * CopySnapshot
656 * Copy the given snapshot.
657 *
658 * The copy is palloc'd in TopTransactionContext and has initial refcounts set
659 * to 0. The returned snapshot has the copied flag set.
660 */
661static Snapshot
662CopySnapshot(Snapshot snapshot)
663{
664 Snapshot newsnap;
665 Size subxipoff;
666 Size size;
667
668 Assert(snapshot != InvalidSnapshot);
669
670 /* We allocate any XID arrays needed in the same palloc block. */
671 size = subxipoff = sizeof(SnapshotData) +
672 snapshot->xcnt * sizeof(TransactionId);
673 if (snapshot->subxcnt > 0)
674 size += snapshot->subxcnt * sizeof(TransactionId);
675
676 newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
677 memcpy(newsnap, snapshot, sizeof(SnapshotData));
678
679 newsnap->regd_count = 0;
680 newsnap->active_count = 0;
681 newsnap->copied = true;
682
683 /* setup XID array */
684 if (snapshot->xcnt > 0)
685 {
686 newsnap->xip = (TransactionId *) (newsnap + 1);
687 memcpy(newsnap->xip, snapshot->xip,
688 snapshot->xcnt * sizeof(TransactionId));
689 }
690 else
691 newsnap->xip = NULL;
692
693 /*
694 * Setup subXID array. Don't bother to copy it if it had overflowed,
695 * though, because it's not used anywhere in that case. Except if it's a
696 * snapshot taken during recovery; all the top-level XIDs are in subxip as
697 * well in that case, so we mustn't lose them.
698 */
699 if (snapshot->subxcnt > 0 &&
700 (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
701 {
702 newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
703 memcpy(newsnap->subxip, snapshot->subxip,
704 snapshot->subxcnt * sizeof(TransactionId));
705 }
706 else
707 newsnap->subxip = NULL;
708
709 return newsnap;
710}
711
712/*
713 * FreeSnapshot
714 * Free the memory associated with a snapshot.
715 */
716static void
717FreeSnapshot(Snapshot snapshot)
718{
719 Assert(snapshot->regd_count == 0);
720 Assert(snapshot->active_count == 0);
721 Assert(snapshot->copied);
722
723 pfree(snapshot);
724}
725
726/*
727 * PushActiveSnapshot
728 * Set the given snapshot as the current active snapshot
729 *
730 * If the passed snapshot is a statically-allocated one, or it is possibly
731 * subject to a future command counter update, create a new long-lived copy
732 * with active refcount=1. Otherwise, only increment the refcount.
733 */
734void
735PushActiveSnapshot(Snapshot snap)
736{
737 ActiveSnapshotElt *newactive;
738
739 Assert(snap != InvalidSnapshot);
740
741 newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
742
743 /*
744 * Checking SecondarySnapshot is probably useless here, but it seems
745 * better to be sure.
746 */
747 if (snap == CurrentSnapshot || snap == SecondarySnapshot || !snap->copied)
748 newactive->as_snap = CopySnapshot(snap);
749 else
750 newactive->as_snap = snap;
751
752 newactive->as_next = ActiveSnapshot;
753 newactive->as_level = GetCurrentTransactionNestLevel();
754
755 newactive->as_snap->active_count++;
756
757 ActiveSnapshot = newactive;
758 if (OldestActiveSnapshot == NULL)
759 OldestActiveSnapshot = ActiveSnapshot;
760}
761
762/*
763 * PushCopiedSnapshot
764 * As above, except forcibly copy the presented snapshot.
765 *
766 * This should be used when the ActiveSnapshot has to be modifiable, for
767 * example if the caller intends to call UpdateActiveSnapshotCommandId.
768 * The new snapshot will be released when popped from the stack.
769 */
770void
771PushCopiedSnapshot(Snapshot snapshot)
772{
773 PushActiveSnapshot(CopySnapshot(snapshot));
774}
775
776/*
777 * UpdateActiveSnapshotCommandId
778 *
779 * Update the current CID of the active snapshot. This can only be applied
780 * to a snapshot that is not referenced elsewhere.
781 */
782void
783UpdateActiveSnapshotCommandId(void)
784{
785 CommandId save_curcid,
786 curcid;
787
788 Assert(ActiveSnapshot != NULL);
789 Assert(ActiveSnapshot->as_snap->active_count == 1);
790 Assert(ActiveSnapshot->as_snap->regd_count == 0);
791
792 /*
793 * Don't allow modification of the active snapshot during parallel
794 * operation. We share the snapshot to worker backends at the beginning
795 * of parallel operation, so any change to the snapshot can lead to
796 * inconsistencies. We have other defenses against
797 * CommandCounterIncrement, but there are a few places that call this
798 * directly, so we put an additional guard here.
799 */
800 save_curcid = ActiveSnapshot->as_snap->curcid;
801 curcid = GetCurrentCommandId(false);
802 if (IsInParallelMode() && save_curcid != curcid)
803 elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
804 ActiveSnapshot->as_snap->curcid = curcid;
805}
806
807/*
808 * PopActiveSnapshot
809 *
810 * Remove the topmost snapshot from the active snapshot stack, decrementing the
811 * reference count, and free it if this was the last reference.
812 */
813void
814PopActiveSnapshot(void)
815{
816 ActiveSnapshotElt *newstack;
817
818 newstack = ActiveSnapshot->as_next;
819
820 Assert(ActiveSnapshot->as_snap->active_count > 0);
821
822 ActiveSnapshot->as_snap->active_count--;
823
824 if (ActiveSnapshot->as_snap->active_count == 0 &&
825 ActiveSnapshot->as_snap->regd_count == 0)
826 FreeSnapshot(ActiveSnapshot->as_snap);
827
828 pfree(ActiveSnapshot);
829 ActiveSnapshot = newstack;
830 if (ActiveSnapshot == NULL)
831 OldestActiveSnapshot = NULL;
832
833 SnapshotResetXmin();
834}
835
836/*
837 * GetActiveSnapshot
838 * Return the topmost snapshot in the Active stack.
839 */
840Snapshot
841GetActiveSnapshot(void)
842{
843 Assert(ActiveSnapshot != NULL);
844
845 return ActiveSnapshot->as_snap;
846}
847
848/*
849 * ActiveSnapshotSet
850 * Return whether there is at least one snapshot in the Active stack
851 */
852bool
853ActiveSnapshotSet(void)
854{
855 return ActiveSnapshot != NULL;
856}
857
858/*
859 * RegisterSnapshot
860 * Register a snapshot as being in use by the current resource owner
861 *
862 * If InvalidSnapshot is passed, it is not registered.
863 */
864Snapshot
865RegisterSnapshot(Snapshot snapshot)
866{
867 if (snapshot == InvalidSnapshot)
868 return InvalidSnapshot;
869
870 return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
871}
872
873/*
874 * RegisterSnapshotOnOwner
875 * As above, but use the specified resource owner
876 */
877Snapshot
878RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
879{
880 Snapshot snap;
881
882 if (snapshot == InvalidSnapshot)
883 return InvalidSnapshot;
884
885 /* Static snapshot? Create a persistent copy */
886 snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
887
888 /* and tell resowner.c about it */
889 ResourceOwnerEnlargeSnapshots(owner);
890 snap->regd_count++;
891 ResourceOwnerRememberSnapshot(owner, snap);
892
893 if (snap->regd_count == 1)
894 pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
895
896 return snap;
897}
898
899/*
900 * UnregisterSnapshot
901 *
902 * Decrement the reference count of a snapshot, remove the corresponding
903 * reference from CurrentResourceOwner, and free the snapshot if no more
904 * references remain.
905 */
906void
907UnregisterSnapshot(Snapshot snapshot)
908{
909 if (snapshot == NULL)
910 return;
911
912 UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
913}
914
915/*
916 * UnregisterSnapshotFromOwner
917 * As above, but use the specified resource owner
918 */
919void
920UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
921{
922 if (snapshot == NULL)
923 return;
924
925 Assert(snapshot->regd_count > 0);
926 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
927
928 ResourceOwnerForgetSnapshot(owner, snapshot);
929
930 snapshot->regd_count--;
931 if (snapshot->regd_count == 0)
932 pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
933
934 if (snapshot->regd_count == 0 && snapshot->active_count == 0)
935 {
936 FreeSnapshot(snapshot);
937 SnapshotResetXmin();
938 }
939}
940
941/*
942 * Comparison function for RegisteredSnapshots heap. Snapshots are ordered
943 * by xmin, so that the snapshot with smallest xmin is at the top.
944 */
945static int
946xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
947{
948 const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
949 const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
950
951 if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
952 return 1;
953 else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
954 return -1;
955 else
956 return 0;
957}
958
959/*
960 * Get current RecentGlobalXmin value, as a FullTransactionId.
961 */
962FullTransactionId
963GetFullRecentGlobalXmin(void)
964{
965 FullTransactionId nextxid_full;
966 uint32 nextxid_epoch;
967 TransactionId nextxid_xid;
968 uint32 epoch;
969
970 Assert(TransactionIdIsNormal(RecentGlobalXmin));
971
972 /*
973 * Compute the epoch from the next XID's epoch. This relies on the fact
974 * that RecentGlobalXmin must be within the 2 billion XID horizon from the
975 * next XID.
976 */
977 nextxid_full = ReadNextFullTransactionId();
978 nextxid_epoch = EpochFromFullTransactionId(nextxid_full);
979 nextxid_xid = XidFromFullTransactionId(nextxid_full);
980
981 if (RecentGlobalXmin > nextxid_xid)
982 epoch = nextxid_epoch - 1;
983 else
984 epoch = nextxid_epoch;
985
986 return FullTransactionIdFromEpochAndXid(epoch, RecentGlobalXmin);
987}
988
989/*
990 * SnapshotResetXmin
991 *
992 * If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
993 * Note we can do this without locking because we assume that storing an Xid
994 * is atomic.
995 *
996 * Even if there are some remaining snapshots, we may be able to advance our
997 * PGXACT->xmin to some degree. This typically happens when a portal is
998 * dropped. For efficiency, we only consider recomputing PGXACT->xmin when
999 * the active snapshot stack is empty; this allows us not to need to track
1000 * which active snapshot is oldest.
1001 *
1002 * Note: it's tempting to use GetOldestSnapshot() here so that we can include
1003 * active snapshots in the calculation. However, that compares by LSN not
1004 * xmin so it's not entirely clear that it's the same thing. Also, we'd be
1005 * critically dependent on the assumption that the bottommost active snapshot
1006 * stack entry has the oldest xmin. (Current uses of GetOldestSnapshot() are
1007 * not actually critical, but this would be.)
1008 */
1009static void
1010SnapshotResetXmin(void)
1011{
1012 Snapshot minSnapshot;
1013
1014 if (ActiveSnapshot != NULL)
1015 return;
1016
1017 if (pairingheap_is_empty(&RegisteredSnapshots))
1018 {
1019 MyPgXact->xmin = InvalidTransactionId;
1020 return;
1021 }
1022
1023 minSnapshot = pairingheap_container(SnapshotData, ph_node,
1024 pairingheap_first(&RegisteredSnapshots));
1025
1026 if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
1027 MyPgXact->xmin = minSnapshot->xmin;
1028}
1029
1030/*
1031 * AtSubCommit_Snapshot
1032 */
1033void
1034AtSubCommit_Snapshot(int level)
1035{
1036 ActiveSnapshotElt *active;
1037
1038 /*
1039 * Relabel the active snapshots set in this subtransaction as though they
1040 * are owned by the parent subxact.
1041 */
1042 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1043 {
1044 if (active->as_level < level)
1045 break;
1046 active->as_level = level - 1;
1047 }
1048}
1049
1050/*
1051 * AtSubAbort_Snapshot
1052 * Clean up snapshots after a subtransaction abort
1053 */
1054void
1055AtSubAbort_Snapshot(int level)
1056{
1057 /* Forget the active snapshots set by this subtransaction */
1058 while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
1059 {
1060 ActiveSnapshotElt *next;
1061
1062 next = ActiveSnapshot->as_next;
1063
1064 /*
1065 * Decrement the snapshot's active count. If it's still registered or
1066 * marked as active by an outer subtransaction, we can't free it yet.
1067 */
1068 Assert(ActiveSnapshot->as_snap->active_count >= 1);
1069 ActiveSnapshot->as_snap->active_count -= 1;
1070
1071 if (ActiveSnapshot->as_snap->active_count == 0 &&
1072 ActiveSnapshot->as_snap->regd_count == 0)
1073 FreeSnapshot(ActiveSnapshot->as_snap);
1074
1075 /* and free the stack element */
1076 pfree(ActiveSnapshot);
1077
1078 ActiveSnapshot = next;
1079 if (ActiveSnapshot == NULL)
1080 OldestActiveSnapshot = NULL;
1081 }
1082
1083 SnapshotResetXmin();
1084}
1085
1086/*
1087 * AtEOXact_Snapshot
1088 * Snapshot manager's cleanup function for end of transaction
1089 */
1090void
1091AtEOXact_Snapshot(bool isCommit, bool resetXmin)
1092{
1093 /*
1094 * In transaction-snapshot mode we must release our privately-managed
1095 * reference to the transaction snapshot. We must remove it from
1096 * RegisteredSnapshots to keep the check below happy. But we don't bother
1097 * to do FreeSnapshot, for two reasons: the memory will go away with
1098 * TopTransactionContext anyway, and if someone has left the snapshot
1099 * stacked as active, we don't want the code below to be chasing through a
1100 * dangling pointer.
1101 */
1102 if (FirstXactSnapshot != NULL)
1103 {
1104 Assert(FirstXactSnapshot->regd_count > 0);
1105 Assert(!pairingheap_is_empty(&RegisteredSnapshots));
1106 pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
1107 }
1108 FirstXactSnapshot = NULL;
1109
1110 /*
1111 * If we exported any snapshots, clean them up.
1112 */
1113 if (exportedSnapshots != NIL)
1114 {
1115 ListCell *lc;
1116
1117 /*
1118 * Get rid of the files. Unlink failure is only a WARNING because (1)
1119 * it's too late to abort the transaction, and (2) leaving a leaked
1120 * file around has little real consequence anyway.
1121 *
1122 * We also need to remove the snapshots from RegisteredSnapshots to
1123 * prevent a warning below.
1124 *
1125 * As with the FirstXactSnapshot, we don't need to free resources of
1126 * the snapshot iself as it will go away with the memory context.
1127 */
1128 foreach(lc, exportedSnapshots)
1129 {
1130 ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
1131
1132 if (unlink(esnap->snapfile))
1133 elog(WARNING, "could not unlink file \"%s\": %m",
1134 esnap->snapfile);
1135
1136 pairingheap_remove(&RegisteredSnapshots,
1137 &esnap->snapshot->ph_node);
1138 }
1139
1140 exportedSnapshots = NIL;
1141 }
1142
1143 /* Drop catalog snapshot if any */
1144 InvalidateCatalogSnapshot();
1145
1146 /* On commit, complain about leftover snapshots */
1147 if (isCommit)
1148 {
1149 ActiveSnapshotElt *active;
1150
1151 if (!pairingheap_is_empty(&RegisteredSnapshots))
1152 elog(WARNING, "registered snapshots seem to remain after cleanup");
1153
1154 /* complain about unpopped active snapshots */
1155 for (active = ActiveSnapshot; active != NULL; active = active->as_next)
1156 elog(WARNING, "snapshot %p still active", active);
1157 }
1158
1159 /*
1160 * And reset our state. We don't need to free the memory explicitly --
1161 * it'll go away with TopTransactionContext.
1162 */
1163 ActiveSnapshot = NULL;
1164 OldestActiveSnapshot = NULL;
1165 pairingheap_reset(&RegisteredSnapshots);
1166
1167 CurrentSnapshot = NULL;
1168 SecondarySnapshot = NULL;
1169
1170 FirstSnapshotSet = false;
1171
1172 /*
1173 * During normal commit processing, we call ProcArrayEndTransaction() to
1174 * reset the PgXact->xmin. That call happens prior to the call to
1175 * AtEOXact_Snapshot(), so we need not touch xmin here at all.
1176 */
1177 if (resetXmin)
1178 SnapshotResetXmin();
1179
1180 Assert(resetXmin || MyPgXact->xmin == 0);
1181}
1182
1183
1184/*
1185 * ExportSnapshot
1186 * Export the snapshot to a file so that other backends can import it.
1187 * Returns the token (the file name) that can be used to import this
1188 * snapshot.
1189 */
1190char *
1191ExportSnapshot(Snapshot snapshot)
1192{
1193 TransactionId topXid;
1194 TransactionId *children;
1195 ExportedSnapshot *esnap;
1196 int nchildren;
1197 int addTopXid;
1198 StringInfoData buf;
1199 FILE *f;
1200 int i;
1201 MemoryContext oldcxt;
1202 char path[MAXPGPATH];
1203 char pathtmp[MAXPGPATH];
1204
1205 /*
1206 * It's tempting to call RequireTransactionBlock here, since it's not very
1207 * useful to export a snapshot that will disappear immediately afterwards.
1208 * However, we haven't got enough information to do that, since we don't
1209 * know if we're at top level or not. For example, we could be inside a
1210 * plpgsql function that is going to fire off other transactions via
1211 * dblink. Rather than disallow perfectly legitimate usages, don't make a
1212 * check.
1213 *
1214 * Also note that we don't make any restriction on the transaction's
1215 * isolation level; however, importers must check the level if they are
1216 * serializable.
1217 */
1218
1219 /*
1220 * Get our transaction ID if there is one, to include in the snapshot.
1221 */
1222 topXid = GetTopTransactionIdIfAny();
1223
1224 /*
1225 * We cannot export a snapshot from a subtransaction because there's no
1226 * easy way for importers to verify that the same subtransaction is still
1227 * running.
1228 */
1229 if (IsSubTransaction())
1230 ereport(ERROR,
1231 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1232 errmsg("cannot export a snapshot from a subtransaction")));
1233
1234 /*
1235 * We do however allow previous committed subtransactions to exist.
1236 * Importers of the snapshot must see them as still running, so get their
1237 * XIDs to add them to the snapshot.
1238 */
1239 nchildren = xactGetCommittedChildren(&children);
1240
1241 /*
1242 * Generate file path for the snapshot. We start numbering of snapshots
1243 * inside the transaction from 1.
1244 */
1245 snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
1246 MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
1247
1248 /*
1249 * Copy the snapshot into TopTransactionContext, add it to the
1250 * exportedSnapshots list, and mark it pseudo-registered. We do this to
1251 * ensure that the snapshot's xmin is honored for the rest of the
1252 * transaction.
1253 */
1254 snapshot = CopySnapshot(snapshot);
1255
1256 oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1257 esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
1258 esnap->snapfile = pstrdup(path);
1259 esnap->snapshot = snapshot;
1260 exportedSnapshots = lappend(exportedSnapshots, esnap);
1261 MemoryContextSwitchTo(oldcxt);
1262
1263 snapshot->regd_count++;
1264 pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
1265
1266 /*
1267 * Fill buf with a text serialization of the snapshot, plus identification
1268 * data about this transaction. The format expected by ImportSnapshot is
1269 * pretty rigid: each line must be fieldname:value.
1270 */
1271 initStringInfo(&buf);
1272
1273 appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
1274 appendStringInfo(&buf, "pid:%d\n", MyProcPid);
1275 appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
1276 appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
1277 appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
1278
1279 appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
1280 appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
1281
1282 /*
1283 * We must include our own top transaction ID in the top-xid data, since
1284 * by definition we will still be running when the importing transaction
1285 * adopts the snapshot, but GetSnapshotData never includes our own XID in
1286 * the snapshot. (There must, therefore, be enough room to add it.)
1287 *
1288 * However, it could be that our topXid is after the xmax, in which case
1289 * we shouldn't include it because xip[] members are expected to be before
1290 * xmax. (We need not make the same check for subxip[] members, see
1291 * snapshot.h.)
1292 */
1293 addTopXid = (TransactionIdIsValid(topXid) &&
1294 TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
1295 appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
1296 for (i = 0; i < snapshot->xcnt; i++)
1297 appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
1298 if (addTopXid)
1299 appendStringInfo(&buf, "xip:%u\n", topXid);
1300
1301 /*
1302 * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
1303 * we have to cope with possible overflow.
1304 */
1305 if (snapshot->suboverflowed ||
1306 snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
1307 appendStringInfoString(&buf, "sof:1\n");
1308 else
1309 {
1310 appendStringInfoString(&buf, "sof:0\n");
1311 appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
1312 for (i = 0; i < snapshot->subxcnt; i++)
1313 appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
1314 for (i = 0; i < nchildren; i++)
1315 appendStringInfo(&buf, "sxp:%u\n", children[i]);
1316 }
1317 appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
1318
1319 /*
1320 * Now write the text representation into a file. We first write to a
1321 * ".tmp" filename, and rename to final filename if no error. This
1322 * ensures that no other backend can read an incomplete file
1323 * (ImportSnapshot won't allow it because of its valid-characters check).
1324 */
1325 snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
1326 if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
1327 ereport(ERROR,
1328 (errcode_for_file_access(),
1329 errmsg("could not create file \"%s\": %m", pathtmp)));
1330
1331 if (fwrite(buf.data, buf.len, 1, f) != 1)
1332 ereport(ERROR,
1333 (errcode_for_file_access(),
1334 errmsg("could not write to file \"%s\": %m", pathtmp)));
1335
1336 /* no fsync() since file need not survive a system crash */
1337
1338 if (FreeFile(f))
1339 ereport(ERROR,
1340 (errcode_for_file_access(),
1341 errmsg("could not write to file \"%s\": %m", pathtmp)));
1342
1343 /*
1344 * Now that we have written everything into a .tmp file, rename the file
1345 * to remove the .tmp suffix.
1346 */
1347 if (rename(pathtmp, path) < 0)
1348 ereport(ERROR,
1349 (errcode_for_file_access(),
1350 errmsg("could not rename file \"%s\" to \"%s\": %m",
1351 pathtmp, path)));
1352
1353 /*
1354 * The basename of the file is what we return from pg_export_snapshot().
1355 * It's already in path in a textual format and we know that the path
1356 * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
1357 * and pstrdup it so as not to return the address of a local variable.
1358 */
1359 return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
1360}
1361
1362/*
1363 * pg_export_snapshot
1364 * SQL-callable wrapper for ExportSnapshot.
1365 */
1366Datum
1367pg_export_snapshot(PG_FUNCTION_ARGS)
1368{
1369 char *snapshotName;
1370
1371 snapshotName = ExportSnapshot(GetActiveSnapshot());
1372 PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
1373}
1374
1375
1376/*
1377 * Parsing subroutines for ImportSnapshot: parse a line with the given
1378 * prefix followed by a value, and advance *s to the next line. The
1379 * filename is provided for use in error messages.
1380 */
1381static int
1382parseIntFromText(const char *prefix, char **s, const char *filename)
1383{
1384 char *ptr = *s;
1385 int prefixlen = strlen(prefix);
1386 int val;
1387
1388 if (strncmp(ptr, prefix, prefixlen) != 0)
1389 ereport(ERROR,
1390 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1391 errmsg("invalid snapshot data in file \"%s\"", filename)));
1392 ptr += prefixlen;
1393 if (sscanf(ptr, "%d", &val) != 1)
1394 ereport(ERROR,
1395 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1396 errmsg("invalid snapshot data in file \"%s\"", filename)));
1397 ptr = strchr(ptr, '\n');
1398 if (!ptr)
1399 ereport(ERROR,
1400 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1401 errmsg("invalid snapshot data in file \"%s\"", filename)));
1402 *s = ptr + 1;
1403 return val;
1404}
1405
1406static TransactionId
1407parseXidFromText(const char *prefix, char **s, const char *filename)
1408{
1409 char *ptr = *s;
1410 int prefixlen = strlen(prefix);
1411 TransactionId val;
1412
1413 if (strncmp(ptr, prefix, prefixlen) != 0)
1414 ereport(ERROR,
1415 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1416 errmsg("invalid snapshot data in file \"%s\"", filename)));
1417 ptr += prefixlen;
1418 if (sscanf(ptr, "%u", &val) != 1)
1419 ereport(ERROR,
1420 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1421 errmsg("invalid snapshot data in file \"%s\"", filename)));
1422 ptr = strchr(ptr, '\n');
1423 if (!ptr)
1424 ereport(ERROR,
1425 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1426 errmsg("invalid snapshot data in file \"%s\"", filename)));
1427 *s = ptr + 1;
1428 return val;
1429}
1430
1431static void
1432parseVxidFromText(const char *prefix, char **s, const char *filename,
1433 VirtualTransactionId *vxid)
1434{
1435 char *ptr = *s;
1436 int prefixlen = strlen(prefix);
1437
1438 if (strncmp(ptr, prefix, prefixlen) != 0)
1439 ereport(ERROR,
1440 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1441 errmsg("invalid snapshot data in file \"%s\"", filename)));
1442 ptr += prefixlen;
1443 if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
1444 ereport(ERROR,
1445 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1446 errmsg("invalid snapshot data in file \"%s\"", filename)));
1447 ptr = strchr(ptr, '\n');
1448 if (!ptr)
1449 ereport(ERROR,
1450 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1451 errmsg("invalid snapshot data in file \"%s\"", filename)));
1452 *s = ptr + 1;
1453}
1454
1455/*
1456 * ImportSnapshot
1457 * Import a previously exported snapshot. The argument should be a
1458 * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
1459 * This is called by "SET TRANSACTION SNAPSHOT 'foo'".
1460 */
1461void
1462ImportSnapshot(const char *idstr)
1463{
1464 char path[MAXPGPATH];
1465 FILE *f;
1466 struct stat stat_buf;
1467 char *filebuf;
1468 int xcnt;
1469 int i;
1470 VirtualTransactionId src_vxid;
1471 int src_pid;
1472 Oid src_dbid;
1473 int src_isolevel;
1474 bool src_readonly;
1475 SnapshotData snapshot;
1476
1477 /*
1478 * Must be at top level of a fresh transaction. Note in particular that
1479 * we check we haven't acquired an XID --- if we have, it's conceivable
1480 * that the snapshot would show it as not running, making for very screwy
1481 * behavior.
1482 */
1483 if (FirstSnapshotSet ||
1484 GetTopTransactionIdIfAny() != InvalidTransactionId ||
1485 IsSubTransaction())
1486 ereport(ERROR,
1487 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
1488 errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
1489
1490 /*
1491 * If we are in read committed mode then the next query would execute with
1492 * a new snapshot thus making this function call quite useless.
1493 */
1494 if (!IsolationUsesXactSnapshot())
1495 ereport(ERROR,
1496 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1497 errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
1498
1499 /*
1500 * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
1501 * this mainly to prevent reading arbitrary files.
1502 */
1503 if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
1504 ereport(ERROR,
1505 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1506 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1507
1508 /* OK, read the file */
1509 snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
1510
1511 f = AllocateFile(path, PG_BINARY_R);
1512 if (!f)
1513 ereport(ERROR,
1514 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1515 errmsg("invalid snapshot identifier: \"%s\"", idstr)));
1516
1517 /* get the size of the file so that we know how much memory we need */
1518 if (fstat(fileno(f), &stat_buf))
1519 elog(ERROR, "could not stat file \"%s\": %m", path);
1520
1521 /* and read the file into a palloc'd string */
1522 filebuf = (char *) palloc(stat_buf.st_size + 1);
1523 if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
1524 elog(ERROR, "could not read file \"%s\": %m", path);
1525
1526 filebuf[stat_buf.st_size] = '\0';
1527
1528 FreeFile(f);
1529
1530 /*
1531 * Construct a snapshot struct by parsing the file content.
1532 */
1533 memset(&snapshot, 0, sizeof(snapshot));
1534
1535 parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
1536 src_pid = parseIntFromText("pid:", &filebuf, path);
1537 /* we abuse parseXidFromText a bit here ... */
1538 src_dbid = parseXidFromText("dbid:", &filebuf, path);
1539 src_isolevel = parseIntFromText("iso:", &filebuf, path);
1540 src_readonly = parseIntFromText("ro:", &filebuf, path);
1541
1542 snapshot.snapshot_type = SNAPSHOT_MVCC;
1543
1544 snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
1545 snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
1546
1547 snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
1548
1549 /* sanity-check the xid count before palloc */
1550 if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
1551 ereport(ERROR,
1552 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1553 errmsg("invalid snapshot data in file \"%s\"", path)));
1554
1555 snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1556 for (i = 0; i < xcnt; i++)
1557 snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
1558
1559 snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
1560
1561 if (!snapshot.suboverflowed)
1562 {
1563 snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
1564
1565 /* sanity-check the xid count before palloc */
1566 if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
1567 ereport(ERROR,
1568 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1569 errmsg("invalid snapshot data in file \"%s\"", path)));
1570
1571 snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
1572 for (i = 0; i < xcnt; i++)
1573 snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
1574 }
1575 else
1576 {
1577 snapshot.subxcnt = 0;
1578 snapshot.subxip = NULL;
1579 }
1580
1581 snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
1582
1583 /*
1584 * Do some additional sanity checking, just to protect ourselves. We
1585 * don't trouble to check the array elements, just the most critical
1586 * fields.
1587 */
1588 if (!VirtualTransactionIdIsValid(src_vxid) ||
1589 !OidIsValid(src_dbid) ||
1590 !TransactionIdIsNormal(snapshot.xmin) ||
1591 !TransactionIdIsNormal(snapshot.xmax))
1592 ereport(ERROR,
1593 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1594 errmsg("invalid snapshot data in file \"%s\"", path)));
1595
1596 /*
1597 * If we're serializable, the source transaction must be too, otherwise
1598 * predicate.c has problems (SxactGlobalXmin could go backwards). Also, a
1599 * non-read-only transaction can't adopt a snapshot from a read-only
1600 * transaction, as predicate.c handles the cases very differently.
1601 */
1602 if (IsolationIsSerializable())
1603 {
1604 if (src_isolevel != XACT_SERIALIZABLE)
1605 ereport(ERROR,
1606 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1607 errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
1608 if (src_readonly && !XactReadOnly)
1609 ereport(ERROR,
1610 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1611 errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
1612 }
1613
1614 /*
1615 * We cannot import a snapshot that was taken in a different database,
1616 * because vacuum calculates OldestXmin on a per-database basis; so the
1617 * source transaction's xmin doesn't protect us from data loss. This
1618 * restriction could be removed if the source transaction were to mark its
1619 * xmin as being globally applicable. But that would require some
1620 * additional syntax, since that has to be known when the snapshot is
1621 * initially taken. (See pgsql-hackers discussion of 2011-10-21.)
1622 */
1623 if (src_dbid != MyDatabaseId)
1624 ereport(ERROR,
1625 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1626 errmsg("cannot import a snapshot from a different database")));
1627
1628 /* OK, install the snapshot */
1629 SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
1630}
1631
1632/*
1633 * XactHasExportedSnapshots
1634 * Test whether current transaction has exported any snapshots.
1635 */
1636bool
1637XactHasExportedSnapshots(void)
1638{
1639 return (exportedSnapshots != NIL);
1640}
1641
1642/*
1643 * DeleteAllExportedSnapshotFiles
1644 * Clean up any files that have been left behind by a crashed backend
1645 * that had exported snapshots before it died.
1646 *
1647 * This should be called during database startup or crash recovery.
1648 */
1649void
1650DeleteAllExportedSnapshotFiles(void)
1651{
1652 char buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
1653 DIR *s_dir;
1654 struct dirent *s_de;
1655
1656 /*
1657 * Problems in reading the directory, or unlinking files, are reported at
1658 * LOG level. Since we're running in the startup process, ERROR level
1659 * would prevent database start, and it's not important enough for that.
1660 */
1661 s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR);
1662
1663 while ((s_de = ReadDirExtended(s_dir, SNAPSHOT_EXPORT_DIR, LOG)) != NULL)
1664 {
1665 if (strcmp(s_de->d_name, ".") == 0 ||
1666 strcmp(s_de->d_name, "..") == 0)
1667 continue;
1668
1669 snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
1670
1671 if (unlink(buf) != 0)
1672 ereport(LOG,
1673 (errcode_for_file_access(),
1674 errmsg("could not remove file \"%s\": %m", buf)));
1675 }
1676
1677 FreeDir(s_dir);
1678}
1679
1680/*
1681 * ThereAreNoPriorRegisteredSnapshots
1682 * Is the registered snapshot count less than or equal to one?
1683 *
1684 * Don't use this to settle important decisions. While zero registrations and
1685 * no ActiveSnapshot would confirm a certain idleness, the system makes no
1686 * guarantees about the significance of one registered snapshot.
1687 */
1688bool
1689ThereAreNoPriorRegisteredSnapshots(void)
1690{
1691 if (pairingheap_is_empty(&RegisteredSnapshots) ||
1692 pairingheap_is_singular(&RegisteredSnapshots))
1693 return true;
1694
1695 return false;
1696}
1697
1698
1699/*
1700 * Return a timestamp that is exactly on a minute boundary.
1701 *
1702 * If the argument is already aligned, return that value, otherwise move to
1703 * the next minute boundary following the given time.
1704 */
1705static TimestampTz
1706AlignTimestampToMinuteBoundary(TimestampTz ts)
1707{
1708 TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
1709
1710 return retval - (retval % USECS_PER_MINUTE);
1711}
1712
1713/*
1714 * Get current timestamp for snapshots
1715 *
1716 * This is basically GetCurrentTimestamp(), but with a guarantee that
1717 * the result never moves backward.
1718 */
1719TimestampTz
1720GetSnapshotCurrentTimestamp(void)
1721{
1722 TimestampTz now = GetCurrentTimestamp();
1723
1724 /*
1725 * Don't let time move backward; if it hasn't advanced, use the old value.
1726 */
1727 SpinLockAcquire(&oldSnapshotControl->mutex_current);
1728 if (now <= oldSnapshotControl->current_timestamp)
1729 now = oldSnapshotControl->current_timestamp;
1730 else
1731 oldSnapshotControl->current_timestamp = now;
1732 SpinLockRelease(&oldSnapshotControl->mutex_current);
1733
1734 return now;
1735}
1736
1737/*
1738 * Get timestamp through which vacuum may have processed based on last stored
1739 * value for threshold_timestamp.
1740 *
1741 * XXX: So far, we never trust that a 64-bit value can be read atomically; if
1742 * that ever changes, we could get rid of the spinlock here.
1743 */
1744TimestampTz
1745GetOldSnapshotThresholdTimestamp(void)
1746{
1747 TimestampTz threshold_timestamp;
1748
1749 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1750 threshold_timestamp = oldSnapshotControl->threshold_timestamp;
1751 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1752
1753 return threshold_timestamp;
1754}
1755
1756static void
1757SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
1758{
1759 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1760 oldSnapshotControl->threshold_timestamp = ts;
1761 oldSnapshotControl->threshold_xid = xlimit;
1762 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1763}
1764
1765/*
1766 * TransactionIdLimitedForOldSnapshots
1767 *
1768 * Apply old snapshot limit, if any. This is intended to be called for page
1769 * pruning and table vacuuming, to allow old_snapshot_threshold to override
1770 * the normal global xmin value. Actual testing for snapshot too old will be
1771 * based on whether a snapshot timestamp is prior to the threshold timestamp
1772 * set in this function.
1773 */
1774TransactionId
1775TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
1776 Relation relation)
1777{
1778 if (TransactionIdIsNormal(recentXmin)
1779 && old_snapshot_threshold >= 0
1780 && RelationAllowsEarlyPruning(relation))
1781 {
1782 TimestampTz ts = GetSnapshotCurrentTimestamp();
1783 TransactionId xlimit = recentXmin;
1784 TransactionId latest_xmin;
1785 TimestampTz update_ts;
1786 bool same_ts_as_threshold = false;
1787
1788 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1789 latest_xmin = oldSnapshotControl->latest_xmin;
1790 update_ts = oldSnapshotControl->next_map_update;
1791 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1792
1793 /*
1794 * Zero threshold always overrides to latest xmin, if valid. Without
1795 * some heuristic it will find its own snapshot too old on, for
1796 * example, a simple UPDATE -- which would make it useless for most
1797 * testing, but there is no principled way to ensure that it doesn't
1798 * fail in this way. Use a five-second delay to try to get useful
1799 * testing behavior, but this may need adjustment.
1800 */
1801 if (old_snapshot_threshold == 0)
1802 {
1803 if (TransactionIdPrecedes(latest_xmin, MyPgXact->xmin)
1804 && TransactionIdFollows(latest_xmin, xlimit))
1805 xlimit = latest_xmin;
1806
1807 ts -= 5 * USECS_PER_SEC;
1808 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1809
1810 return xlimit;
1811 }
1812
1813 ts = AlignTimestampToMinuteBoundary(ts)
1814 - (old_snapshot_threshold * USECS_PER_MINUTE);
1815
1816 /* Check for fast exit without LW locking. */
1817 SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
1818 if (ts == oldSnapshotControl->threshold_timestamp)
1819 {
1820 xlimit = oldSnapshotControl->threshold_xid;
1821 same_ts_as_threshold = true;
1822 }
1823 SpinLockRelease(&oldSnapshotControl->mutex_threshold);
1824
1825 if (!same_ts_as_threshold)
1826 {
1827 if (ts == update_ts)
1828 {
1829 xlimit = latest_xmin;
1830 if (NormalTransactionIdFollows(xlimit, recentXmin))
1831 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1832 }
1833 else
1834 {
1835 LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
1836
1837 if (oldSnapshotControl->count_used > 0
1838 && ts >= oldSnapshotControl->head_timestamp)
1839 {
1840 int offset;
1841
1842 offset = ((ts - oldSnapshotControl->head_timestamp)
1843 / USECS_PER_MINUTE);
1844 if (offset > oldSnapshotControl->count_used - 1)
1845 offset = oldSnapshotControl->count_used - 1;
1846 offset = (oldSnapshotControl->head_offset + offset)
1847 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1848 xlimit = oldSnapshotControl->xid_by_minute[offset];
1849
1850 if (NormalTransactionIdFollows(xlimit, recentXmin))
1851 SetOldSnapshotThresholdTimestamp(ts, xlimit);
1852 }
1853
1854 LWLockRelease(OldSnapshotTimeMapLock);
1855 }
1856 }
1857
1858 /*
1859 * Failsafe protection against vacuuming work of active transaction.
1860 *
1861 * This is not an assertion because we avoid the spinlock for
1862 * performance, leaving open the possibility that xlimit could advance
1863 * and be more current; but it seems prudent to apply this limit. It
1864 * might make pruning a tiny bit less aggressive than it could be, but
1865 * protects against data loss bugs.
1866 */
1867 if (TransactionIdIsNormal(latest_xmin)
1868 && TransactionIdPrecedes(latest_xmin, xlimit))
1869 xlimit = latest_xmin;
1870
1871 if (NormalTransactionIdFollows(xlimit, recentXmin))
1872 return xlimit;
1873 }
1874
1875 return recentXmin;
1876}
1877
1878/*
1879 * Take care of the circular buffer that maps time to xid.
1880 */
1881void
1882MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
1883{
1884 TimestampTz ts;
1885 TransactionId latest_xmin;
1886 TimestampTz update_ts;
1887 bool map_update_required = false;
1888
1889 /* Never call this function when old snapshot checking is disabled. */
1890 Assert(old_snapshot_threshold >= 0);
1891
1892 ts = AlignTimestampToMinuteBoundary(whenTaken);
1893
1894 /*
1895 * Keep track of the latest xmin seen by any process. Update mapping with
1896 * a new value when we have crossed a bucket boundary.
1897 */
1898 SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
1899 latest_xmin = oldSnapshotControl->latest_xmin;
1900 update_ts = oldSnapshotControl->next_map_update;
1901 if (ts > update_ts)
1902 {
1903 oldSnapshotControl->next_map_update = ts;
1904 map_update_required = true;
1905 }
1906 if (TransactionIdFollows(xmin, latest_xmin))
1907 oldSnapshotControl->latest_xmin = xmin;
1908 SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
1909
1910 /* We only needed to update the most recent xmin value. */
1911 if (!map_update_required)
1912 return;
1913
1914 /* No further tracking needed for 0 (used for testing). */
1915 if (old_snapshot_threshold == 0)
1916 return;
1917
1918 /*
1919 * We don't want to do something stupid with unusual values, but we don't
1920 * want to litter the log with warnings or break otherwise normal
1921 * processing for this feature; so if something seems unreasonable, just
1922 * log at DEBUG level and return without doing anything.
1923 */
1924 if (whenTaken < 0)
1925 {
1926 elog(DEBUG1,
1927 "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
1928 (long) whenTaken);
1929 return;
1930 }
1931 if (!TransactionIdIsNormal(xmin))
1932 {
1933 elog(DEBUG1,
1934 "MaintainOldSnapshotTimeMapping called with xmin = %lu",
1935 (unsigned long) xmin);
1936 return;
1937 }
1938
1939 LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
1940
1941 Assert(oldSnapshotControl->head_offset >= 0);
1942 Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1943 Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
1944 Assert(oldSnapshotControl->count_used >= 0);
1945 Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
1946
1947 if (oldSnapshotControl->count_used == 0)
1948 {
1949 /* set up first entry for empty mapping */
1950 oldSnapshotControl->head_offset = 0;
1951 oldSnapshotControl->head_timestamp = ts;
1952 oldSnapshotControl->count_used = 1;
1953 oldSnapshotControl->xid_by_minute[0] = xmin;
1954 }
1955 else if (ts < oldSnapshotControl->head_timestamp)
1956 {
1957 /* old ts; log it at DEBUG */
1958 LWLockRelease(OldSnapshotTimeMapLock);
1959 elog(DEBUG1,
1960 "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
1961 (long) whenTaken);
1962 return;
1963 }
1964 else if (ts <= (oldSnapshotControl->head_timestamp +
1965 ((oldSnapshotControl->count_used - 1)
1966 * USECS_PER_MINUTE)))
1967 {
1968 /* existing mapping; advance xid if possible */
1969 int bucket = (oldSnapshotControl->head_offset
1970 + ((ts - oldSnapshotControl->head_timestamp)
1971 / USECS_PER_MINUTE))
1972 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
1973
1974 if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
1975 oldSnapshotControl->xid_by_minute[bucket] = xmin;
1976 }
1977 else
1978 {
1979 /* We need a new bucket, but it might not be the very next one. */
1980 int advance = ((ts - oldSnapshotControl->head_timestamp)
1981 / USECS_PER_MINUTE);
1982
1983 oldSnapshotControl->head_timestamp = ts;
1984
1985 if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
1986 {
1987 /* Advance is so far that all old data is junk; start over. */
1988 oldSnapshotControl->head_offset = 0;
1989 oldSnapshotControl->count_used = 1;
1990 oldSnapshotControl->xid_by_minute[0] = xmin;
1991 }
1992 else
1993 {
1994 /* Store the new value in one or more buckets. */
1995 int i;
1996
1997 for (i = 0; i < advance; i++)
1998 {
1999 if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
2000 {
2001 /* Map full and new value replaces old head. */
2002 int old_head = oldSnapshotControl->head_offset;
2003
2004 if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
2005 oldSnapshotControl->head_offset = 0;
2006 else
2007 oldSnapshotControl->head_offset = old_head + 1;
2008 oldSnapshotControl->xid_by_minute[old_head] = xmin;
2009 }
2010 else
2011 {
2012 /* Extend map to unused entry. */
2013 int new_tail = (oldSnapshotControl->head_offset
2014 + oldSnapshotControl->count_used)
2015 % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
2016
2017 oldSnapshotControl->count_used++;
2018 oldSnapshotControl->xid_by_minute[new_tail] = xmin;
2019 }
2020 }
2021 }
2022 }
2023
2024 LWLockRelease(OldSnapshotTimeMapLock);
2025}
2026
2027
2028/*
2029 * Setup a snapshot that replaces normal catalog snapshots that allows catalog
2030 * access to behave just like it did at a certain point in the past.
2031 *
2032 * Needed for logical decoding.
2033 */
2034void
2035SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
2036{
2037 Assert(historic_snapshot != NULL);
2038
2039 /* setup the timetravel snapshot */
2040 HistoricSnapshot = historic_snapshot;
2041
2042 /* setup (cmin, cmax) lookup hash */
2043 tuplecid_data = tuplecids;
2044}
2045
2046
2047/*
2048 * Make catalog snapshots behave normally again.
2049 */
2050void
2051TeardownHistoricSnapshot(bool is_error)
2052{
2053 HistoricSnapshot = NULL;
2054 tuplecid_data = NULL;
2055}
2056
2057bool
2058HistoricSnapshotActive(void)
2059{
2060 return HistoricSnapshot != NULL;
2061}
2062
2063HTAB *
2064HistoricSnapshotGetTupleCids(void)
2065{
2066 Assert(HistoricSnapshotActive());
2067 return tuplecid_data;
2068}
2069
2070/*
2071 * EstimateSnapshotSpace
2072 * Returns the size needed to store the given snapshot.
2073 *
2074 * We are exporting only required fields from the Snapshot, stored in
2075 * SerializedSnapshotData.
2076 */
2077Size
2078EstimateSnapshotSpace(Snapshot snap)
2079{
2080 Size size;
2081
2082 Assert(snap != InvalidSnapshot);
2083 Assert(snap->snapshot_type == SNAPSHOT_MVCC);
2084
2085 /* We allocate any XID arrays needed in the same palloc block. */
2086 size = add_size(sizeof(SerializedSnapshotData),
2087 mul_size(snap->xcnt, sizeof(TransactionId)));
2088 if (snap->subxcnt > 0 &&
2089 (!snap->suboverflowed || snap->takenDuringRecovery))
2090 size = add_size(size,
2091 mul_size(snap->subxcnt, sizeof(TransactionId)));
2092
2093 return size;
2094}
2095
2096/*
2097 * SerializeSnapshot
2098 * Dumps the serialized snapshot (extracted from given snapshot) onto the
2099 * memory location at start_address.
2100 */
2101void
2102SerializeSnapshot(Snapshot snapshot, char *start_address)
2103{
2104 SerializedSnapshotData serialized_snapshot;
2105
2106 Assert(snapshot->subxcnt >= 0);
2107
2108 /* Copy all required fields */
2109 serialized_snapshot.xmin = snapshot->xmin;
2110 serialized_snapshot.xmax = snapshot->xmax;
2111 serialized_snapshot.xcnt = snapshot->xcnt;
2112 serialized_snapshot.subxcnt = snapshot->subxcnt;
2113 serialized_snapshot.suboverflowed = snapshot->suboverflowed;
2114 serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
2115 serialized_snapshot.curcid = snapshot->curcid;
2116 serialized_snapshot.whenTaken = snapshot->whenTaken;
2117 serialized_snapshot.lsn = snapshot->lsn;
2118
2119 /*
2120 * Ignore the SubXID array if it has overflowed, unless the snapshot was
2121 * taken during recovery - in that case, top-level XIDs are in subxip as
2122 * well, and we mustn't lose them.
2123 */
2124 if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
2125 serialized_snapshot.subxcnt = 0;
2126
2127 /* Copy struct to possibly-unaligned buffer */
2128 memcpy(start_address,
2129 &serialized_snapshot, sizeof(SerializedSnapshotData));
2130
2131 /* Copy XID array */
2132 if (snapshot->xcnt > 0)
2133 memcpy((TransactionId *) (start_address +
2134 sizeof(SerializedSnapshotData)),
2135 snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
2136
2137 /*
2138 * Copy SubXID array. Don't bother to copy it if it had overflowed,
2139 * though, because it's not used anywhere in that case. Except if it's a
2140 * snapshot taken during recovery; all the top-level XIDs are in subxip as
2141 * well in that case, so we mustn't lose them.
2142 */
2143 if (serialized_snapshot.subxcnt > 0)
2144 {
2145 Size subxipoff = sizeof(SerializedSnapshotData) +
2146 snapshot->xcnt * sizeof(TransactionId);
2147
2148 memcpy((TransactionId *) (start_address + subxipoff),
2149 snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
2150 }
2151}
2152
2153/*
2154 * RestoreSnapshot
2155 * Restore a serialized snapshot from the specified address.
2156 *
2157 * The copy is palloc'd in TopTransactionContext and has initial refcounts set
2158 * to 0. The returned snapshot has the copied flag set.
2159 */
2160Snapshot
2161RestoreSnapshot(char *start_address)
2162{
2163 SerializedSnapshotData serialized_snapshot;
2164 Size size;
2165 Snapshot snapshot;
2166 TransactionId *serialized_xids;
2167
2168 memcpy(&serialized_snapshot, start_address,
2169 sizeof(SerializedSnapshotData));
2170 serialized_xids = (TransactionId *)
2171 (start_address + sizeof(SerializedSnapshotData));
2172
2173 /* We allocate any XID arrays needed in the same palloc block. */
2174 size = sizeof(SnapshotData)
2175 + serialized_snapshot.xcnt * sizeof(TransactionId)
2176 + serialized_snapshot.subxcnt * sizeof(TransactionId);
2177
2178 /* Copy all required fields */
2179 snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
2180 snapshot->snapshot_type = SNAPSHOT_MVCC;
2181 snapshot->xmin = serialized_snapshot.xmin;
2182 snapshot->xmax = serialized_snapshot.xmax;
2183 snapshot->xip = NULL;
2184 snapshot->xcnt = serialized_snapshot.xcnt;
2185 snapshot->subxip = NULL;
2186 snapshot->subxcnt = serialized_snapshot.subxcnt;
2187 snapshot->suboverflowed = serialized_snapshot.suboverflowed;
2188 snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
2189 snapshot->curcid = serialized_snapshot.curcid;
2190 snapshot->whenTaken = serialized_snapshot.whenTaken;
2191 snapshot->lsn = serialized_snapshot.lsn;
2192
2193 /* Copy XIDs, if present. */
2194 if (serialized_snapshot.xcnt > 0)
2195 {
2196 snapshot->xip = (TransactionId *) (snapshot + 1);
2197 memcpy(snapshot->xip, serialized_xids,
2198 serialized_snapshot.xcnt * sizeof(TransactionId));
2199 }
2200
2201 /* Copy SubXIDs, if present. */
2202 if (serialized_snapshot.subxcnt > 0)
2203 {
2204 snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
2205 serialized_snapshot.xcnt;
2206 memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
2207 serialized_snapshot.subxcnt * sizeof(TransactionId));
2208 }
2209
2210 /* Set the copied flag so that the caller will set refcounts correctly. */
2211 snapshot->regd_count = 0;
2212 snapshot->active_count = 0;
2213 snapshot->copied = true;
2214
2215 return snapshot;
2216}
2217
2218/*
2219 * Install a restored snapshot as the transaction snapshot.
2220 *
2221 * The second argument is of type void * so that snapmgr.h need not include
2222 * the declaration for PGPROC.
2223 */
2224void
2225RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
2226{
2227 SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
2228}
2229
2230/*
2231 * XidInMVCCSnapshot
2232 * Is the given XID still-in-progress according to the snapshot?
2233 *
2234 * Note: GetSnapshotData never stores either top xid or subxids of our own
2235 * backend into a snapshot, so these xids will not be reported as "running"
2236 * by this function. This is OK for current uses, because we always check
2237 * TransactionIdIsCurrentTransactionId first, except when it's known the
2238 * XID could not be ours anyway.
2239 */
2240bool
2241XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
2242{
2243 uint32 i;
2244
2245 /*
2246 * Make a quick range check to eliminate most XIDs without looking at the
2247 * xip arrays. Note that this is OK even if we convert a subxact XID to
2248 * its parent below, because a subxact with XID < xmin has surely also got
2249 * a parent with XID < xmin, while one with XID >= xmax must belong to a
2250 * parent that was not yet committed at the time of this snapshot.
2251 */
2252
2253 /* Any xid < xmin is not in-progress */
2254 if (TransactionIdPrecedes(xid, snapshot->xmin))
2255 return false;
2256 /* Any xid >= xmax is in-progress */
2257 if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
2258 return true;
2259
2260 /*
2261 * Snapshot information is stored slightly differently in snapshots taken
2262 * during recovery.
2263 */
2264 if (!snapshot->takenDuringRecovery)
2265 {
2266 /*
2267 * If the snapshot contains full subxact data, the fastest way to
2268 * check things is just to compare the given XID against both subxact
2269 * XIDs and top-level XIDs. If the snapshot overflowed, we have to
2270 * use pg_subtrans to convert a subxact XID to its parent XID, but
2271 * then we need only look at top-level XIDs not subxacts.
2272 */
2273 if (!snapshot->suboverflowed)
2274 {
2275 /* we have full data, so search subxip */
2276 int32 j;
2277
2278 for (j = 0; j < snapshot->subxcnt; j++)
2279 {
2280 if (TransactionIdEquals(xid, snapshot->subxip[j]))
2281 return true;
2282 }
2283
2284 /* not there, fall through to search xip[] */
2285 }
2286 else
2287 {
2288 /*
2289 * Snapshot overflowed, so convert xid to top-level. This is safe
2290 * because we eliminated too-old XIDs above.
2291 */
2292 xid = SubTransGetTopmostTransaction(xid);
2293
2294 /*
2295 * If xid was indeed a subxact, we might now have an xid < xmin,
2296 * so recheck to avoid an array scan. No point in rechecking
2297 * xmax.
2298 */
2299 if (TransactionIdPrecedes(xid, snapshot->xmin))
2300 return false;
2301 }
2302
2303 for (i = 0; i < snapshot->xcnt; i++)
2304 {
2305 if (TransactionIdEquals(xid, snapshot->xip[i]))
2306 return true;
2307 }
2308 }
2309 else
2310 {
2311 int32 j;
2312
2313 /*
2314 * In recovery we store all xids in the subxact array because it is by
2315 * far the bigger array, and we mostly don't know which xids are
2316 * top-level and which are subxacts. The xip array is empty.
2317 *
2318 * We start by searching subtrans, if we overflowed.
2319 */
2320 if (snapshot->suboverflowed)
2321 {
2322 /*
2323 * Snapshot overflowed, so convert xid to top-level. This is safe
2324 * because we eliminated too-old XIDs above.
2325 */
2326 xid = SubTransGetTopmostTransaction(xid);
2327
2328 /*
2329 * If xid was indeed a subxact, we might now have an xid < xmin,
2330 * so recheck to avoid an array scan. No point in rechecking
2331 * xmax.
2332 */
2333 if (TransactionIdPrecedes(xid, snapshot->xmin))
2334 return false;
2335 }
2336
2337 /*
2338 * We now have either a top-level xid higher than xmin or an
2339 * indeterminate xid. We don't know whether it's top level or subxact
2340 * but it doesn't matter. If it's present, the xid is visible.
2341 */
2342 for (j = 0; j < snapshot->subxcnt; j++)
2343 {
2344 if (TransactionIdEquals(xid, snapshot->subxip[j]))
2345 return true;
2346 }
2347 }
2348
2349 return false;
2350}
2351