1/*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68#include "postgres.h"
69
70#include <unistd.h>
71#include <sys/stat.h>
72
73#include "funcapi.h"
74#include "miscadmin.h"
75
76#include "access/genam.h"
77#include "access/htup_details.h"
78#include "access/table.h"
79#include "access/xact.h"
80
81#include "catalog/catalog.h"
82#include "catalog/indexing.h"
83#include "nodes/execnodes.h"
84
85#include "replication/origin.h"
86#include "replication/logical.h"
87#include "pgstat.h"
88#include "storage/fd.h"
89#include "storage/ipc.h"
90#include "storage/lmgr.h"
91#include "storage/condition_variable.h"
92#include "storage/copydir.h"
93
94#include "utils/builtins.h"
95#include "utils/fmgroids.h"
96#include "utils/pg_lsn.h"
97#include "utils/rel.h"
98#include "utils/syscache.h"
99#include "utils/snapmgr.h"
100
101/*
102 * Replay progress of a single remote node.
103 */
104typedef struct ReplicationState
105{
106 /*
107 * Local identifier for the remote node.
108 */
109 RepOriginId roident;
110
111 /*
112 * Location of the latest commit from the remote side.
113 */
114 XLogRecPtr remote_lsn;
115
116 /*
117 * Remember the local lsn of the commit record so we can XLogFlush() to it
118 * during a checkpoint so we know the commit record actually is safe on
119 * disk.
120 */
121 XLogRecPtr local_lsn;
122
123 /*
124 * PID of backend that's acquired slot, or 0 if none.
125 */
126 int acquired_by;
127
128 /*
129 * Condition variable that's signalled when acquired_by changes.
130 */
131 ConditionVariable origin_cv;
132
133 /*
134 * Lock protecting remote_lsn and local_lsn.
135 */
136 LWLock lock;
137} ReplicationState;
138
139/*
140 * On disk version of ReplicationState.
141 */
142typedef struct ReplicationStateOnDisk
143{
144 RepOriginId roident;
145 XLogRecPtr remote_lsn;
146} ReplicationStateOnDisk;
147
148
149typedef struct ReplicationStateCtl
150{
151 int tranche_id;
152 ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
153} ReplicationStateCtl;
154
155/* external variables */
156RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
157XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
158TimestampTz replorigin_session_origin_timestamp = 0;
159
160/*
161 * Base address into a shared memory array of replication states of size
162 * max_replication_slots.
163 *
164 * XXX: Should we use a separate variable to size this rather than
165 * max_replication_slots?
166 */
167static ReplicationState *replication_states;
168static ReplicationStateCtl *replication_states_ctl;
169
170/*
171 * Backend-local, cached element from ReplicationState for use in a backend
172 * replaying remote commits, so we don't have to search ReplicationState for
173 * the backends current RepOriginId.
174 */
175static ReplicationState *session_replication_state = NULL;
176
177/* Magic for on disk files. */
178#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
179
180static void
181replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
182{
183 if (!superuser())
184 ereport(ERROR,
185 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
186 errmsg("only superusers can query or manipulate replication origins")));
187
188 if (check_slots && max_replication_slots == 0)
189 ereport(ERROR,
190 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
191 errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
192
193 if (!recoveryOK && RecoveryInProgress())
194 ereport(ERROR,
195 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
196 errmsg("cannot manipulate replication origins during recovery")));
197
198}
199
200
201/* ---------------------------------------------------------------------------
202 * Functions for working with replication origins themselves.
203 * ---------------------------------------------------------------------------
204 */
205
206/*
207 * Check for a persistent replication origin identified by name.
208 *
209 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
210 */
211RepOriginId
212replorigin_by_name(char *roname, bool missing_ok)
213{
214 Form_pg_replication_origin ident;
215 Oid roident = InvalidOid;
216 HeapTuple tuple;
217 Datum roname_d;
218
219 roname_d = CStringGetTextDatum(roname);
220
221 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
222 if (HeapTupleIsValid(tuple))
223 {
224 ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
225 roident = ident->roident;
226 ReleaseSysCache(tuple);
227 }
228 else if (!missing_ok)
229 ereport(ERROR,
230 (errcode(ERRCODE_UNDEFINED_OBJECT),
231 errmsg("replication origin \"%s\" does not exist",
232 roname)));
233
234 return roident;
235}
236
237/*
238 * Create a replication origin.
239 *
240 * Needs to be called in a transaction.
241 */
242RepOriginId
243replorigin_create(char *roname)
244{
245 Oid roident;
246 HeapTuple tuple = NULL;
247 Relation rel;
248 Datum roname_d;
249 SnapshotData SnapshotDirty;
250 SysScanDesc scan;
251 ScanKeyData key;
252
253 roname_d = CStringGetTextDatum(roname);
254
255 Assert(IsTransactionState());
256
257 /*
258 * We need the numeric replication origin to be 16bit wide, so we cannot
259 * rely on the normal oid allocation. Instead we simply scan
260 * pg_replication_origin for the first unused id. That's not particularly
261 * efficient, but this should be a fairly infrequent operation - we can
262 * easily spend a bit more code on this when it turns out it needs to be
263 * faster.
264 *
265 * We handle concurrency by taking an exclusive lock (allowing reads!)
266 * over the table for the duration of the search. Because we use a "dirty
267 * snapshot" we can read rows that other in-progress sessions have
268 * written, even though they would be invisible with normal snapshots. Due
269 * to the exclusive lock there's no danger that new rows can appear while
270 * we're checking.
271 */
272 InitDirtySnapshot(SnapshotDirty);
273
274 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
275
276 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
277 {
278 bool nulls[Natts_pg_replication_origin];
279 Datum values[Natts_pg_replication_origin];
280 bool collides;
281
282 CHECK_FOR_INTERRUPTS();
283
284 ScanKeyInit(&key,
285 Anum_pg_replication_origin_roident,
286 BTEqualStrategyNumber, F_OIDEQ,
287 ObjectIdGetDatum(roident));
288
289 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
290 true /* indexOK */ ,
291 &SnapshotDirty,
292 1, &key);
293
294 collides = HeapTupleIsValid(systable_getnext(scan));
295
296 systable_endscan(scan);
297
298 if (!collides)
299 {
300 /*
301 * Ok, found an unused roident, insert the new row and do a CCI,
302 * so our callers can look it up if they want to.
303 */
304 memset(&nulls, 0, sizeof(nulls));
305
306 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
307 values[Anum_pg_replication_origin_roname - 1] = roname_d;
308
309 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
310 CatalogTupleInsert(rel, tuple);
311 CommandCounterIncrement();
312 break;
313 }
314 }
315
316 /* now release lock again, */
317 table_close(rel, ExclusiveLock);
318
319 if (tuple == NULL)
320 ereport(ERROR,
321 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
322 errmsg("could not find free replication origin OID")));
323
324 heap_freetuple(tuple);
325 return roident;
326}
327
328
329/*
330 * Drop replication origin.
331 *
332 * Needs to be called in a transaction.
333 */
334void
335replorigin_drop(RepOriginId roident, bool nowait)
336{
337 HeapTuple tuple;
338 Relation rel;
339 int i;
340
341 Assert(IsTransactionState());
342
343 /*
344 * To interlock against concurrent drops, we hold ExclusiveLock on
345 * pg_replication_origin throughout this function.
346 */
347 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
348
349 /*
350 * First, clean up the slot state info, if there is any matching slot.
351 */
352restart:
353 tuple = NULL;
354 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
355
356 for (i = 0; i < max_replication_slots; i++)
357 {
358 ReplicationState *state = &replication_states[i];
359
360 if (state->roident == roident)
361 {
362 /* found our slot, is it busy? */
363 if (state->acquired_by != 0)
364 {
365 ConditionVariable *cv;
366
367 if (nowait)
368 ereport(ERROR,
369 (errcode(ERRCODE_OBJECT_IN_USE),
370 errmsg("could not drop replication origin with OID %d, in use by PID %d",
371 state->roident,
372 state->acquired_by)));
373
374 /*
375 * We must wait and then retry. Since we don't know which CV
376 * to wait on until here, we can't readily use
377 * ConditionVariablePrepareToSleep (calling it here would be
378 * wrong, since we could miss the signal if we did so); just
379 * use ConditionVariableSleep directly.
380 */
381 cv = &state->origin_cv;
382
383 LWLockRelease(ReplicationOriginLock);
384
385 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
386 goto restart;
387 }
388
389 /* first make a WAL log entry */
390 {
391 xl_replorigin_drop xlrec;
392
393 xlrec.node_id = roident;
394 XLogBeginInsert();
395 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
396 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
397 }
398
399 /* then clear the in-memory slot */
400 state->roident = InvalidRepOriginId;
401 state->remote_lsn = InvalidXLogRecPtr;
402 state->local_lsn = InvalidXLogRecPtr;
403 break;
404 }
405 }
406 LWLockRelease(ReplicationOriginLock);
407 ConditionVariableCancelSleep();
408
409 /*
410 * Now, we can delete the catalog entry.
411 */
412 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
413 if (!HeapTupleIsValid(tuple))
414 elog(ERROR, "cache lookup failed for replication origin with oid %u",
415 roident);
416
417 CatalogTupleDelete(rel, &tuple->t_self);
418 ReleaseSysCache(tuple);
419
420 CommandCounterIncrement();
421
422 /* now release lock again */
423 table_close(rel, ExclusiveLock);
424}
425
426
427/*
428 * Lookup replication origin via it's oid and return the name.
429 *
430 * The external name is palloc'd in the calling context.
431 *
432 * Returns true if the origin is known, false otherwise.
433 */
434bool
435replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
436{
437 HeapTuple tuple;
438 Form_pg_replication_origin ric;
439
440 Assert(OidIsValid((Oid) roident));
441 Assert(roident != InvalidRepOriginId);
442 Assert(roident != DoNotReplicateId);
443
444 tuple = SearchSysCache1(REPLORIGIDENT,
445 ObjectIdGetDatum((Oid) roident));
446
447 if (HeapTupleIsValid(tuple))
448 {
449 ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
450 *roname = text_to_cstring(&ric->roname);
451 ReleaseSysCache(tuple);
452
453 return true;
454 }
455 else
456 {
457 *roname = NULL;
458
459 if (!missing_ok)
460 ereport(ERROR,
461 (errcode(ERRCODE_UNDEFINED_OBJECT),
462 errmsg("replication origin with OID %u does not exist",
463 roident)));
464
465 return false;
466 }
467}
468
469
470/* ---------------------------------------------------------------------------
471 * Functions for handling replication progress.
472 * ---------------------------------------------------------------------------
473 */
474
475Size
476ReplicationOriginShmemSize(void)
477{
478 Size size = 0;
479
480 /*
481 * XXX: max_replication_slots is arguably the wrong thing to use, as here
482 * we keep the replay state of *remote* transactions. But for now it seems
483 * sufficient to reuse it, lest we introduce a separate GUC.
484 */
485 if (max_replication_slots == 0)
486 return size;
487
488 size = add_size(size, offsetof(ReplicationStateCtl, states));
489
490 size = add_size(size,
491 mul_size(max_replication_slots, sizeof(ReplicationState)));
492 return size;
493}
494
495void
496ReplicationOriginShmemInit(void)
497{
498 bool found;
499
500 if (max_replication_slots == 0)
501 return;
502
503 replication_states_ctl = (ReplicationStateCtl *)
504 ShmemInitStruct("ReplicationOriginState",
505 ReplicationOriginShmemSize(),
506 &found);
507 replication_states = replication_states_ctl->states;
508
509 if (!found)
510 {
511 int i;
512
513 replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
514
515 MemSet(replication_states, 0, ReplicationOriginShmemSize());
516
517 for (i = 0; i < max_replication_slots; i++)
518 {
519 LWLockInitialize(&replication_states[i].lock,
520 replication_states_ctl->tranche_id);
521 ConditionVariableInit(&replication_states[i].origin_cv);
522 }
523 }
524
525 LWLockRegisterTranche(replication_states_ctl->tranche_id,
526 "replication_origin");
527}
528
529/* ---------------------------------------------------------------------------
530 * Perform a checkpoint of each replication origin's progress with respect to
531 * the replayed remote_lsn. Make sure that all transactions we refer to in the
532 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
533 * if the transactions were originally committed asynchronously.
534 *
535 * We store checkpoints in the following format:
536 * +-------+------------------------+------------------+-----+--------+
537 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
538 * +-------+------------------------+------------------+-----+--------+
539 *
540 * So its just the magic, followed by the statically sized
541 * ReplicationStateOnDisk structs. Note that the maximum number of
542 * ReplicationState is determined by max_replication_slots.
543 * ---------------------------------------------------------------------------
544 */
545void
546CheckPointReplicationOrigin(void)
547{
548 const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
549 const char *path = "pg_logical/replorigin_checkpoint";
550 int tmpfd;
551 int i;
552 uint32 magic = REPLICATION_STATE_MAGIC;
553 pg_crc32c crc;
554
555 if (max_replication_slots == 0)
556 return;
557
558 INIT_CRC32C(crc);
559
560 /* make sure no old temp file is remaining */
561 if (unlink(tmppath) < 0 && errno != ENOENT)
562 ereport(PANIC,
563 (errcode_for_file_access(),
564 errmsg("could not remove file \"%s\": %m",
565 tmppath)));
566
567 /*
568 * no other backend can perform this at the same time, we're protected by
569 * CheckpointLock.
570 */
571 tmpfd = OpenTransientFile(tmppath,
572 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
573 if (tmpfd < 0)
574 ereport(PANIC,
575 (errcode_for_file_access(),
576 errmsg("could not create file \"%s\": %m",
577 tmppath)));
578
579 /* write magic */
580 errno = 0;
581 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
582 {
583 /* if write didn't set errno, assume problem is no disk space */
584 if (errno == 0)
585 errno = ENOSPC;
586 ereport(PANIC,
587 (errcode_for_file_access(),
588 errmsg("could not write to file \"%s\": %m",
589 tmppath)));
590 }
591 COMP_CRC32C(crc, &magic, sizeof(magic));
592
593 /* prevent concurrent creations/drops */
594 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
595
596 /* write actual data */
597 for (i = 0; i < max_replication_slots; i++)
598 {
599 ReplicationStateOnDisk disk_state;
600 ReplicationState *curstate = &replication_states[i];
601 XLogRecPtr local_lsn;
602
603 if (curstate->roident == InvalidRepOriginId)
604 continue;
605
606 /* zero, to avoid uninitialized padding bytes */
607 memset(&disk_state, 0, sizeof(disk_state));
608
609 LWLockAcquire(&curstate->lock, LW_SHARED);
610
611 disk_state.roident = curstate->roident;
612
613 disk_state.remote_lsn = curstate->remote_lsn;
614 local_lsn = curstate->local_lsn;
615
616 LWLockRelease(&curstate->lock);
617
618 /* make sure we only write out a commit that's persistent */
619 XLogFlush(local_lsn);
620
621 errno = 0;
622 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
623 sizeof(disk_state))
624 {
625 /* if write didn't set errno, assume problem is no disk space */
626 if (errno == 0)
627 errno = ENOSPC;
628 ereport(PANIC,
629 (errcode_for_file_access(),
630 errmsg("could not write to file \"%s\": %m",
631 tmppath)));
632 }
633
634 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
635 }
636
637 LWLockRelease(ReplicationOriginLock);
638
639 /* write out the CRC */
640 FIN_CRC32C(crc);
641 errno = 0;
642 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
643 {
644 /* if write didn't set errno, assume problem is no disk space */
645 if (errno == 0)
646 errno = ENOSPC;
647 ereport(PANIC,
648 (errcode_for_file_access(),
649 errmsg("could not write to file \"%s\": %m",
650 tmppath)));
651 }
652
653 if (CloseTransientFile(tmpfd))
654 ereport(PANIC,
655 (errcode_for_file_access(),
656 errmsg("could not close file \"%s\": %m",
657 tmppath)));
658
659 /* fsync, rename to permanent file, fsync file and directory */
660 durable_rename(tmppath, path, PANIC);
661}
662
663/*
664 * Recover replication replay status from checkpoint data saved earlier by
665 * CheckPointReplicationOrigin.
666 *
667 * This only needs to be called at startup and *not* during every checkpoint
668 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
669 * state thereafter can be recovered by looking at commit records.
670 */
671void
672StartupReplicationOrigin(void)
673{
674 const char *path = "pg_logical/replorigin_checkpoint";
675 int fd;
676 int readBytes;
677 uint32 magic = REPLICATION_STATE_MAGIC;
678 int last_state = 0;
679 pg_crc32c file_crc;
680 pg_crc32c crc;
681
682 /* don't want to overwrite already existing state */
683#ifdef USE_ASSERT_CHECKING
684 static bool already_started = false;
685
686 Assert(!already_started);
687 already_started = true;
688#endif
689
690 if (max_replication_slots == 0)
691 return;
692
693 INIT_CRC32C(crc);
694
695 elog(DEBUG2, "starting up replication origin progress state");
696
697 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
698
699 /*
700 * might have had max_replication_slots == 0 last run, or we just brought
701 * up a standby.
702 */
703 if (fd < 0 && errno == ENOENT)
704 return;
705 else if (fd < 0)
706 ereport(PANIC,
707 (errcode_for_file_access(),
708 errmsg("could not open file \"%s\": %m",
709 path)));
710
711 /* verify magic, that is written even if nothing was active */
712 readBytes = read(fd, &magic, sizeof(magic));
713 if (readBytes != sizeof(magic))
714 {
715 if (readBytes < 0)
716 ereport(PANIC,
717 (errcode_for_file_access(),
718 errmsg("could not read file \"%s\": %m",
719 path)));
720 else
721 ereport(PANIC,
722 (errcode(ERRCODE_DATA_CORRUPTED),
723 errmsg("could not read file \"%s\": read %d of %zu",
724 path, readBytes, sizeof(magic))));
725 }
726 COMP_CRC32C(crc, &magic, sizeof(magic));
727
728 if (magic != REPLICATION_STATE_MAGIC)
729 ereport(PANIC,
730 (errmsg("replication checkpoint has wrong magic %u instead of %u",
731 magic, REPLICATION_STATE_MAGIC)));
732
733 /* we can skip locking here, no other access is possible */
734
735 /* recover individual states, until there are no more to be found */
736 while (true)
737 {
738 ReplicationStateOnDisk disk_state;
739
740 readBytes = read(fd, &disk_state, sizeof(disk_state));
741
742 /* no further data */
743 if (readBytes == sizeof(crc))
744 {
745 /* not pretty, but simple ... */
746 file_crc = *(pg_crc32c *) &disk_state;
747 break;
748 }
749
750 if (readBytes < 0)
751 {
752 ereport(PANIC,
753 (errcode_for_file_access(),
754 errmsg("could not read file \"%s\": %m",
755 path)));
756 }
757
758 if (readBytes != sizeof(disk_state))
759 {
760 ereport(PANIC,
761 (errcode_for_file_access(),
762 errmsg("could not read file \"%s\": read %d of %zu",
763 path, readBytes, sizeof(disk_state))));
764 }
765
766 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
767
768 if (last_state == max_replication_slots)
769 ereport(PANIC,
770 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
771 errmsg("could not find free replication state, increase max_replication_slots")));
772
773 /* copy data to shared memory */
774 replication_states[last_state].roident = disk_state.roident;
775 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
776 last_state++;
777
778 elog(LOG, "recovered replication state of node %u to %X/%X",
779 disk_state.roident,
780 (uint32) (disk_state.remote_lsn >> 32),
781 (uint32) disk_state.remote_lsn);
782 }
783
784 /* now check checksum */
785 FIN_CRC32C(crc);
786 if (file_crc != crc)
787 ereport(PANIC,
788 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
789 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
790 crc, file_crc)));
791
792 if (CloseTransientFile(fd))
793 ereport(PANIC,
794 (errcode_for_file_access(),
795 errmsg("could not close file \"%s\": %m",
796 path)));
797}
798
799void
800replorigin_redo(XLogReaderState *record)
801{
802 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
803
804 switch (info)
805 {
806 case XLOG_REPLORIGIN_SET:
807 {
808 xl_replorigin_set *xlrec =
809 (xl_replorigin_set *) XLogRecGetData(record);
810
811 replorigin_advance(xlrec->node_id,
812 xlrec->remote_lsn, record->EndRecPtr,
813 xlrec->force /* backward */ ,
814 false /* WAL log */ );
815 break;
816 }
817 case XLOG_REPLORIGIN_DROP:
818 {
819 xl_replorigin_drop *xlrec;
820 int i;
821
822 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
823
824 for (i = 0; i < max_replication_slots; i++)
825 {
826 ReplicationState *state = &replication_states[i];
827
828 /* found our slot */
829 if (state->roident == xlrec->node_id)
830 {
831 /* reset entry */
832 state->roident = InvalidRepOriginId;
833 state->remote_lsn = InvalidXLogRecPtr;
834 state->local_lsn = InvalidXLogRecPtr;
835 break;
836 }
837 }
838 break;
839 }
840 default:
841 elog(PANIC, "replorigin_redo: unknown op code %u", info);
842 }
843}
844
845
846/*
847 * Tell the replication origin progress machinery that a commit from 'node'
848 * that originated at the LSN remote_commit on the remote node was replayed
849 * successfully and that we don't need to do so again. In combination with
850 * setting up replorigin_session_origin_lsn and replorigin_session_origin
851 * that ensures we won't loose knowledge about that after a crash if the
852 * transaction had a persistent effect (think of asynchronous commits).
853 *
854 * local_commit needs to be a local LSN of the commit so that we can make sure
855 * upon a checkpoint that enough WAL has been persisted to disk.
856 *
857 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
858 * unless running in recovery.
859 */
860void
861replorigin_advance(RepOriginId node,
862 XLogRecPtr remote_commit, XLogRecPtr local_commit,
863 bool go_backward, bool wal_log)
864{
865 int i;
866 ReplicationState *replication_state = NULL;
867 ReplicationState *free_state = NULL;
868
869 Assert(node != InvalidRepOriginId);
870
871 /* we don't track DoNotReplicateId */
872 if (node == DoNotReplicateId)
873 return;
874
875 /*
876 * XXX: For the case where this is called by WAL replay, it'd be more
877 * efficient to restore into a backend local hashtable and only dump into
878 * shmem after recovery is finished. Let's wait with implementing that
879 * till it's shown to be a measurable expense
880 */
881
882 /* Lock exclusively, as we may have to create a new table entry. */
883 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
884
885 /*
886 * Search for either an existing slot for the origin, or a free one we can
887 * use.
888 */
889 for (i = 0; i < max_replication_slots; i++)
890 {
891 ReplicationState *curstate = &replication_states[i];
892
893 /* remember where to insert if necessary */
894 if (curstate->roident == InvalidRepOriginId &&
895 free_state == NULL)
896 {
897 free_state = curstate;
898 continue;
899 }
900
901 /* not our slot */
902 if (curstate->roident != node)
903 {
904 continue;
905 }
906
907 /* ok, found slot */
908 replication_state = curstate;
909
910 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
911
912 /* Make sure it's not used by somebody else */
913 if (replication_state->acquired_by != 0)
914 {
915 ereport(ERROR,
916 (errcode(ERRCODE_OBJECT_IN_USE),
917 errmsg("replication origin with OID %d is already active for PID %d",
918 replication_state->roident,
919 replication_state->acquired_by)));
920 }
921
922 break;
923 }
924
925 if (replication_state == NULL && free_state == NULL)
926 ereport(ERROR,
927 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
928 errmsg("could not find free replication state slot for replication origin with OID %u",
929 node),
930 errhint("Increase max_replication_slots and try again.")));
931
932 if (replication_state == NULL)
933 {
934 /* initialize new slot */
935 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
936 replication_state = free_state;
937 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
938 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
939 replication_state->roident = node;
940 }
941
942 Assert(replication_state->roident != InvalidRepOriginId);
943
944 /*
945 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
946 * and the standby gets the message. Primarily this will be called during
947 * WAL replay (of commit records) where no WAL logging is necessary.
948 */
949 if (wal_log)
950 {
951 xl_replorigin_set xlrec;
952
953 xlrec.remote_lsn = remote_commit;
954 xlrec.node_id = node;
955 xlrec.force = go_backward;
956
957 XLogBeginInsert();
958 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
959
960 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
961 }
962
963 /*
964 * Due to - harmless - race conditions during a checkpoint we could see
965 * values here that are older than the ones we already have in memory.
966 * Don't overwrite those.
967 */
968 if (go_backward || replication_state->remote_lsn < remote_commit)
969 replication_state->remote_lsn = remote_commit;
970 if (local_commit != InvalidXLogRecPtr &&
971 (go_backward || replication_state->local_lsn < local_commit))
972 replication_state->local_lsn = local_commit;
973 LWLockRelease(&replication_state->lock);
974
975 /*
976 * Release *after* changing the LSNs, slot isn't acquired and thus could
977 * otherwise be dropped anytime.
978 */
979 LWLockRelease(ReplicationOriginLock);
980}
981
982
983XLogRecPtr
984replorigin_get_progress(RepOriginId node, bool flush)
985{
986 int i;
987 XLogRecPtr local_lsn = InvalidXLogRecPtr;
988 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
989
990 /* prevent slots from being concurrently dropped */
991 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
992
993 for (i = 0; i < max_replication_slots; i++)
994 {
995 ReplicationState *state;
996
997 state = &replication_states[i];
998
999 if (state->roident == node)
1000 {
1001 LWLockAcquire(&state->lock, LW_SHARED);
1002
1003 remote_lsn = state->remote_lsn;
1004 local_lsn = state->local_lsn;
1005
1006 LWLockRelease(&state->lock);
1007
1008 break;
1009 }
1010 }
1011
1012 LWLockRelease(ReplicationOriginLock);
1013
1014 if (flush && local_lsn != InvalidXLogRecPtr)
1015 XLogFlush(local_lsn);
1016
1017 return remote_lsn;
1018}
1019
1020/*
1021 * Tear down a (possibly) configured session replication origin during process
1022 * exit.
1023 */
1024static void
1025ReplicationOriginExitCleanup(int code, Datum arg)
1026{
1027 ConditionVariable *cv = NULL;
1028
1029 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1030
1031 if (session_replication_state != NULL &&
1032 session_replication_state->acquired_by == MyProcPid)
1033 {
1034 cv = &session_replication_state->origin_cv;
1035
1036 session_replication_state->acquired_by = 0;
1037 session_replication_state = NULL;
1038 }
1039
1040 LWLockRelease(ReplicationOriginLock);
1041
1042 if (cv)
1043 ConditionVariableBroadcast(cv);
1044}
1045
1046/*
1047 * Setup a replication origin in the shared memory struct if it doesn't
1048 * already exists and cache access to the specific ReplicationSlot so the
1049 * array doesn't have to be searched when calling
1050 * replorigin_session_advance().
1051 *
1052 * Obviously only one such cached origin can exist per process and the current
1053 * cached value can only be set again after the previous value is torn down
1054 * with replorigin_session_reset().
1055 */
1056void
1057replorigin_session_setup(RepOriginId node)
1058{
1059 static bool registered_cleanup;
1060 int i;
1061 int free_slot = -1;
1062
1063 if (!registered_cleanup)
1064 {
1065 on_shmem_exit(ReplicationOriginExitCleanup, 0);
1066 registered_cleanup = true;
1067 }
1068
1069 Assert(max_replication_slots > 0);
1070
1071 if (session_replication_state != NULL)
1072 ereport(ERROR,
1073 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1074 errmsg("cannot setup replication origin when one is already setup")));
1075
1076 /* Lock exclusively, as we may have to create a new table entry. */
1077 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1078
1079 /*
1080 * Search for either an existing slot for the origin, or a free one we can
1081 * use.
1082 */
1083 for (i = 0; i < max_replication_slots; i++)
1084 {
1085 ReplicationState *curstate = &replication_states[i];
1086
1087 /* remember where to insert if necessary */
1088 if (curstate->roident == InvalidRepOriginId &&
1089 free_slot == -1)
1090 {
1091 free_slot = i;
1092 continue;
1093 }
1094
1095 /* not our slot */
1096 if (curstate->roident != node)
1097 continue;
1098
1099 else if (curstate->acquired_by != 0)
1100 {
1101 ereport(ERROR,
1102 (errcode(ERRCODE_OBJECT_IN_USE),
1103 errmsg("replication origin %d is already active for PID %d",
1104 curstate->roident, curstate->acquired_by)));
1105 }
1106
1107 /* ok, found slot */
1108 session_replication_state = curstate;
1109 }
1110
1111
1112 if (session_replication_state == NULL && free_slot == -1)
1113 ereport(ERROR,
1114 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1115 errmsg("could not find free replication state slot for replication origin with OID %u",
1116 node),
1117 errhint("Increase max_replication_slots and try again.")));
1118 else if (session_replication_state == NULL)
1119 {
1120 /* initialize new slot */
1121 session_replication_state = &replication_states[free_slot];
1122 Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1123 Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1124 session_replication_state->roident = node;
1125 }
1126
1127
1128 Assert(session_replication_state->roident != InvalidRepOriginId);
1129
1130 session_replication_state->acquired_by = MyProcPid;
1131
1132 LWLockRelease(ReplicationOriginLock);
1133
1134 /* probably this one is pointless */
1135 ConditionVariableBroadcast(&session_replication_state->origin_cv);
1136}
1137
1138/*
1139 * Reset replay state previously setup in this session.
1140 *
1141 * This function may only be called if an origin was setup with
1142 * replorigin_session_setup().
1143 */
1144void
1145replorigin_session_reset(void)
1146{
1147 ConditionVariable *cv;
1148
1149 Assert(max_replication_slots != 0);
1150
1151 if (session_replication_state == NULL)
1152 ereport(ERROR,
1153 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1154 errmsg("no replication origin is configured")));
1155
1156 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1157
1158 session_replication_state->acquired_by = 0;
1159 cv = &session_replication_state->origin_cv;
1160 session_replication_state = NULL;
1161
1162 LWLockRelease(ReplicationOriginLock);
1163
1164 ConditionVariableBroadcast(cv);
1165}
1166
1167/*
1168 * Do the same work replorigin_advance() does, just on the session's
1169 * configured origin.
1170 *
1171 * This is noticeably cheaper than using replorigin_advance().
1172 */
1173void
1174replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1175{
1176 Assert(session_replication_state != NULL);
1177 Assert(session_replication_state->roident != InvalidRepOriginId);
1178
1179 LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1180 if (session_replication_state->local_lsn < local_commit)
1181 session_replication_state->local_lsn = local_commit;
1182 if (session_replication_state->remote_lsn < remote_commit)
1183 session_replication_state->remote_lsn = remote_commit;
1184 LWLockRelease(&session_replication_state->lock);
1185}
1186
1187/*
1188 * Ask the machinery about the point up to which we successfully replayed
1189 * changes from an already setup replication origin.
1190 */
1191XLogRecPtr
1192replorigin_session_get_progress(bool flush)
1193{
1194 XLogRecPtr remote_lsn;
1195 XLogRecPtr local_lsn;
1196
1197 Assert(session_replication_state != NULL);
1198
1199 LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1200 remote_lsn = session_replication_state->remote_lsn;
1201 local_lsn = session_replication_state->local_lsn;
1202 LWLockRelease(&session_replication_state->lock);
1203
1204 if (flush && local_lsn != InvalidXLogRecPtr)
1205 XLogFlush(local_lsn);
1206
1207 return remote_lsn;
1208}
1209
1210
1211
1212/* ---------------------------------------------------------------------------
1213 * SQL functions for working with replication origin.
1214 *
1215 * These mostly should be fairly short wrappers around more generic functions.
1216 * ---------------------------------------------------------------------------
1217 */
1218
1219/*
1220 * Create replication origin for the passed in name, and return the assigned
1221 * oid.
1222 */
1223Datum
1224pg_replication_origin_create(PG_FUNCTION_ARGS)
1225{
1226 char *name;
1227 RepOriginId roident;
1228
1229 replorigin_check_prerequisites(false, false);
1230
1231 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1232
1233 /* Replication origins "pg_xxx" are reserved for internal use */
1234 if (IsReservedName(name))
1235 ereport(ERROR,
1236 (errcode(ERRCODE_RESERVED_NAME),
1237 errmsg("replication origin name \"%s\" is reserved",
1238 name),
1239 errdetail("Origin names starting with \"pg_\" are reserved.")));
1240
1241 /*
1242 * If built with appropriate switch, whine when regression-testing
1243 * conventions for replication origin names are violated.
1244 */
1245#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1246 if (strncmp(name, "regress_", 8) != 0)
1247 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1248#endif
1249
1250 roident = replorigin_create(name);
1251
1252 pfree(name);
1253
1254 PG_RETURN_OID(roident);
1255}
1256
1257/*
1258 * Drop replication origin.
1259 */
1260Datum
1261pg_replication_origin_drop(PG_FUNCTION_ARGS)
1262{
1263 char *name;
1264 RepOriginId roident;
1265
1266 replorigin_check_prerequisites(false, false);
1267
1268 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1269
1270 roident = replorigin_by_name(name, false);
1271 Assert(OidIsValid(roident));
1272
1273 replorigin_drop(roident, true);
1274
1275 pfree(name);
1276
1277 PG_RETURN_VOID();
1278}
1279
1280/*
1281 * Return oid of a replication origin.
1282 */
1283Datum
1284pg_replication_origin_oid(PG_FUNCTION_ARGS)
1285{
1286 char *name;
1287 RepOriginId roident;
1288
1289 replorigin_check_prerequisites(false, false);
1290
1291 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1292 roident = replorigin_by_name(name, true);
1293
1294 pfree(name);
1295
1296 if (OidIsValid(roident))
1297 PG_RETURN_OID(roident);
1298 PG_RETURN_NULL();
1299}
1300
1301/*
1302 * Setup a replication origin for this session.
1303 */
1304Datum
1305pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1306{
1307 char *name;
1308 RepOriginId origin;
1309
1310 replorigin_check_prerequisites(true, false);
1311
1312 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1313 origin = replorigin_by_name(name, false);
1314 replorigin_session_setup(origin);
1315
1316 replorigin_session_origin = origin;
1317
1318 pfree(name);
1319
1320 PG_RETURN_VOID();
1321}
1322
1323/*
1324 * Reset previously setup origin in this session
1325 */
1326Datum
1327pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1328{
1329 replorigin_check_prerequisites(true, false);
1330
1331 replorigin_session_reset();
1332
1333 replorigin_session_origin = InvalidRepOriginId;
1334 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1335 replorigin_session_origin_timestamp = 0;
1336
1337 PG_RETURN_VOID();
1338}
1339
1340/*
1341 * Has a replication origin been setup for this session.
1342 */
1343Datum
1344pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1345{
1346 replorigin_check_prerequisites(false, false);
1347
1348 PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1349}
1350
1351
1352/*
1353 * Return the replication progress for origin setup in the current session.
1354 *
1355 * If 'flush' is set to true it is ensured that the returned value corresponds
1356 * to a local transaction that has been flushed. This is useful if asynchronous
1357 * commits are used when replaying replicated transactions.
1358 */
1359Datum
1360pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1361{
1362 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1363 bool flush = PG_GETARG_BOOL(0);
1364
1365 replorigin_check_prerequisites(true, false);
1366
1367 if (session_replication_state == NULL)
1368 ereport(ERROR,
1369 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1370 errmsg("no replication origin is configured")));
1371
1372 remote_lsn = replorigin_session_get_progress(flush);
1373
1374 if (remote_lsn == InvalidXLogRecPtr)
1375 PG_RETURN_NULL();
1376
1377 PG_RETURN_LSN(remote_lsn);
1378}
1379
1380Datum
1381pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1382{
1383 XLogRecPtr location = PG_GETARG_LSN(0);
1384
1385 replorigin_check_prerequisites(true, false);
1386
1387 if (session_replication_state == NULL)
1388 ereport(ERROR,
1389 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1390 errmsg("no replication origin is configured")));
1391
1392 replorigin_session_origin_lsn = location;
1393 replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1394
1395 PG_RETURN_VOID();
1396}
1397
1398Datum
1399pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1400{
1401 replorigin_check_prerequisites(true, false);
1402
1403 replorigin_session_origin_lsn = InvalidXLogRecPtr;
1404 replorigin_session_origin_timestamp = 0;
1405
1406 PG_RETURN_VOID();
1407}
1408
1409
1410Datum
1411pg_replication_origin_advance(PG_FUNCTION_ARGS)
1412{
1413 text *name = PG_GETARG_TEXT_PP(0);
1414 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1415 RepOriginId node;
1416
1417 replorigin_check_prerequisites(true, false);
1418
1419 /* lock to prevent the replication origin from vanishing */
1420 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1421
1422 node = replorigin_by_name(text_to_cstring(name), false);
1423
1424 /*
1425 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1426 * xact hasn't committed yet. This is why this function should be used to
1427 * set up the initial replication state, but not for replay.
1428 */
1429 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1430 true /* go backward */ , true /* WAL log */ );
1431
1432 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1433
1434 PG_RETURN_VOID();
1435}
1436
1437
1438/*
1439 * Return the replication progress for an individual replication origin.
1440 *
1441 * If 'flush' is set to true it is ensured that the returned value corresponds
1442 * to a local transaction that has been flushed. This is useful if asynchronous
1443 * commits are used when replaying replicated transactions.
1444 */
1445Datum
1446pg_replication_origin_progress(PG_FUNCTION_ARGS)
1447{
1448 char *name;
1449 bool flush;
1450 RepOriginId roident;
1451 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1452
1453 replorigin_check_prerequisites(true, true);
1454
1455 name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1456 flush = PG_GETARG_BOOL(1);
1457
1458 roident = replorigin_by_name(name, false);
1459 Assert(OidIsValid(roident));
1460
1461 remote_lsn = replorigin_get_progress(roident, flush);
1462
1463 if (remote_lsn == InvalidXLogRecPtr)
1464 PG_RETURN_NULL();
1465
1466 PG_RETURN_LSN(remote_lsn);
1467}
1468
1469
1470Datum
1471pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1472{
1473 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1474 TupleDesc tupdesc;
1475 Tuplestorestate *tupstore;
1476 MemoryContext per_query_ctx;
1477 MemoryContext oldcontext;
1478 int i;
1479#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1480
1481 /* we want to return 0 rows if slot is set to zero */
1482 replorigin_check_prerequisites(false, true);
1483
1484 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1485 ereport(ERROR,
1486 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1487 errmsg("set-valued function called in context that cannot accept a set")));
1488 if (!(rsinfo->allowedModes & SFRM_Materialize))
1489 ereport(ERROR,
1490 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1491 errmsg("materialize mode required, but it is not allowed in this context")));
1492 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1493 elog(ERROR, "return type must be a row type");
1494
1495 if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1496 elog(ERROR, "wrong function definition");
1497
1498 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1499 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1500
1501 tupstore = tuplestore_begin_heap(true, false, work_mem);
1502 rsinfo->returnMode = SFRM_Materialize;
1503 rsinfo->setResult = tupstore;
1504 rsinfo->setDesc = tupdesc;
1505
1506 MemoryContextSwitchTo(oldcontext);
1507
1508
1509 /* prevent slots from being concurrently dropped */
1510 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1511
1512 /*
1513 * Iterate through all possible replication_states, display if they are
1514 * filled. Note that we do not take any locks, so slightly corrupted/out
1515 * of date values are a possibility.
1516 */
1517 for (i = 0; i < max_replication_slots; i++)
1518 {
1519 ReplicationState *state;
1520 Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1521 bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1522 char *roname;
1523
1524 state = &replication_states[i];
1525
1526 /* unused slot, nothing to display */
1527 if (state->roident == InvalidRepOriginId)
1528 continue;
1529
1530 memset(values, 0, sizeof(values));
1531 memset(nulls, 1, sizeof(nulls));
1532
1533 values[0] = ObjectIdGetDatum(state->roident);
1534 nulls[0] = false;
1535
1536 /*
1537 * We're not preventing the origin to be dropped concurrently, so
1538 * silently accept that it might be gone.
1539 */
1540 if (replorigin_by_oid(state->roident, true,
1541 &roname))
1542 {
1543 values[1] = CStringGetTextDatum(roname);
1544 nulls[1] = false;
1545 }
1546
1547 LWLockAcquire(&state->lock, LW_SHARED);
1548
1549 values[2] = LSNGetDatum(state->remote_lsn);
1550 nulls[2] = false;
1551
1552 values[3] = LSNGetDatum(state->local_lsn);
1553 nulls[3] = false;
1554
1555 LWLockRelease(&state->lock);
1556
1557 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1558 }
1559
1560 tuplestore_donestoring(tupstore);
1561
1562 LWLockRelease(ReplicationOriginLock);
1563
1564#undef REPLICATION_ORIGIN_PROGRESS_COLS
1565
1566 return (Datum) 0;
1567}
1568