| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * commit_ts.c |
| 4 | * PostgreSQL commit timestamp manager |
| 5 | * |
| 6 | * This module is a pg_xact-like system that stores the commit timestamp |
| 7 | * for each transaction. |
| 8 | * |
| 9 | * XLOG interactions: this module generates an XLOG record whenever a new |
| 10 | * CommitTs page is initialized to zeroes. Also, one XLOG record is |
| 11 | * generated for setting of values when the caller requests it; this allows |
| 12 | * us to support values coming from places other than transaction commit. |
| 13 | * Other writes of CommitTS come from recording of transaction commit in |
| 14 | * xact.c, which generates its own XLOG records for these events and will |
| 15 | * re-perform the status update on redo; so we need make no additional XLOG |
| 16 | * entry here. |
| 17 | * |
| 18 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 19 | * Portions Copyright (c) 1994, Regents of the University of California |
| 20 | * |
| 21 | * src/backend/access/transam/commit_ts.c |
| 22 | * |
| 23 | *------------------------------------------------------------------------- |
| 24 | */ |
| 25 | #include "postgres.h" |
| 26 | |
| 27 | #include "access/commit_ts.h" |
| 28 | #include "access/htup_details.h" |
| 29 | #include "access/slru.h" |
| 30 | #include "access/transam.h" |
| 31 | #include "catalog/pg_type.h" |
| 32 | #include "funcapi.h" |
| 33 | #include "miscadmin.h" |
| 34 | #include "pg_trace.h" |
| 35 | #include "storage/shmem.h" |
| 36 | #include "utils/builtins.h" |
| 37 | #include "utils/snapmgr.h" |
| 38 | #include "utils/timestamp.h" |
| 39 | |
| 40 | /* |
| 41 | * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used |
| 42 | * everywhere else in Postgres. |
| 43 | * |
| 44 | * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, |
| 45 | * CommitTs page numbering also wraps around at |
| 46 | * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at |
| 47 | * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no |
| 48 | * explicit notice of that fact in this module, except when comparing segment |
| 49 | * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes). |
| 50 | */ |
| 51 | |
| 52 | /* |
| 53 | * We need 8+2 bytes per xact. Note that enlarging this struct might mean |
| 54 | * the largest possible file name is more than 5 chars long; see |
| 55 | * SlruScanDirectory. |
| 56 | */ |
| 57 | typedef struct CommitTimestampEntry |
| 58 | { |
| 59 | TimestampTz time; |
| 60 | RepOriginId nodeid; |
| 61 | } CommitTimestampEntry; |
| 62 | |
| 63 | #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \ |
| 64 | sizeof(RepOriginId)) |
| 65 | |
| 66 | #define COMMIT_TS_XACTS_PER_PAGE \ |
| 67 | (BLCKSZ / SizeOfCommitTimestampEntry) |
| 68 | |
| 69 | #define TransactionIdToCTsPage(xid) \ |
| 70 | ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE) |
| 71 | #define TransactionIdToCTsEntry(xid) \ |
| 72 | ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE) |
| 73 | |
| 74 | /* |
| 75 | * Link to shared-memory data structures for CommitTs control |
| 76 | */ |
| 77 | static SlruCtlData CommitTsCtlData; |
| 78 | |
| 79 | #define CommitTsCtl (&CommitTsCtlData) |
| 80 | |
| 81 | /* |
| 82 | * We keep a cache of the last value set in shared memory. |
| 83 | * |
| 84 | * This is also good place to keep the activation status. We keep this |
| 85 | * separate from the GUC so that the standby can activate the module if the |
| 86 | * primary has it active independently of the value of the GUC. |
| 87 | * |
| 88 | * This is protected by CommitTsLock. In some places, we use commitTsActive |
| 89 | * without acquiring the lock; where this happens, a comment explains the |
| 90 | * rationale for it. |
| 91 | */ |
| 92 | typedef struct CommitTimestampShared |
| 93 | { |
| 94 | TransactionId xidLastCommit; |
| 95 | CommitTimestampEntry dataLastCommit; |
| 96 | bool commitTsActive; |
| 97 | } CommitTimestampShared; |
| 98 | |
| 99 | CommitTimestampShared *commitTsShared; |
| 100 | |
| 101 | |
| 102 | /* GUC variable */ |
| 103 | bool track_commit_timestamp; |
| 104 | |
| 105 | static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, |
| 106 | TransactionId *subxids, TimestampTz ts, |
| 107 | RepOriginId nodeid, int pageno); |
| 108 | static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, |
| 109 | RepOriginId nodeid, int slotno); |
| 110 | static void error_commit_ts_disabled(void); |
| 111 | static int ZeroCommitTsPage(int pageno, bool writeXlog); |
| 112 | static bool CommitTsPagePrecedes(int page1, int page2); |
| 113 | static void ActivateCommitTs(void); |
| 114 | static void DeactivateCommitTs(void); |
| 115 | static void WriteZeroPageXlogRec(int pageno); |
| 116 | static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid); |
| 117 | static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, |
| 118 | TransactionId *subxids, TimestampTz timestamp, |
| 119 | RepOriginId nodeid); |
| 120 | |
| 121 | /* |
| 122 | * TransactionTreeSetCommitTsData |
| 123 | * |
| 124 | * Record the final commit timestamp of transaction entries in the commit log |
| 125 | * for a transaction and its subtransaction tree, as efficiently as possible. |
| 126 | * |
| 127 | * xid is the top level transaction id. |
| 128 | * |
| 129 | * subxids is an array of xids of length nsubxids, representing subtransactions |
| 130 | * in the tree of xid. In various cases nsubxids may be zero. |
| 131 | * The reason why tracking just the parent xid commit timestamp is not enough |
| 132 | * is that the subtrans SLRU does not stay valid across crashes (it's not |
| 133 | * permanent) so we need to keep the information about them here. If the |
| 134 | * subtrans implementation changes in the future, we might want to revisit the |
| 135 | * decision of storing timestamp info for each subxid. |
| 136 | * |
| 137 | * The write_xlog parameter tells us whether to include an XLog record of this |
| 138 | * or not. Normally, this is called from transaction commit routines (both |
| 139 | * normal and prepared) and the information will be stored in the transaction |
| 140 | * commit XLog record, and so they should pass "false" for this. The XLog redo |
| 141 | * code should use "false" here as well. Other callers probably want to pass |
| 142 | * true, so that the given values persist in case of crashes. |
| 143 | */ |
| 144 | void |
| 145 | TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, |
| 146 | TransactionId *subxids, TimestampTz timestamp, |
| 147 | RepOriginId nodeid, bool write_xlog) |
| 148 | { |
| 149 | int i; |
| 150 | TransactionId headxid; |
| 151 | TransactionId newestXact; |
| 152 | |
| 153 | /* |
| 154 | * No-op if the module is not active. |
| 155 | * |
| 156 | * An unlocked read here is fine, because in a standby (the only place |
| 157 | * where the flag can change in flight) this routine is only called by the |
| 158 | * recovery process, which is also the only process which can change the |
| 159 | * flag. |
| 160 | */ |
| 161 | if (!commitTsShared->commitTsActive) |
| 162 | return; |
| 163 | |
| 164 | /* |
| 165 | * Comply with the WAL-before-data rule: if caller specified it wants this |
| 166 | * value to be recorded in WAL, do so before touching the data. |
| 167 | */ |
| 168 | if (write_xlog) |
| 169 | WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid); |
| 170 | |
| 171 | /* |
| 172 | * Figure out the latest Xid in this batch: either the last subxid if |
| 173 | * there's any, otherwise the parent xid. |
| 174 | */ |
| 175 | if (nsubxids > 0) |
| 176 | newestXact = subxids[nsubxids - 1]; |
| 177 | else |
| 178 | newestXact = xid; |
| 179 | |
| 180 | /* |
| 181 | * We split the xids to set the timestamp to in groups belonging to the |
| 182 | * same SLRU page; the first element in each such set is its head. The |
| 183 | * first group has the main XID as the head; subsequent sets use the first |
| 184 | * subxid not on the previous page as head. This way, we only have to |
| 185 | * lock/modify each SLRU page once. |
| 186 | */ |
| 187 | for (i = 0, headxid = xid;;) |
| 188 | { |
| 189 | int pageno = TransactionIdToCTsPage(headxid); |
| 190 | int j; |
| 191 | |
| 192 | for (j = i; j < nsubxids; j++) |
| 193 | { |
| 194 | if (TransactionIdToCTsPage(subxids[j]) != pageno) |
| 195 | break; |
| 196 | } |
| 197 | /* subxids[i..j] are on the same page as the head */ |
| 198 | |
| 199 | SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid, |
| 200 | pageno); |
| 201 | |
| 202 | /* if we wrote out all subxids, we're done. */ |
| 203 | if (j + 1 >= nsubxids) |
| 204 | break; |
| 205 | |
| 206 | /* |
| 207 | * Set the new head and skip over it, as well as over the subxids we |
| 208 | * just wrote. |
| 209 | */ |
| 210 | headxid = subxids[j]; |
| 211 | i += j - i + 1; |
| 212 | } |
| 213 | |
| 214 | /* update the cached value in shared memory */ |
| 215 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
| 216 | commitTsShared->xidLastCommit = xid; |
| 217 | commitTsShared->dataLastCommit.time = timestamp; |
| 218 | commitTsShared->dataLastCommit.nodeid = nodeid; |
| 219 | |
| 220 | /* and move forwards our endpoint, if needed */ |
| 221 | if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact)) |
| 222 | ShmemVariableCache->newestCommitTsXid = newestXact; |
| 223 | LWLockRelease(CommitTsLock); |
| 224 | } |
| 225 | |
| 226 | /* |
| 227 | * Record the commit timestamp of transaction entries in the commit log for all |
| 228 | * entries on a single page. Atomic only on this page. |
| 229 | */ |
| 230 | static void |
| 231 | SetXidCommitTsInPage(TransactionId xid, int nsubxids, |
| 232 | TransactionId *subxids, TimestampTz ts, |
| 233 | RepOriginId nodeid, int pageno) |
| 234 | { |
| 235 | int slotno; |
| 236 | int i; |
| 237 | |
| 238 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
| 239 | |
| 240 | slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); |
| 241 | |
| 242 | TransactionIdSetCommitTs(xid, ts, nodeid, slotno); |
| 243 | for (i = 0; i < nsubxids; i++) |
| 244 | TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno); |
| 245 | |
| 246 | CommitTsCtl->shared->page_dirty[slotno] = true; |
| 247 | |
| 248 | LWLockRelease(CommitTsControlLock); |
| 249 | } |
| 250 | |
| 251 | /* |
| 252 | * Sets the commit timestamp of a single transaction. |
| 253 | * |
| 254 | * Must be called with CommitTsControlLock held |
| 255 | */ |
| 256 | static void |
| 257 | TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, |
| 258 | RepOriginId nodeid, int slotno) |
| 259 | { |
| 260 | int entryno = TransactionIdToCTsEntry(xid); |
| 261 | CommitTimestampEntry entry; |
| 262 | |
| 263 | Assert(TransactionIdIsNormal(xid)); |
| 264 | |
| 265 | entry.time = ts; |
| 266 | entry.nodeid = nodeid; |
| 267 | |
| 268 | memcpy(CommitTsCtl->shared->page_buffer[slotno] + |
| 269 | SizeOfCommitTimestampEntry * entryno, |
| 270 | &entry, SizeOfCommitTimestampEntry); |
| 271 | } |
| 272 | |
| 273 | /* |
| 274 | * Interrogate the commit timestamp of a transaction. |
| 275 | * |
| 276 | * The return value indicates whether a commit timestamp record was found for |
| 277 | * the given xid. The timestamp value is returned in *ts (which may not be |
| 278 | * null), and the origin node for the Xid is returned in *nodeid, if it's not |
| 279 | * null. |
| 280 | */ |
| 281 | bool |
| 282 | TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, |
| 283 | RepOriginId *nodeid) |
| 284 | { |
| 285 | int pageno = TransactionIdToCTsPage(xid); |
| 286 | int entryno = TransactionIdToCTsEntry(xid); |
| 287 | int slotno; |
| 288 | CommitTimestampEntry entry; |
| 289 | TransactionId oldestCommitTsXid; |
| 290 | TransactionId newestCommitTsXid; |
| 291 | |
| 292 | if (!TransactionIdIsValid(xid)) |
| 293 | ereport(ERROR, |
| 294 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| 295 | errmsg("cannot retrieve commit timestamp for transaction %u" , xid))); |
| 296 | else if (!TransactionIdIsNormal(xid)) |
| 297 | { |
| 298 | /* frozen and bootstrap xids are always committed far in the past */ |
| 299 | *ts = 0; |
| 300 | if (nodeid) |
| 301 | *nodeid = 0; |
| 302 | return false; |
| 303 | } |
| 304 | |
| 305 | LWLockAcquire(CommitTsLock, LW_SHARED); |
| 306 | |
| 307 | /* Error if module not enabled */ |
| 308 | if (!commitTsShared->commitTsActive) |
| 309 | error_commit_ts_disabled(); |
| 310 | |
| 311 | /* |
| 312 | * If we're asked for the cached value, return that. Otherwise, fall |
| 313 | * through to read from SLRU. |
| 314 | */ |
| 315 | if (commitTsShared->xidLastCommit == xid) |
| 316 | { |
| 317 | *ts = commitTsShared->dataLastCommit.time; |
| 318 | if (nodeid) |
| 319 | *nodeid = commitTsShared->dataLastCommit.nodeid; |
| 320 | |
| 321 | LWLockRelease(CommitTsLock); |
| 322 | return *ts != 0; |
| 323 | } |
| 324 | |
| 325 | oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid; |
| 326 | newestCommitTsXid = ShmemVariableCache->newestCommitTsXid; |
| 327 | /* neither is invalid, or both are */ |
| 328 | Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid)); |
| 329 | LWLockRelease(CommitTsLock); |
| 330 | |
| 331 | /* |
| 332 | * Return empty if the requested value is outside our valid range. |
| 333 | */ |
| 334 | if (!TransactionIdIsValid(oldestCommitTsXid) || |
| 335 | TransactionIdPrecedes(xid, oldestCommitTsXid) || |
| 336 | TransactionIdPrecedes(newestCommitTsXid, xid)) |
| 337 | { |
| 338 | *ts = 0; |
| 339 | if (nodeid) |
| 340 | *nodeid = InvalidRepOriginId; |
| 341 | return false; |
| 342 | } |
| 343 | |
| 344 | /* lock is acquired by SimpleLruReadPage_ReadOnly */ |
| 345 | slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid); |
| 346 | memcpy(&entry, |
| 347 | CommitTsCtl->shared->page_buffer[slotno] + |
| 348 | SizeOfCommitTimestampEntry * entryno, |
| 349 | SizeOfCommitTimestampEntry); |
| 350 | |
| 351 | *ts = entry.time; |
| 352 | if (nodeid) |
| 353 | *nodeid = entry.nodeid; |
| 354 | |
| 355 | LWLockRelease(CommitTsControlLock); |
| 356 | return *ts != 0; |
| 357 | } |
| 358 | |
| 359 | /* |
| 360 | * Return the Xid of the latest committed transaction. (As far as this module |
| 361 | * is concerned, anyway; it's up to the caller to ensure the value is useful |
| 362 | * for its purposes.) |
| 363 | * |
| 364 | * ts and extra are filled with the corresponding data; they can be passed |
| 365 | * as NULL if not wanted. |
| 366 | */ |
| 367 | TransactionId |
| 368 | GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid) |
| 369 | { |
| 370 | TransactionId xid; |
| 371 | |
| 372 | LWLockAcquire(CommitTsLock, LW_SHARED); |
| 373 | |
| 374 | /* Error if module not enabled */ |
| 375 | if (!commitTsShared->commitTsActive) |
| 376 | error_commit_ts_disabled(); |
| 377 | |
| 378 | xid = commitTsShared->xidLastCommit; |
| 379 | if (ts) |
| 380 | *ts = commitTsShared->dataLastCommit.time; |
| 381 | if (nodeid) |
| 382 | *nodeid = commitTsShared->dataLastCommit.nodeid; |
| 383 | LWLockRelease(CommitTsLock); |
| 384 | |
| 385 | return xid; |
| 386 | } |
| 387 | |
| 388 | static void |
| 389 | error_commit_ts_disabled(void) |
| 390 | { |
| 391 | ereport(ERROR, |
| 392 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| 393 | errmsg("could not get commit timestamp data" ), |
| 394 | RecoveryInProgress() ? |
| 395 | errhint("Make sure the configuration parameter \"%s\" is set on the master server." , |
| 396 | "track_commit_timestamp" ) : |
| 397 | errhint("Make sure the configuration parameter \"%s\" is set." , |
| 398 | "track_commit_timestamp" ))); |
| 399 | } |
| 400 | |
| 401 | /* |
| 402 | * SQL-callable wrapper to obtain commit time of a transaction |
| 403 | */ |
| 404 | Datum |
| 405 | pg_xact_commit_timestamp(PG_FUNCTION_ARGS) |
| 406 | { |
| 407 | TransactionId xid = PG_GETARG_UINT32(0); |
| 408 | TimestampTz ts; |
| 409 | bool found; |
| 410 | |
| 411 | found = TransactionIdGetCommitTsData(xid, &ts, NULL); |
| 412 | |
| 413 | if (!found) |
| 414 | PG_RETURN_NULL(); |
| 415 | |
| 416 | PG_RETURN_TIMESTAMPTZ(ts); |
| 417 | } |
| 418 | |
| 419 | |
| 420 | Datum |
| 421 | pg_last_committed_xact(PG_FUNCTION_ARGS) |
| 422 | { |
| 423 | TransactionId xid; |
| 424 | TimestampTz ts; |
| 425 | Datum values[2]; |
| 426 | bool nulls[2]; |
| 427 | TupleDesc tupdesc; |
| 428 | HeapTuple htup; |
| 429 | |
| 430 | /* and construct a tuple with our data */ |
| 431 | xid = GetLatestCommitTsData(&ts, NULL); |
| 432 | |
| 433 | /* |
| 434 | * Construct a tuple descriptor for the result row. This must match this |
| 435 | * function's pg_proc entry! |
| 436 | */ |
| 437 | tupdesc = CreateTemplateTupleDesc(2); |
| 438 | TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid" , |
| 439 | XIDOID, -1, 0); |
| 440 | TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp" , |
| 441 | TIMESTAMPTZOID, -1, 0); |
| 442 | tupdesc = BlessTupleDesc(tupdesc); |
| 443 | |
| 444 | if (!TransactionIdIsNormal(xid)) |
| 445 | { |
| 446 | memset(nulls, true, sizeof(nulls)); |
| 447 | } |
| 448 | else |
| 449 | { |
| 450 | values[0] = TransactionIdGetDatum(xid); |
| 451 | nulls[0] = false; |
| 452 | |
| 453 | values[1] = TimestampTzGetDatum(ts); |
| 454 | nulls[1] = false; |
| 455 | } |
| 456 | |
| 457 | htup = heap_form_tuple(tupdesc, values, nulls); |
| 458 | |
| 459 | PG_RETURN_DATUM(HeapTupleGetDatum(htup)); |
| 460 | } |
| 461 | |
| 462 | |
| 463 | /* |
| 464 | * Number of shared CommitTS buffers. |
| 465 | * |
| 466 | * We use a very similar logic as for the number of CLOG buffers; see comments |
| 467 | * in CLOGShmemBuffers. |
| 468 | */ |
| 469 | Size |
| 470 | CommitTsShmemBuffers(void) |
| 471 | { |
| 472 | return Min(16, Max(4, NBuffers / 1024)); |
| 473 | } |
| 474 | |
| 475 | /* |
| 476 | * Shared memory sizing for CommitTs |
| 477 | */ |
| 478 | Size |
| 479 | CommitTsShmemSize(void) |
| 480 | { |
| 481 | return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) + |
| 482 | sizeof(CommitTimestampShared); |
| 483 | } |
| 484 | |
| 485 | /* |
| 486 | * Initialize CommitTs at system startup (postmaster start or standalone |
| 487 | * backend) |
| 488 | */ |
| 489 | void |
| 490 | CommitTsShmemInit(void) |
| 491 | { |
| 492 | bool found; |
| 493 | |
| 494 | CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; |
| 495 | SimpleLruInit(CommitTsCtl, "commit_timestamp" , CommitTsShmemBuffers(), 0, |
| 496 | CommitTsControlLock, "pg_commit_ts" , |
| 497 | LWTRANCHE_COMMITTS_BUFFERS); |
| 498 | |
| 499 | commitTsShared = ShmemInitStruct("CommitTs shared" , |
| 500 | sizeof(CommitTimestampShared), |
| 501 | &found); |
| 502 | |
| 503 | if (!IsUnderPostmaster) |
| 504 | { |
| 505 | Assert(!found); |
| 506 | |
| 507 | commitTsShared->xidLastCommit = InvalidTransactionId; |
| 508 | TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); |
| 509 | commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; |
| 510 | commitTsShared->commitTsActive = false; |
| 511 | } |
| 512 | else |
| 513 | Assert(found); |
| 514 | } |
| 515 | |
| 516 | /* |
| 517 | * This function must be called ONCE on system install. |
| 518 | * |
| 519 | * (The CommitTs directory is assumed to have been created by initdb, and |
| 520 | * CommitTsShmemInit must have been called already.) |
| 521 | */ |
| 522 | void |
| 523 | BootStrapCommitTs(void) |
| 524 | { |
| 525 | /* |
| 526 | * Nothing to do here at present, unlike most other SLRU modules; segments |
| 527 | * are created when the server is started with this module enabled. See |
| 528 | * ActivateCommitTs. |
| 529 | */ |
| 530 | } |
| 531 | |
| 532 | /* |
| 533 | * Initialize (or reinitialize) a page of CommitTs to zeroes. |
| 534 | * If writeXlog is true, also emit an XLOG record saying we did this. |
| 535 | * |
| 536 | * The page is not actually written, just set up in shared memory. |
| 537 | * The slot number of the new page is returned. |
| 538 | * |
| 539 | * Control lock must be held at entry, and will be held at exit. |
| 540 | */ |
| 541 | static int |
| 542 | ZeroCommitTsPage(int pageno, bool writeXlog) |
| 543 | { |
| 544 | int slotno; |
| 545 | |
| 546 | slotno = SimpleLruZeroPage(CommitTsCtl, pageno); |
| 547 | |
| 548 | if (writeXlog) |
| 549 | WriteZeroPageXlogRec(pageno); |
| 550 | |
| 551 | return slotno; |
| 552 | } |
| 553 | |
| 554 | /* |
| 555 | * This must be called ONCE during postmaster or standalone-backend startup, |
| 556 | * after StartupXLOG has initialized ShmemVariableCache->nextFullXid. |
| 557 | */ |
| 558 | void |
| 559 | StartupCommitTs(void) |
| 560 | { |
| 561 | ActivateCommitTs(); |
| 562 | } |
| 563 | |
| 564 | /* |
| 565 | * This must be called ONCE during postmaster or standalone-backend startup, |
| 566 | * after recovery has finished. |
| 567 | */ |
| 568 | void |
| 569 | CompleteCommitTsInitialization(void) |
| 570 | { |
| 571 | /* |
| 572 | * If the feature is not enabled, turn it off for good. This also removes |
| 573 | * any leftover data. |
| 574 | * |
| 575 | * Conversely, we activate the module if the feature is enabled. This is |
| 576 | * necessary for primary and standby as the activation depends on the |
| 577 | * control file contents at the beginning of recovery or when a |
| 578 | * XLOG_PARAMETER_CHANGE is replayed. |
| 579 | */ |
| 580 | if (!track_commit_timestamp) |
| 581 | DeactivateCommitTs(); |
| 582 | else |
| 583 | ActivateCommitTs(); |
| 584 | } |
| 585 | |
| 586 | /* |
| 587 | * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE |
| 588 | * XLog record during recovery. |
| 589 | */ |
| 590 | void |
| 591 | CommitTsParameterChange(bool newvalue, bool oldvalue) |
| 592 | { |
| 593 | /* |
| 594 | * If the commit_ts module is disabled in this server and we get word from |
| 595 | * the master server that it is enabled there, activate it so that we can |
| 596 | * replay future WAL records involving it; also mark it as active on |
| 597 | * pg_control. If the old value was already set, we already did this, so |
| 598 | * don't do anything. |
| 599 | * |
| 600 | * If the module is disabled in the master, disable it here too, unless |
| 601 | * the module is enabled locally. |
| 602 | * |
| 603 | * Note this only runs in the recovery process, so an unlocked read is |
| 604 | * fine. |
| 605 | */ |
| 606 | if (newvalue) |
| 607 | { |
| 608 | if (!commitTsShared->commitTsActive) |
| 609 | ActivateCommitTs(); |
| 610 | } |
| 611 | else if (commitTsShared->commitTsActive) |
| 612 | DeactivateCommitTs(); |
| 613 | } |
| 614 | |
| 615 | /* |
| 616 | * Activate this module whenever necessary. |
| 617 | * This must happen during postmaster or standalone-backend startup, |
| 618 | * or during WAL replay anytime the track_commit_timestamp setting is |
| 619 | * changed in the master. |
| 620 | * |
| 621 | * The reason why this SLRU needs separate activation/deactivation functions is |
| 622 | * that it can be enabled/disabled during start and the activation/deactivation |
| 623 | * on master is propagated to standby via replay. Other SLRUs don't have this |
| 624 | * property and they can be just initialized during normal startup. |
| 625 | * |
| 626 | * This is in charge of creating the currently active segment, if it's not |
| 627 | * already there. The reason for this is that the server might have been |
| 628 | * running with this module disabled for a while and thus might have skipped |
| 629 | * the normal creation point. |
| 630 | */ |
| 631 | static void |
| 632 | ActivateCommitTs(void) |
| 633 | { |
| 634 | TransactionId xid; |
| 635 | int pageno; |
| 636 | |
| 637 | /* If we've done this already, there's nothing to do */ |
| 638 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
| 639 | if (commitTsShared->commitTsActive) |
| 640 | { |
| 641 | LWLockRelease(CommitTsLock); |
| 642 | return; |
| 643 | } |
| 644 | LWLockRelease(CommitTsLock); |
| 645 | |
| 646 | xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid); |
| 647 | pageno = TransactionIdToCTsPage(xid); |
| 648 | |
| 649 | /* |
| 650 | * Re-Initialize our idea of the latest page number. |
| 651 | */ |
| 652 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
| 653 | CommitTsCtl->shared->latest_page_number = pageno; |
| 654 | LWLockRelease(CommitTsControlLock); |
| 655 | |
| 656 | /* |
| 657 | * If CommitTs is enabled, but it wasn't in the previous server run, we |
| 658 | * need to set the oldest and newest values to the next Xid; that way, we |
| 659 | * will not try to read data that might not have been set. |
| 660 | * |
| 661 | * XXX does this have a problem if a server is started with commitTs |
| 662 | * enabled, then started with commitTs disabled, then restarted with it |
| 663 | * enabled again? It doesn't look like it does, because there should be a |
| 664 | * checkpoint that sets the value to InvalidTransactionId at end of |
| 665 | * recovery; and so any chance of injecting new transactions without |
| 666 | * CommitTs values would occur after the oldestCommitTsXid has been set to |
| 667 | * Invalid temporarily. |
| 668 | */ |
| 669 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
| 670 | if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId) |
| 671 | { |
| 672 | ShmemVariableCache->oldestCommitTsXid = |
| 673 | ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId(); |
| 674 | } |
| 675 | LWLockRelease(CommitTsLock); |
| 676 | |
| 677 | /* Create the current segment file, if necessary */ |
| 678 | if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) |
| 679 | { |
| 680 | int slotno; |
| 681 | |
| 682 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
| 683 | slotno = ZeroCommitTsPage(pageno, false); |
| 684 | SimpleLruWritePage(CommitTsCtl, slotno); |
| 685 | Assert(!CommitTsCtl->shared->page_dirty[slotno]); |
| 686 | LWLockRelease(CommitTsControlLock); |
| 687 | } |
| 688 | |
| 689 | /* Change the activation status in shared memory. */ |
| 690 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
| 691 | commitTsShared->commitTsActive = true; |
| 692 | LWLockRelease(CommitTsLock); |
| 693 | } |
| 694 | |
| 695 | /* |
| 696 | * Deactivate this module. |
| 697 | * |
| 698 | * This must be called when the track_commit_timestamp parameter is turned off. |
| 699 | * This happens during postmaster or standalone-backend startup, or during WAL |
| 700 | * replay. |
| 701 | * |
| 702 | * Resets CommitTs into invalid state to make sure we don't hand back |
| 703 | * possibly-invalid data; also removes segments of old data. |
| 704 | */ |
| 705 | static void |
| 706 | DeactivateCommitTs(void) |
| 707 | { |
| 708 | /* |
| 709 | * Cleanup the status in the shared memory. |
| 710 | * |
| 711 | * We reset everything in the commitTsShared record to prevent user from |
| 712 | * getting confusing data about last committed transaction on the standby |
| 713 | * when the module was activated repeatedly on the primary. |
| 714 | */ |
| 715 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
| 716 | |
| 717 | commitTsShared->commitTsActive = false; |
| 718 | commitTsShared->xidLastCommit = InvalidTransactionId; |
| 719 | TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); |
| 720 | commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; |
| 721 | |
| 722 | ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId; |
| 723 | ShmemVariableCache->newestCommitTsXid = InvalidTransactionId; |
| 724 | |
| 725 | LWLockRelease(CommitTsLock); |
| 726 | |
| 727 | /* |
| 728 | * Remove *all* files. This is necessary so that there are no leftover |
| 729 | * files; in the case where this feature is later enabled after running |
| 730 | * with it disabled for some time there may be a gap in the file sequence. |
| 731 | * (We can probably tolerate out-of-sequence files, as they are going to |
| 732 | * be overwritten anyway when we wrap around, but it seems better to be |
| 733 | * tidy.) |
| 734 | */ |
| 735 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
| 736 | (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL); |
| 737 | LWLockRelease(CommitTsControlLock); |
| 738 | } |
| 739 | |
| 740 | /* |
| 741 | * This must be called ONCE during postmaster or standalone-backend shutdown |
| 742 | */ |
| 743 | void |
| 744 | ShutdownCommitTs(void) |
| 745 | { |
| 746 | /* Flush dirty CommitTs pages to disk */ |
| 747 | SimpleLruFlush(CommitTsCtl, false); |
| 748 | |
| 749 | /* |
| 750 | * fsync pg_commit_ts to ensure that any files flushed previously are |
| 751 | * durably on disk. |
| 752 | */ |
| 753 | fsync_fname("pg_commit_ts" , true); |
| 754 | } |
| 755 | |
| 756 | /* |
| 757 | * Perform a checkpoint --- either during shutdown, or on-the-fly |
| 758 | */ |
| 759 | void |
| 760 | CheckPointCommitTs(void) |
| 761 | { |
| 762 | /* Flush dirty CommitTs pages to disk */ |
| 763 | SimpleLruFlush(CommitTsCtl, true); |
| 764 | |
| 765 | /* |
| 766 | * fsync pg_commit_ts to ensure that any files flushed previously are |
| 767 | * durably on disk. |
| 768 | */ |
| 769 | fsync_fname("pg_commit_ts" , true); |
| 770 | } |
| 771 | |
| 772 | /* |
| 773 | * Make sure that CommitTs has room for a newly-allocated XID. |
| 774 | * |
| 775 | * NB: this is called while holding XidGenLock. We want it to be very fast |
| 776 | * most of the time; even when it's not so fast, no actual I/O need happen |
| 777 | * unless we're forced to write out a dirty CommitTs or xlog page to make room |
| 778 | * in shared memory. |
| 779 | * |
| 780 | * NB: the current implementation relies on track_commit_timestamp being |
| 781 | * PGC_POSTMASTER. |
| 782 | */ |
| 783 | void |
| 784 | ExtendCommitTs(TransactionId newestXact) |
| 785 | { |
| 786 | int pageno; |
| 787 | |
| 788 | /* |
| 789 | * Nothing to do if module not enabled. Note we do an unlocked read of |
| 790 | * the flag here, which is okay because this routine is only called from |
| 791 | * GetNewTransactionId, which is never called in a standby. |
| 792 | */ |
| 793 | Assert(!InRecovery); |
| 794 | if (!commitTsShared->commitTsActive) |
| 795 | return; |
| 796 | |
| 797 | /* |
| 798 | * No work except at first XID of a page. But beware: just after |
| 799 | * wraparound, the first XID of page zero is FirstNormalTransactionId. |
| 800 | */ |
| 801 | if (TransactionIdToCTsEntry(newestXact) != 0 && |
| 802 | !TransactionIdEquals(newestXact, FirstNormalTransactionId)) |
| 803 | return; |
| 804 | |
| 805 | pageno = TransactionIdToCTsPage(newestXact); |
| 806 | |
| 807 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
| 808 | |
| 809 | /* Zero the page and make an XLOG entry about it */ |
| 810 | ZeroCommitTsPage(pageno, !InRecovery); |
| 811 | |
| 812 | LWLockRelease(CommitTsControlLock); |
| 813 | } |
| 814 | |
| 815 | /* |
| 816 | * Remove all CommitTs segments before the one holding the passed |
| 817 | * transaction ID. |
| 818 | * |
| 819 | * Note that we don't need to flush XLOG here. |
| 820 | */ |
| 821 | void |
| 822 | TruncateCommitTs(TransactionId oldestXact) |
| 823 | { |
| 824 | int cutoffPage; |
| 825 | |
| 826 | /* |
| 827 | * The cutoff point is the start of the segment containing oldestXact. We |
| 828 | * pass the *page* containing oldestXact to SimpleLruTruncate. |
| 829 | */ |
| 830 | cutoffPage = TransactionIdToCTsPage(oldestXact); |
| 831 | |
| 832 | /* Check to see if there's any files that could be removed */ |
| 833 | if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence, |
| 834 | &cutoffPage)) |
| 835 | return; /* nothing to remove */ |
| 836 | |
| 837 | /* Write XLOG record */ |
| 838 | WriteTruncateXlogRec(cutoffPage, oldestXact); |
| 839 | |
| 840 | /* Now we can remove the old CommitTs segment(s) */ |
| 841 | SimpleLruTruncate(CommitTsCtl, cutoffPage); |
| 842 | } |
| 843 | |
| 844 | /* |
| 845 | * Set the limit values between which commit TS can be consulted. |
| 846 | */ |
| 847 | void |
| 848 | SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact) |
| 849 | { |
| 850 | /* |
| 851 | * Be careful not to overwrite values that are either further into the |
| 852 | * "future" or signal a disabled committs. |
| 853 | */ |
| 854 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
| 855 | if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId) |
| 856 | { |
| 857 | if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact)) |
| 858 | ShmemVariableCache->oldestCommitTsXid = oldestXact; |
| 859 | if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid)) |
| 860 | ShmemVariableCache->newestCommitTsXid = newestXact; |
| 861 | } |
| 862 | else |
| 863 | { |
| 864 | Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId); |
| 865 | ShmemVariableCache->oldestCommitTsXid = oldestXact; |
| 866 | ShmemVariableCache->newestCommitTsXid = newestXact; |
| 867 | } |
| 868 | LWLockRelease(CommitTsLock); |
| 869 | } |
| 870 | |
| 871 | /* |
| 872 | * Move forwards the oldest commitTS value that can be consulted |
| 873 | */ |
| 874 | void |
| 875 | AdvanceOldestCommitTsXid(TransactionId oldestXact) |
| 876 | { |
| 877 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
| 878 | if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId && |
| 879 | TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact)) |
| 880 | ShmemVariableCache->oldestCommitTsXid = oldestXact; |
| 881 | LWLockRelease(CommitTsLock); |
| 882 | } |
| 883 | |
| 884 | |
| 885 | /* |
| 886 | * Decide which of two commitTS page numbers is "older" for truncation |
| 887 | * purposes. |
| 888 | * |
| 889 | * We need to use comparison of TransactionIds here in order to do the right |
| 890 | * thing with wraparound XID arithmetic. However, if we are asked about |
| 891 | * page number zero, we don't want to hand InvalidTransactionId to |
| 892 | * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So, |
| 893 | * offset both xids by FirstNormalTransactionId to avoid that. |
| 894 | */ |
| 895 | static bool |
| 896 | CommitTsPagePrecedes(int page1, int page2) |
| 897 | { |
| 898 | TransactionId xid1; |
| 899 | TransactionId xid2; |
| 900 | |
| 901 | xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE; |
| 902 | xid1 += FirstNormalTransactionId; |
| 903 | xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE; |
| 904 | xid2 += FirstNormalTransactionId; |
| 905 | |
| 906 | return TransactionIdPrecedes(xid1, xid2); |
| 907 | } |
| 908 | |
| 909 | |
| 910 | /* |
| 911 | * Write a ZEROPAGE xlog record |
| 912 | */ |
| 913 | static void |
| 914 | WriteZeroPageXlogRec(int pageno) |
| 915 | { |
| 916 | XLogBeginInsert(); |
| 917 | XLogRegisterData((char *) (&pageno), sizeof(int)); |
| 918 | (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE); |
| 919 | } |
| 920 | |
| 921 | /* |
| 922 | * Write a TRUNCATE xlog record |
| 923 | */ |
| 924 | static void |
| 925 | WriteTruncateXlogRec(int pageno, TransactionId oldestXid) |
| 926 | { |
| 927 | xl_commit_ts_truncate xlrec; |
| 928 | |
| 929 | xlrec.pageno = pageno; |
| 930 | xlrec.oldestXid = oldestXid; |
| 931 | |
| 932 | XLogBeginInsert(); |
| 933 | XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate); |
| 934 | (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE); |
| 935 | } |
| 936 | |
| 937 | /* |
| 938 | * Write a SETTS xlog record |
| 939 | */ |
| 940 | static void |
| 941 | WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, |
| 942 | TransactionId *subxids, TimestampTz timestamp, |
| 943 | RepOriginId nodeid) |
| 944 | { |
| 945 | xl_commit_ts_set record; |
| 946 | |
| 947 | record.timestamp = timestamp; |
| 948 | record.nodeid = nodeid; |
| 949 | record.mainxid = mainxid; |
| 950 | |
| 951 | XLogBeginInsert(); |
| 952 | XLogRegisterData((char *) &record, |
| 953 | offsetof(xl_commit_ts_set, mainxid) + |
| 954 | sizeof(TransactionId)); |
| 955 | XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId)); |
| 956 | XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS); |
| 957 | } |
| 958 | |
| 959 | /* |
| 960 | * CommitTS resource manager's routines |
| 961 | */ |
| 962 | void |
| 963 | commit_ts_redo(XLogReaderState *record) |
| 964 | { |
| 965 | uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
| 966 | |
| 967 | /* Backup blocks are not used in commit_ts records */ |
| 968 | Assert(!XLogRecHasAnyBlockRefs(record)); |
| 969 | |
| 970 | if (info == COMMIT_TS_ZEROPAGE) |
| 971 | { |
| 972 | int pageno; |
| 973 | int slotno; |
| 974 | |
| 975 | memcpy(&pageno, XLogRecGetData(record), sizeof(int)); |
| 976 | |
| 977 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
| 978 | |
| 979 | slotno = ZeroCommitTsPage(pageno, false); |
| 980 | SimpleLruWritePage(CommitTsCtl, slotno); |
| 981 | Assert(!CommitTsCtl->shared->page_dirty[slotno]); |
| 982 | |
| 983 | LWLockRelease(CommitTsControlLock); |
| 984 | } |
| 985 | else if (info == COMMIT_TS_TRUNCATE) |
| 986 | { |
| 987 | xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record); |
| 988 | |
| 989 | AdvanceOldestCommitTsXid(trunc->oldestXid); |
| 990 | |
| 991 | /* |
| 992 | * During XLOG replay, latest_page_number isn't set up yet; insert a |
| 993 | * suitable value to bypass the sanity test in SimpleLruTruncate. |
| 994 | */ |
| 995 | CommitTsCtl->shared->latest_page_number = trunc->pageno; |
| 996 | |
| 997 | SimpleLruTruncate(CommitTsCtl, trunc->pageno); |
| 998 | } |
| 999 | else if (info == COMMIT_TS_SETTS) |
| 1000 | { |
| 1001 | xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record); |
| 1002 | int nsubxids; |
| 1003 | TransactionId *subxids; |
| 1004 | |
| 1005 | nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) / |
| 1006 | sizeof(TransactionId)); |
| 1007 | if (nsubxids > 0) |
| 1008 | { |
| 1009 | subxids = palloc(sizeof(TransactionId) * nsubxids); |
| 1010 | memcpy(subxids, |
| 1011 | XLogRecGetData(record) + SizeOfCommitTsSet, |
| 1012 | sizeof(TransactionId) * nsubxids); |
| 1013 | } |
| 1014 | else |
| 1015 | subxids = NULL; |
| 1016 | |
| 1017 | TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids, |
| 1018 | setts->timestamp, setts->nodeid, true); |
| 1019 | if (subxids) |
| 1020 | pfree(subxids); |
| 1021 | } |
| 1022 | else |
| 1023 | elog(PANIC, "commit_ts_redo: unknown op code %u" , info); |
| 1024 | } |
| 1025 | |