1/*-------------------------------------------------------------------------
2 *
3 * commit_ts.c
4 * PostgreSQL commit timestamp manager
5 *
6 * This module is a pg_xact-like system that stores the commit timestamp
7 * for each transaction.
8 *
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Also, one XLOG record is
11 * generated for setting of values when the caller requests it; this allows
12 * us to support values coming from places other than transaction commit.
13 * Other writes of CommitTS come from recording of transaction commit in
14 * xact.c, which generates its own XLOG records for these events and will
15 * re-perform the status update on redo; so we need make no additional XLOG
16 * entry here.
17 *
18 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
19 * Portions Copyright (c) 1994, Regents of the University of California
20 *
21 * src/backend/access/transam/commit_ts.c
22 *
23 *-------------------------------------------------------------------------
24 */
25#include "postgres.h"
26
27#include "access/commit_ts.h"
28#include "access/htup_details.h"
29#include "access/slru.h"
30#include "access/transam.h"
31#include "catalog/pg_type.h"
32#include "funcapi.h"
33#include "miscadmin.h"
34#include "pg_trace.h"
35#include "storage/shmem.h"
36#include "utils/builtins.h"
37#include "utils/snapmgr.h"
38#include "utils/timestamp.h"
39
40/*
41 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
42 * everywhere else in Postgres.
43 *
44 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
45 * CommitTs page numbering also wraps around at
46 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
47 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
48 * explicit notice of that fact in this module, except when comparing segment
49 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50 */
51
52/*
53 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
54 * the largest possible file name is more than 5 chars long; see
55 * SlruScanDirectory.
56 */
57typedef struct CommitTimestampEntry
58{
59 TimestampTz time;
60 RepOriginId nodeid;
61} CommitTimestampEntry;
62
63#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
64 sizeof(RepOriginId))
65
66#define COMMIT_TS_XACTS_PER_PAGE \
67 (BLCKSZ / SizeOfCommitTimestampEntry)
68
69#define TransactionIdToCTsPage(xid) \
70 ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
71#define TransactionIdToCTsEntry(xid) \
72 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
73
74/*
75 * Link to shared-memory data structures for CommitTs control
76 */
77static SlruCtlData CommitTsCtlData;
78
79#define CommitTsCtl (&CommitTsCtlData)
80
81/*
82 * We keep a cache of the last value set in shared memory.
83 *
84 * This is also good place to keep the activation status. We keep this
85 * separate from the GUC so that the standby can activate the module if the
86 * primary has it active independently of the value of the GUC.
87 *
88 * This is protected by CommitTsLock. In some places, we use commitTsActive
89 * without acquiring the lock; where this happens, a comment explains the
90 * rationale for it.
91 */
92typedef struct CommitTimestampShared
93{
94 TransactionId xidLastCommit;
95 CommitTimestampEntry dataLastCommit;
96 bool commitTsActive;
97} CommitTimestampShared;
98
99CommitTimestampShared *commitTsShared;
100
101
102/* GUC variable */
103bool track_commit_timestamp;
104
105static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
106 TransactionId *subxids, TimestampTz ts,
107 RepOriginId nodeid, int pageno);
108static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
109 RepOriginId nodeid, int slotno);
110static void error_commit_ts_disabled(void);
111static int ZeroCommitTsPage(int pageno, bool writeXlog);
112static bool CommitTsPagePrecedes(int page1, int page2);
113static void ActivateCommitTs(void);
114static void DeactivateCommitTs(void);
115static void WriteZeroPageXlogRec(int pageno);
116static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
117static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
118 TransactionId *subxids, TimestampTz timestamp,
119 RepOriginId nodeid);
120
121/*
122 * TransactionTreeSetCommitTsData
123 *
124 * Record the final commit timestamp of transaction entries in the commit log
125 * for a transaction and its subtransaction tree, as efficiently as possible.
126 *
127 * xid is the top level transaction id.
128 *
129 * subxids is an array of xids of length nsubxids, representing subtransactions
130 * in the tree of xid. In various cases nsubxids may be zero.
131 * The reason why tracking just the parent xid commit timestamp is not enough
132 * is that the subtrans SLRU does not stay valid across crashes (it's not
133 * permanent) so we need to keep the information about them here. If the
134 * subtrans implementation changes in the future, we might want to revisit the
135 * decision of storing timestamp info for each subxid.
136 *
137 * The write_xlog parameter tells us whether to include an XLog record of this
138 * or not. Normally, this is called from transaction commit routines (both
139 * normal and prepared) and the information will be stored in the transaction
140 * commit XLog record, and so they should pass "false" for this. The XLog redo
141 * code should use "false" here as well. Other callers probably want to pass
142 * true, so that the given values persist in case of crashes.
143 */
144void
145TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
146 TransactionId *subxids, TimestampTz timestamp,
147 RepOriginId nodeid, bool write_xlog)
148{
149 int i;
150 TransactionId headxid;
151 TransactionId newestXact;
152
153 /*
154 * No-op if the module is not active.
155 *
156 * An unlocked read here is fine, because in a standby (the only place
157 * where the flag can change in flight) this routine is only called by the
158 * recovery process, which is also the only process which can change the
159 * flag.
160 */
161 if (!commitTsShared->commitTsActive)
162 return;
163
164 /*
165 * Comply with the WAL-before-data rule: if caller specified it wants this
166 * value to be recorded in WAL, do so before touching the data.
167 */
168 if (write_xlog)
169 WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
170
171 /*
172 * Figure out the latest Xid in this batch: either the last subxid if
173 * there's any, otherwise the parent xid.
174 */
175 if (nsubxids > 0)
176 newestXact = subxids[nsubxids - 1];
177 else
178 newestXact = xid;
179
180 /*
181 * We split the xids to set the timestamp to in groups belonging to the
182 * same SLRU page; the first element in each such set is its head. The
183 * first group has the main XID as the head; subsequent sets use the first
184 * subxid not on the previous page as head. This way, we only have to
185 * lock/modify each SLRU page once.
186 */
187 for (i = 0, headxid = xid;;)
188 {
189 int pageno = TransactionIdToCTsPage(headxid);
190 int j;
191
192 for (j = i; j < nsubxids; j++)
193 {
194 if (TransactionIdToCTsPage(subxids[j]) != pageno)
195 break;
196 }
197 /* subxids[i..j] are on the same page as the head */
198
199 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200 pageno);
201
202 /* if we wrote out all subxids, we're done. */
203 if (j + 1 >= nsubxids)
204 break;
205
206 /*
207 * Set the new head and skip over it, as well as over the subxids we
208 * just wrote.
209 */
210 headxid = subxids[j];
211 i += j - i + 1;
212 }
213
214 /* update the cached value in shared memory */
215 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216 commitTsShared->xidLastCommit = xid;
217 commitTsShared->dataLastCommit.time = timestamp;
218 commitTsShared->dataLastCommit.nodeid = nodeid;
219
220 /* and move forwards our endpoint, if needed */
221 if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
222 ShmemVariableCache->newestCommitTsXid = newestXact;
223 LWLockRelease(CommitTsLock);
224}
225
226/*
227 * Record the commit timestamp of transaction entries in the commit log for all
228 * entries on a single page. Atomic only on this page.
229 */
230static void
231SetXidCommitTsInPage(TransactionId xid, int nsubxids,
232 TransactionId *subxids, TimestampTz ts,
233 RepOriginId nodeid, int pageno)
234{
235 int slotno;
236 int i;
237
238 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
239
240 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
241
242 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
243 for (i = 0; i < nsubxids; i++)
244 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
245
246 CommitTsCtl->shared->page_dirty[slotno] = true;
247
248 LWLockRelease(CommitTsControlLock);
249}
250
251/*
252 * Sets the commit timestamp of a single transaction.
253 *
254 * Must be called with CommitTsControlLock held
255 */
256static void
257TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
258 RepOriginId nodeid, int slotno)
259{
260 int entryno = TransactionIdToCTsEntry(xid);
261 CommitTimestampEntry entry;
262
263 Assert(TransactionIdIsNormal(xid));
264
265 entry.time = ts;
266 entry.nodeid = nodeid;
267
268 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
269 SizeOfCommitTimestampEntry * entryno,
270 &entry, SizeOfCommitTimestampEntry);
271}
272
273/*
274 * Interrogate the commit timestamp of a transaction.
275 *
276 * The return value indicates whether a commit timestamp record was found for
277 * the given xid. The timestamp value is returned in *ts (which may not be
278 * null), and the origin node for the Xid is returned in *nodeid, if it's not
279 * null.
280 */
281bool
282TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
283 RepOriginId *nodeid)
284{
285 int pageno = TransactionIdToCTsPage(xid);
286 int entryno = TransactionIdToCTsEntry(xid);
287 int slotno;
288 CommitTimestampEntry entry;
289 TransactionId oldestCommitTsXid;
290 TransactionId newestCommitTsXid;
291
292 if (!TransactionIdIsValid(xid))
293 ereport(ERROR,
294 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
295 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
296 else if (!TransactionIdIsNormal(xid))
297 {
298 /* frozen and bootstrap xids are always committed far in the past */
299 *ts = 0;
300 if (nodeid)
301 *nodeid = 0;
302 return false;
303 }
304
305 LWLockAcquire(CommitTsLock, LW_SHARED);
306
307 /* Error if module not enabled */
308 if (!commitTsShared->commitTsActive)
309 error_commit_ts_disabled();
310
311 /*
312 * If we're asked for the cached value, return that. Otherwise, fall
313 * through to read from SLRU.
314 */
315 if (commitTsShared->xidLastCommit == xid)
316 {
317 *ts = commitTsShared->dataLastCommit.time;
318 if (nodeid)
319 *nodeid = commitTsShared->dataLastCommit.nodeid;
320
321 LWLockRelease(CommitTsLock);
322 return *ts != 0;
323 }
324
325 oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
326 newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
327 /* neither is invalid, or both are */
328 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
329 LWLockRelease(CommitTsLock);
330
331 /*
332 * Return empty if the requested value is outside our valid range.
333 */
334 if (!TransactionIdIsValid(oldestCommitTsXid) ||
335 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
336 TransactionIdPrecedes(newestCommitTsXid, xid))
337 {
338 *ts = 0;
339 if (nodeid)
340 *nodeid = InvalidRepOriginId;
341 return false;
342 }
343
344 /* lock is acquired by SimpleLruReadPage_ReadOnly */
345 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
346 memcpy(&entry,
347 CommitTsCtl->shared->page_buffer[slotno] +
348 SizeOfCommitTimestampEntry * entryno,
349 SizeOfCommitTimestampEntry);
350
351 *ts = entry.time;
352 if (nodeid)
353 *nodeid = entry.nodeid;
354
355 LWLockRelease(CommitTsControlLock);
356 return *ts != 0;
357}
358
359/*
360 * Return the Xid of the latest committed transaction. (As far as this module
361 * is concerned, anyway; it's up to the caller to ensure the value is useful
362 * for its purposes.)
363 *
364 * ts and extra are filled with the corresponding data; they can be passed
365 * as NULL if not wanted.
366 */
367TransactionId
368GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
369{
370 TransactionId xid;
371
372 LWLockAcquire(CommitTsLock, LW_SHARED);
373
374 /* Error if module not enabled */
375 if (!commitTsShared->commitTsActive)
376 error_commit_ts_disabled();
377
378 xid = commitTsShared->xidLastCommit;
379 if (ts)
380 *ts = commitTsShared->dataLastCommit.time;
381 if (nodeid)
382 *nodeid = commitTsShared->dataLastCommit.nodeid;
383 LWLockRelease(CommitTsLock);
384
385 return xid;
386}
387
388static void
389error_commit_ts_disabled(void)
390{
391 ereport(ERROR,
392 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393 errmsg("could not get commit timestamp data"),
394 RecoveryInProgress() ?
395 errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
396 "track_commit_timestamp") :
397 errhint("Make sure the configuration parameter \"%s\" is set.",
398 "track_commit_timestamp")));
399}
400
401/*
402 * SQL-callable wrapper to obtain commit time of a transaction
403 */
404Datum
405pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
406{
407 TransactionId xid = PG_GETARG_UINT32(0);
408 TimestampTz ts;
409 bool found;
410
411 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
412
413 if (!found)
414 PG_RETURN_NULL();
415
416 PG_RETURN_TIMESTAMPTZ(ts);
417}
418
419
420Datum
421pg_last_committed_xact(PG_FUNCTION_ARGS)
422{
423 TransactionId xid;
424 TimestampTz ts;
425 Datum values[2];
426 bool nulls[2];
427 TupleDesc tupdesc;
428 HeapTuple htup;
429
430 /* and construct a tuple with our data */
431 xid = GetLatestCommitTsData(&ts, NULL);
432
433 /*
434 * Construct a tuple descriptor for the result row. This must match this
435 * function's pg_proc entry!
436 */
437 tupdesc = CreateTemplateTupleDesc(2);
438 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
439 XIDOID, -1, 0);
440 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
441 TIMESTAMPTZOID, -1, 0);
442 tupdesc = BlessTupleDesc(tupdesc);
443
444 if (!TransactionIdIsNormal(xid))
445 {
446 memset(nulls, true, sizeof(nulls));
447 }
448 else
449 {
450 values[0] = TransactionIdGetDatum(xid);
451 nulls[0] = false;
452
453 values[1] = TimestampTzGetDatum(ts);
454 nulls[1] = false;
455 }
456
457 htup = heap_form_tuple(tupdesc, values, nulls);
458
459 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
460}
461
462
463/*
464 * Number of shared CommitTS buffers.
465 *
466 * We use a very similar logic as for the number of CLOG buffers; see comments
467 * in CLOGShmemBuffers.
468 */
469Size
470CommitTsShmemBuffers(void)
471{
472 return Min(16, Max(4, NBuffers / 1024));
473}
474
475/*
476 * Shared memory sizing for CommitTs
477 */
478Size
479CommitTsShmemSize(void)
480{
481 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
482 sizeof(CommitTimestampShared);
483}
484
485/*
486 * Initialize CommitTs at system startup (postmaster start or standalone
487 * backend)
488 */
489void
490CommitTsShmemInit(void)
491{
492 bool found;
493
494 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
495 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
496 CommitTsControlLock, "pg_commit_ts",
497 LWTRANCHE_COMMITTS_BUFFERS);
498
499 commitTsShared = ShmemInitStruct("CommitTs shared",
500 sizeof(CommitTimestampShared),
501 &found);
502
503 if (!IsUnderPostmaster)
504 {
505 Assert(!found);
506
507 commitTsShared->xidLastCommit = InvalidTransactionId;
508 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
509 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
510 commitTsShared->commitTsActive = false;
511 }
512 else
513 Assert(found);
514}
515
516/*
517 * This function must be called ONCE on system install.
518 *
519 * (The CommitTs directory is assumed to have been created by initdb, and
520 * CommitTsShmemInit must have been called already.)
521 */
522void
523BootStrapCommitTs(void)
524{
525 /*
526 * Nothing to do here at present, unlike most other SLRU modules; segments
527 * are created when the server is started with this module enabled. See
528 * ActivateCommitTs.
529 */
530}
531
532/*
533 * Initialize (or reinitialize) a page of CommitTs to zeroes.
534 * If writeXlog is true, also emit an XLOG record saying we did this.
535 *
536 * The page is not actually written, just set up in shared memory.
537 * The slot number of the new page is returned.
538 *
539 * Control lock must be held at entry, and will be held at exit.
540 */
541static int
542ZeroCommitTsPage(int pageno, bool writeXlog)
543{
544 int slotno;
545
546 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
547
548 if (writeXlog)
549 WriteZeroPageXlogRec(pageno);
550
551 return slotno;
552}
553
554/*
555 * This must be called ONCE during postmaster or standalone-backend startup,
556 * after StartupXLOG has initialized ShmemVariableCache->nextFullXid.
557 */
558void
559StartupCommitTs(void)
560{
561 ActivateCommitTs();
562}
563
564/*
565 * This must be called ONCE during postmaster or standalone-backend startup,
566 * after recovery has finished.
567 */
568void
569CompleteCommitTsInitialization(void)
570{
571 /*
572 * If the feature is not enabled, turn it off for good. This also removes
573 * any leftover data.
574 *
575 * Conversely, we activate the module if the feature is enabled. This is
576 * necessary for primary and standby as the activation depends on the
577 * control file contents at the beginning of recovery or when a
578 * XLOG_PARAMETER_CHANGE is replayed.
579 */
580 if (!track_commit_timestamp)
581 DeactivateCommitTs();
582 else
583 ActivateCommitTs();
584}
585
586/*
587 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
588 * XLog record during recovery.
589 */
590void
591CommitTsParameterChange(bool newvalue, bool oldvalue)
592{
593 /*
594 * If the commit_ts module is disabled in this server and we get word from
595 * the master server that it is enabled there, activate it so that we can
596 * replay future WAL records involving it; also mark it as active on
597 * pg_control. If the old value was already set, we already did this, so
598 * don't do anything.
599 *
600 * If the module is disabled in the master, disable it here too, unless
601 * the module is enabled locally.
602 *
603 * Note this only runs in the recovery process, so an unlocked read is
604 * fine.
605 */
606 if (newvalue)
607 {
608 if (!commitTsShared->commitTsActive)
609 ActivateCommitTs();
610 }
611 else if (commitTsShared->commitTsActive)
612 DeactivateCommitTs();
613}
614
615/*
616 * Activate this module whenever necessary.
617 * This must happen during postmaster or standalone-backend startup,
618 * or during WAL replay anytime the track_commit_timestamp setting is
619 * changed in the master.
620 *
621 * The reason why this SLRU needs separate activation/deactivation functions is
622 * that it can be enabled/disabled during start and the activation/deactivation
623 * on master is propagated to standby via replay. Other SLRUs don't have this
624 * property and they can be just initialized during normal startup.
625 *
626 * This is in charge of creating the currently active segment, if it's not
627 * already there. The reason for this is that the server might have been
628 * running with this module disabled for a while and thus might have skipped
629 * the normal creation point.
630 */
631static void
632ActivateCommitTs(void)
633{
634 TransactionId xid;
635 int pageno;
636
637 /* If we've done this already, there's nothing to do */
638 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
639 if (commitTsShared->commitTsActive)
640 {
641 LWLockRelease(CommitTsLock);
642 return;
643 }
644 LWLockRelease(CommitTsLock);
645
646 xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
647 pageno = TransactionIdToCTsPage(xid);
648
649 /*
650 * Re-Initialize our idea of the latest page number.
651 */
652 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
653 CommitTsCtl->shared->latest_page_number = pageno;
654 LWLockRelease(CommitTsControlLock);
655
656 /*
657 * If CommitTs is enabled, but it wasn't in the previous server run, we
658 * need to set the oldest and newest values to the next Xid; that way, we
659 * will not try to read data that might not have been set.
660 *
661 * XXX does this have a problem if a server is started with commitTs
662 * enabled, then started with commitTs disabled, then restarted with it
663 * enabled again? It doesn't look like it does, because there should be a
664 * checkpoint that sets the value to InvalidTransactionId at end of
665 * recovery; and so any chance of injecting new transactions without
666 * CommitTs values would occur after the oldestCommitTsXid has been set to
667 * Invalid temporarily.
668 */
669 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
670 if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
671 {
672 ShmemVariableCache->oldestCommitTsXid =
673 ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
674 }
675 LWLockRelease(CommitTsLock);
676
677 /* Create the current segment file, if necessary */
678 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
679 {
680 int slotno;
681
682 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
683 slotno = ZeroCommitTsPage(pageno, false);
684 SimpleLruWritePage(CommitTsCtl, slotno);
685 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
686 LWLockRelease(CommitTsControlLock);
687 }
688
689 /* Change the activation status in shared memory. */
690 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
691 commitTsShared->commitTsActive = true;
692 LWLockRelease(CommitTsLock);
693}
694
695/*
696 * Deactivate this module.
697 *
698 * This must be called when the track_commit_timestamp parameter is turned off.
699 * This happens during postmaster or standalone-backend startup, or during WAL
700 * replay.
701 *
702 * Resets CommitTs into invalid state to make sure we don't hand back
703 * possibly-invalid data; also removes segments of old data.
704 */
705static void
706DeactivateCommitTs(void)
707{
708 /*
709 * Cleanup the status in the shared memory.
710 *
711 * We reset everything in the commitTsShared record to prevent user from
712 * getting confusing data about last committed transaction on the standby
713 * when the module was activated repeatedly on the primary.
714 */
715 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
716
717 commitTsShared->commitTsActive = false;
718 commitTsShared->xidLastCommit = InvalidTransactionId;
719 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
720 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
721
722 ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
723 ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
724
725 LWLockRelease(CommitTsLock);
726
727 /*
728 * Remove *all* files. This is necessary so that there are no leftover
729 * files; in the case where this feature is later enabled after running
730 * with it disabled for some time there may be a gap in the file sequence.
731 * (We can probably tolerate out-of-sequence files, as they are going to
732 * be overwritten anyway when we wrap around, but it seems better to be
733 * tidy.)
734 */
735 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
736 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
737 LWLockRelease(CommitTsControlLock);
738}
739
740/*
741 * This must be called ONCE during postmaster or standalone-backend shutdown
742 */
743void
744ShutdownCommitTs(void)
745{
746 /* Flush dirty CommitTs pages to disk */
747 SimpleLruFlush(CommitTsCtl, false);
748
749 /*
750 * fsync pg_commit_ts to ensure that any files flushed previously are
751 * durably on disk.
752 */
753 fsync_fname("pg_commit_ts", true);
754}
755
756/*
757 * Perform a checkpoint --- either during shutdown, or on-the-fly
758 */
759void
760CheckPointCommitTs(void)
761{
762 /* Flush dirty CommitTs pages to disk */
763 SimpleLruFlush(CommitTsCtl, true);
764
765 /*
766 * fsync pg_commit_ts to ensure that any files flushed previously are
767 * durably on disk.
768 */
769 fsync_fname("pg_commit_ts", true);
770}
771
772/*
773 * Make sure that CommitTs has room for a newly-allocated XID.
774 *
775 * NB: this is called while holding XidGenLock. We want it to be very fast
776 * most of the time; even when it's not so fast, no actual I/O need happen
777 * unless we're forced to write out a dirty CommitTs or xlog page to make room
778 * in shared memory.
779 *
780 * NB: the current implementation relies on track_commit_timestamp being
781 * PGC_POSTMASTER.
782 */
783void
784ExtendCommitTs(TransactionId newestXact)
785{
786 int pageno;
787
788 /*
789 * Nothing to do if module not enabled. Note we do an unlocked read of
790 * the flag here, which is okay because this routine is only called from
791 * GetNewTransactionId, which is never called in a standby.
792 */
793 Assert(!InRecovery);
794 if (!commitTsShared->commitTsActive)
795 return;
796
797 /*
798 * No work except at first XID of a page. But beware: just after
799 * wraparound, the first XID of page zero is FirstNormalTransactionId.
800 */
801 if (TransactionIdToCTsEntry(newestXact) != 0 &&
802 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
803 return;
804
805 pageno = TransactionIdToCTsPage(newestXact);
806
807 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
808
809 /* Zero the page and make an XLOG entry about it */
810 ZeroCommitTsPage(pageno, !InRecovery);
811
812 LWLockRelease(CommitTsControlLock);
813}
814
815/*
816 * Remove all CommitTs segments before the one holding the passed
817 * transaction ID.
818 *
819 * Note that we don't need to flush XLOG here.
820 */
821void
822TruncateCommitTs(TransactionId oldestXact)
823{
824 int cutoffPage;
825
826 /*
827 * The cutoff point is the start of the segment containing oldestXact. We
828 * pass the *page* containing oldestXact to SimpleLruTruncate.
829 */
830 cutoffPage = TransactionIdToCTsPage(oldestXact);
831
832 /* Check to see if there's any files that could be removed */
833 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
834 &cutoffPage))
835 return; /* nothing to remove */
836
837 /* Write XLOG record */
838 WriteTruncateXlogRec(cutoffPage, oldestXact);
839
840 /* Now we can remove the old CommitTs segment(s) */
841 SimpleLruTruncate(CommitTsCtl, cutoffPage);
842}
843
844/*
845 * Set the limit values between which commit TS can be consulted.
846 */
847void
848SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
849{
850 /*
851 * Be careful not to overwrite values that are either further into the
852 * "future" or signal a disabled committs.
853 */
854 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
855 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
856 {
857 if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
858 ShmemVariableCache->oldestCommitTsXid = oldestXact;
859 if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
860 ShmemVariableCache->newestCommitTsXid = newestXact;
861 }
862 else
863 {
864 Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
865 ShmemVariableCache->oldestCommitTsXid = oldestXact;
866 ShmemVariableCache->newestCommitTsXid = newestXact;
867 }
868 LWLockRelease(CommitTsLock);
869}
870
871/*
872 * Move forwards the oldest commitTS value that can be consulted
873 */
874void
875AdvanceOldestCommitTsXid(TransactionId oldestXact)
876{
877 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
878 if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
879 TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
880 ShmemVariableCache->oldestCommitTsXid = oldestXact;
881 LWLockRelease(CommitTsLock);
882}
883
884
885/*
886 * Decide which of two commitTS page numbers is "older" for truncation
887 * purposes.
888 *
889 * We need to use comparison of TransactionIds here in order to do the right
890 * thing with wraparound XID arithmetic. However, if we are asked about
891 * page number zero, we don't want to hand InvalidTransactionId to
892 * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So,
893 * offset both xids by FirstNormalTransactionId to avoid that.
894 */
895static bool
896CommitTsPagePrecedes(int page1, int page2)
897{
898 TransactionId xid1;
899 TransactionId xid2;
900
901 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
902 xid1 += FirstNormalTransactionId;
903 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
904 xid2 += FirstNormalTransactionId;
905
906 return TransactionIdPrecedes(xid1, xid2);
907}
908
909
910/*
911 * Write a ZEROPAGE xlog record
912 */
913static void
914WriteZeroPageXlogRec(int pageno)
915{
916 XLogBeginInsert();
917 XLogRegisterData((char *) (&pageno), sizeof(int));
918 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
919}
920
921/*
922 * Write a TRUNCATE xlog record
923 */
924static void
925WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
926{
927 xl_commit_ts_truncate xlrec;
928
929 xlrec.pageno = pageno;
930 xlrec.oldestXid = oldestXid;
931
932 XLogBeginInsert();
933 XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
934 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
935}
936
937/*
938 * Write a SETTS xlog record
939 */
940static void
941WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
942 TransactionId *subxids, TimestampTz timestamp,
943 RepOriginId nodeid)
944{
945 xl_commit_ts_set record;
946
947 record.timestamp = timestamp;
948 record.nodeid = nodeid;
949 record.mainxid = mainxid;
950
951 XLogBeginInsert();
952 XLogRegisterData((char *) &record,
953 offsetof(xl_commit_ts_set, mainxid) +
954 sizeof(TransactionId));
955 XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
956 XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
957}
958
959/*
960 * CommitTS resource manager's routines
961 */
962void
963commit_ts_redo(XLogReaderState *record)
964{
965 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
966
967 /* Backup blocks are not used in commit_ts records */
968 Assert(!XLogRecHasAnyBlockRefs(record));
969
970 if (info == COMMIT_TS_ZEROPAGE)
971 {
972 int pageno;
973 int slotno;
974
975 memcpy(&pageno, XLogRecGetData(record), sizeof(int));
976
977 LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
978
979 slotno = ZeroCommitTsPage(pageno, false);
980 SimpleLruWritePage(CommitTsCtl, slotno);
981 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
982
983 LWLockRelease(CommitTsControlLock);
984 }
985 else if (info == COMMIT_TS_TRUNCATE)
986 {
987 xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
988
989 AdvanceOldestCommitTsXid(trunc->oldestXid);
990
991 /*
992 * During XLOG replay, latest_page_number isn't set up yet; insert a
993 * suitable value to bypass the sanity test in SimpleLruTruncate.
994 */
995 CommitTsCtl->shared->latest_page_number = trunc->pageno;
996
997 SimpleLruTruncate(CommitTsCtl, trunc->pageno);
998 }
999 else if (info == COMMIT_TS_SETTS)
1000 {
1001 xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
1002 int nsubxids;
1003 TransactionId *subxids;
1004
1005 nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
1006 sizeof(TransactionId));
1007 if (nsubxids > 0)
1008 {
1009 subxids = palloc(sizeof(TransactionId) * nsubxids);
1010 memcpy(subxids,
1011 XLogRecGetData(record) + SizeOfCommitTsSet,
1012 sizeof(TransactionId) * nsubxids);
1013 }
1014 else
1015 subxids = NULL;
1016
1017 TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
1018 setts->timestamp, setts->nodeid, true);
1019 if (subxids)
1020 pfree(subxids);
1021 }
1022 else
1023 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1024}
1025