1/*-------------------------------------------------------------------------
2 *
3 * slotfuncs.c
4 * Support functions for replication slots
5 *
6 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/slotfuncs.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "access/htup_details.h"
16#include "access/xlog_internal.h"
17#include "funcapi.h"
18#include "miscadmin.h"
19#include "replication/decode.h"
20#include "replication/slot.h"
21#include "replication/logical.h"
22#include "replication/logicalfuncs.h"
23#include "utils/builtins.h"
24#include "utils/inval.h"
25#include "utils/pg_lsn.h"
26#include "utils/resowner.h"
27
28static void
29check_permissions(void)
30{
31 if (!superuser() && !has_rolreplication(GetUserId()))
32 ereport(ERROR,
33 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
34 (errmsg("must be superuser or replication role to use replication slots"))));
35}
36
37/*
38 * Helper function for creating a new physical replication slot with
39 * given arguments. Note that this function doesn't release the created
40 * slot.
41 *
42 * If restart_lsn is a valid value, we use it without WAL reservation
43 * routine. So the caller must guarantee that WAL is available.
44 */
45static void
46create_physical_replication_slot(char *name, bool immediately_reserve,
47 bool temporary, XLogRecPtr restart_lsn)
48{
49 Assert(!MyReplicationSlot);
50
51 /* acquire replication slot, this will check for conflicting names */
52 ReplicationSlotCreate(name, false,
53 temporary ? RS_TEMPORARY : RS_PERSISTENT);
54
55 if (immediately_reserve)
56 {
57 /* Reserve WAL as the user asked for it */
58 if (XLogRecPtrIsInvalid(restart_lsn))
59 ReplicationSlotReserveWal();
60 else
61 MyReplicationSlot->data.restart_lsn = restart_lsn;
62
63 /* Write this slot to disk */
64 ReplicationSlotMarkDirty();
65 ReplicationSlotSave();
66 }
67}
68
69/*
70 * SQL function for creating a new physical (streaming replication)
71 * replication slot.
72 */
73Datum
74pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
75{
76 Name name = PG_GETARG_NAME(0);
77 bool immediately_reserve = PG_GETARG_BOOL(1);
78 bool temporary = PG_GETARG_BOOL(2);
79 Datum values[2];
80 bool nulls[2];
81 TupleDesc tupdesc;
82 HeapTuple tuple;
83 Datum result;
84
85 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
86 elog(ERROR, "return type must be a row type");
87
88 check_permissions();
89
90 CheckSlotRequirements();
91
92 create_physical_replication_slot(NameStr(*name),
93 immediately_reserve,
94 temporary,
95 InvalidXLogRecPtr);
96
97 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
98 nulls[0] = false;
99
100 if (immediately_reserve)
101 {
102 values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
103 nulls[1] = false;
104 }
105 else
106 nulls[1] = true;
107
108 tuple = heap_form_tuple(tupdesc, values, nulls);
109 result = HeapTupleGetDatum(tuple);
110
111 ReplicationSlotRelease();
112
113 PG_RETURN_DATUM(result);
114}
115
116
117/*
118 * Helper function for creating a new logical replication slot with
119 * given arguments. Note that this function doesn't release the created
120 * slot.
121 */
122static void
123create_logical_replication_slot(char *name, char *plugin,
124 bool temporary, XLogRecPtr restart_lsn)
125{
126 LogicalDecodingContext *ctx = NULL;
127
128 Assert(!MyReplicationSlot);
129
130 /*
131 * Acquire a logical decoding slot, this will check for conflicting names.
132 * Initially create persistent slot as ephemeral - that allows us to
133 * nicely handle errors during initialization because it'll get dropped if
134 * this transaction fails. We'll make it persistent at the end. Temporary
135 * slots can be created as temporary from beginning as they get dropped on
136 * error as well.
137 */
138 ReplicationSlotCreate(name, true,
139 temporary ? RS_TEMPORARY : RS_EPHEMERAL);
140
141 /*
142 * Create logical decoding context, to build the initial snapshot.
143 */
144 ctx = CreateInitDecodingContext(plugin, NIL,
145 false, /* do not build snapshot */
146 restart_lsn,
147 logical_read_local_xlog_page, NULL, NULL,
148 NULL);
149
150 /* build initial snapshot, might take a while */
151 DecodingContextFindStartpoint(ctx);
152
153 /* don't need the decoding context anymore */
154 FreeDecodingContext(ctx);
155}
156
157/*
158 * SQL function for creating a new logical replication slot.
159 */
160Datum
161pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
162{
163 Name name = PG_GETARG_NAME(0);
164 Name plugin = PG_GETARG_NAME(1);
165 bool temporary = PG_GETARG_BOOL(2);
166 Datum result;
167 TupleDesc tupdesc;
168 HeapTuple tuple;
169 Datum values[2];
170 bool nulls[2];
171
172 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
173 elog(ERROR, "return type must be a row type");
174
175 check_permissions();
176
177 CheckLogicalDecodingRequirements();
178
179 create_logical_replication_slot(NameStr(*name),
180 NameStr(*plugin),
181 temporary,
182 InvalidXLogRecPtr);
183
184 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
185 values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
186
187 memset(nulls, 0, sizeof(nulls));
188
189 tuple = heap_form_tuple(tupdesc, values, nulls);
190 result = HeapTupleGetDatum(tuple);
191
192 /* ok, slot is now fully created, mark it as persistent if needed */
193 if (!temporary)
194 ReplicationSlotPersist();
195 ReplicationSlotRelease();
196
197 PG_RETURN_DATUM(result);
198}
199
200
201/*
202 * SQL function for dropping a replication slot.
203 */
204Datum
205pg_drop_replication_slot(PG_FUNCTION_ARGS)
206{
207 Name name = PG_GETARG_NAME(0);
208
209 check_permissions();
210
211 CheckSlotRequirements();
212
213 ReplicationSlotDrop(NameStr(*name), true);
214
215 PG_RETURN_VOID();
216}
217
218/*
219 * pg_get_replication_slots - SQL SRF showing active replication slots.
220 */
221Datum
222pg_get_replication_slots(PG_FUNCTION_ARGS)
223{
224#define PG_GET_REPLICATION_SLOTS_COLS 11
225 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
226 TupleDesc tupdesc;
227 Tuplestorestate *tupstore;
228 MemoryContext per_query_ctx;
229 MemoryContext oldcontext;
230 int slotno;
231
232 /* check to see if caller supports us returning a tuplestore */
233 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
234 ereport(ERROR,
235 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
236 errmsg("set-valued function called in context that cannot accept a set")));
237 if (!(rsinfo->allowedModes & SFRM_Materialize))
238 ereport(ERROR,
239 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
240 errmsg("materialize mode required, but it is not " \
241 "allowed in this context")));
242
243 /* Build a tuple descriptor for our result type */
244 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
245 elog(ERROR, "return type must be a row type");
246
247 /*
248 * We don't require any special permission to see this function's data
249 * because nothing should be sensitive. The most critical being the slot
250 * name, which shouldn't contain anything particularly sensitive.
251 */
252
253 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
254 oldcontext = MemoryContextSwitchTo(per_query_ctx);
255
256 tupstore = tuplestore_begin_heap(true, false, work_mem);
257 rsinfo->returnMode = SFRM_Materialize;
258 rsinfo->setResult = tupstore;
259 rsinfo->setDesc = tupdesc;
260
261 MemoryContextSwitchTo(oldcontext);
262
263 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
264 for (slotno = 0; slotno < max_replication_slots; slotno++)
265 {
266 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
267 Datum values[PG_GET_REPLICATION_SLOTS_COLS];
268 bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
269
270 ReplicationSlotPersistency persistency;
271 TransactionId xmin;
272 TransactionId catalog_xmin;
273 XLogRecPtr restart_lsn;
274 XLogRecPtr confirmed_flush_lsn;
275 pid_t active_pid;
276 Oid database;
277 NameData slot_name;
278 NameData plugin;
279 int i;
280
281 if (!slot->in_use)
282 continue;
283
284 SpinLockAcquire(&slot->mutex);
285
286 xmin = slot->data.xmin;
287 catalog_xmin = slot->data.catalog_xmin;
288 database = slot->data.database;
289 restart_lsn = slot->data.restart_lsn;
290 confirmed_flush_lsn = slot->data.confirmed_flush;
291 namecpy(&slot_name, &slot->data.name);
292 namecpy(&plugin, &slot->data.plugin);
293 active_pid = slot->active_pid;
294 persistency = slot->data.persistency;
295
296 SpinLockRelease(&slot->mutex);
297
298 memset(nulls, 0, sizeof(nulls));
299
300 i = 0;
301 values[i++] = NameGetDatum(&slot_name);
302
303 if (database == InvalidOid)
304 nulls[i++] = true;
305 else
306 values[i++] = NameGetDatum(&plugin);
307
308 if (database == InvalidOid)
309 values[i++] = CStringGetTextDatum("physical");
310 else
311 values[i++] = CStringGetTextDatum("logical");
312
313 if (database == InvalidOid)
314 nulls[i++] = true;
315 else
316 values[i++] = database;
317
318 values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
319 values[i++] = BoolGetDatum(active_pid != 0);
320
321 if (active_pid != 0)
322 values[i++] = Int32GetDatum(active_pid);
323 else
324 nulls[i++] = true;
325
326 if (xmin != InvalidTransactionId)
327 values[i++] = TransactionIdGetDatum(xmin);
328 else
329 nulls[i++] = true;
330
331 if (catalog_xmin != InvalidTransactionId)
332 values[i++] = TransactionIdGetDatum(catalog_xmin);
333 else
334 nulls[i++] = true;
335
336 if (restart_lsn != InvalidXLogRecPtr)
337 values[i++] = LSNGetDatum(restart_lsn);
338 else
339 nulls[i++] = true;
340
341 if (confirmed_flush_lsn != InvalidXLogRecPtr)
342 values[i++] = LSNGetDatum(confirmed_flush_lsn);
343 else
344 nulls[i++] = true;
345
346 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
347 }
348 LWLockRelease(ReplicationSlotControlLock);
349
350 tuplestore_donestoring(tupstore);
351
352 return (Datum) 0;
353}
354
355/*
356 * Helper function for advancing our physical replication slot forward.
357 *
358 * The LSN position to move to is compared simply to the slot's restart_lsn,
359 * knowing that any position older than that would be removed by successive
360 * checkpoints.
361 */
362static XLogRecPtr
363pg_physical_replication_slot_advance(XLogRecPtr moveto)
364{
365 XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
366 XLogRecPtr retlsn = startlsn;
367
368 if (startlsn < moveto)
369 {
370 SpinLockAcquire(&MyReplicationSlot->mutex);
371 MyReplicationSlot->data.restart_lsn = moveto;
372 SpinLockRelease(&MyReplicationSlot->mutex);
373 retlsn = moveto;
374 }
375
376 return retlsn;
377}
378
379/*
380 * Helper function for advancing our logical replication slot forward.
381 *
382 * The slot's restart_lsn is used as start point for reading records,
383 * while confirmed_lsn is used as base point for the decoding context.
384 *
385 * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
386 * because we need to digest WAL to advance restart_lsn allowing to recycle
387 * WAL and removal of old catalog tuples. As decoding is done in fast_forward
388 * mode, no changes are generated anyway.
389 */
390static XLogRecPtr
391pg_logical_replication_slot_advance(XLogRecPtr moveto)
392{
393 LogicalDecodingContext *ctx;
394 ResourceOwner old_resowner = CurrentResourceOwner;
395 XLogRecPtr startlsn;
396 XLogRecPtr retlsn;
397
398 PG_TRY();
399 {
400 /*
401 * Create our decoding context in fast_forward mode, passing start_lsn
402 * as InvalidXLogRecPtr, so that we start processing from my slot's
403 * confirmed_flush.
404 */
405 ctx = CreateDecodingContext(InvalidXLogRecPtr,
406 NIL,
407 true, /* fast_forward */
408 logical_read_local_xlog_page,
409 NULL, NULL, NULL);
410
411 /*
412 * Start reading at the slot's restart_lsn, which we know to point to
413 * a valid record.
414 */
415 startlsn = MyReplicationSlot->data.restart_lsn;
416
417 /* Initialize our return value in case we don't do anything */
418 retlsn = MyReplicationSlot->data.confirmed_flush;
419
420 /* invalidate non-timetravel entries */
421 InvalidateSystemCaches();
422
423 /* Decode at least one record, until we run out of records */
424 while ((!XLogRecPtrIsInvalid(startlsn) &&
425 startlsn < moveto) ||
426 (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
427 ctx->reader->EndRecPtr < moveto))
428 {
429 char *errm = NULL;
430 XLogRecord *record;
431
432 /*
433 * Read records. No changes are generated in fast_forward mode,
434 * but snapbuilder/slot statuses are updated properly.
435 */
436 record = XLogReadRecord(ctx->reader, startlsn, &errm);
437 if (errm)
438 elog(ERROR, "%s", errm);
439
440 /* Read sequentially from now on */
441 startlsn = InvalidXLogRecPtr;
442
443 /*
444 * Process the record. Storage-level changes are ignored in
445 * fast_forward mode, but other modules (such as snapbuilder)
446 * might still have critical updates to do.
447 */
448 if (record)
449 LogicalDecodingProcessRecord(ctx, ctx->reader);
450
451 /* Stop once the requested target has been reached */
452 if (moveto <= ctx->reader->EndRecPtr)
453 break;
454
455 CHECK_FOR_INTERRUPTS();
456 }
457
458 /*
459 * Logical decoding could have clobbered CurrentResourceOwner during
460 * transaction management, so restore the executor's value. (This is
461 * a kluge, but it's not worth cleaning up right now.)
462 */
463 CurrentResourceOwner = old_resowner;
464
465 if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
466 {
467 LogicalConfirmReceivedLocation(moveto);
468
469 /*
470 * If only the confirmed_flush LSN has changed the slot won't get
471 * marked as dirty by the above. Callers on the walsender
472 * interface are expected to keep track of their own progress and
473 * don't need it written out. But SQL-interface users cannot
474 * specify their own start positions and it's harder for them to
475 * keep track of their progress, so we should make more of an
476 * effort to save it for them.
477 *
478 * Dirty the slot so it's written out at the next checkpoint.
479 * We'll still lose its position on crash, as documented, but it's
480 * better than always losing the position even on clean restart.
481 */
482 ReplicationSlotMarkDirty();
483 }
484
485 retlsn = MyReplicationSlot->data.confirmed_flush;
486
487 /* free context, call shutdown callback */
488 FreeDecodingContext(ctx);
489
490 InvalidateSystemCaches();
491 }
492 PG_CATCH();
493 {
494 /* clear all timetravel entries */
495 InvalidateSystemCaches();
496
497 PG_RE_THROW();
498 }
499 PG_END_TRY();
500
501 return retlsn;
502}
503
504/*
505 * SQL function for moving the position in a replication slot.
506 */
507Datum
508pg_replication_slot_advance(PG_FUNCTION_ARGS)
509{
510 Name slotname = PG_GETARG_NAME(0);
511 XLogRecPtr moveto = PG_GETARG_LSN(1);
512 XLogRecPtr endlsn;
513 XLogRecPtr minlsn;
514 TupleDesc tupdesc;
515 Datum values[2];
516 bool nulls[2];
517 HeapTuple tuple;
518 Datum result;
519
520 Assert(!MyReplicationSlot);
521
522 check_permissions();
523
524 if (XLogRecPtrIsInvalid(moveto))
525 ereport(ERROR,
526 (errmsg("invalid target WAL LSN")));
527
528 /* Build a tuple descriptor for our result type */
529 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
530 elog(ERROR, "return type must be a row type");
531
532 /*
533 * We can't move slot past what's been flushed/replayed so clamp the
534 * target position accordingly.
535 */
536 if (!RecoveryInProgress())
537 moveto = Min(moveto, GetFlushRecPtr());
538 else
539 moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
540
541 /* Acquire the slot so we "own" it */
542 ReplicationSlotAcquire(NameStr(*slotname), true);
543
544 /* A slot whose restart_lsn has never been reserved cannot be advanced */
545 if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
546 ereport(ERROR,
547 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548 errmsg("cannot advance replication slot that has not previously reserved WAL")));
549
550 /*
551 * Check if the slot is not moving backwards. Physical slots rely simply
552 * on restart_lsn as a minimum point, while logical slots have confirmed
553 * consumption up to confirmed_lsn, meaning that in both cases data older
554 * than that is not available anymore.
555 */
556 if (OidIsValid(MyReplicationSlot->data.database))
557 minlsn = MyReplicationSlot->data.confirmed_flush;
558 else
559 minlsn = MyReplicationSlot->data.restart_lsn;
560
561 if (moveto < minlsn)
562 ereport(ERROR,
563 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
564 errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
565 (uint32) (moveto >> 32), (uint32) moveto,
566 (uint32) (minlsn >> 32), (uint32) minlsn)));
567
568 /* Do the actual slot update, depending on the slot type */
569 if (OidIsValid(MyReplicationSlot->data.database))
570 endlsn = pg_logical_replication_slot_advance(moveto);
571 else
572 endlsn = pg_physical_replication_slot_advance(moveto);
573
574 values[0] = NameGetDatum(&MyReplicationSlot->data.name);
575 nulls[0] = false;
576
577 /* Update the on disk state when lsn was updated. */
578 if (XLogRecPtrIsInvalid(endlsn))
579 {
580 ReplicationSlotMarkDirty();
581 ReplicationSlotsComputeRequiredXmin(false);
582 ReplicationSlotsComputeRequiredLSN();
583 ReplicationSlotSave();
584 }
585
586 ReplicationSlotRelease();
587
588 /* Return the reached position. */
589 values[1] = LSNGetDatum(endlsn);
590 nulls[1] = false;
591
592 tuple = heap_form_tuple(tupdesc, values, nulls);
593 result = HeapTupleGetDatum(tuple);
594
595 PG_RETURN_DATUM(result);
596}
597
598/*
599 * Helper function of copying a replication slot.
600 */
601static Datum
602copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
603{
604 Name src_name = PG_GETARG_NAME(0);
605 Name dst_name = PG_GETARG_NAME(1);
606 ReplicationSlot *src = NULL;
607 XLogRecPtr src_restart_lsn;
608 bool src_islogical;
609 bool temporary;
610 char *plugin;
611 Datum values[2];
612 bool nulls[2];
613 Datum result;
614 TupleDesc tupdesc;
615 HeapTuple tuple;
616
617 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
618 elog(ERROR, "return type must be a row type");
619
620 check_permissions();
621
622 if (logical_slot)
623 CheckLogicalDecodingRequirements();
624 else
625 CheckSlotRequirements();
626
627 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
628
629 /*
630 * We need to prevent the source slot's reserved WAL from being removed,
631 * but we don't want to lock that slot for very long, and it can advance
632 * in the meantime. So obtain the source slot's data, and create a new
633 * slot using its restart_lsn. Afterwards we lock the source slot again
634 * and verify that the data we copied (name, type) has not changed
635 * incompatibly. No inconvenient WAL removal can occur once the new slot
636 * is created -- but since WAL removal could have occurred before we
637 * managed to create the new slot, we advance the new slot's restart_lsn
638 * to the source slot's updated restart_lsn the second time we lock it.
639 */
640 for (int i = 0; i < max_replication_slots; i++)
641 {
642 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
643
644 if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
645 {
646 SpinLockAcquire(&s->mutex);
647 src_islogical = SlotIsLogical(s);
648 src_restart_lsn = s->data.restart_lsn;
649 temporary = s->data.persistency == RS_TEMPORARY;
650 plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
651 SpinLockRelease(&s->mutex);
652
653 src = s;
654 break;
655 }
656 }
657
658 LWLockRelease(ReplicationSlotControlLock);
659
660 if (src == NULL)
661 ereport(ERROR,
662 (errcode(ERRCODE_UNDEFINED_OBJECT),
663 errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
664
665 /* Check type of replication slot */
666 if (src_islogical != logical_slot)
667 ereport(ERROR,
668 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
669 src_islogical ?
670 errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
671 NameStr(*src_name)) :
672 errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
673 NameStr(*src_name))));
674
675 /* Copying non-reserved slot doesn't make sense */
676 if (XLogRecPtrIsInvalid(src_restart_lsn))
677 {
678 Assert(!logical_slot);
679 ereport(ERROR,
680 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
681 (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
682 }
683
684 /* Overwrite params from optional arguments */
685 if (PG_NARGS() >= 3)
686 temporary = PG_GETARG_BOOL(2);
687 if (PG_NARGS() >= 4)
688 {
689 Assert(logical_slot);
690 plugin = NameStr(*(PG_GETARG_NAME(3)));
691 }
692
693 /* Create new slot and acquire it */
694 if (logical_slot)
695 create_logical_replication_slot(NameStr(*dst_name),
696 plugin,
697 temporary,
698 src_restart_lsn);
699 else
700 create_physical_replication_slot(NameStr(*dst_name),
701 true,
702 temporary,
703 src_restart_lsn);
704
705 /*
706 * Update the destination slot to current values of the source slot;
707 * recheck that the source slot is still the one we saw previously.
708 */
709 {
710 TransactionId copy_effective_xmin;
711 TransactionId copy_effective_catalog_xmin;
712 TransactionId copy_xmin;
713 TransactionId copy_catalog_xmin;
714 XLogRecPtr copy_restart_lsn;
715 bool copy_islogical;
716 char *copy_name;
717
718 /* Copy data of source slot again */
719 SpinLockAcquire(&src->mutex);
720 copy_effective_xmin = src->effective_xmin;
721 copy_effective_catalog_xmin = src->effective_catalog_xmin;
722
723 copy_xmin = src->data.xmin;
724 copy_catalog_xmin = src->data.catalog_xmin;
725 copy_restart_lsn = src->data.restart_lsn;
726
727 /* for existence check */
728 copy_name = pstrdup(NameStr(src->data.name));
729 copy_islogical = SlotIsLogical(src);
730 SpinLockRelease(&src->mutex);
731
732 /*
733 * Check if the source slot still exists and is valid. We regard it as
734 * invalid if the type of replication slot or name has been changed,
735 * or the restart_lsn either is invalid or has gone backward. (The
736 * restart_lsn could go backwards if the source slot is dropped and
737 * copied from an older slot during installation.)
738 *
739 * Since erroring out will release and drop the destination slot we
740 * don't need to release it here.
741 */
742 if (copy_restart_lsn < src_restart_lsn ||
743 src_islogical != copy_islogical ||
744 strcmp(copy_name, NameStr(*src_name)) != 0)
745 ereport(ERROR,
746 (errmsg("could not copy replication slot \"%s\"",
747 NameStr(*src_name)),
748 errdetail("The source replication slot was modified incompatibly during the copy operation.")));
749
750 /* Install copied values again */
751 SpinLockAcquire(&MyReplicationSlot->mutex);
752 MyReplicationSlot->effective_xmin = copy_effective_xmin;
753 MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
754
755 MyReplicationSlot->data.xmin = copy_xmin;
756 MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
757 MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
758 SpinLockRelease(&MyReplicationSlot->mutex);
759
760 ReplicationSlotMarkDirty();
761 ReplicationSlotsComputeRequiredXmin(false);
762 ReplicationSlotsComputeRequiredLSN();
763 ReplicationSlotSave();
764
765#ifdef USE_ASSERT_CHECKING
766 /* Check that the restart_lsn is available */
767 {
768 XLogSegNo segno;
769
770 XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
771 Assert(XLogGetLastRemovedSegno() < segno);
772 }
773#endif
774 }
775
776 /* target slot fully created, mark as persistent if needed */
777 if (logical_slot && !temporary)
778 ReplicationSlotPersist();
779
780 /* All done. Set up the return values */
781 values[0] = NameGetDatum(dst_name);
782 nulls[0] = false;
783 if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
784 {
785 values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
786 nulls[1] = false;
787 }
788 else
789 nulls[1] = true;
790
791 tuple = heap_form_tuple(tupdesc, values, nulls);
792 result = HeapTupleGetDatum(tuple);
793
794 ReplicationSlotRelease();
795
796 PG_RETURN_DATUM(result);
797}
798
799/* The wrappers below are all to appease opr_sanity */
800Datum
801pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
802{
803 return copy_replication_slot(fcinfo, true);
804}
805
806Datum
807pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
808{
809 return copy_replication_slot(fcinfo, true);
810}
811
812Datum
813pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
814{
815 return copy_replication_slot(fcinfo, true);
816}
817
818Datum
819pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
820{
821 return copy_replication_slot(fcinfo, false);
822}
823
824Datum
825pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
826{
827 return copy_replication_slot(fcinfo, false);
828}
829