1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * commit_ts.c |
4 | * PostgreSQL commit timestamp manager |
5 | * |
6 | * This module is a pg_xact-like system that stores the commit timestamp |
7 | * for each transaction. |
8 | * |
9 | * XLOG interactions: this module generates an XLOG record whenever a new |
10 | * CommitTs page is initialized to zeroes. Also, one XLOG record is |
11 | * generated for setting of values when the caller requests it; this allows |
12 | * us to support values coming from places other than transaction commit. |
13 | * Other writes of CommitTS come from recording of transaction commit in |
14 | * xact.c, which generates its own XLOG records for these events and will |
15 | * re-perform the status update on redo; so we need make no additional XLOG |
16 | * entry here. |
17 | * |
18 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
19 | * Portions Copyright (c) 1994, Regents of the University of California |
20 | * |
21 | * src/backend/access/transam/commit_ts.c |
22 | * |
23 | *------------------------------------------------------------------------- |
24 | */ |
25 | #include "postgres.h" |
26 | |
27 | #include "access/commit_ts.h" |
28 | #include "access/htup_details.h" |
29 | #include "access/slru.h" |
30 | #include "access/transam.h" |
31 | #include "catalog/pg_type.h" |
32 | #include "funcapi.h" |
33 | #include "miscadmin.h" |
34 | #include "pg_trace.h" |
35 | #include "storage/shmem.h" |
36 | #include "utils/builtins.h" |
37 | #include "utils/snapmgr.h" |
38 | #include "utils/timestamp.h" |
39 | |
40 | /* |
41 | * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used |
42 | * everywhere else in Postgres. |
43 | * |
44 | * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, |
45 | * CommitTs page numbering also wraps around at |
46 | * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at |
47 | * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no |
48 | * explicit notice of that fact in this module, except when comparing segment |
49 | * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes). |
50 | */ |
51 | |
52 | /* |
53 | * We need 8+2 bytes per xact. Note that enlarging this struct might mean |
54 | * the largest possible file name is more than 5 chars long; see |
55 | * SlruScanDirectory. |
56 | */ |
57 | typedef struct CommitTimestampEntry |
58 | { |
59 | TimestampTz time; |
60 | RepOriginId nodeid; |
61 | } CommitTimestampEntry; |
62 | |
63 | #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \ |
64 | sizeof(RepOriginId)) |
65 | |
66 | #define COMMIT_TS_XACTS_PER_PAGE \ |
67 | (BLCKSZ / SizeOfCommitTimestampEntry) |
68 | |
69 | #define TransactionIdToCTsPage(xid) \ |
70 | ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE) |
71 | #define TransactionIdToCTsEntry(xid) \ |
72 | ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE) |
73 | |
74 | /* |
75 | * Link to shared-memory data structures for CommitTs control |
76 | */ |
77 | static SlruCtlData CommitTsCtlData; |
78 | |
79 | #define CommitTsCtl (&CommitTsCtlData) |
80 | |
81 | /* |
82 | * We keep a cache of the last value set in shared memory. |
83 | * |
84 | * This is also good place to keep the activation status. We keep this |
85 | * separate from the GUC so that the standby can activate the module if the |
86 | * primary has it active independently of the value of the GUC. |
87 | * |
88 | * This is protected by CommitTsLock. In some places, we use commitTsActive |
89 | * without acquiring the lock; where this happens, a comment explains the |
90 | * rationale for it. |
91 | */ |
92 | typedef struct CommitTimestampShared |
93 | { |
94 | TransactionId xidLastCommit; |
95 | CommitTimestampEntry dataLastCommit; |
96 | bool commitTsActive; |
97 | } CommitTimestampShared; |
98 | |
99 | CommitTimestampShared *commitTsShared; |
100 | |
101 | |
102 | /* GUC variable */ |
103 | bool track_commit_timestamp; |
104 | |
105 | static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, |
106 | TransactionId *subxids, TimestampTz ts, |
107 | RepOriginId nodeid, int pageno); |
108 | static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, |
109 | RepOriginId nodeid, int slotno); |
110 | static void error_commit_ts_disabled(void); |
111 | static int ZeroCommitTsPage(int pageno, bool writeXlog); |
112 | static bool CommitTsPagePrecedes(int page1, int page2); |
113 | static void ActivateCommitTs(void); |
114 | static void DeactivateCommitTs(void); |
115 | static void WriteZeroPageXlogRec(int pageno); |
116 | static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid); |
117 | static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, |
118 | TransactionId *subxids, TimestampTz timestamp, |
119 | RepOriginId nodeid); |
120 | |
121 | /* |
122 | * TransactionTreeSetCommitTsData |
123 | * |
124 | * Record the final commit timestamp of transaction entries in the commit log |
125 | * for a transaction and its subtransaction tree, as efficiently as possible. |
126 | * |
127 | * xid is the top level transaction id. |
128 | * |
129 | * subxids is an array of xids of length nsubxids, representing subtransactions |
130 | * in the tree of xid. In various cases nsubxids may be zero. |
131 | * The reason why tracking just the parent xid commit timestamp is not enough |
132 | * is that the subtrans SLRU does not stay valid across crashes (it's not |
133 | * permanent) so we need to keep the information about them here. If the |
134 | * subtrans implementation changes in the future, we might want to revisit the |
135 | * decision of storing timestamp info for each subxid. |
136 | * |
137 | * The write_xlog parameter tells us whether to include an XLog record of this |
138 | * or not. Normally, this is called from transaction commit routines (both |
139 | * normal and prepared) and the information will be stored in the transaction |
140 | * commit XLog record, and so they should pass "false" for this. The XLog redo |
141 | * code should use "false" here as well. Other callers probably want to pass |
142 | * true, so that the given values persist in case of crashes. |
143 | */ |
144 | void |
145 | TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, |
146 | TransactionId *subxids, TimestampTz timestamp, |
147 | RepOriginId nodeid, bool write_xlog) |
148 | { |
149 | int i; |
150 | TransactionId headxid; |
151 | TransactionId newestXact; |
152 | |
153 | /* |
154 | * No-op if the module is not active. |
155 | * |
156 | * An unlocked read here is fine, because in a standby (the only place |
157 | * where the flag can change in flight) this routine is only called by the |
158 | * recovery process, which is also the only process which can change the |
159 | * flag. |
160 | */ |
161 | if (!commitTsShared->commitTsActive) |
162 | return; |
163 | |
164 | /* |
165 | * Comply with the WAL-before-data rule: if caller specified it wants this |
166 | * value to be recorded in WAL, do so before touching the data. |
167 | */ |
168 | if (write_xlog) |
169 | WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid); |
170 | |
171 | /* |
172 | * Figure out the latest Xid in this batch: either the last subxid if |
173 | * there's any, otherwise the parent xid. |
174 | */ |
175 | if (nsubxids > 0) |
176 | newestXact = subxids[nsubxids - 1]; |
177 | else |
178 | newestXact = xid; |
179 | |
180 | /* |
181 | * We split the xids to set the timestamp to in groups belonging to the |
182 | * same SLRU page; the first element in each such set is its head. The |
183 | * first group has the main XID as the head; subsequent sets use the first |
184 | * subxid not on the previous page as head. This way, we only have to |
185 | * lock/modify each SLRU page once. |
186 | */ |
187 | for (i = 0, headxid = xid;;) |
188 | { |
189 | int pageno = TransactionIdToCTsPage(headxid); |
190 | int j; |
191 | |
192 | for (j = i; j < nsubxids; j++) |
193 | { |
194 | if (TransactionIdToCTsPage(subxids[j]) != pageno) |
195 | break; |
196 | } |
197 | /* subxids[i..j] are on the same page as the head */ |
198 | |
199 | SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid, |
200 | pageno); |
201 | |
202 | /* if we wrote out all subxids, we're done. */ |
203 | if (j + 1 >= nsubxids) |
204 | break; |
205 | |
206 | /* |
207 | * Set the new head and skip over it, as well as over the subxids we |
208 | * just wrote. |
209 | */ |
210 | headxid = subxids[j]; |
211 | i += j - i + 1; |
212 | } |
213 | |
214 | /* update the cached value in shared memory */ |
215 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
216 | commitTsShared->xidLastCommit = xid; |
217 | commitTsShared->dataLastCommit.time = timestamp; |
218 | commitTsShared->dataLastCommit.nodeid = nodeid; |
219 | |
220 | /* and move forwards our endpoint, if needed */ |
221 | if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact)) |
222 | ShmemVariableCache->newestCommitTsXid = newestXact; |
223 | LWLockRelease(CommitTsLock); |
224 | } |
225 | |
226 | /* |
227 | * Record the commit timestamp of transaction entries in the commit log for all |
228 | * entries on a single page. Atomic only on this page. |
229 | */ |
230 | static void |
231 | SetXidCommitTsInPage(TransactionId xid, int nsubxids, |
232 | TransactionId *subxids, TimestampTz ts, |
233 | RepOriginId nodeid, int pageno) |
234 | { |
235 | int slotno; |
236 | int i; |
237 | |
238 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
239 | |
240 | slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); |
241 | |
242 | TransactionIdSetCommitTs(xid, ts, nodeid, slotno); |
243 | for (i = 0; i < nsubxids; i++) |
244 | TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno); |
245 | |
246 | CommitTsCtl->shared->page_dirty[slotno] = true; |
247 | |
248 | LWLockRelease(CommitTsControlLock); |
249 | } |
250 | |
251 | /* |
252 | * Sets the commit timestamp of a single transaction. |
253 | * |
254 | * Must be called with CommitTsControlLock held |
255 | */ |
256 | static void |
257 | TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, |
258 | RepOriginId nodeid, int slotno) |
259 | { |
260 | int entryno = TransactionIdToCTsEntry(xid); |
261 | CommitTimestampEntry entry; |
262 | |
263 | Assert(TransactionIdIsNormal(xid)); |
264 | |
265 | entry.time = ts; |
266 | entry.nodeid = nodeid; |
267 | |
268 | memcpy(CommitTsCtl->shared->page_buffer[slotno] + |
269 | SizeOfCommitTimestampEntry * entryno, |
270 | &entry, SizeOfCommitTimestampEntry); |
271 | } |
272 | |
273 | /* |
274 | * Interrogate the commit timestamp of a transaction. |
275 | * |
276 | * The return value indicates whether a commit timestamp record was found for |
277 | * the given xid. The timestamp value is returned in *ts (which may not be |
278 | * null), and the origin node for the Xid is returned in *nodeid, if it's not |
279 | * null. |
280 | */ |
281 | bool |
282 | TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, |
283 | RepOriginId *nodeid) |
284 | { |
285 | int pageno = TransactionIdToCTsPage(xid); |
286 | int entryno = TransactionIdToCTsEntry(xid); |
287 | int slotno; |
288 | CommitTimestampEntry entry; |
289 | TransactionId oldestCommitTsXid; |
290 | TransactionId newestCommitTsXid; |
291 | |
292 | if (!TransactionIdIsValid(xid)) |
293 | ereport(ERROR, |
294 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
295 | errmsg("cannot retrieve commit timestamp for transaction %u" , xid))); |
296 | else if (!TransactionIdIsNormal(xid)) |
297 | { |
298 | /* frozen and bootstrap xids are always committed far in the past */ |
299 | *ts = 0; |
300 | if (nodeid) |
301 | *nodeid = 0; |
302 | return false; |
303 | } |
304 | |
305 | LWLockAcquire(CommitTsLock, LW_SHARED); |
306 | |
307 | /* Error if module not enabled */ |
308 | if (!commitTsShared->commitTsActive) |
309 | error_commit_ts_disabled(); |
310 | |
311 | /* |
312 | * If we're asked for the cached value, return that. Otherwise, fall |
313 | * through to read from SLRU. |
314 | */ |
315 | if (commitTsShared->xidLastCommit == xid) |
316 | { |
317 | *ts = commitTsShared->dataLastCommit.time; |
318 | if (nodeid) |
319 | *nodeid = commitTsShared->dataLastCommit.nodeid; |
320 | |
321 | LWLockRelease(CommitTsLock); |
322 | return *ts != 0; |
323 | } |
324 | |
325 | oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid; |
326 | newestCommitTsXid = ShmemVariableCache->newestCommitTsXid; |
327 | /* neither is invalid, or both are */ |
328 | Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid)); |
329 | LWLockRelease(CommitTsLock); |
330 | |
331 | /* |
332 | * Return empty if the requested value is outside our valid range. |
333 | */ |
334 | if (!TransactionIdIsValid(oldestCommitTsXid) || |
335 | TransactionIdPrecedes(xid, oldestCommitTsXid) || |
336 | TransactionIdPrecedes(newestCommitTsXid, xid)) |
337 | { |
338 | *ts = 0; |
339 | if (nodeid) |
340 | *nodeid = InvalidRepOriginId; |
341 | return false; |
342 | } |
343 | |
344 | /* lock is acquired by SimpleLruReadPage_ReadOnly */ |
345 | slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid); |
346 | memcpy(&entry, |
347 | CommitTsCtl->shared->page_buffer[slotno] + |
348 | SizeOfCommitTimestampEntry * entryno, |
349 | SizeOfCommitTimestampEntry); |
350 | |
351 | *ts = entry.time; |
352 | if (nodeid) |
353 | *nodeid = entry.nodeid; |
354 | |
355 | LWLockRelease(CommitTsControlLock); |
356 | return *ts != 0; |
357 | } |
358 | |
359 | /* |
360 | * Return the Xid of the latest committed transaction. (As far as this module |
361 | * is concerned, anyway; it's up to the caller to ensure the value is useful |
362 | * for its purposes.) |
363 | * |
364 | * ts and extra are filled with the corresponding data; they can be passed |
365 | * as NULL if not wanted. |
366 | */ |
367 | TransactionId |
368 | GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid) |
369 | { |
370 | TransactionId xid; |
371 | |
372 | LWLockAcquire(CommitTsLock, LW_SHARED); |
373 | |
374 | /* Error if module not enabled */ |
375 | if (!commitTsShared->commitTsActive) |
376 | error_commit_ts_disabled(); |
377 | |
378 | xid = commitTsShared->xidLastCommit; |
379 | if (ts) |
380 | *ts = commitTsShared->dataLastCommit.time; |
381 | if (nodeid) |
382 | *nodeid = commitTsShared->dataLastCommit.nodeid; |
383 | LWLockRelease(CommitTsLock); |
384 | |
385 | return xid; |
386 | } |
387 | |
388 | static void |
389 | error_commit_ts_disabled(void) |
390 | { |
391 | ereport(ERROR, |
392 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
393 | errmsg("could not get commit timestamp data" ), |
394 | RecoveryInProgress() ? |
395 | errhint("Make sure the configuration parameter \"%s\" is set on the master server." , |
396 | "track_commit_timestamp" ) : |
397 | errhint("Make sure the configuration parameter \"%s\" is set." , |
398 | "track_commit_timestamp" ))); |
399 | } |
400 | |
401 | /* |
402 | * SQL-callable wrapper to obtain commit time of a transaction |
403 | */ |
404 | Datum |
405 | pg_xact_commit_timestamp(PG_FUNCTION_ARGS) |
406 | { |
407 | TransactionId xid = PG_GETARG_UINT32(0); |
408 | TimestampTz ts; |
409 | bool found; |
410 | |
411 | found = TransactionIdGetCommitTsData(xid, &ts, NULL); |
412 | |
413 | if (!found) |
414 | PG_RETURN_NULL(); |
415 | |
416 | PG_RETURN_TIMESTAMPTZ(ts); |
417 | } |
418 | |
419 | |
420 | Datum |
421 | pg_last_committed_xact(PG_FUNCTION_ARGS) |
422 | { |
423 | TransactionId xid; |
424 | TimestampTz ts; |
425 | Datum values[2]; |
426 | bool nulls[2]; |
427 | TupleDesc tupdesc; |
428 | HeapTuple htup; |
429 | |
430 | /* and construct a tuple with our data */ |
431 | xid = GetLatestCommitTsData(&ts, NULL); |
432 | |
433 | /* |
434 | * Construct a tuple descriptor for the result row. This must match this |
435 | * function's pg_proc entry! |
436 | */ |
437 | tupdesc = CreateTemplateTupleDesc(2); |
438 | TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid" , |
439 | XIDOID, -1, 0); |
440 | TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp" , |
441 | TIMESTAMPTZOID, -1, 0); |
442 | tupdesc = BlessTupleDesc(tupdesc); |
443 | |
444 | if (!TransactionIdIsNormal(xid)) |
445 | { |
446 | memset(nulls, true, sizeof(nulls)); |
447 | } |
448 | else |
449 | { |
450 | values[0] = TransactionIdGetDatum(xid); |
451 | nulls[0] = false; |
452 | |
453 | values[1] = TimestampTzGetDatum(ts); |
454 | nulls[1] = false; |
455 | } |
456 | |
457 | htup = heap_form_tuple(tupdesc, values, nulls); |
458 | |
459 | PG_RETURN_DATUM(HeapTupleGetDatum(htup)); |
460 | } |
461 | |
462 | |
463 | /* |
464 | * Number of shared CommitTS buffers. |
465 | * |
466 | * We use a very similar logic as for the number of CLOG buffers; see comments |
467 | * in CLOGShmemBuffers. |
468 | */ |
469 | Size |
470 | CommitTsShmemBuffers(void) |
471 | { |
472 | return Min(16, Max(4, NBuffers / 1024)); |
473 | } |
474 | |
475 | /* |
476 | * Shared memory sizing for CommitTs |
477 | */ |
478 | Size |
479 | CommitTsShmemSize(void) |
480 | { |
481 | return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) + |
482 | sizeof(CommitTimestampShared); |
483 | } |
484 | |
485 | /* |
486 | * Initialize CommitTs at system startup (postmaster start or standalone |
487 | * backend) |
488 | */ |
489 | void |
490 | CommitTsShmemInit(void) |
491 | { |
492 | bool found; |
493 | |
494 | CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; |
495 | SimpleLruInit(CommitTsCtl, "commit_timestamp" , CommitTsShmemBuffers(), 0, |
496 | CommitTsControlLock, "pg_commit_ts" , |
497 | LWTRANCHE_COMMITTS_BUFFERS); |
498 | |
499 | commitTsShared = ShmemInitStruct("CommitTs shared" , |
500 | sizeof(CommitTimestampShared), |
501 | &found); |
502 | |
503 | if (!IsUnderPostmaster) |
504 | { |
505 | Assert(!found); |
506 | |
507 | commitTsShared->xidLastCommit = InvalidTransactionId; |
508 | TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); |
509 | commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; |
510 | commitTsShared->commitTsActive = false; |
511 | } |
512 | else |
513 | Assert(found); |
514 | } |
515 | |
516 | /* |
517 | * This function must be called ONCE on system install. |
518 | * |
519 | * (The CommitTs directory is assumed to have been created by initdb, and |
520 | * CommitTsShmemInit must have been called already.) |
521 | */ |
522 | void |
523 | BootStrapCommitTs(void) |
524 | { |
525 | /* |
526 | * Nothing to do here at present, unlike most other SLRU modules; segments |
527 | * are created when the server is started with this module enabled. See |
528 | * ActivateCommitTs. |
529 | */ |
530 | } |
531 | |
532 | /* |
533 | * Initialize (or reinitialize) a page of CommitTs to zeroes. |
534 | * If writeXlog is true, also emit an XLOG record saying we did this. |
535 | * |
536 | * The page is not actually written, just set up in shared memory. |
537 | * The slot number of the new page is returned. |
538 | * |
539 | * Control lock must be held at entry, and will be held at exit. |
540 | */ |
541 | static int |
542 | ZeroCommitTsPage(int pageno, bool writeXlog) |
543 | { |
544 | int slotno; |
545 | |
546 | slotno = SimpleLruZeroPage(CommitTsCtl, pageno); |
547 | |
548 | if (writeXlog) |
549 | WriteZeroPageXlogRec(pageno); |
550 | |
551 | return slotno; |
552 | } |
553 | |
554 | /* |
555 | * This must be called ONCE during postmaster or standalone-backend startup, |
556 | * after StartupXLOG has initialized ShmemVariableCache->nextFullXid. |
557 | */ |
558 | void |
559 | StartupCommitTs(void) |
560 | { |
561 | ActivateCommitTs(); |
562 | } |
563 | |
564 | /* |
565 | * This must be called ONCE during postmaster or standalone-backend startup, |
566 | * after recovery has finished. |
567 | */ |
568 | void |
569 | CompleteCommitTsInitialization(void) |
570 | { |
571 | /* |
572 | * If the feature is not enabled, turn it off for good. This also removes |
573 | * any leftover data. |
574 | * |
575 | * Conversely, we activate the module if the feature is enabled. This is |
576 | * necessary for primary and standby as the activation depends on the |
577 | * control file contents at the beginning of recovery or when a |
578 | * XLOG_PARAMETER_CHANGE is replayed. |
579 | */ |
580 | if (!track_commit_timestamp) |
581 | DeactivateCommitTs(); |
582 | else |
583 | ActivateCommitTs(); |
584 | } |
585 | |
586 | /* |
587 | * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE |
588 | * XLog record during recovery. |
589 | */ |
590 | void |
591 | CommitTsParameterChange(bool newvalue, bool oldvalue) |
592 | { |
593 | /* |
594 | * If the commit_ts module is disabled in this server and we get word from |
595 | * the master server that it is enabled there, activate it so that we can |
596 | * replay future WAL records involving it; also mark it as active on |
597 | * pg_control. If the old value was already set, we already did this, so |
598 | * don't do anything. |
599 | * |
600 | * If the module is disabled in the master, disable it here too, unless |
601 | * the module is enabled locally. |
602 | * |
603 | * Note this only runs in the recovery process, so an unlocked read is |
604 | * fine. |
605 | */ |
606 | if (newvalue) |
607 | { |
608 | if (!commitTsShared->commitTsActive) |
609 | ActivateCommitTs(); |
610 | } |
611 | else if (commitTsShared->commitTsActive) |
612 | DeactivateCommitTs(); |
613 | } |
614 | |
615 | /* |
616 | * Activate this module whenever necessary. |
617 | * This must happen during postmaster or standalone-backend startup, |
618 | * or during WAL replay anytime the track_commit_timestamp setting is |
619 | * changed in the master. |
620 | * |
621 | * The reason why this SLRU needs separate activation/deactivation functions is |
622 | * that it can be enabled/disabled during start and the activation/deactivation |
623 | * on master is propagated to standby via replay. Other SLRUs don't have this |
624 | * property and they can be just initialized during normal startup. |
625 | * |
626 | * This is in charge of creating the currently active segment, if it's not |
627 | * already there. The reason for this is that the server might have been |
628 | * running with this module disabled for a while and thus might have skipped |
629 | * the normal creation point. |
630 | */ |
631 | static void |
632 | ActivateCommitTs(void) |
633 | { |
634 | TransactionId xid; |
635 | int pageno; |
636 | |
637 | /* If we've done this already, there's nothing to do */ |
638 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
639 | if (commitTsShared->commitTsActive) |
640 | { |
641 | LWLockRelease(CommitTsLock); |
642 | return; |
643 | } |
644 | LWLockRelease(CommitTsLock); |
645 | |
646 | xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid); |
647 | pageno = TransactionIdToCTsPage(xid); |
648 | |
649 | /* |
650 | * Re-Initialize our idea of the latest page number. |
651 | */ |
652 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
653 | CommitTsCtl->shared->latest_page_number = pageno; |
654 | LWLockRelease(CommitTsControlLock); |
655 | |
656 | /* |
657 | * If CommitTs is enabled, but it wasn't in the previous server run, we |
658 | * need to set the oldest and newest values to the next Xid; that way, we |
659 | * will not try to read data that might not have been set. |
660 | * |
661 | * XXX does this have a problem if a server is started with commitTs |
662 | * enabled, then started with commitTs disabled, then restarted with it |
663 | * enabled again? It doesn't look like it does, because there should be a |
664 | * checkpoint that sets the value to InvalidTransactionId at end of |
665 | * recovery; and so any chance of injecting new transactions without |
666 | * CommitTs values would occur after the oldestCommitTsXid has been set to |
667 | * Invalid temporarily. |
668 | */ |
669 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
670 | if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId) |
671 | { |
672 | ShmemVariableCache->oldestCommitTsXid = |
673 | ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId(); |
674 | } |
675 | LWLockRelease(CommitTsLock); |
676 | |
677 | /* Create the current segment file, if necessary */ |
678 | if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) |
679 | { |
680 | int slotno; |
681 | |
682 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
683 | slotno = ZeroCommitTsPage(pageno, false); |
684 | SimpleLruWritePage(CommitTsCtl, slotno); |
685 | Assert(!CommitTsCtl->shared->page_dirty[slotno]); |
686 | LWLockRelease(CommitTsControlLock); |
687 | } |
688 | |
689 | /* Change the activation status in shared memory. */ |
690 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
691 | commitTsShared->commitTsActive = true; |
692 | LWLockRelease(CommitTsLock); |
693 | } |
694 | |
695 | /* |
696 | * Deactivate this module. |
697 | * |
698 | * This must be called when the track_commit_timestamp parameter is turned off. |
699 | * This happens during postmaster or standalone-backend startup, or during WAL |
700 | * replay. |
701 | * |
702 | * Resets CommitTs into invalid state to make sure we don't hand back |
703 | * possibly-invalid data; also removes segments of old data. |
704 | */ |
705 | static void |
706 | DeactivateCommitTs(void) |
707 | { |
708 | /* |
709 | * Cleanup the status in the shared memory. |
710 | * |
711 | * We reset everything in the commitTsShared record to prevent user from |
712 | * getting confusing data about last committed transaction on the standby |
713 | * when the module was activated repeatedly on the primary. |
714 | */ |
715 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
716 | |
717 | commitTsShared->commitTsActive = false; |
718 | commitTsShared->xidLastCommit = InvalidTransactionId; |
719 | TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); |
720 | commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId; |
721 | |
722 | ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId; |
723 | ShmemVariableCache->newestCommitTsXid = InvalidTransactionId; |
724 | |
725 | LWLockRelease(CommitTsLock); |
726 | |
727 | /* |
728 | * Remove *all* files. This is necessary so that there are no leftover |
729 | * files; in the case where this feature is later enabled after running |
730 | * with it disabled for some time there may be a gap in the file sequence. |
731 | * (We can probably tolerate out-of-sequence files, as they are going to |
732 | * be overwritten anyway when we wrap around, but it seems better to be |
733 | * tidy.) |
734 | */ |
735 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
736 | (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL); |
737 | LWLockRelease(CommitTsControlLock); |
738 | } |
739 | |
740 | /* |
741 | * This must be called ONCE during postmaster or standalone-backend shutdown |
742 | */ |
743 | void |
744 | ShutdownCommitTs(void) |
745 | { |
746 | /* Flush dirty CommitTs pages to disk */ |
747 | SimpleLruFlush(CommitTsCtl, false); |
748 | |
749 | /* |
750 | * fsync pg_commit_ts to ensure that any files flushed previously are |
751 | * durably on disk. |
752 | */ |
753 | fsync_fname("pg_commit_ts" , true); |
754 | } |
755 | |
756 | /* |
757 | * Perform a checkpoint --- either during shutdown, or on-the-fly |
758 | */ |
759 | void |
760 | CheckPointCommitTs(void) |
761 | { |
762 | /* Flush dirty CommitTs pages to disk */ |
763 | SimpleLruFlush(CommitTsCtl, true); |
764 | |
765 | /* |
766 | * fsync pg_commit_ts to ensure that any files flushed previously are |
767 | * durably on disk. |
768 | */ |
769 | fsync_fname("pg_commit_ts" , true); |
770 | } |
771 | |
772 | /* |
773 | * Make sure that CommitTs has room for a newly-allocated XID. |
774 | * |
775 | * NB: this is called while holding XidGenLock. We want it to be very fast |
776 | * most of the time; even when it's not so fast, no actual I/O need happen |
777 | * unless we're forced to write out a dirty CommitTs or xlog page to make room |
778 | * in shared memory. |
779 | * |
780 | * NB: the current implementation relies on track_commit_timestamp being |
781 | * PGC_POSTMASTER. |
782 | */ |
783 | void |
784 | ExtendCommitTs(TransactionId newestXact) |
785 | { |
786 | int pageno; |
787 | |
788 | /* |
789 | * Nothing to do if module not enabled. Note we do an unlocked read of |
790 | * the flag here, which is okay because this routine is only called from |
791 | * GetNewTransactionId, which is never called in a standby. |
792 | */ |
793 | Assert(!InRecovery); |
794 | if (!commitTsShared->commitTsActive) |
795 | return; |
796 | |
797 | /* |
798 | * No work except at first XID of a page. But beware: just after |
799 | * wraparound, the first XID of page zero is FirstNormalTransactionId. |
800 | */ |
801 | if (TransactionIdToCTsEntry(newestXact) != 0 && |
802 | !TransactionIdEquals(newestXact, FirstNormalTransactionId)) |
803 | return; |
804 | |
805 | pageno = TransactionIdToCTsPage(newestXact); |
806 | |
807 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
808 | |
809 | /* Zero the page and make an XLOG entry about it */ |
810 | ZeroCommitTsPage(pageno, !InRecovery); |
811 | |
812 | LWLockRelease(CommitTsControlLock); |
813 | } |
814 | |
815 | /* |
816 | * Remove all CommitTs segments before the one holding the passed |
817 | * transaction ID. |
818 | * |
819 | * Note that we don't need to flush XLOG here. |
820 | */ |
821 | void |
822 | TruncateCommitTs(TransactionId oldestXact) |
823 | { |
824 | int cutoffPage; |
825 | |
826 | /* |
827 | * The cutoff point is the start of the segment containing oldestXact. We |
828 | * pass the *page* containing oldestXact to SimpleLruTruncate. |
829 | */ |
830 | cutoffPage = TransactionIdToCTsPage(oldestXact); |
831 | |
832 | /* Check to see if there's any files that could be removed */ |
833 | if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence, |
834 | &cutoffPage)) |
835 | return; /* nothing to remove */ |
836 | |
837 | /* Write XLOG record */ |
838 | WriteTruncateXlogRec(cutoffPage, oldestXact); |
839 | |
840 | /* Now we can remove the old CommitTs segment(s) */ |
841 | SimpleLruTruncate(CommitTsCtl, cutoffPage); |
842 | } |
843 | |
844 | /* |
845 | * Set the limit values between which commit TS can be consulted. |
846 | */ |
847 | void |
848 | SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact) |
849 | { |
850 | /* |
851 | * Be careful not to overwrite values that are either further into the |
852 | * "future" or signal a disabled committs. |
853 | */ |
854 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
855 | if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId) |
856 | { |
857 | if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact)) |
858 | ShmemVariableCache->oldestCommitTsXid = oldestXact; |
859 | if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid)) |
860 | ShmemVariableCache->newestCommitTsXid = newestXact; |
861 | } |
862 | else |
863 | { |
864 | Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId); |
865 | ShmemVariableCache->oldestCommitTsXid = oldestXact; |
866 | ShmemVariableCache->newestCommitTsXid = newestXact; |
867 | } |
868 | LWLockRelease(CommitTsLock); |
869 | } |
870 | |
871 | /* |
872 | * Move forwards the oldest commitTS value that can be consulted |
873 | */ |
874 | void |
875 | AdvanceOldestCommitTsXid(TransactionId oldestXact) |
876 | { |
877 | LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
878 | if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId && |
879 | TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact)) |
880 | ShmemVariableCache->oldestCommitTsXid = oldestXact; |
881 | LWLockRelease(CommitTsLock); |
882 | } |
883 | |
884 | |
885 | /* |
886 | * Decide which of two commitTS page numbers is "older" for truncation |
887 | * purposes. |
888 | * |
889 | * We need to use comparison of TransactionIds here in order to do the right |
890 | * thing with wraparound XID arithmetic. However, if we are asked about |
891 | * page number zero, we don't want to hand InvalidTransactionId to |
892 | * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So, |
893 | * offset both xids by FirstNormalTransactionId to avoid that. |
894 | */ |
895 | static bool |
896 | CommitTsPagePrecedes(int page1, int page2) |
897 | { |
898 | TransactionId xid1; |
899 | TransactionId xid2; |
900 | |
901 | xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE; |
902 | xid1 += FirstNormalTransactionId; |
903 | xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE; |
904 | xid2 += FirstNormalTransactionId; |
905 | |
906 | return TransactionIdPrecedes(xid1, xid2); |
907 | } |
908 | |
909 | |
910 | /* |
911 | * Write a ZEROPAGE xlog record |
912 | */ |
913 | static void |
914 | WriteZeroPageXlogRec(int pageno) |
915 | { |
916 | XLogBeginInsert(); |
917 | XLogRegisterData((char *) (&pageno), sizeof(int)); |
918 | (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE); |
919 | } |
920 | |
921 | /* |
922 | * Write a TRUNCATE xlog record |
923 | */ |
924 | static void |
925 | WriteTruncateXlogRec(int pageno, TransactionId oldestXid) |
926 | { |
927 | xl_commit_ts_truncate xlrec; |
928 | |
929 | xlrec.pageno = pageno; |
930 | xlrec.oldestXid = oldestXid; |
931 | |
932 | XLogBeginInsert(); |
933 | XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate); |
934 | (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE); |
935 | } |
936 | |
937 | /* |
938 | * Write a SETTS xlog record |
939 | */ |
940 | static void |
941 | WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, |
942 | TransactionId *subxids, TimestampTz timestamp, |
943 | RepOriginId nodeid) |
944 | { |
945 | xl_commit_ts_set record; |
946 | |
947 | record.timestamp = timestamp; |
948 | record.nodeid = nodeid; |
949 | record.mainxid = mainxid; |
950 | |
951 | XLogBeginInsert(); |
952 | XLogRegisterData((char *) &record, |
953 | offsetof(xl_commit_ts_set, mainxid) + |
954 | sizeof(TransactionId)); |
955 | XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId)); |
956 | XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS); |
957 | } |
958 | |
959 | /* |
960 | * CommitTS resource manager's routines |
961 | */ |
962 | void |
963 | commit_ts_redo(XLogReaderState *record) |
964 | { |
965 | uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
966 | |
967 | /* Backup blocks are not used in commit_ts records */ |
968 | Assert(!XLogRecHasAnyBlockRefs(record)); |
969 | |
970 | if (info == COMMIT_TS_ZEROPAGE) |
971 | { |
972 | int pageno; |
973 | int slotno; |
974 | |
975 | memcpy(&pageno, XLogRecGetData(record), sizeof(int)); |
976 | |
977 | LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
978 | |
979 | slotno = ZeroCommitTsPage(pageno, false); |
980 | SimpleLruWritePage(CommitTsCtl, slotno); |
981 | Assert(!CommitTsCtl->shared->page_dirty[slotno]); |
982 | |
983 | LWLockRelease(CommitTsControlLock); |
984 | } |
985 | else if (info == COMMIT_TS_TRUNCATE) |
986 | { |
987 | xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record); |
988 | |
989 | AdvanceOldestCommitTsXid(trunc->oldestXid); |
990 | |
991 | /* |
992 | * During XLOG replay, latest_page_number isn't set up yet; insert a |
993 | * suitable value to bypass the sanity test in SimpleLruTruncate. |
994 | */ |
995 | CommitTsCtl->shared->latest_page_number = trunc->pageno; |
996 | |
997 | SimpleLruTruncate(CommitTsCtl, trunc->pageno); |
998 | } |
999 | else if (info == COMMIT_TS_SETTS) |
1000 | { |
1001 | xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record); |
1002 | int nsubxids; |
1003 | TransactionId *subxids; |
1004 | |
1005 | nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) / |
1006 | sizeof(TransactionId)); |
1007 | if (nsubxids > 0) |
1008 | { |
1009 | subxids = palloc(sizeof(TransactionId) * nsubxids); |
1010 | memcpy(subxids, |
1011 | XLogRecGetData(record) + SizeOfCommitTsSet, |
1012 | sizeof(TransactionId) * nsubxids); |
1013 | } |
1014 | else |
1015 | subxids = NULL; |
1016 | |
1017 | TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids, |
1018 | setts->timestamp, setts->nodeid, true); |
1019 | if (subxids) |
1020 | pfree(subxids); |
1021 | } |
1022 | else |
1023 | elog(PANIC, "commit_ts_redo: unknown op code %u" , info); |
1024 | } |
1025 | |