1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * origin.c |
4 | * Logical replication progress tracking support. |
5 | * |
6 | * Copyright (c) 2013-2019, PostgreSQL Global Development Group |
7 | * |
8 | * IDENTIFICATION |
9 | * src/backend/replication/logical/origin.c |
10 | * |
11 | * NOTES |
12 | * |
13 | * This file provides the following: |
14 | * * An infrastructure to name nodes in a replication setup |
15 | * * A facility to efficiently store and persist replication progress in an |
16 | * efficient and durable manner. |
17 | * |
18 | * Replication origin consist out of a descriptive, user defined, external |
19 | * name and a short, thus space efficient, internal 2 byte one. This split |
20 | * exists because replication origin have to be stored in WAL and shared |
21 | * memory and long descriptors would be inefficient. For now only use 2 bytes |
22 | * for the internal id of a replication origin as it seems unlikely that there |
23 | * soon will be more than 65k nodes in one replication setup; and using only |
24 | * two bytes allow us to be more space efficient. |
25 | * |
26 | * Replication progress is tracked in a shared memory table |
27 | * (ReplicationState) that's dumped to disk every checkpoint. Entries |
28 | * ('slots') in this table are identified by the internal id. That's the case |
29 | * because it allows to increase replication progress during crash |
30 | * recovery. To allow doing so we store the original LSN (from the originating |
31 | * system) of a transaction in the commit record. That allows to recover the |
32 | * precise replayed state after crash recovery; without requiring synchronous |
33 | * commits. Allowing logical replication to use asynchronous commit is |
34 | * generally good for performance, but especially important as it allows a |
35 | * single threaded replay process to keep up with a source that has multiple |
36 | * backends generating changes concurrently. For efficiency and simplicity |
37 | * reasons a backend can setup one replication origin that's from then used as |
38 | * the source of changes produced by the backend, until reset again. |
39 | * |
40 | * This infrastructure is intended to be used in cooperation with logical |
41 | * decoding. When replaying from a remote system the configured origin is |
42 | * provided to output plugins, allowing prevention of replication loops and |
43 | * other filtering. |
44 | * |
45 | * There are several levels of locking at work: |
46 | * |
47 | * * To create and drop replication origins an exclusive lock on |
48 | * pg_replication_slot is required for the duration. That allows us to |
49 | * safely and conflict free assign new origins using a dirty snapshot. |
50 | * |
51 | * * When creating an in-memory replication progress slot the ReplicationOrigin |
52 | * LWLock has to be held exclusively; when iterating over the replication |
53 | * progress a shared lock has to be held, the same when advancing the |
54 | * replication progress of an individual backend that has not setup as the |
55 | * session's replication origin. |
56 | * |
57 | * * When manipulating or looking at the remote_lsn and local_lsn fields of a |
58 | * replication progress slot that slot's lwlock has to be held. That's |
59 | * primarily because we do not assume 8 byte writes (the LSN) is atomic on |
60 | * all our platforms, but it also simplifies memory ordering concerns |
61 | * between the remote and local lsn. We use a lwlock instead of a spinlock |
62 | * so it's less harmful to hold the lock over a WAL write |
63 | * (cf. AdvanceReplicationProgress). |
64 | * |
65 | * --------------------------------------------------------------------------- |
66 | */ |
67 | |
68 | #include "postgres.h" |
69 | |
70 | #include <unistd.h> |
71 | #include <sys/stat.h> |
72 | |
73 | #include "funcapi.h" |
74 | #include "miscadmin.h" |
75 | |
76 | #include "access/genam.h" |
77 | #include "access/htup_details.h" |
78 | #include "access/table.h" |
79 | #include "access/xact.h" |
80 | |
81 | #include "catalog/catalog.h" |
82 | #include "catalog/indexing.h" |
83 | #include "nodes/execnodes.h" |
84 | |
85 | #include "replication/origin.h" |
86 | #include "replication/logical.h" |
87 | #include "pgstat.h" |
88 | #include "storage/fd.h" |
89 | #include "storage/ipc.h" |
90 | #include "storage/lmgr.h" |
91 | #include "storage/condition_variable.h" |
92 | #include "storage/copydir.h" |
93 | |
94 | #include "utils/builtins.h" |
95 | #include "utils/fmgroids.h" |
96 | #include "utils/pg_lsn.h" |
97 | #include "utils/rel.h" |
98 | #include "utils/syscache.h" |
99 | #include "utils/snapmgr.h" |
100 | |
101 | /* |
102 | * Replay progress of a single remote node. |
103 | */ |
104 | typedef struct ReplicationState |
105 | { |
106 | /* |
107 | * Local identifier for the remote node. |
108 | */ |
109 | RepOriginId roident; |
110 | |
111 | /* |
112 | * Location of the latest commit from the remote side. |
113 | */ |
114 | XLogRecPtr remote_lsn; |
115 | |
116 | /* |
117 | * Remember the local lsn of the commit record so we can XLogFlush() to it |
118 | * during a checkpoint so we know the commit record actually is safe on |
119 | * disk. |
120 | */ |
121 | XLogRecPtr local_lsn; |
122 | |
123 | /* |
124 | * PID of backend that's acquired slot, or 0 if none. |
125 | */ |
126 | int acquired_by; |
127 | |
128 | /* |
129 | * Condition variable that's signalled when acquired_by changes. |
130 | */ |
131 | ConditionVariable origin_cv; |
132 | |
133 | /* |
134 | * Lock protecting remote_lsn and local_lsn. |
135 | */ |
136 | LWLock lock; |
137 | } ReplicationState; |
138 | |
139 | /* |
140 | * On disk version of ReplicationState. |
141 | */ |
142 | typedef struct ReplicationStateOnDisk |
143 | { |
144 | RepOriginId roident; |
145 | XLogRecPtr remote_lsn; |
146 | } ReplicationStateOnDisk; |
147 | |
148 | |
149 | typedef struct ReplicationStateCtl |
150 | { |
151 | int tranche_id; |
152 | ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; |
153 | } ReplicationStateCtl; |
154 | |
155 | /* external variables */ |
156 | RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ |
157 | XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; |
158 | TimestampTz replorigin_session_origin_timestamp = 0; |
159 | |
160 | /* |
161 | * Base address into a shared memory array of replication states of size |
162 | * max_replication_slots. |
163 | * |
164 | * XXX: Should we use a separate variable to size this rather than |
165 | * max_replication_slots? |
166 | */ |
167 | static ReplicationState *replication_states; |
168 | static ReplicationStateCtl *replication_states_ctl; |
169 | |
170 | /* |
171 | * Backend-local, cached element from ReplicationState for use in a backend |
172 | * replaying remote commits, so we don't have to search ReplicationState for |
173 | * the backends current RepOriginId. |
174 | */ |
175 | static ReplicationState *session_replication_state = NULL; |
176 | |
177 | /* Magic for on disk files. */ |
178 | #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE) |
179 | |
180 | static void |
181 | replorigin_check_prerequisites(bool check_slots, bool recoveryOK) |
182 | { |
183 | if (!superuser()) |
184 | ereport(ERROR, |
185 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
186 | errmsg("only superusers can query or manipulate replication origins" ))); |
187 | |
188 | if (check_slots && max_replication_slots == 0) |
189 | ereport(ERROR, |
190 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
191 | errmsg("cannot query or manipulate replication origin when max_replication_slots = 0" ))); |
192 | |
193 | if (!recoveryOK && RecoveryInProgress()) |
194 | ereport(ERROR, |
195 | (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), |
196 | errmsg("cannot manipulate replication origins during recovery" ))); |
197 | |
198 | } |
199 | |
200 | |
201 | /* --------------------------------------------------------------------------- |
202 | * Functions for working with replication origins themselves. |
203 | * --------------------------------------------------------------------------- |
204 | */ |
205 | |
206 | /* |
207 | * Check for a persistent replication origin identified by name. |
208 | * |
209 | * Returns InvalidOid if the node isn't known yet and missing_ok is true. |
210 | */ |
211 | RepOriginId |
212 | replorigin_by_name(char *roname, bool missing_ok) |
213 | { |
214 | Form_pg_replication_origin ident; |
215 | Oid roident = InvalidOid; |
216 | HeapTuple tuple; |
217 | Datum roname_d; |
218 | |
219 | roname_d = CStringGetTextDatum(roname); |
220 | |
221 | tuple = SearchSysCache1(REPLORIGNAME, roname_d); |
222 | if (HeapTupleIsValid(tuple)) |
223 | { |
224 | ident = (Form_pg_replication_origin) GETSTRUCT(tuple); |
225 | roident = ident->roident; |
226 | ReleaseSysCache(tuple); |
227 | } |
228 | else if (!missing_ok) |
229 | ereport(ERROR, |
230 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
231 | errmsg("replication origin \"%s\" does not exist" , |
232 | roname))); |
233 | |
234 | return roident; |
235 | } |
236 | |
237 | /* |
238 | * Create a replication origin. |
239 | * |
240 | * Needs to be called in a transaction. |
241 | */ |
242 | RepOriginId |
243 | replorigin_create(char *roname) |
244 | { |
245 | Oid roident; |
246 | HeapTuple tuple = NULL; |
247 | Relation rel; |
248 | Datum roname_d; |
249 | SnapshotData SnapshotDirty; |
250 | SysScanDesc scan; |
251 | ScanKeyData key; |
252 | |
253 | roname_d = CStringGetTextDatum(roname); |
254 | |
255 | Assert(IsTransactionState()); |
256 | |
257 | /* |
258 | * We need the numeric replication origin to be 16bit wide, so we cannot |
259 | * rely on the normal oid allocation. Instead we simply scan |
260 | * pg_replication_origin for the first unused id. That's not particularly |
261 | * efficient, but this should be a fairly infrequent operation - we can |
262 | * easily spend a bit more code on this when it turns out it needs to be |
263 | * faster. |
264 | * |
265 | * We handle concurrency by taking an exclusive lock (allowing reads!) |
266 | * over the table for the duration of the search. Because we use a "dirty |
267 | * snapshot" we can read rows that other in-progress sessions have |
268 | * written, even though they would be invisible with normal snapshots. Due |
269 | * to the exclusive lock there's no danger that new rows can appear while |
270 | * we're checking. |
271 | */ |
272 | InitDirtySnapshot(SnapshotDirty); |
273 | |
274 | rel = table_open(ReplicationOriginRelationId, ExclusiveLock); |
275 | |
276 | for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++) |
277 | { |
278 | bool nulls[Natts_pg_replication_origin]; |
279 | Datum values[Natts_pg_replication_origin]; |
280 | bool collides; |
281 | |
282 | CHECK_FOR_INTERRUPTS(); |
283 | |
284 | ScanKeyInit(&key, |
285 | Anum_pg_replication_origin_roident, |
286 | BTEqualStrategyNumber, F_OIDEQ, |
287 | ObjectIdGetDatum(roident)); |
288 | |
289 | scan = systable_beginscan(rel, ReplicationOriginIdentIndex, |
290 | true /* indexOK */ , |
291 | &SnapshotDirty, |
292 | 1, &key); |
293 | |
294 | collides = HeapTupleIsValid(systable_getnext(scan)); |
295 | |
296 | systable_endscan(scan); |
297 | |
298 | if (!collides) |
299 | { |
300 | /* |
301 | * Ok, found an unused roident, insert the new row and do a CCI, |
302 | * so our callers can look it up if they want to. |
303 | */ |
304 | memset(&nulls, 0, sizeof(nulls)); |
305 | |
306 | values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident); |
307 | values[Anum_pg_replication_origin_roname - 1] = roname_d; |
308 | |
309 | tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); |
310 | CatalogTupleInsert(rel, tuple); |
311 | CommandCounterIncrement(); |
312 | break; |
313 | } |
314 | } |
315 | |
316 | /* now release lock again, */ |
317 | table_close(rel, ExclusiveLock); |
318 | |
319 | if (tuple == NULL) |
320 | ereport(ERROR, |
321 | (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
322 | errmsg("could not find free replication origin OID" ))); |
323 | |
324 | heap_freetuple(tuple); |
325 | return roident; |
326 | } |
327 | |
328 | |
329 | /* |
330 | * Drop replication origin. |
331 | * |
332 | * Needs to be called in a transaction. |
333 | */ |
334 | void |
335 | replorigin_drop(RepOriginId roident, bool nowait) |
336 | { |
337 | HeapTuple tuple; |
338 | Relation rel; |
339 | int i; |
340 | |
341 | Assert(IsTransactionState()); |
342 | |
343 | /* |
344 | * To interlock against concurrent drops, we hold ExclusiveLock on |
345 | * pg_replication_origin throughout this function. |
346 | */ |
347 | rel = table_open(ReplicationOriginRelationId, ExclusiveLock); |
348 | |
349 | /* |
350 | * First, clean up the slot state info, if there is any matching slot. |
351 | */ |
352 | restart: |
353 | tuple = NULL; |
354 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
355 | |
356 | for (i = 0; i < max_replication_slots; i++) |
357 | { |
358 | ReplicationState *state = &replication_states[i]; |
359 | |
360 | if (state->roident == roident) |
361 | { |
362 | /* found our slot, is it busy? */ |
363 | if (state->acquired_by != 0) |
364 | { |
365 | ConditionVariable *cv; |
366 | |
367 | if (nowait) |
368 | ereport(ERROR, |
369 | (errcode(ERRCODE_OBJECT_IN_USE), |
370 | errmsg("could not drop replication origin with OID %d, in use by PID %d" , |
371 | state->roident, |
372 | state->acquired_by))); |
373 | |
374 | /* |
375 | * We must wait and then retry. Since we don't know which CV |
376 | * to wait on until here, we can't readily use |
377 | * ConditionVariablePrepareToSleep (calling it here would be |
378 | * wrong, since we could miss the signal if we did so); just |
379 | * use ConditionVariableSleep directly. |
380 | */ |
381 | cv = &state->origin_cv; |
382 | |
383 | LWLockRelease(ReplicationOriginLock); |
384 | |
385 | ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); |
386 | goto restart; |
387 | } |
388 | |
389 | /* first make a WAL log entry */ |
390 | { |
391 | xl_replorigin_drop xlrec; |
392 | |
393 | xlrec.node_id = roident; |
394 | XLogBeginInsert(); |
395 | XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); |
396 | XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP); |
397 | } |
398 | |
399 | /* then clear the in-memory slot */ |
400 | state->roident = InvalidRepOriginId; |
401 | state->remote_lsn = InvalidXLogRecPtr; |
402 | state->local_lsn = InvalidXLogRecPtr; |
403 | break; |
404 | } |
405 | } |
406 | LWLockRelease(ReplicationOriginLock); |
407 | ConditionVariableCancelSleep(); |
408 | |
409 | /* |
410 | * Now, we can delete the catalog entry. |
411 | */ |
412 | tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); |
413 | if (!HeapTupleIsValid(tuple)) |
414 | elog(ERROR, "cache lookup failed for replication origin with oid %u" , |
415 | roident); |
416 | |
417 | CatalogTupleDelete(rel, &tuple->t_self); |
418 | ReleaseSysCache(tuple); |
419 | |
420 | CommandCounterIncrement(); |
421 | |
422 | /* now release lock again */ |
423 | table_close(rel, ExclusiveLock); |
424 | } |
425 | |
426 | |
427 | /* |
428 | * Lookup replication origin via it's oid and return the name. |
429 | * |
430 | * The external name is palloc'd in the calling context. |
431 | * |
432 | * Returns true if the origin is known, false otherwise. |
433 | */ |
434 | bool |
435 | replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname) |
436 | { |
437 | HeapTuple tuple; |
438 | Form_pg_replication_origin ric; |
439 | |
440 | Assert(OidIsValid((Oid) roident)); |
441 | Assert(roident != InvalidRepOriginId); |
442 | Assert(roident != DoNotReplicateId); |
443 | |
444 | tuple = SearchSysCache1(REPLORIGIDENT, |
445 | ObjectIdGetDatum((Oid) roident)); |
446 | |
447 | if (HeapTupleIsValid(tuple)) |
448 | { |
449 | ric = (Form_pg_replication_origin) GETSTRUCT(tuple); |
450 | *roname = text_to_cstring(&ric->roname); |
451 | ReleaseSysCache(tuple); |
452 | |
453 | return true; |
454 | } |
455 | else |
456 | { |
457 | *roname = NULL; |
458 | |
459 | if (!missing_ok) |
460 | ereport(ERROR, |
461 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
462 | errmsg("replication origin with OID %u does not exist" , |
463 | roident))); |
464 | |
465 | return false; |
466 | } |
467 | } |
468 | |
469 | |
470 | /* --------------------------------------------------------------------------- |
471 | * Functions for handling replication progress. |
472 | * --------------------------------------------------------------------------- |
473 | */ |
474 | |
475 | Size |
476 | ReplicationOriginShmemSize(void) |
477 | { |
478 | Size size = 0; |
479 | |
480 | /* |
481 | * XXX: max_replication_slots is arguably the wrong thing to use, as here |
482 | * we keep the replay state of *remote* transactions. But for now it seems |
483 | * sufficient to reuse it, lest we introduce a separate GUC. |
484 | */ |
485 | if (max_replication_slots == 0) |
486 | return size; |
487 | |
488 | size = add_size(size, offsetof(ReplicationStateCtl, states)); |
489 | |
490 | size = add_size(size, |
491 | mul_size(max_replication_slots, sizeof(ReplicationState))); |
492 | return size; |
493 | } |
494 | |
495 | void |
496 | ReplicationOriginShmemInit(void) |
497 | { |
498 | bool found; |
499 | |
500 | if (max_replication_slots == 0) |
501 | return; |
502 | |
503 | replication_states_ctl = (ReplicationStateCtl *) |
504 | ShmemInitStruct("ReplicationOriginState" , |
505 | ReplicationOriginShmemSize(), |
506 | &found); |
507 | replication_states = replication_states_ctl->states; |
508 | |
509 | if (!found) |
510 | { |
511 | int i; |
512 | |
513 | replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN; |
514 | |
515 | MemSet(replication_states, 0, ReplicationOriginShmemSize()); |
516 | |
517 | for (i = 0; i < max_replication_slots; i++) |
518 | { |
519 | LWLockInitialize(&replication_states[i].lock, |
520 | replication_states_ctl->tranche_id); |
521 | ConditionVariableInit(&replication_states[i].origin_cv); |
522 | } |
523 | } |
524 | |
525 | LWLockRegisterTranche(replication_states_ctl->tranche_id, |
526 | "replication_origin" ); |
527 | } |
528 | |
529 | /* --------------------------------------------------------------------------- |
530 | * Perform a checkpoint of each replication origin's progress with respect to |
531 | * the replayed remote_lsn. Make sure that all transactions we refer to in the |
532 | * checkpoint (local_lsn) are actually on-disk. This might not yet be the case |
533 | * if the transactions were originally committed asynchronously. |
534 | * |
535 | * We store checkpoints in the following format: |
536 | * +-------+------------------------+------------------+-----+--------+ |
537 | * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF |
538 | * +-------+------------------------+------------------+-----+--------+ |
539 | * |
540 | * So its just the magic, followed by the statically sized |
541 | * ReplicationStateOnDisk structs. Note that the maximum number of |
542 | * ReplicationState is determined by max_replication_slots. |
543 | * --------------------------------------------------------------------------- |
544 | */ |
545 | void |
546 | CheckPointReplicationOrigin(void) |
547 | { |
548 | const char *tmppath = "pg_logical/replorigin_checkpoint.tmp" ; |
549 | const char *path = "pg_logical/replorigin_checkpoint" ; |
550 | int tmpfd; |
551 | int i; |
552 | uint32 magic = REPLICATION_STATE_MAGIC; |
553 | pg_crc32c crc; |
554 | |
555 | if (max_replication_slots == 0) |
556 | return; |
557 | |
558 | INIT_CRC32C(crc); |
559 | |
560 | /* make sure no old temp file is remaining */ |
561 | if (unlink(tmppath) < 0 && errno != ENOENT) |
562 | ereport(PANIC, |
563 | (errcode_for_file_access(), |
564 | errmsg("could not remove file \"%s\": %m" , |
565 | tmppath))); |
566 | |
567 | /* |
568 | * no other backend can perform this at the same time, we're protected by |
569 | * CheckpointLock. |
570 | */ |
571 | tmpfd = OpenTransientFile(tmppath, |
572 | O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); |
573 | if (tmpfd < 0) |
574 | ereport(PANIC, |
575 | (errcode_for_file_access(), |
576 | errmsg("could not create file \"%s\": %m" , |
577 | tmppath))); |
578 | |
579 | /* write magic */ |
580 | errno = 0; |
581 | if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic)) |
582 | { |
583 | /* if write didn't set errno, assume problem is no disk space */ |
584 | if (errno == 0) |
585 | errno = ENOSPC; |
586 | ereport(PANIC, |
587 | (errcode_for_file_access(), |
588 | errmsg("could not write to file \"%s\": %m" , |
589 | tmppath))); |
590 | } |
591 | COMP_CRC32C(crc, &magic, sizeof(magic)); |
592 | |
593 | /* prevent concurrent creations/drops */ |
594 | LWLockAcquire(ReplicationOriginLock, LW_SHARED); |
595 | |
596 | /* write actual data */ |
597 | for (i = 0; i < max_replication_slots; i++) |
598 | { |
599 | ReplicationStateOnDisk disk_state; |
600 | ReplicationState *curstate = &replication_states[i]; |
601 | XLogRecPtr local_lsn; |
602 | |
603 | if (curstate->roident == InvalidRepOriginId) |
604 | continue; |
605 | |
606 | /* zero, to avoid uninitialized padding bytes */ |
607 | memset(&disk_state, 0, sizeof(disk_state)); |
608 | |
609 | LWLockAcquire(&curstate->lock, LW_SHARED); |
610 | |
611 | disk_state.roident = curstate->roident; |
612 | |
613 | disk_state.remote_lsn = curstate->remote_lsn; |
614 | local_lsn = curstate->local_lsn; |
615 | |
616 | LWLockRelease(&curstate->lock); |
617 | |
618 | /* make sure we only write out a commit that's persistent */ |
619 | XLogFlush(local_lsn); |
620 | |
621 | errno = 0; |
622 | if ((write(tmpfd, &disk_state, sizeof(disk_state))) != |
623 | sizeof(disk_state)) |
624 | { |
625 | /* if write didn't set errno, assume problem is no disk space */ |
626 | if (errno == 0) |
627 | errno = ENOSPC; |
628 | ereport(PANIC, |
629 | (errcode_for_file_access(), |
630 | errmsg("could not write to file \"%s\": %m" , |
631 | tmppath))); |
632 | } |
633 | |
634 | COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); |
635 | } |
636 | |
637 | LWLockRelease(ReplicationOriginLock); |
638 | |
639 | /* write out the CRC */ |
640 | FIN_CRC32C(crc); |
641 | errno = 0; |
642 | if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc)) |
643 | { |
644 | /* if write didn't set errno, assume problem is no disk space */ |
645 | if (errno == 0) |
646 | errno = ENOSPC; |
647 | ereport(PANIC, |
648 | (errcode_for_file_access(), |
649 | errmsg("could not write to file \"%s\": %m" , |
650 | tmppath))); |
651 | } |
652 | |
653 | if (CloseTransientFile(tmpfd)) |
654 | ereport(PANIC, |
655 | (errcode_for_file_access(), |
656 | errmsg("could not close file \"%s\": %m" , |
657 | tmppath))); |
658 | |
659 | /* fsync, rename to permanent file, fsync file and directory */ |
660 | durable_rename(tmppath, path, PANIC); |
661 | } |
662 | |
663 | /* |
664 | * Recover replication replay status from checkpoint data saved earlier by |
665 | * CheckPointReplicationOrigin. |
666 | * |
667 | * This only needs to be called at startup and *not* during every checkpoint |
668 | * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All |
669 | * state thereafter can be recovered by looking at commit records. |
670 | */ |
671 | void |
672 | StartupReplicationOrigin(void) |
673 | { |
674 | const char *path = "pg_logical/replorigin_checkpoint" ; |
675 | int fd; |
676 | int readBytes; |
677 | uint32 magic = REPLICATION_STATE_MAGIC; |
678 | int last_state = 0; |
679 | pg_crc32c file_crc; |
680 | pg_crc32c crc; |
681 | |
682 | /* don't want to overwrite already existing state */ |
683 | #ifdef USE_ASSERT_CHECKING |
684 | static bool already_started = false; |
685 | |
686 | Assert(!already_started); |
687 | already_started = true; |
688 | #endif |
689 | |
690 | if (max_replication_slots == 0) |
691 | return; |
692 | |
693 | INIT_CRC32C(crc); |
694 | |
695 | elog(DEBUG2, "starting up replication origin progress state" ); |
696 | |
697 | fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); |
698 | |
699 | /* |
700 | * might have had max_replication_slots == 0 last run, or we just brought |
701 | * up a standby. |
702 | */ |
703 | if (fd < 0 && errno == ENOENT) |
704 | return; |
705 | else if (fd < 0) |
706 | ereport(PANIC, |
707 | (errcode_for_file_access(), |
708 | errmsg("could not open file \"%s\": %m" , |
709 | path))); |
710 | |
711 | /* verify magic, that is written even if nothing was active */ |
712 | readBytes = read(fd, &magic, sizeof(magic)); |
713 | if (readBytes != sizeof(magic)) |
714 | { |
715 | if (readBytes < 0) |
716 | ereport(PANIC, |
717 | (errcode_for_file_access(), |
718 | errmsg("could not read file \"%s\": %m" , |
719 | path))); |
720 | else |
721 | ereport(PANIC, |
722 | (errcode(ERRCODE_DATA_CORRUPTED), |
723 | errmsg("could not read file \"%s\": read %d of %zu" , |
724 | path, readBytes, sizeof(magic)))); |
725 | } |
726 | COMP_CRC32C(crc, &magic, sizeof(magic)); |
727 | |
728 | if (magic != REPLICATION_STATE_MAGIC) |
729 | ereport(PANIC, |
730 | (errmsg("replication checkpoint has wrong magic %u instead of %u" , |
731 | magic, REPLICATION_STATE_MAGIC))); |
732 | |
733 | /* we can skip locking here, no other access is possible */ |
734 | |
735 | /* recover individual states, until there are no more to be found */ |
736 | while (true) |
737 | { |
738 | ReplicationStateOnDisk disk_state; |
739 | |
740 | readBytes = read(fd, &disk_state, sizeof(disk_state)); |
741 | |
742 | /* no further data */ |
743 | if (readBytes == sizeof(crc)) |
744 | { |
745 | /* not pretty, but simple ... */ |
746 | file_crc = *(pg_crc32c *) &disk_state; |
747 | break; |
748 | } |
749 | |
750 | if (readBytes < 0) |
751 | { |
752 | ereport(PANIC, |
753 | (errcode_for_file_access(), |
754 | errmsg("could not read file \"%s\": %m" , |
755 | path))); |
756 | } |
757 | |
758 | if (readBytes != sizeof(disk_state)) |
759 | { |
760 | ereport(PANIC, |
761 | (errcode_for_file_access(), |
762 | errmsg("could not read file \"%s\": read %d of %zu" , |
763 | path, readBytes, sizeof(disk_state)))); |
764 | } |
765 | |
766 | COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); |
767 | |
768 | if (last_state == max_replication_slots) |
769 | ereport(PANIC, |
770 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
771 | errmsg("could not find free replication state, increase max_replication_slots" ))); |
772 | |
773 | /* copy data to shared memory */ |
774 | replication_states[last_state].roident = disk_state.roident; |
775 | replication_states[last_state].remote_lsn = disk_state.remote_lsn; |
776 | last_state++; |
777 | |
778 | elog(LOG, "recovered replication state of node %u to %X/%X" , |
779 | disk_state.roident, |
780 | (uint32) (disk_state.remote_lsn >> 32), |
781 | (uint32) disk_state.remote_lsn); |
782 | } |
783 | |
784 | /* now check checksum */ |
785 | FIN_CRC32C(crc); |
786 | if (file_crc != crc) |
787 | ereport(PANIC, |
788 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
789 | errmsg("replication slot checkpoint has wrong checksum %u, expected %u" , |
790 | crc, file_crc))); |
791 | |
792 | if (CloseTransientFile(fd)) |
793 | ereport(PANIC, |
794 | (errcode_for_file_access(), |
795 | errmsg("could not close file \"%s\": %m" , |
796 | path))); |
797 | } |
798 | |
799 | void |
800 | replorigin_redo(XLogReaderState *record) |
801 | { |
802 | uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
803 | |
804 | switch (info) |
805 | { |
806 | case XLOG_REPLORIGIN_SET: |
807 | { |
808 | xl_replorigin_set *xlrec = |
809 | (xl_replorigin_set *) XLogRecGetData(record); |
810 | |
811 | replorigin_advance(xlrec->node_id, |
812 | xlrec->remote_lsn, record->EndRecPtr, |
813 | xlrec->force /* backward */ , |
814 | false /* WAL log */ ); |
815 | break; |
816 | } |
817 | case XLOG_REPLORIGIN_DROP: |
818 | { |
819 | xl_replorigin_drop *xlrec; |
820 | int i; |
821 | |
822 | xlrec = (xl_replorigin_drop *) XLogRecGetData(record); |
823 | |
824 | for (i = 0; i < max_replication_slots; i++) |
825 | { |
826 | ReplicationState *state = &replication_states[i]; |
827 | |
828 | /* found our slot */ |
829 | if (state->roident == xlrec->node_id) |
830 | { |
831 | /* reset entry */ |
832 | state->roident = InvalidRepOriginId; |
833 | state->remote_lsn = InvalidXLogRecPtr; |
834 | state->local_lsn = InvalidXLogRecPtr; |
835 | break; |
836 | } |
837 | } |
838 | break; |
839 | } |
840 | default: |
841 | elog(PANIC, "replorigin_redo: unknown op code %u" , info); |
842 | } |
843 | } |
844 | |
845 | |
846 | /* |
847 | * Tell the replication origin progress machinery that a commit from 'node' |
848 | * that originated at the LSN remote_commit on the remote node was replayed |
849 | * successfully and that we don't need to do so again. In combination with |
850 | * setting up replorigin_session_origin_lsn and replorigin_session_origin |
851 | * that ensures we won't loose knowledge about that after a crash if the |
852 | * transaction had a persistent effect (think of asynchronous commits). |
853 | * |
854 | * local_commit needs to be a local LSN of the commit so that we can make sure |
855 | * upon a checkpoint that enough WAL has been persisted to disk. |
856 | * |
857 | * Needs to be called with a RowExclusiveLock on pg_replication_origin, |
858 | * unless running in recovery. |
859 | */ |
860 | void |
861 | replorigin_advance(RepOriginId node, |
862 | XLogRecPtr remote_commit, XLogRecPtr local_commit, |
863 | bool go_backward, bool wal_log) |
864 | { |
865 | int i; |
866 | ReplicationState *replication_state = NULL; |
867 | ReplicationState *free_state = NULL; |
868 | |
869 | Assert(node != InvalidRepOriginId); |
870 | |
871 | /* we don't track DoNotReplicateId */ |
872 | if (node == DoNotReplicateId) |
873 | return; |
874 | |
875 | /* |
876 | * XXX: For the case where this is called by WAL replay, it'd be more |
877 | * efficient to restore into a backend local hashtable and only dump into |
878 | * shmem after recovery is finished. Let's wait with implementing that |
879 | * till it's shown to be a measurable expense |
880 | */ |
881 | |
882 | /* Lock exclusively, as we may have to create a new table entry. */ |
883 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
884 | |
885 | /* |
886 | * Search for either an existing slot for the origin, or a free one we can |
887 | * use. |
888 | */ |
889 | for (i = 0; i < max_replication_slots; i++) |
890 | { |
891 | ReplicationState *curstate = &replication_states[i]; |
892 | |
893 | /* remember where to insert if necessary */ |
894 | if (curstate->roident == InvalidRepOriginId && |
895 | free_state == NULL) |
896 | { |
897 | free_state = curstate; |
898 | continue; |
899 | } |
900 | |
901 | /* not our slot */ |
902 | if (curstate->roident != node) |
903 | { |
904 | continue; |
905 | } |
906 | |
907 | /* ok, found slot */ |
908 | replication_state = curstate; |
909 | |
910 | LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE); |
911 | |
912 | /* Make sure it's not used by somebody else */ |
913 | if (replication_state->acquired_by != 0) |
914 | { |
915 | ereport(ERROR, |
916 | (errcode(ERRCODE_OBJECT_IN_USE), |
917 | errmsg("replication origin with OID %d is already active for PID %d" , |
918 | replication_state->roident, |
919 | replication_state->acquired_by))); |
920 | } |
921 | |
922 | break; |
923 | } |
924 | |
925 | if (replication_state == NULL && free_state == NULL) |
926 | ereport(ERROR, |
927 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
928 | errmsg("could not find free replication state slot for replication origin with OID %u" , |
929 | node), |
930 | errhint("Increase max_replication_slots and try again." ))); |
931 | |
932 | if (replication_state == NULL) |
933 | { |
934 | /* initialize new slot */ |
935 | LWLockAcquire(&free_state->lock, LW_EXCLUSIVE); |
936 | replication_state = free_state; |
937 | Assert(replication_state->remote_lsn == InvalidXLogRecPtr); |
938 | Assert(replication_state->local_lsn == InvalidXLogRecPtr); |
939 | replication_state->roident = node; |
940 | } |
941 | |
942 | Assert(replication_state->roident != InvalidRepOriginId); |
943 | |
944 | /* |
945 | * If somebody "forcefully" sets this slot, WAL log it, so it's durable |
946 | * and the standby gets the message. Primarily this will be called during |
947 | * WAL replay (of commit records) where no WAL logging is necessary. |
948 | */ |
949 | if (wal_log) |
950 | { |
951 | xl_replorigin_set xlrec; |
952 | |
953 | xlrec.remote_lsn = remote_commit; |
954 | xlrec.node_id = node; |
955 | xlrec.force = go_backward; |
956 | |
957 | XLogBeginInsert(); |
958 | XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); |
959 | |
960 | XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET); |
961 | } |
962 | |
963 | /* |
964 | * Due to - harmless - race conditions during a checkpoint we could see |
965 | * values here that are older than the ones we already have in memory. |
966 | * Don't overwrite those. |
967 | */ |
968 | if (go_backward || replication_state->remote_lsn < remote_commit) |
969 | replication_state->remote_lsn = remote_commit; |
970 | if (local_commit != InvalidXLogRecPtr && |
971 | (go_backward || replication_state->local_lsn < local_commit)) |
972 | replication_state->local_lsn = local_commit; |
973 | LWLockRelease(&replication_state->lock); |
974 | |
975 | /* |
976 | * Release *after* changing the LSNs, slot isn't acquired and thus could |
977 | * otherwise be dropped anytime. |
978 | */ |
979 | LWLockRelease(ReplicationOriginLock); |
980 | } |
981 | |
982 | |
983 | XLogRecPtr |
984 | replorigin_get_progress(RepOriginId node, bool flush) |
985 | { |
986 | int i; |
987 | XLogRecPtr local_lsn = InvalidXLogRecPtr; |
988 | XLogRecPtr remote_lsn = InvalidXLogRecPtr; |
989 | |
990 | /* prevent slots from being concurrently dropped */ |
991 | LWLockAcquire(ReplicationOriginLock, LW_SHARED); |
992 | |
993 | for (i = 0; i < max_replication_slots; i++) |
994 | { |
995 | ReplicationState *state; |
996 | |
997 | state = &replication_states[i]; |
998 | |
999 | if (state->roident == node) |
1000 | { |
1001 | LWLockAcquire(&state->lock, LW_SHARED); |
1002 | |
1003 | remote_lsn = state->remote_lsn; |
1004 | local_lsn = state->local_lsn; |
1005 | |
1006 | LWLockRelease(&state->lock); |
1007 | |
1008 | break; |
1009 | } |
1010 | } |
1011 | |
1012 | LWLockRelease(ReplicationOriginLock); |
1013 | |
1014 | if (flush && local_lsn != InvalidXLogRecPtr) |
1015 | XLogFlush(local_lsn); |
1016 | |
1017 | return remote_lsn; |
1018 | } |
1019 | |
1020 | /* |
1021 | * Tear down a (possibly) configured session replication origin during process |
1022 | * exit. |
1023 | */ |
1024 | static void |
1025 | ReplicationOriginExitCleanup(int code, Datum arg) |
1026 | { |
1027 | ConditionVariable *cv = NULL; |
1028 | |
1029 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
1030 | |
1031 | if (session_replication_state != NULL && |
1032 | session_replication_state->acquired_by == MyProcPid) |
1033 | { |
1034 | cv = &session_replication_state->origin_cv; |
1035 | |
1036 | session_replication_state->acquired_by = 0; |
1037 | session_replication_state = NULL; |
1038 | } |
1039 | |
1040 | LWLockRelease(ReplicationOriginLock); |
1041 | |
1042 | if (cv) |
1043 | ConditionVariableBroadcast(cv); |
1044 | } |
1045 | |
1046 | /* |
1047 | * Setup a replication origin in the shared memory struct if it doesn't |
1048 | * already exists and cache access to the specific ReplicationSlot so the |
1049 | * array doesn't have to be searched when calling |
1050 | * replorigin_session_advance(). |
1051 | * |
1052 | * Obviously only one such cached origin can exist per process and the current |
1053 | * cached value can only be set again after the previous value is torn down |
1054 | * with replorigin_session_reset(). |
1055 | */ |
1056 | void |
1057 | replorigin_session_setup(RepOriginId node) |
1058 | { |
1059 | static bool registered_cleanup; |
1060 | int i; |
1061 | int free_slot = -1; |
1062 | |
1063 | if (!registered_cleanup) |
1064 | { |
1065 | on_shmem_exit(ReplicationOriginExitCleanup, 0); |
1066 | registered_cleanup = true; |
1067 | } |
1068 | |
1069 | Assert(max_replication_slots > 0); |
1070 | |
1071 | if (session_replication_state != NULL) |
1072 | ereport(ERROR, |
1073 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1074 | errmsg("cannot setup replication origin when one is already setup" ))); |
1075 | |
1076 | /* Lock exclusively, as we may have to create a new table entry. */ |
1077 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
1078 | |
1079 | /* |
1080 | * Search for either an existing slot for the origin, or a free one we can |
1081 | * use. |
1082 | */ |
1083 | for (i = 0; i < max_replication_slots; i++) |
1084 | { |
1085 | ReplicationState *curstate = &replication_states[i]; |
1086 | |
1087 | /* remember where to insert if necessary */ |
1088 | if (curstate->roident == InvalidRepOriginId && |
1089 | free_slot == -1) |
1090 | { |
1091 | free_slot = i; |
1092 | continue; |
1093 | } |
1094 | |
1095 | /* not our slot */ |
1096 | if (curstate->roident != node) |
1097 | continue; |
1098 | |
1099 | else if (curstate->acquired_by != 0) |
1100 | { |
1101 | ereport(ERROR, |
1102 | (errcode(ERRCODE_OBJECT_IN_USE), |
1103 | errmsg("replication origin %d is already active for PID %d" , |
1104 | curstate->roident, curstate->acquired_by))); |
1105 | } |
1106 | |
1107 | /* ok, found slot */ |
1108 | session_replication_state = curstate; |
1109 | } |
1110 | |
1111 | |
1112 | if (session_replication_state == NULL && free_slot == -1) |
1113 | ereport(ERROR, |
1114 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
1115 | errmsg("could not find free replication state slot for replication origin with OID %u" , |
1116 | node), |
1117 | errhint("Increase max_replication_slots and try again." ))); |
1118 | else if (session_replication_state == NULL) |
1119 | { |
1120 | /* initialize new slot */ |
1121 | session_replication_state = &replication_states[free_slot]; |
1122 | Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); |
1123 | Assert(session_replication_state->local_lsn == InvalidXLogRecPtr); |
1124 | session_replication_state->roident = node; |
1125 | } |
1126 | |
1127 | |
1128 | Assert(session_replication_state->roident != InvalidRepOriginId); |
1129 | |
1130 | session_replication_state->acquired_by = MyProcPid; |
1131 | |
1132 | LWLockRelease(ReplicationOriginLock); |
1133 | |
1134 | /* probably this one is pointless */ |
1135 | ConditionVariableBroadcast(&session_replication_state->origin_cv); |
1136 | } |
1137 | |
1138 | /* |
1139 | * Reset replay state previously setup in this session. |
1140 | * |
1141 | * This function may only be called if an origin was setup with |
1142 | * replorigin_session_setup(). |
1143 | */ |
1144 | void |
1145 | replorigin_session_reset(void) |
1146 | { |
1147 | ConditionVariable *cv; |
1148 | |
1149 | Assert(max_replication_slots != 0); |
1150 | |
1151 | if (session_replication_state == NULL) |
1152 | ereport(ERROR, |
1153 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1154 | errmsg("no replication origin is configured" ))); |
1155 | |
1156 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
1157 | |
1158 | session_replication_state->acquired_by = 0; |
1159 | cv = &session_replication_state->origin_cv; |
1160 | session_replication_state = NULL; |
1161 | |
1162 | LWLockRelease(ReplicationOriginLock); |
1163 | |
1164 | ConditionVariableBroadcast(cv); |
1165 | } |
1166 | |
1167 | /* |
1168 | * Do the same work replorigin_advance() does, just on the session's |
1169 | * configured origin. |
1170 | * |
1171 | * This is noticeably cheaper than using replorigin_advance(). |
1172 | */ |
1173 | void |
1174 | replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit) |
1175 | { |
1176 | Assert(session_replication_state != NULL); |
1177 | Assert(session_replication_state->roident != InvalidRepOriginId); |
1178 | |
1179 | LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE); |
1180 | if (session_replication_state->local_lsn < local_commit) |
1181 | session_replication_state->local_lsn = local_commit; |
1182 | if (session_replication_state->remote_lsn < remote_commit) |
1183 | session_replication_state->remote_lsn = remote_commit; |
1184 | LWLockRelease(&session_replication_state->lock); |
1185 | } |
1186 | |
1187 | /* |
1188 | * Ask the machinery about the point up to which we successfully replayed |
1189 | * changes from an already setup replication origin. |
1190 | */ |
1191 | XLogRecPtr |
1192 | replorigin_session_get_progress(bool flush) |
1193 | { |
1194 | XLogRecPtr remote_lsn; |
1195 | XLogRecPtr local_lsn; |
1196 | |
1197 | Assert(session_replication_state != NULL); |
1198 | |
1199 | LWLockAcquire(&session_replication_state->lock, LW_SHARED); |
1200 | remote_lsn = session_replication_state->remote_lsn; |
1201 | local_lsn = session_replication_state->local_lsn; |
1202 | LWLockRelease(&session_replication_state->lock); |
1203 | |
1204 | if (flush && local_lsn != InvalidXLogRecPtr) |
1205 | XLogFlush(local_lsn); |
1206 | |
1207 | return remote_lsn; |
1208 | } |
1209 | |
1210 | |
1211 | |
1212 | /* --------------------------------------------------------------------------- |
1213 | * SQL functions for working with replication origin. |
1214 | * |
1215 | * These mostly should be fairly short wrappers around more generic functions. |
1216 | * --------------------------------------------------------------------------- |
1217 | */ |
1218 | |
1219 | /* |
1220 | * Create replication origin for the passed in name, and return the assigned |
1221 | * oid. |
1222 | */ |
1223 | Datum |
1224 | pg_replication_origin_create(PG_FUNCTION_ARGS) |
1225 | { |
1226 | char *name; |
1227 | RepOriginId roident; |
1228 | |
1229 | replorigin_check_prerequisites(false, false); |
1230 | |
1231 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1232 | |
1233 | /* Replication origins "pg_xxx" are reserved for internal use */ |
1234 | if (IsReservedName(name)) |
1235 | ereport(ERROR, |
1236 | (errcode(ERRCODE_RESERVED_NAME), |
1237 | errmsg("replication origin name \"%s\" is reserved" , |
1238 | name), |
1239 | errdetail("Origin names starting with \"pg_\" are reserved." ))); |
1240 | |
1241 | /* |
1242 | * If built with appropriate switch, whine when regression-testing |
1243 | * conventions for replication origin names are violated. |
1244 | */ |
1245 | #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS |
1246 | if (strncmp(name, "regress_" , 8) != 0) |
1247 | elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"" ); |
1248 | #endif |
1249 | |
1250 | roident = replorigin_create(name); |
1251 | |
1252 | pfree(name); |
1253 | |
1254 | PG_RETURN_OID(roident); |
1255 | } |
1256 | |
1257 | /* |
1258 | * Drop replication origin. |
1259 | */ |
1260 | Datum |
1261 | pg_replication_origin_drop(PG_FUNCTION_ARGS) |
1262 | { |
1263 | char *name; |
1264 | RepOriginId roident; |
1265 | |
1266 | replorigin_check_prerequisites(false, false); |
1267 | |
1268 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1269 | |
1270 | roident = replorigin_by_name(name, false); |
1271 | Assert(OidIsValid(roident)); |
1272 | |
1273 | replorigin_drop(roident, true); |
1274 | |
1275 | pfree(name); |
1276 | |
1277 | PG_RETURN_VOID(); |
1278 | } |
1279 | |
1280 | /* |
1281 | * Return oid of a replication origin. |
1282 | */ |
1283 | Datum |
1284 | pg_replication_origin_oid(PG_FUNCTION_ARGS) |
1285 | { |
1286 | char *name; |
1287 | RepOriginId roident; |
1288 | |
1289 | replorigin_check_prerequisites(false, false); |
1290 | |
1291 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1292 | roident = replorigin_by_name(name, true); |
1293 | |
1294 | pfree(name); |
1295 | |
1296 | if (OidIsValid(roident)) |
1297 | PG_RETURN_OID(roident); |
1298 | PG_RETURN_NULL(); |
1299 | } |
1300 | |
1301 | /* |
1302 | * Setup a replication origin for this session. |
1303 | */ |
1304 | Datum |
1305 | pg_replication_origin_session_setup(PG_FUNCTION_ARGS) |
1306 | { |
1307 | char *name; |
1308 | RepOriginId origin; |
1309 | |
1310 | replorigin_check_prerequisites(true, false); |
1311 | |
1312 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1313 | origin = replorigin_by_name(name, false); |
1314 | replorigin_session_setup(origin); |
1315 | |
1316 | replorigin_session_origin = origin; |
1317 | |
1318 | pfree(name); |
1319 | |
1320 | PG_RETURN_VOID(); |
1321 | } |
1322 | |
1323 | /* |
1324 | * Reset previously setup origin in this session |
1325 | */ |
1326 | Datum |
1327 | pg_replication_origin_session_reset(PG_FUNCTION_ARGS) |
1328 | { |
1329 | replorigin_check_prerequisites(true, false); |
1330 | |
1331 | replorigin_session_reset(); |
1332 | |
1333 | replorigin_session_origin = InvalidRepOriginId; |
1334 | replorigin_session_origin_lsn = InvalidXLogRecPtr; |
1335 | replorigin_session_origin_timestamp = 0; |
1336 | |
1337 | PG_RETURN_VOID(); |
1338 | } |
1339 | |
1340 | /* |
1341 | * Has a replication origin been setup for this session. |
1342 | */ |
1343 | Datum |
1344 | pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS) |
1345 | { |
1346 | replorigin_check_prerequisites(false, false); |
1347 | |
1348 | PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId); |
1349 | } |
1350 | |
1351 | |
1352 | /* |
1353 | * Return the replication progress for origin setup in the current session. |
1354 | * |
1355 | * If 'flush' is set to true it is ensured that the returned value corresponds |
1356 | * to a local transaction that has been flushed. This is useful if asynchronous |
1357 | * commits are used when replaying replicated transactions. |
1358 | */ |
1359 | Datum |
1360 | pg_replication_origin_session_progress(PG_FUNCTION_ARGS) |
1361 | { |
1362 | XLogRecPtr remote_lsn = InvalidXLogRecPtr; |
1363 | bool flush = PG_GETARG_BOOL(0); |
1364 | |
1365 | replorigin_check_prerequisites(true, false); |
1366 | |
1367 | if (session_replication_state == NULL) |
1368 | ereport(ERROR, |
1369 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1370 | errmsg("no replication origin is configured" ))); |
1371 | |
1372 | remote_lsn = replorigin_session_get_progress(flush); |
1373 | |
1374 | if (remote_lsn == InvalidXLogRecPtr) |
1375 | PG_RETURN_NULL(); |
1376 | |
1377 | PG_RETURN_LSN(remote_lsn); |
1378 | } |
1379 | |
1380 | Datum |
1381 | pg_replication_origin_xact_setup(PG_FUNCTION_ARGS) |
1382 | { |
1383 | XLogRecPtr location = PG_GETARG_LSN(0); |
1384 | |
1385 | replorigin_check_prerequisites(true, false); |
1386 | |
1387 | if (session_replication_state == NULL) |
1388 | ereport(ERROR, |
1389 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1390 | errmsg("no replication origin is configured" ))); |
1391 | |
1392 | replorigin_session_origin_lsn = location; |
1393 | replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); |
1394 | |
1395 | PG_RETURN_VOID(); |
1396 | } |
1397 | |
1398 | Datum |
1399 | pg_replication_origin_xact_reset(PG_FUNCTION_ARGS) |
1400 | { |
1401 | replorigin_check_prerequisites(true, false); |
1402 | |
1403 | replorigin_session_origin_lsn = InvalidXLogRecPtr; |
1404 | replorigin_session_origin_timestamp = 0; |
1405 | |
1406 | PG_RETURN_VOID(); |
1407 | } |
1408 | |
1409 | |
1410 | Datum |
1411 | pg_replication_origin_advance(PG_FUNCTION_ARGS) |
1412 | { |
1413 | text *name = PG_GETARG_TEXT_PP(0); |
1414 | XLogRecPtr remote_commit = PG_GETARG_LSN(1); |
1415 | RepOriginId node; |
1416 | |
1417 | replorigin_check_prerequisites(true, false); |
1418 | |
1419 | /* lock to prevent the replication origin from vanishing */ |
1420 | LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); |
1421 | |
1422 | node = replorigin_by_name(text_to_cstring(name), false); |
1423 | |
1424 | /* |
1425 | * Can't sensibly pass a local commit to be flushed at checkpoint - this |
1426 | * xact hasn't committed yet. This is why this function should be used to |
1427 | * set up the initial replication state, but not for replay. |
1428 | */ |
1429 | replorigin_advance(node, remote_commit, InvalidXLogRecPtr, |
1430 | true /* go backward */ , true /* WAL log */ ); |
1431 | |
1432 | UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); |
1433 | |
1434 | PG_RETURN_VOID(); |
1435 | } |
1436 | |
1437 | |
1438 | /* |
1439 | * Return the replication progress for an individual replication origin. |
1440 | * |
1441 | * If 'flush' is set to true it is ensured that the returned value corresponds |
1442 | * to a local transaction that has been flushed. This is useful if asynchronous |
1443 | * commits are used when replaying replicated transactions. |
1444 | */ |
1445 | Datum |
1446 | pg_replication_origin_progress(PG_FUNCTION_ARGS) |
1447 | { |
1448 | char *name; |
1449 | bool flush; |
1450 | RepOriginId roident; |
1451 | XLogRecPtr remote_lsn = InvalidXLogRecPtr; |
1452 | |
1453 | replorigin_check_prerequisites(true, true); |
1454 | |
1455 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1456 | flush = PG_GETARG_BOOL(1); |
1457 | |
1458 | roident = replorigin_by_name(name, false); |
1459 | Assert(OidIsValid(roident)); |
1460 | |
1461 | remote_lsn = replorigin_get_progress(roident, flush); |
1462 | |
1463 | if (remote_lsn == InvalidXLogRecPtr) |
1464 | PG_RETURN_NULL(); |
1465 | |
1466 | PG_RETURN_LSN(remote_lsn); |
1467 | } |
1468 | |
1469 | |
1470 | Datum |
1471 | pg_show_replication_origin_status(PG_FUNCTION_ARGS) |
1472 | { |
1473 | ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
1474 | TupleDesc tupdesc; |
1475 | Tuplestorestate *tupstore; |
1476 | MemoryContext per_query_ctx; |
1477 | MemoryContext oldcontext; |
1478 | int i; |
1479 | #define REPLICATION_ORIGIN_PROGRESS_COLS 4 |
1480 | |
1481 | /* we want to return 0 rows if slot is set to zero */ |
1482 | replorigin_check_prerequisites(false, true); |
1483 | |
1484 | if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) |
1485 | ereport(ERROR, |
1486 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
1487 | errmsg("set-valued function called in context that cannot accept a set" ))); |
1488 | if (!(rsinfo->allowedModes & SFRM_Materialize)) |
1489 | ereport(ERROR, |
1490 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
1491 | errmsg("materialize mode required, but it is not allowed in this context" ))); |
1492 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
1493 | elog(ERROR, "return type must be a row type" ); |
1494 | |
1495 | if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS) |
1496 | elog(ERROR, "wrong function definition" ); |
1497 | |
1498 | per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; |
1499 | oldcontext = MemoryContextSwitchTo(per_query_ctx); |
1500 | |
1501 | tupstore = tuplestore_begin_heap(true, false, work_mem); |
1502 | rsinfo->returnMode = SFRM_Materialize; |
1503 | rsinfo->setResult = tupstore; |
1504 | rsinfo->setDesc = tupdesc; |
1505 | |
1506 | MemoryContextSwitchTo(oldcontext); |
1507 | |
1508 | |
1509 | /* prevent slots from being concurrently dropped */ |
1510 | LWLockAcquire(ReplicationOriginLock, LW_SHARED); |
1511 | |
1512 | /* |
1513 | * Iterate through all possible replication_states, display if they are |
1514 | * filled. Note that we do not take any locks, so slightly corrupted/out |
1515 | * of date values are a possibility. |
1516 | */ |
1517 | for (i = 0; i < max_replication_slots; i++) |
1518 | { |
1519 | ReplicationState *state; |
1520 | Datum values[REPLICATION_ORIGIN_PROGRESS_COLS]; |
1521 | bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS]; |
1522 | char *roname; |
1523 | |
1524 | state = &replication_states[i]; |
1525 | |
1526 | /* unused slot, nothing to display */ |
1527 | if (state->roident == InvalidRepOriginId) |
1528 | continue; |
1529 | |
1530 | memset(values, 0, sizeof(values)); |
1531 | memset(nulls, 1, sizeof(nulls)); |
1532 | |
1533 | values[0] = ObjectIdGetDatum(state->roident); |
1534 | nulls[0] = false; |
1535 | |
1536 | /* |
1537 | * We're not preventing the origin to be dropped concurrently, so |
1538 | * silently accept that it might be gone. |
1539 | */ |
1540 | if (replorigin_by_oid(state->roident, true, |
1541 | &roname)) |
1542 | { |
1543 | values[1] = CStringGetTextDatum(roname); |
1544 | nulls[1] = false; |
1545 | } |
1546 | |
1547 | LWLockAcquire(&state->lock, LW_SHARED); |
1548 | |
1549 | values[2] = LSNGetDatum(state->remote_lsn); |
1550 | nulls[2] = false; |
1551 | |
1552 | values[3] = LSNGetDatum(state->local_lsn); |
1553 | nulls[3] = false; |
1554 | |
1555 | LWLockRelease(&state->lock); |
1556 | |
1557 | tuplestore_putvalues(tupstore, tupdesc, values, nulls); |
1558 | } |
1559 | |
1560 | tuplestore_donestoring(tupstore); |
1561 | |
1562 | LWLockRelease(ReplicationOriginLock); |
1563 | |
1564 | #undef REPLICATION_ORIGIN_PROGRESS_COLS |
1565 | |
1566 | return (Datum) 0; |
1567 | } |
1568 | |