1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 * directory. Inside that directory the state file will contain the slot's
25 * own data. Additional data can be stored alongside that file if required.
26 * While the server is running, the state data is also cached in memory for
27 * efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
43#include "access/xlog_internal.h"
44#include "common/string.h"
45#include "miscadmin.h"
46#include "pgstat.h"
47#include "replication/slot.h"
48#include "storage/fd.h"
49#include "storage/proc.h"
50#include "storage/procarray.h"
51#include "utils/builtins.h"
52
53/*
54 * Replication slot on-disk data structure.
55 */
56typedef struct ReplicationSlotOnDisk
57{
58 /* first part of this struct needs to be version independent */
59
60 /* data not covered by checksum */
61 uint32 magic;
62 pg_crc32c checksum;
63
64 /* data covered by checksum */
65 uint32 version;
66 uint32 length;
67
68 /*
69 * The actual data in the slot that follows can differ based on the above
70 * 'version'.
71 */
72
73 ReplicationSlotPersistentData slotdata;
74} ReplicationSlotOnDisk;
75
76/* size of version independent data */
77#define ReplicationSlotOnDiskConstantSize \
78 offsetof(ReplicationSlotOnDisk, slotdata)
79/* size of the part of the slot not covered by the checksum */
80#define SnapBuildOnDiskNotChecksummedSize \
81 offsetof(ReplicationSlotOnDisk, version)
82/* size of the part covered by the checksum */
83#define SnapBuildOnDiskChecksummedSize \
84 sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
85/* size of the slot data that is version dependent */
86#define ReplicationSlotOnDiskV2Size \
87 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
88
89#define SLOT_MAGIC 0x1051CA1 /* format identifier */
90#define SLOT_VERSION 2 /* version for new files */
91
92/* Control array for replication slot management */
93ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
94
95/* My backend's replication slot in the shared memory array */
96ReplicationSlot *MyReplicationSlot = NULL;
97
98/* GUCs */
99int max_replication_slots = 0; /* the maximum number of replication
100 * slots */
101
102static void ReplicationSlotDropAcquired(void);
103static void ReplicationSlotDropPtr(ReplicationSlot *slot);
104
105/* internal persistency functions */
106static void RestoreSlotFromDisk(const char *name);
107static void CreateSlotOnDisk(ReplicationSlot *slot);
108static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
109
110/*
111 * Report shared-memory space needed by ReplicationSlotShmemInit.
112 */
113Size
114ReplicationSlotsShmemSize(void)
115{
116 Size size = 0;
117
118 if (max_replication_slots == 0)
119 return size;
120
121 size = offsetof(ReplicationSlotCtlData, replication_slots);
122 size = add_size(size,
123 mul_size(max_replication_slots, sizeof(ReplicationSlot)));
124
125 return size;
126}
127
128/*
129 * Allocate and initialize walsender-related shared memory.
130 */
131void
132ReplicationSlotsShmemInit(void)
133{
134 bool found;
135
136 if (max_replication_slots == 0)
137 return;
138
139 ReplicationSlotCtl = (ReplicationSlotCtlData *)
140 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
141 &found);
142
143 LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
144 "replication_slot_io");
145
146 if (!found)
147 {
148 int i;
149
150 /* First time through, so initialize */
151 MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
152
153 for (i = 0; i < max_replication_slots; i++)
154 {
155 ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
156
157 /* everything else is zeroed by the memset above */
158 SpinLockInit(&slot->mutex);
159 LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160 ConditionVariableInit(&slot->active_cv);
161 }
162 }
163}
164
165/*
166 * Check whether the passed slot name is valid and report errors at elevel.
167 *
168 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
169 * the name to be used as a directory name on every supported OS.
170 *
171 * Returns whether the directory name is valid or not if elevel < ERROR.
172 */
173bool
174ReplicationSlotValidateName(const char *name, int elevel)
175{
176 const char *cp;
177
178 if (strlen(name) == 0)
179 {
180 ereport(elevel,
181 (errcode(ERRCODE_INVALID_NAME),
182 errmsg("replication slot name \"%s\" is too short",
183 name)));
184 return false;
185 }
186
187 if (strlen(name) >= NAMEDATALEN)
188 {
189 ereport(elevel,
190 (errcode(ERRCODE_NAME_TOO_LONG),
191 errmsg("replication slot name \"%s\" is too long",
192 name)));
193 return false;
194 }
195
196 for (cp = name; *cp; cp++)
197 {
198 if (!((*cp >= 'a' && *cp <= 'z')
199 || (*cp >= '0' && *cp <= '9')
200 || (*cp == '_')))
201 {
202 ereport(elevel,
203 (errcode(ERRCODE_INVALID_NAME),
204 errmsg("replication slot name \"%s\" contains invalid character",
205 name),
206 errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
207 return false;
208 }
209 }
210 return true;
211}
212
213/*
214 * Create a new replication slot and mark it as used by this backend.
215 *
216 * name: Name of the slot
217 * db_specific: logical decoding is db specific; if the slot is going to
218 * be used for that pass true, otherwise false.
219 */
220void
221ReplicationSlotCreate(const char *name, bool db_specific,
222 ReplicationSlotPersistency persistency)
223{
224 ReplicationSlot *slot = NULL;
225 int i;
226
227 Assert(MyReplicationSlot == NULL);
228
229 ReplicationSlotValidateName(name, ERROR);
230
231 /*
232 * If some other backend ran this code concurrently with us, we'd likely
233 * both allocate the same slot, and that would be bad. We'd also be at
234 * risk of missing a name collision. Also, we don't want to try to create
235 * a new slot while somebody's busy cleaning up an old one, because we
236 * might both be monkeying with the same directory.
237 */
238 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
239
240 /*
241 * Check for name collision, and identify an allocatable slot. We need to
242 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
243 * else can change the in_use flags while we're looking at them.
244 */
245 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
246 for (i = 0; i < max_replication_slots; i++)
247 {
248 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
249
250 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
251 ereport(ERROR,
252 (errcode(ERRCODE_DUPLICATE_OBJECT),
253 errmsg("replication slot \"%s\" already exists", name)));
254 if (!s->in_use && slot == NULL)
255 slot = s;
256 }
257 LWLockRelease(ReplicationSlotControlLock);
258
259 /* If all slots are in use, we're out of luck. */
260 if (slot == NULL)
261 ereport(ERROR,
262 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
263 errmsg("all replication slots are in use"),
264 errhint("Free one or increase max_replication_slots.")));
265
266 /*
267 * Since this slot is not in use, nobody should be looking at any part of
268 * it other than the in_use field unless they're trying to allocate it.
269 * And since we hold ReplicationSlotAllocationLock, nobody except us can
270 * be doing that. So it's safe to initialize the slot.
271 */
272 Assert(!slot->in_use);
273 Assert(slot->active_pid == 0);
274
275 /* first initialize persistent data */
276 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
277 StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
278 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
279 slot->data.persistency = persistency;
280
281 /* and then data only present in shared memory */
282 slot->just_dirtied = false;
283 slot->dirty = false;
284 slot->effective_xmin = InvalidTransactionId;
285 slot->effective_catalog_xmin = InvalidTransactionId;
286 slot->candidate_catalog_xmin = InvalidTransactionId;
287 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
288 slot->candidate_restart_valid = InvalidXLogRecPtr;
289 slot->candidate_restart_lsn = InvalidXLogRecPtr;
290
291 /*
292 * Create the slot on disk. We haven't actually marked the slot allocated
293 * yet, so no special cleanup is required if this errors out.
294 */
295 CreateSlotOnDisk(slot);
296
297 /*
298 * We need to briefly prevent any other backend from iterating over the
299 * slots while we flip the in_use flag. We also need to set the active
300 * flag while holding the ControlLock as otherwise a concurrent
301 * SlotAcquire() could acquire the slot as well.
302 */
303 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
304
305 slot->in_use = true;
306
307 /* We can now mark the slot active, and that makes it our slot. */
308 SpinLockAcquire(&slot->mutex);
309 Assert(slot->active_pid == 0);
310 slot->active_pid = MyProcPid;
311 SpinLockRelease(&slot->mutex);
312 MyReplicationSlot = slot;
313
314 LWLockRelease(ReplicationSlotControlLock);
315
316 /*
317 * Now that the slot has been marked as in_use and active, it's safe to
318 * let somebody else try to allocate a slot.
319 */
320 LWLockRelease(ReplicationSlotAllocationLock);
321
322 /* Let everybody know we've modified this slot */
323 ConditionVariableBroadcast(&slot->active_cv);
324}
325
326/*
327 * Find a previously created slot and mark it as used by this backend.
328 */
329void
330ReplicationSlotAcquire(const char *name, bool nowait)
331{
332 ReplicationSlot *slot;
333 int active_pid;
334 int i;
335
336retry:
337 Assert(MyReplicationSlot == NULL);
338
339 /*
340 * Search for the named slot and mark it active if we find it. If the
341 * slot is already active, we exit the loop with active_pid set to the PID
342 * of the backend that owns it.
343 */
344 active_pid = 0;
345 slot = NULL;
346 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
347 for (i = 0; i < max_replication_slots; i++)
348 {
349 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
350
351 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
352 {
353 /*
354 * This is the slot we want; check if it's active under some other
355 * process. In single user mode, we don't need this check.
356 */
357 if (IsUnderPostmaster)
358 {
359 /*
360 * Get ready to sleep on it in case it is active. (We may end
361 * up not sleeping, but we don't want to do this while holding
362 * the spinlock.)
363 */
364 ConditionVariablePrepareToSleep(&s->active_cv);
365
366 SpinLockAcquire(&s->mutex);
367
368 active_pid = s->active_pid;
369 if (active_pid == 0)
370 active_pid = s->active_pid = MyProcPid;
371
372 SpinLockRelease(&s->mutex);
373 }
374 else
375 active_pid = MyProcPid;
376 slot = s;
377
378 break;
379 }
380 }
381 LWLockRelease(ReplicationSlotControlLock);
382
383 /* If we did not find the slot, error out. */
384 if (slot == NULL)
385 ereport(ERROR,
386 (errcode(ERRCODE_UNDEFINED_OBJECT),
387 errmsg("replication slot \"%s\" does not exist", name)));
388
389 /*
390 * If we found the slot but it's already active in another backend, we
391 * either error out or retry after a short wait, as caller specified.
392 */
393 if (active_pid != MyProcPid)
394 {
395 if (nowait)
396 ereport(ERROR,
397 (errcode(ERRCODE_OBJECT_IN_USE),
398 errmsg("replication slot \"%s\" is active for PID %d",
399 name, active_pid)));
400
401 /* Wait here until we get signaled, and then restart */
402 ConditionVariableSleep(&slot->active_cv,
403 WAIT_EVENT_REPLICATION_SLOT_DROP);
404 ConditionVariableCancelSleep();
405 goto retry;
406 }
407 else
408 ConditionVariableCancelSleep(); /* no sleep needed after all */
409
410 /* Let everybody know we've modified this slot */
411 ConditionVariableBroadcast(&slot->active_cv);
412
413 /* We made this slot active, so it's ours now. */
414 MyReplicationSlot = slot;
415}
416
417/*
418 * Release the replication slot that this backend considers to own.
419 *
420 * This or another backend can re-acquire the slot later.
421 * Resources this slot requires will be preserved.
422 */
423void
424ReplicationSlotRelease(void)
425{
426 ReplicationSlot *slot = MyReplicationSlot;
427
428 Assert(slot != NULL && slot->active_pid != 0);
429
430 if (slot->data.persistency == RS_EPHEMERAL)
431 {
432 /*
433 * Delete the slot. There is no !PANIC case where this is allowed to
434 * fail, all that may happen is an incomplete cleanup of the on-disk
435 * data.
436 */
437 ReplicationSlotDropAcquired();
438 }
439
440 /*
441 * If slot needed to temporarily restrain both data and catalog xmin to
442 * create the catalog snapshot, remove that temporary constraint.
443 * Snapshots can only be exported while the initial snapshot is still
444 * acquired.
445 */
446 if (!TransactionIdIsValid(slot->data.xmin) &&
447 TransactionIdIsValid(slot->effective_xmin))
448 {
449 SpinLockAcquire(&slot->mutex);
450 slot->effective_xmin = InvalidTransactionId;
451 SpinLockRelease(&slot->mutex);
452 ReplicationSlotsComputeRequiredXmin(false);
453 }
454
455 if (slot->data.persistency == RS_PERSISTENT)
456 {
457 /*
458 * Mark persistent slot inactive. We're not freeing it, just
459 * disconnecting, but wake up others that may be waiting for it.
460 */
461 SpinLockAcquire(&slot->mutex);
462 slot->active_pid = 0;
463 SpinLockRelease(&slot->mutex);
464 ConditionVariableBroadcast(&slot->active_cv);
465 }
466
467 MyReplicationSlot = NULL;
468
469 /* might not have been set when we've been a plain slot */
470 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
471 MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
472 LWLockRelease(ProcArrayLock);
473}
474
475/*
476 * Cleanup all temporary slots created in current session.
477 */
478void
479ReplicationSlotCleanup(void)
480{
481 int i;
482
483 Assert(MyReplicationSlot == NULL);
484
485restart:
486 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487 for (i = 0; i < max_replication_slots; i++)
488 {
489 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
490
491 if (!s->in_use)
492 continue;
493
494 SpinLockAcquire(&s->mutex);
495 if (s->active_pid == MyProcPid)
496 {
497 Assert(s->data.persistency == RS_TEMPORARY);
498 SpinLockRelease(&s->mutex);
499 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
500
501 ReplicationSlotDropPtr(s);
502
503 ConditionVariableBroadcast(&s->active_cv);
504 goto restart;
505 }
506 else
507 SpinLockRelease(&s->mutex);
508 }
509
510 LWLockRelease(ReplicationSlotControlLock);
511}
512
513/*
514 * Permanently drop replication slot identified by the passed in name.
515 */
516void
517ReplicationSlotDrop(const char *name, bool nowait)
518{
519 Assert(MyReplicationSlot == NULL);
520
521 ReplicationSlotAcquire(name, nowait);
522
523 ReplicationSlotDropAcquired();
524}
525
526/*
527 * Permanently drop the currently acquired replication slot.
528 */
529static void
530ReplicationSlotDropAcquired(void)
531{
532 ReplicationSlot *slot = MyReplicationSlot;
533
534 Assert(MyReplicationSlot != NULL);
535
536 /* slot isn't acquired anymore */
537 MyReplicationSlot = NULL;
538
539 ReplicationSlotDropPtr(slot);
540}
541
542/*
543 * Permanently drop the replication slot which will be released by the point
544 * this function returns.
545 */
546static void
547ReplicationSlotDropPtr(ReplicationSlot *slot)
548{
549 char path[MAXPGPATH];
550 char tmppath[MAXPGPATH];
551
552 /*
553 * If some other backend ran this code concurrently with us, we might try
554 * to delete a slot with a certain name while someone else was trying to
555 * create a slot with the same name.
556 */
557 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
558
559 /* Generate pathnames. */
560 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
561 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
562
563 /*
564 * Rename the slot directory on disk, so that we'll no longer recognize
565 * this as a valid slot. Note that if this fails, we've got to mark the
566 * slot inactive before bailing out. If we're dropping an ephemeral or a
567 * temporary slot, we better never fail hard as the caller won't expect
568 * the slot to survive and this might get called during error handling.
569 */
570 if (rename(path, tmppath) == 0)
571 {
572 /*
573 * We need to fsync() the directory we just renamed and its parent to
574 * make sure that our changes are on disk in a crash-safe fashion. If
575 * fsync() fails, we can't be sure whether the changes are on disk or
576 * not. For now, we handle that by panicking;
577 * StartupReplicationSlots() will try to straighten it out after
578 * restart.
579 */
580 START_CRIT_SECTION();
581 fsync_fname(tmppath, true);
582 fsync_fname("pg_replslot", true);
583 END_CRIT_SECTION();
584 }
585 else
586 {
587 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
588
589 SpinLockAcquire(&slot->mutex);
590 slot->active_pid = 0;
591 SpinLockRelease(&slot->mutex);
592
593 /* wake up anyone waiting on this slot */
594 ConditionVariableBroadcast(&slot->active_cv);
595
596 ereport(fail_softly ? WARNING : ERROR,
597 (errcode_for_file_access(),
598 errmsg("could not rename file \"%s\" to \"%s\": %m",
599 path, tmppath)));
600 }
601
602 /*
603 * The slot is definitely gone. Lock out concurrent scans of the array
604 * long enough to kill it. It's OK to clear the active PID here without
605 * grabbing the mutex because nobody else can be scanning the array here,
606 * and nobody can be attached to this slot and thus access it without
607 * scanning the array.
608 *
609 * Also wake up processes waiting for it.
610 */
611 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
612 slot->active_pid = 0;
613 slot->in_use = false;
614 LWLockRelease(ReplicationSlotControlLock);
615 ConditionVariableBroadcast(&slot->active_cv);
616
617 /*
618 * Slot is dead and doesn't prevent resource removal anymore, recompute
619 * limits.
620 */
621 ReplicationSlotsComputeRequiredXmin(false);
622 ReplicationSlotsComputeRequiredLSN();
623
624 /*
625 * If removing the directory fails, the worst thing that will happen is
626 * that the user won't be able to create a new slot with the same name
627 * until the next server restart. We warn about it, but that's all.
628 */
629 if (!rmtree(tmppath, true))
630 ereport(WARNING,
631 (errmsg("could not remove directory \"%s\"", tmppath)));
632
633 /*
634 * We release this at the very end, so that nobody starts trying to create
635 * a slot while we're still cleaning up the detritus of the old one.
636 */
637 LWLockRelease(ReplicationSlotAllocationLock);
638}
639
640/*
641 * Serialize the currently acquired slot's state from memory to disk, thereby
642 * guaranteeing the current state will survive a crash.
643 */
644void
645ReplicationSlotSave(void)
646{
647 char path[MAXPGPATH];
648
649 Assert(MyReplicationSlot != NULL);
650
651 sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
652 SaveSlotToPath(MyReplicationSlot, path, ERROR);
653}
654
655/*
656 * Signal that it would be useful if the currently acquired slot would be
657 * flushed out to disk.
658 *
659 * Note that the actual flush to disk can be delayed for a long time, if
660 * required for correctness explicitly do a ReplicationSlotSave().
661 */
662void
663ReplicationSlotMarkDirty(void)
664{
665 ReplicationSlot *slot = MyReplicationSlot;
666
667 Assert(MyReplicationSlot != NULL);
668
669 SpinLockAcquire(&slot->mutex);
670 MyReplicationSlot->just_dirtied = true;
671 MyReplicationSlot->dirty = true;
672 SpinLockRelease(&slot->mutex);
673}
674
675/*
676 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
677 * guaranteeing it will be there after an eventual crash.
678 */
679void
680ReplicationSlotPersist(void)
681{
682 ReplicationSlot *slot = MyReplicationSlot;
683
684 Assert(slot != NULL);
685 Assert(slot->data.persistency != RS_PERSISTENT);
686
687 SpinLockAcquire(&slot->mutex);
688 slot->data.persistency = RS_PERSISTENT;
689 SpinLockRelease(&slot->mutex);
690
691 ReplicationSlotMarkDirty();
692 ReplicationSlotSave();
693}
694
695/*
696 * Compute the oldest xmin across all slots and store it in the ProcArray.
697 *
698 * If already_locked is true, ProcArrayLock has already been acquired
699 * exclusively.
700 */
701void
702ReplicationSlotsComputeRequiredXmin(bool already_locked)
703{
704 int i;
705 TransactionId agg_xmin = InvalidTransactionId;
706 TransactionId agg_catalog_xmin = InvalidTransactionId;
707
708 Assert(ReplicationSlotCtl != NULL);
709
710 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
711
712 for (i = 0; i < max_replication_slots; i++)
713 {
714 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
715 TransactionId effective_xmin;
716 TransactionId effective_catalog_xmin;
717
718 if (!s->in_use)
719 continue;
720
721 SpinLockAcquire(&s->mutex);
722 effective_xmin = s->effective_xmin;
723 effective_catalog_xmin = s->effective_catalog_xmin;
724 SpinLockRelease(&s->mutex);
725
726 /* check the data xmin */
727 if (TransactionIdIsValid(effective_xmin) &&
728 (!TransactionIdIsValid(agg_xmin) ||
729 TransactionIdPrecedes(effective_xmin, agg_xmin)))
730 agg_xmin = effective_xmin;
731
732 /* check the catalog xmin */
733 if (TransactionIdIsValid(effective_catalog_xmin) &&
734 (!TransactionIdIsValid(agg_catalog_xmin) ||
735 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
736 agg_catalog_xmin = effective_catalog_xmin;
737 }
738
739 LWLockRelease(ReplicationSlotControlLock);
740
741 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
742}
743
744/*
745 * Compute the oldest restart LSN across all slots and inform xlog module.
746 */
747void
748ReplicationSlotsComputeRequiredLSN(void)
749{
750 int i;
751 XLogRecPtr min_required = InvalidXLogRecPtr;
752
753 Assert(ReplicationSlotCtl != NULL);
754
755 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
756 for (i = 0; i < max_replication_slots; i++)
757 {
758 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
759 XLogRecPtr restart_lsn;
760
761 if (!s->in_use)
762 continue;
763
764 SpinLockAcquire(&s->mutex);
765 restart_lsn = s->data.restart_lsn;
766 SpinLockRelease(&s->mutex);
767
768 if (restart_lsn != InvalidXLogRecPtr &&
769 (min_required == InvalidXLogRecPtr ||
770 restart_lsn < min_required))
771 min_required = restart_lsn;
772 }
773 LWLockRelease(ReplicationSlotControlLock);
774
775 XLogSetReplicationSlotMinimumLSN(min_required);
776}
777
778/*
779 * Compute the oldest WAL LSN required by *logical* decoding slots..
780 *
781 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
782 * slots exist.
783 *
784 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
785 * ignores physical replication slots.
786 *
787 * The results aren't required frequently, so we don't maintain a precomputed
788 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
789 */
790XLogRecPtr
791ReplicationSlotsComputeLogicalRestartLSN(void)
792{
793 XLogRecPtr result = InvalidXLogRecPtr;
794 int i;
795
796 if (max_replication_slots <= 0)
797 return InvalidXLogRecPtr;
798
799 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
800
801 for (i = 0; i < max_replication_slots; i++)
802 {
803 ReplicationSlot *s;
804 XLogRecPtr restart_lsn;
805
806 s = &ReplicationSlotCtl->replication_slots[i];
807
808 /* cannot change while ReplicationSlotCtlLock is held */
809 if (!s->in_use)
810 continue;
811
812 /* we're only interested in logical slots */
813 if (!SlotIsLogical(s))
814 continue;
815
816 /* read once, it's ok if it increases while we're checking */
817 SpinLockAcquire(&s->mutex);
818 restart_lsn = s->data.restart_lsn;
819 SpinLockRelease(&s->mutex);
820
821 if (result == InvalidXLogRecPtr ||
822 restart_lsn < result)
823 result = restart_lsn;
824 }
825
826 LWLockRelease(ReplicationSlotControlLock);
827
828 return result;
829}
830
831/*
832 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
833 * passed database oid.
834 *
835 * Returns true if there are any slots referencing the database. *nslots will
836 * be set to the absolute number of slots in the database, *nactive to ones
837 * currently active.
838 */
839bool
840ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
841{
842 int i;
843
844 *nslots = *nactive = 0;
845
846 if (max_replication_slots <= 0)
847 return false;
848
849 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
850 for (i = 0; i < max_replication_slots; i++)
851 {
852 ReplicationSlot *s;
853
854 s = &ReplicationSlotCtl->replication_slots[i];
855
856 /* cannot change while ReplicationSlotCtlLock is held */
857 if (!s->in_use)
858 continue;
859
860 /* only logical slots are database specific, skip */
861 if (!SlotIsLogical(s))
862 continue;
863
864 /* not our database, skip */
865 if (s->data.database != dboid)
866 continue;
867
868 /* count slots with spinlock held */
869 SpinLockAcquire(&s->mutex);
870 (*nslots)++;
871 if (s->active_pid != 0)
872 (*nactive)++;
873 SpinLockRelease(&s->mutex);
874 }
875 LWLockRelease(ReplicationSlotControlLock);
876
877 if (*nslots > 0)
878 return true;
879 return false;
880}
881
882/*
883 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
884 * passed database oid. The caller should hold an exclusive lock on the
885 * pg_database oid for the database to prevent creation of new slots on the db
886 * or replay from existing slots.
887 *
888 * Another session that concurrently acquires an existing slot on the target DB
889 * (most likely to drop it) may cause this function to ERROR. If that happens
890 * it may have dropped some but not all slots.
891 *
892 * This routine isn't as efficient as it could be - but we don't drop
893 * databases often, especially databases with lots of slots.
894 */
895void
896ReplicationSlotsDropDBSlots(Oid dboid)
897{
898 int i;
899
900 if (max_replication_slots <= 0)
901 return;
902
903restart:
904 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
905 for (i = 0; i < max_replication_slots; i++)
906 {
907 ReplicationSlot *s;
908 char *slotname;
909 int active_pid;
910
911 s = &ReplicationSlotCtl->replication_slots[i];
912
913 /* cannot change while ReplicationSlotCtlLock is held */
914 if (!s->in_use)
915 continue;
916
917 /* only logical slots are database specific, skip */
918 if (!SlotIsLogical(s))
919 continue;
920
921 /* not our database, skip */
922 if (s->data.database != dboid)
923 continue;
924
925 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
926 SpinLockAcquire(&s->mutex);
927 /* can't change while ReplicationSlotControlLock is held */
928 slotname = NameStr(s->data.name);
929 active_pid = s->active_pid;
930 if (active_pid == 0)
931 {
932 MyReplicationSlot = s;
933 s->active_pid = MyProcPid;
934 }
935 SpinLockRelease(&s->mutex);
936
937 /*
938 * Even though we hold an exclusive lock on the database object a
939 * logical slot for that DB can still be active, e.g. if it's
940 * concurrently being dropped by a backend connected to another DB.
941 *
942 * That's fairly unlikely in practice, so we'll just bail out.
943 */
944 if (active_pid)
945 ereport(ERROR,
946 (errcode(ERRCODE_OBJECT_IN_USE),
947 errmsg("replication slot \"%s\" is active for PID %d",
948 slotname, active_pid)));
949
950 /*
951 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
952 * holding ReplicationSlotControlLock over filesystem operations,
953 * release ReplicationSlotControlLock and use
954 * ReplicationSlotDropAcquired.
955 *
956 * As that means the set of slots could change, restart scan from the
957 * beginning each time we release the lock.
958 */
959 LWLockRelease(ReplicationSlotControlLock);
960 ReplicationSlotDropAcquired();
961 goto restart;
962 }
963 LWLockRelease(ReplicationSlotControlLock);
964}
965
966
967/*
968 * Check whether the server's configuration supports using replication
969 * slots.
970 */
971void
972CheckSlotRequirements(void)
973{
974 /*
975 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
976 * needs the same check.
977 */
978
979 if (max_replication_slots == 0)
980 ereport(ERROR,
981 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
982 (errmsg("replication slots can only be used if max_replication_slots > 0"))));
983
984 if (wal_level < WAL_LEVEL_REPLICA)
985 ereport(ERROR,
986 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
987 errmsg("replication slots can only be used if wal_level >= replica")));
988}
989
990/*
991 * Reserve WAL for the currently active slot.
992 *
993 * Compute and set restart_lsn in a manner that's appropriate for the type of
994 * the slot and concurrency safe.
995 */
996void
997ReplicationSlotReserveWal(void)
998{
999 ReplicationSlot *slot = MyReplicationSlot;
1000
1001 Assert(slot != NULL);
1002 Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1003
1004 /*
1005 * The replication slot mechanism is used to prevent removal of required
1006 * WAL. As there is no interlock between this routine and checkpoints, WAL
1007 * segments could concurrently be removed when a now stale return value of
1008 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1009 * this happens we'll just retry.
1010 */
1011 while (true)
1012 {
1013 XLogSegNo segno;
1014 XLogRecPtr restart_lsn;
1015
1016 /*
1017 * For logical slots log a standby snapshot and start logical decoding
1018 * at exactly that position. That allows the slot to start up more
1019 * quickly.
1020 *
1021 * That's not needed (or indeed helpful) for physical slots as they'll
1022 * start replay at the last logged checkpoint anyway. Instead return
1023 * the location of the last redo LSN. While that slightly increases
1024 * the chance that we have to retry, it's where a base backup has to
1025 * start replay at.
1026 */
1027 if (!RecoveryInProgress() && SlotIsLogical(slot))
1028 {
1029 XLogRecPtr flushptr;
1030
1031 /* start at current insert position */
1032 restart_lsn = GetXLogInsertRecPtr();
1033 SpinLockAcquire(&slot->mutex);
1034 slot->data.restart_lsn = restart_lsn;
1035 SpinLockRelease(&slot->mutex);
1036
1037 /* make sure we have enough information to start */
1038 flushptr = LogStandbySnapshot();
1039
1040 /* and make sure it's fsynced to disk */
1041 XLogFlush(flushptr);
1042 }
1043 else
1044 {
1045 restart_lsn = GetRedoRecPtr();
1046 SpinLockAcquire(&slot->mutex);
1047 slot->data.restart_lsn = restart_lsn;
1048 SpinLockRelease(&slot->mutex);
1049 }
1050
1051 /* prevent WAL removal as fast as possible */
1052 ReplicationSlotsComputeRequiredLSN();
1053
1054 /*
1055 * If all required WAL is still there, great, otherwise retry. The
1056 * slot should prevent further removal of WAL, unless there's a
1057 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1058 * the new restart_lsn above, so normally we should never need to loop
1059 * more than twice.
1060 */
1061 XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1062 if (XLogGetLastRemovedSegno() < segno)
1063 break;
1064 }
1065}
1066
1067/*
1068 * Flush all replication slots to disk.
1069 *
1070 * This needn't actually be part of a checkpoint, but it's a convenient
1071 * location.
1072 */
1073void
1074CheckPointReplicationSlots(void)
1075{
1076 int i;
1077
1078 elog(DEBUG1, "performing replication slot checkpoint");
1079
1080 /*
1081 * Prevent any slot from being created/dropped while we're active. As we
1082 * explicitly do *not* want to block iterating over replication_slots or
1083 * acquiring a slot we cannot take the control lock - but that's OK,
1084 * because holding ReplicationSlotAllocationLock is strictly stronger, and
1085 * enough to guarantee that nobody can change the in_use bits on us.
1086 */
1087 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1088
1089 for (i = 0; i < max_replication_slots; i++)
1090 {
1091 ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1092 char path[MAXPGPATH];
1093
1094 if (!s->in_use)
1095 continue;
1096
1097 /* save the slot to disk, locking is handled in SaveSlotToPath() */
1098 sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1099 SaveSlotToPath(s, path, LOG);
1100 }
1101 LWLockRelease(ReplicationSlotAllocationLock);
1102}
1103
1104/*
1105 * Load all replication slots from disk into memory at server startup. This
1106 * needs to be run before we start crash recovery.
1107 */
1108void
1109StartupReplicationSlots(void)
1110{
1111 DIR *replication_dir;
1112 struct dirent *replication_de;
1113
1114 elog(DEBUG1, "starting up replication slots");
1115
1116 /* restore all slots by iterating over all on-disk entries */
1117 replication_dir = AllocateDir("pg_replslot");
1118 while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1119 {
1120 struct stat statbuf;
1121 char path[MAXPGPATH + 12];
1122
1123 if (strcmp(replication_de->d_name, ".") == 0 ||
1124 strcmp(replication_de->d_name, "..") == 0)
1125 continue;
1126
1127 snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1128
1129 /* we're only creating directories here, skip if it's not our's */
1130 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
1131 continue;
1132
1133 /* we crashed while a slot was being setup or deleted, clean up */
1134 if (pg_str_endswith(replication_de->d_name, ".tmp"))
1135 {
1136 if (!rmtree(path, true))
1137 {
1138 ereport(WARNING,
1139 (errmsg("could not remove directory \"%s\"",
1140 path)));
1141 continue;
1142 }
1143 fsync_fname("pg_replslot", true);
1144 continue;
1145 }
1146
1147 /* looks like a slot in a normal state, restore */
1148 RestoreSlotFromDisk(replication_de->d_name);
1149 }
1150 FreeDir(replication_dir);
1151
1152 /* currently no slots exist, we're done. */
1153 if (max_replication_slots <= 0)
1154 return;
1155
1156 /* Now that we have recovered all the data, compute replication xmin */
1157 ReplicationSlotsComputeRequiredXmin(false);
1158 ReplicationSlotsComputeRequiredLSN();
1159}
1160
1161/* ----
1162 * Manipulation of on-disk state of replication slots
1163 *
1164 * NB: none of the routines below should take any notice whether a slot is the
1165 * current one or not, that's all handled a layer above.
1166 * ----
1167 */
1168static void
1169CreateSlotOnDisk(ReplicationSlot *slot)
1170{
1171 char tmppath[MAXPGPATH];
1172 char path[MAXPGPATH];
1173 struct stat st;
1174
1175 /*
1176 * No need to take out the io_in_progress_lock, nobody else can see this
1177 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1178 * takes out the lock, if we'd take the lock here, we'd deadlock.
1179 */
1180
1181 sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1182 sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1183
1184 /*
1185 * It's just barely possible that some previous effort to create or drop a
1186 * slot with this name left a temp directory lying around. If that seems
1187 * to be the case, try to remove it. If the rmtree() fails, we'll error
1188 * out at the MakePGDirectory() below, so we don't bother checking
1189 * success.
1190 */
1191 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1192 rmtree(tmppath, true);
1193
1194 /* Create and fsync the temporary slot directory. */
1195 if (MakePGDirectory(tmppath) < 0)
1196 ereport(ERROR,
1197 (errcode_for_file_access(),
1198 errmsg("could not create directory \"%s\": %m",
1199 tmppath)));
1200 fsync_fname(tmppath, true);
1201
1202 /* Write the actual state file. */
1203 slot->dirty = true; /* signal that we really need to write */
1204 SaveSlotToPath(slot, tmppath, ERROR);
1205
1206 /* Rename the directory into place. */
1207 if (rename(tmppath, path) != 0)
1208 ereport(ERROR,
1209 (errcode_for_file_access(),
1210 errmsg("could not rename file \"%s\" to \"%s\": %m",
1211 tmppath, path)));
1212
1213 /*
1214 * If we'd now fail - really unlikely - we wouldn't know whether this slot
1215 * would persist after an OS crash or not - so, force a restart. The
1216 * restart would try to fsync this again till it works.
1217 */
1218 START_CRIT_SECTION();
1219
1220 fsync_fname(path, true);
1221 fsync_fname("pg_replslot", true);
1222
1223 END_CRIT_SECTION();
1224}
1225
1226/*
1227 * Shared functionality between saving and creating a replication slot.
1228 */
1229static void
1230SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1231{
1232 char tmppath[MAXPGPATH];
1233 char path[MAXPGPATH];
1234 int fd;
1235 ReplicationSlotOnDisk cp;
1236 bool was_dirty;
1237
1238 /* first check whether there's something to write out */
1239 SpinLockAcquire(&slot->mutex);
1240 was_dirty = slot->dirty;
1241 slot->just_dirtied = false;
1242 SpinLockRelease(&slot->mutex);
1243
1244 /* and don't do anything if there's nothing to write */
1245 if (!was_dirty)
1246 return;
1247
1248 LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
1249
1250 /* silence valgrind :( */
1251 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
1252
1253 sprintf(tmppath, "%s/state.tmp", dir);
1254 sprintf(path, "%s/state", dir);
1255
1256 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1257 if (fd < 0)
1258 {
1259 ereport(elevel,
1260 (errcode_for_file_access(),
1261 errmsg("could not create file \"%s\": %m",
1262 tmppath)));
1263 return;
1264 }
1265
1266 cp.magic = SLOT_MAGIC;
1267 INIT_CRC32C(cp.checksum);
1268 cp.version = SLOT_VERSION;
1269 cp.length = ReplicationSlotOnDiskV2Size;
1270
1271 SpinLockAcquire(&slot->mutex);
1272
1273 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
1274
1275 SpinLockRelease(&slot->mutex);
1276
1277 COMP_CRC32C(cp.checksum,
1278 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
1279 SnapBuildOnDiskChecksummedSize);
1280 FIN_CRC32C(cp.checksum);
1281
1282 errno = 0;
1283 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
1284 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
1285 {
1286 int save_errno = errno;
1287
1288 pgstat_report_wait_end();
1289 CloseTransientFile(fd);
1290
1291 /* if write didn't set errno, assume problem is no disk space */
1292 errno = save_errno ? save_errno : ENOSPC;
1293 ereport(elevel,
1294 (errcode_for_file_access(),
1295 errmsg("could not write to file \"%s\": %m",
1296 tmppath)));
1297 return;
1298 }
1299 pgstat_report_wait_end();
1300
1301 /* fsync the temporary file */
1302 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
1303 if (pg_fsync(fd) != 0)
1304 {
1305 int save_errno = errno;
1306
1307 pgstat_report_wait_end();
1308 CloseTransientFile(fd);
1309 errno = save_errno;
1310 ereport(elevel,
1311 (errcode_for_file_access(),
1312 errmsg("could not fsync file \"%s\": %m",
1313 tmppath)));
1314 return;
1315 }
1316 pgstat_report_wait_end();
1317
1318 if (CloseTransientFile(fd))
1319 {
1320 ereport(elevel,
1321 (errcode_for_file_access(),
1322 errmsg("could not close file \"%s\": %m",
1323 tmppath)));
1324 return;
1325 }
1326
1327 /* rename to permanent file, fsync file and directory */
1328 if (rename(tmppath, path) != 0)
1329 {
1330 ereport(elevel,
1331 (errcode_for_file_access(),
1332 errmsg("could not rename file \"%s\" to \"%s\": %m",
1333 tmppath, path)));
1334 return;
1335 }
1336
1337 /*
1338 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
1339 */
1340 START_CRIT_SECTION();
1341
1342 fsync_fname(path, false);
1343 fsync_fname(dir, true);
1344 fsync_fname("pg_replslot", true);
1345
1346 END_CRIT_SECTION();
1347
1348 /*
1349 * Successfully wrote, unset dirty bit, unless somebody dirtied again
1350 * already.
1351 */
1352 SpinLockAcquire(&slot->mutex);
1353 if (!slot->just_dirtied)
1354 slot->dirty = false;
1355 SpinLockRelease(&slot->mutex);
1356
1357 LWLockRelease(&slot->io_in_progress_lock);
1358}
1359
1360/*
1361 * Load a single slot from disk into memory.
1362 */
1363static void
1364RestoreSlotFromDisk(const char *name)
1365{
1366 ReplicationSlotOnDisk cp;
1367 int i;
1368 char slotdir[MAXPGPATH + 12];
1369 char path[MAXPGPATH + 22];
1370 int fd;
1371 bool restored = false;
1372 int readBytes;
1373 pg_crc32c checksum;
1374
1375 /* no need to lock here, no concurrent access allowed yet */
1376
1377 /* delete temp file if it exists */
1378 sprintf(slotdir, "pg_replslot/%s", name);
1379 sprintf(path, "%s/state.tmp", slotdir);
1380 if (unlink(path) < 0 && errno != ENOENT)
1381 ereport(PANIC,
1382 (errcode_for_file_access(),
1383 errmsg("could not remove file \"%s\": %m", path)));
1384
1385 sprintf(path, "%s/state", slotdir);
1386
1387 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
1388
1389 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1390
1391 /*
1392 * We do not need to handle this as we are rename()ing the directory into
1393 * place only after we fsync()ed the state file.
1394 */
1395 if (fd < 0)
1396 ereport(PANIC,
1397 (errcode_for_file_access(),
1398 errmsg("could not open file \"%s\": %m", path)));
1399
1400 /*
1401 * Sync state file before we're reading from it. We might have crashed
1402 * while it wasn't synced yet and we shouldn't continue on that basis.
1403 */
1404 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
1405 if (pg_fsync(fd) != 0)
1406 ereport(PANIC,
1407 (errcode_for_file_access(),
1408 errmsg("could not fsync file \"%s\": %m",
1409 path)));
1410 pgstat_report_wait_end();
1411
1412 /* Also sync the parent directory */
1413 START_CRIT_SECTION();
1414 fsync_fname(slotdir, true);
1415 END_CRIT_SECTION();
1416
1417 /* read part of statefile that's guaranteed to be version independent */
1418 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1419 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
1420 pgstat_report_wait_end();
1421 if (readBytes != ReplicationSlotOnDiskConstantSize)
1422 {
1423 if (readBytes < 0)
1424 ereport(PANIC,
1425 (errcode_for_file_access(),
1426 errmsg("could not read file \"%s\": %m", path)));
1427 else
1428 ereport(PANIC,
1429 (errcode(ERRCODE_DATA_CORRUPTED),
1430 errmsg("could not read file \"%s\": read %d of %zu",
1431 path, readBytes,
1432 (Size) ReplicationSlotOnDiskConstantSize)));
1433 }
1434
1435 /* verify magic */
1436 if (cp.magic != SLOT_MAGIC)
1437 ereport(PANIC,
1438 (errcode(ERRCODE_DATA_CORRUPTED),
1439 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
1440 path, cp.magic, SLOT_MAGIC)));
1441
1442 /* verify version */
1443 if (cp.version != SLOT_VERSION)
1444 ereport(PANIC,
1445 (errcode(ERRCODE_DATA_CORRUPTED),
1446 errmsg("replication slot file \"%s\" has unsupported version %u",
1447 path, cp.version)));
1448
1449 /* boundary check on length */
1450 if (cp.length != ReplicationSlotOnDiskV2Size)
1451 ereport(PANIC,
1452 (errcode(ERRCODE_DATA_CORRUPTED),
1453 errmsg("replication slot file \"%s\" has corrupted length %u",
1454 path, cp.length)));
1455
1456 /* Now that we know the size, read the entire file */
1457 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
1458 readBytes = read(fd,
1459 (char *) &cp + ReplicationSlotOnDiskConstantSize,
1460 cp.length);
1461 pgstat_report_wait_end();
1462 if (readBytes != cp.length)
1463 {
1464 if (readBytes < 0)
1465 ereport(PANIC,
1466 (errcode_for_file_access(),
1467 errmsg("could not read file \"%s\": %m", path)));
1468 else
1469 ereport(PANIC,
1470 (errcode(ERRCODE_DATA_CORRUPTED),
1471 errmsg("could not read file \"%s\": read %d of %zu",
1472 path, readBytes, (Size) cp.length)));
1473 }
1474
1475 if (CloseTransientFile(fd))
1476 ereport(PANIC,
1477 (errcode_for_file_access(),
1478 errmsg("could not close file \"%s\": %m", path)));
1479
1480 /* now verify the CRC */
1481 INIT_CRC32C(checksum);
1482 COMP_CRC32C(checksum,
1483 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
1484 SnapBuildOnDiskChecksummedSize);
1485 FIN_CRC32C(checksum);
1486
1487 if (!EQ_CRC32C(checksum, cp.checksum))
1488 ereport(PANIC,
1489 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
1490 path, checksum, cp.checksum)));
1491
1492 /*
1493 * If we crashed with an ephemeral slot active, don't restore but delete
1494 * it.
1495 */
1496 if (cp.slotdata.persistency != RS_PERSISTENT)
1497 {
1498 if (!rmtree(slotdir, true))
1499 {
1500 ereport(WARNING,
1501 (errmsg("could not remove directory \"%s\"",
1502 slotdir)));
1503 }
1504 fsync_fname("pg_replslot", true);
1505 return;
1506 }
1507
1508 /*
1509 * Verify that requirements for the specific slot type are met. That's
1510 * important because if these aren't met we're not guaranteed to retain
1511 * all the necessary resources for the slot.
1512 *
1513 * NB: We have to do so *after* the above checks for ephemeral slots,
1514 * because otherwise a slot that shouldn't exist anymore could prevent
1515 * restarts.
1516 *
1517 * NB: Changing the requirements here also requires adapting
1518 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
1519 */
1520 if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
1521 ereport(FATAL,
1522 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1523 errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
1524 NameStr(cp.slotdata.name)),
1525 errhint("Change wal_level to be logical or higher.")));
1526 else if (wal_level < WAL_LEVEL_REPLICA)
1527 ereport(FATAL,
1528 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1529 errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
1530 NameStr(cp.slotdata.name)),
1531 errhint("Change wal_level to be replica or higher.")));
1532
1533 /* nothing can be active yet, don't lock anything */
1534 for (i = 0; i < max_replication_slots; i++)
1535 {
1536 ReplicationSlot *slot;
1537
1538 slot = &ReplicationSlotCtl->replication_slots[i];
1539
1540 if (slot->in_use)
1541 continue;
1542
1543 /* restore the entire set of persistent data */
1544 memcpy(&slot->data, &cp.slotdata,
1545 sizeof(ReplicationSlotPersistentData));
1546
1547 /* initialize in memory state */
1548 slot->effective_xmin = cp.slotdata.xmin;
1549 slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
1550
1551 slot->candidate_catalog_xmin = InvalidTransactionId;
1552 slot->candidate_xmin_lsn = InvalidXLogRecPtr;
1553 slot->candidate_restart_lsn = InvalidXLogRecPtr;
1554 slot->candidate_restart_valid = InvalidXLogRecPtr;
1555
1556 slot->in_use = true;
1557 slot->active_pid = 0;
1558
1559 restored = true;
1560 break;
1561 }
1562
1563 if (!restored)
1564 ereport(FATAL,
1565 (errmsg("too many replication slots active before shutdown"),
1566 errhint("Increase max_replication_slots and try again.")));
1567}
1568