1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * slot.c |
4 | * Replication slot management. |
5 | * |
6 | * |
7 | * Copyright (c) 2012-2019, PostgreSQL Global Development Group |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/replication/slot.c |
12 | * |
13 | * NOTES |
14 | * |
15 | * Replication slots are used to keep state about replication streams |
16 | * originating from this cluster. Their primary purpose is to prevent the |
17 | * premature removal of WAL or of old tuple versions in a manner that would |
18 | * interfere with replication; they are also useful for monitoring purposes. |
19 | * Slots need to be permanent (to allow restarts), crash-safe, and allocatable |
20 | * on standbys (to support cascading setups). The requirement that slots be |
21 | * usable on standbys precludes storing them in the system catalogs. |
22 | * |
23 | * Each replication slot gets its own directory inside the $PGDATA/pg_replslot |
24 | * directory. Inside that directory the state file will contain the slot's |
25 | * own data. Additional data can be stored alongside that file if required. |
26 | * While the server is running, the state data is also cached in memory for |
27 | * efficiency. |
28 | * |
29 | * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate |
30 | * or free a slot. ReplicationSlotControlLock must be taken in shared mode |
31 | * to iterate over the slots, and in exclusive mode to change the in_use flag |
32 | * of a slot. The remaining data in each slot is protected by its mutex. |
33 | * |
34 | *------------------------------------------------------------------------- |
35 | */ |
36 | |
37 | #include "postgres.h" |
38 | |
39 | #include <unistd.h> |
40 | #include <sys/stat.h> |
41 | |
42 | #include "access/transam.h" |
43 | #include "access/xlog_internal.h" |
44 | #include "common/string.h" |
45 | #include "miscadmin.h" |
46 | #include "pgstat.h" |
47 | #include "replication/slot.h" |
48 | #include "storage/fd.h" |
49 | #include "storage/proc.h" |
50 | #include "storage/procarray.h" |
51 | #include "utils/builtins.h" |
52 | |
53 | /* |
54 | * Replication slot on-disk data structure. |
55 | */ |
56 | typedef struct ReplicationSlotOnDisk |
57 | { |
58 | /* first part of this struct needs to be version independent */ |
59 | |
60 | /* data not covered by checksum */ |
61 | uint32 magic; |
62 | pg_crc32c checksum; |
63 | |
64 | /* data covered by checksum */ |
65 | uint32 version; |
66 | uint32 length; |
67 | |
68 | /* |
69 | * The actual data in the slot that follows can differ based on the above |
70 | * 'version'. |
71 | */ |
72 | |
73 | ReplicationSlotPersistentData slotdata; |
74 | } ReplicationSlotOnDisk; |
75 | |
76 | /* size of version independent data */ |
77 | #define ReplicationSlotOnDiskConstantSize \ |
78 | offsetof(ReplicationSlotOnDisk, slotdata) |
79 | /* size of the part of the slot not covered by the checksum */ |
80 | #define SnapBuildOnDiskNotChecksummedSize \ |
81 | offsetof(ReplicationSlotOnDisk, version) |
82 | /* size of the part covered by the checksum */ |
83 | #define SnapBuildOnDiskChecksummedSize \ |
84 | sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize |
85 | /* size of the slot data that is version dependent */ |
86 | #define ReplicationSlotOnDiskV2Size \ |
87 | sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize |
88 | |
89 | #define SLOT_MAGIC 0x1051CA1 /* format identifier */ |
90 | #define SLOT_VERSION 2 /* version for new files */ |
91 | |
92 | /* Control array for replication slot management */ |
93 | ReplicationSlotCtlData *ReplicationSlotCtl = NULL; |
94 | |
95 | /* My backend's replication slot in the shared memory array */ |
96 | ReplicationSlot *MyReplicationSlot = NULL; |
97 | |
98 | /* GUCs */ |
99 | int max_replication_slots = 0; /* the maximum number of replication |
100 | * slots */ |
101 | |
102 | static void ReplicationSlotDropAcquired(void); |
103 | static void ReplicationSlotDropPtr(ReplicationSlot *slot); |
104 | |
105 | /* internal persistency functions */ |
106 | static void RestoreSlotFromDisk(const char *name); |
107 | static void CreateSlotOnDisk(ReplicationSlot *slot); |
108 | static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel); |
109 | |
110 | /* |
111 | * Report shared-memory space needed by ReplicationSlotShmemInit. |
112 | */ |
113 | Size |
114 | ReplicationSlotsShmemSize(void) |
115 | { |
116 | Size size = 0; |
117 | |
118 | if (max_replication_slots == 0) |
119 | return size; |
120 | |
121 | size = offsetof(ReplicationSlotCtlData, replication_slots); |
122 | size = add_size(size, |
123 | mul_size(max_replication_slots, sizeof(ReplicationSlot))); |
124 | |
125 | return size; |
126 | } |
127 | |
128 | /* |
129 | * Allocate and initialize walsender-related shared memory. |
130 | */ |
131 | void |
132 | ReplicationSlotsShmemInit(void) |
133 | { |
134 | bool found; |
135 | |
136 | if (max_replication_slots == 0) |
137 | return; |
138 | |
139 | ReplicationSlotCtl = (ReplicationSlotCtlData *) |
140 | ShmemInitStruct("ReplicationSlot Ctl" , ReplicationSlotsShmemSize(), |
141 | &found); |
142 | |
143 | LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS, |
144 | "replication_slot_io" ); |
145 | |
146 | if (!found) |
147 | { |
148 | int i; |
149 | |
150 | /* First time through, so initialize */ |
151 | MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize()); |
152 | |
153 | for (i = 0; i < max_replication_slots; i++) |
154 | { |
155 | ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; |
156 | |
157 | /* everything else is zeroed by the memset above */ |
158 | SpinLockInit(&slot->mutex); |
159 | LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS); |
160 | ConditionVariableInit(&slot->active_cv); |
161 | } |
162 | } |
163 | } |
164 | |
165 | /* |
166 | * Check whether the passed slot name is valid and report errors at elevel. |
167 | * |
168 | * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow |
169 | * the name to be used as a directory name on every supported OS. |
170 | * |
171 | * Returns whether the directory name is valid or not if elevel < ERROR. |
172 | */ |
173 | bool |
174 | ReplicationSlotValidateName(const char *name, int elevel) |
175 | { |
176 | const char *cp; |
177 | |
178 | if (strlen(name) == 0) |
179 | { |
180 | ereport(elevel, |
181 | (errcode(ERRCODE_INVALID_NAME), |
182 | errmsg("replication slot name \"%s\" is too short" , |
183 | name))); |
184 | return false; |
185 | } |
186 | |
187 | if (strlen(name) >= NAMEDATALEN) |
188 | { |
189 | ereport(elevel, |
190 | (errcode(ERRCODE_NAME_TOO_LONG), |
191 | errmsg("replication slot name \"%s\" is too long" , |
192 | name))); |
193 | return false; |
194 | } |
195 | |
196 | for (cp = name; *cp; cp++) |
197 | { |
198 | if (!((*cp >= 'a' && *cp <= 'z') |
199 | || (*cp >= '0' && *cp <= '9') |
200 | || (*cp == '_'))) |
201 | { |
202 | ereport(elevel, |
203 | (errcode(ERRCODE_INVALID_NAME), |
204 | errmsg("replication slot name \"%s\" contains invalid character" , |
205 | name), |
206 | errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character." ))); |
207 | return false; |
208 | } |
209 | } |
210 | return true; |
211 | } |
212 | |
213 | /* |
214 | * Create a new replication slot and mark it as used by this backend. |
215 | * |
216 | * name: Name of the slot |
217 | * db_specific: logical decoding is db specific; if the slot is going to |
218 | * be used for that pass true, otherwise false. |
219 | */ |
220 | void |
221 | ReplicationSlotCreate(const char *name, bool db_specific, |
222 | ReplicationSlotPersistency persistency) |
223 | { |
224 | ReplicationSlot *slot = NULL; |
225 | int i; |
226 | |
227 | Assert(MyReplicationSlot == NULL); |
228 | |
229 | ReplicationSlotValidateName(name, ERROR); |
230 | |
231 | /* |
232 | * If some other backend ran this code concurrently with us, we'd likely |
233 | * both allocate the same slot, and that would be bad. We'd also be at |
234 | * risk of missing a name collision. Also, we don't want to try to create |
235 | * a new slot while somebody's busy cleaning up an old one, because we |
236 | * might both be monkeying with the same directory. |
237 | */ |
238 | LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); |
239 | |
240 | /* |
241 | * Check for name collision, and identify an allocatable slot. We need to |
242 | * hold ReplicationSlotControlLock in shared mode for this, so that nobody |
243 | * else can change the in_use flags while we're looking at them. |
244 | */ |
245 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
246 | for (i = 0; i < max_replication_slots; i++) |
247 | { |
248 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
249 | |
250 | if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) |
251 | ereport(ERROR, |
252 | (errcode(ERRCODE_DUPLICATE_OBJECT), |
253 | errmsg("replication slot \"%s\" already exists" , name))); |
254 | if (!s->in_use && slot == NULL) |
255 | slot = s; |
256 | } |
257 | LWLockRelease(ReplicationSlotControlLock); |
258 | |
259 | /* If all slots are in use, we're out of luck. */ |
260 | if (slot == NULL) |
261 | ereport(ERROR, |
262 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
263 | errmsg("all replication slots are in use" ), |
264 | errhint("Free one or increase max_replication_slots." ))); |
265 | |
266 | /* |
267 | * Since this slot is not in use, nobody should be looking at any part of |
268 | * it other than the in_use field unless they're trying to allocate it. |
269 | * And since we hold ReplicationSlotAllocationLock, nobody except us can |
270 | * be doing that. So it's safe to initialize the slot. |
271 | */ |
272 | Assert(!slot->in_use); |
273 | Assert(slot->active_pid == 0); |
274 | |
275 | /* first initialize persistent data */ |
276 | memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData)); |
277 | StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN); |
278 | slot->data.database = db_specific ? MyDatabaseId : InvalidOid; |
279 | slot->data.persistency = persistency; |
280 | |
281 | /* and then data only present in shared memory */ |
282 | slot->just_dirtied = false; |
283 | slot->dirty = false; |
284 | slot->effective_xmin = InvalidTransactionId; |
285 | slot->effective_catalog_xmin = InvalidTransactionId; |
286 | slot->candidate_catalog_xmin = InvalidTransactionId; |
287 | slot->candidate_xmin_lsn = InvalidXLogRecPtr; |
288 | slot->candidate_restart_valid = InvalidXLogRecPtr; |
289 | slot->candidate_restart_lsn = InvalidXLogRecPtr; |
290 | |
291 | /* |
292 | * Create the slot on disk. We haven't actually marked the slot allocated |
293 | * yet, so no special cleanup is required if this errors out. |
294 | */ |
295 | CreateSlotOnDisk(slot); |
296 | |
297 | /* |
298 | * We need to briefly prevent any other backend from iterating over the |
299 | * slots while we flip the in_use flag. We also need to set the active |
300 | * flag while holding the ControlLock as otherwise a concurrent |
301 | * SlotAcquire() could acquire the slot as well. |
302 | */ |
303 | LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); |
304 | |
305 | slot->in_use = true; |
306 | |
307 | /* We can now mark the slot active, and that makes it our slot. */ |
308 | SpinLockAcquire(&slot->mutex); |
309 | Assert(slot->active_pid == 0); |
310 | slot->active_pid = MyProcPid; |
311 | SpinLockRelease(&slot->mutex); |
312 | MyReplicationSlot = slot; |
313 | |
314 | LWLockRelease(ReplicationSlotControlLock); |
315 | |
316 | /* |
317 | * Now that the slot has been marked as in_use and active, it's safe to |
318 | * let somebody else try to allocate a slot. |
319 | */ |
320 | LWLockRelease(ReplicationSlotAllocationLock); |
321 | |
322 | /* Let everybody know we've modified this slot */ |
323 | ConditionVariableBroadcast(&slot->active_cv); |
324 | } |
325 | |
326 | /* |
327 | * Find a previously created slot and mark it as used by this backend. |
328 | */ |
329 | void |
330 | ReplicationSlotAcquire(const char *name, bool nowait) |
331 | { |
332 | ReplicationSlot *slot; |
333 | int active_pid; |
334 | int i; |
335 | |
336 | retry: |
337 | Assert(MyReplicationSlot == NULL); |
338 | |
339 | /* |
340 | * Search for the named slot and mark it active if we find it. If the |
341 | * slot is already active, we exit the loop with active_pid set to the PID |
342 | * of the backend that owns it. |
343 | */ |
344 | active_pid = 0; |
345 | slot = NULL; |
346 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
347 | for (i = 0; i < max_replication_slots; i++) |
348 | { |
349 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
350 | |
351 | if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) |
352 | { |
353 | /* |
354 | * This is the slot we want; check if it's active under some other |
355 | * process. In single user mode, we don't need this check. |
356 | */ |
357 | if (IsUnderPostmaster) |
358 | { |
359 | /* |
360 | * Get ready to sleep on it in case it is active. (We may end |
361 | * up not sleeping, but we don't want to do this while holding |
362 | * the spinlock.) |
363 | */ |
364 | ConditionVariablePrepareToSleep(&s->active_cv); |
365 | |
366 | SpinLockAcquire(&s->mutex); |
367 | |
368 | active_pid = s->active_pid; |
369 | if (active_pid == 0) |
370 | active_pid = s->active_pid = MyProcPid; |
371 | |
372 | SpinLockRelease(&s->mutex); |
373 | } |
374 | else |
375 | active_pid = MyProcPid; |
376 | slot = s; |
377 | |
378 | break; |
379 | } |
380 | } |
381 | LWLockRelease(ReplicationSlotControlLock); |
382 | |
383 | /* If we did not find the slot, error out. */ |
384 | if (slot == NULL) |
385 | ereport(ERROR, |
386 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
387 | errmsg("replication slot \"%s\" does not exist" , name))); |
388 | |
389 | /* |
390 | * If we found the slot but it's already active in another backend, we |
391 | * either error out or retry after a short wait, as caller specified. |
392 | */ |
393 | if (active_pid != MyProcPid) |
394 | { |
395 | if (nowait) |
396 | ereport(ERROR, |
397 | (errcode(ERRCODE_OBJECT_IN_USE), |
398 | errmsg("replication slot \"%s\" is active for PID %d" , |
399 | name, active_pid))); |
400 | |
401 | /* Wait here until we get signaled, and then restart */ |
402 | ConditionVariableSleep(&slot->active_cv, |
403 | WAIT_EVENT_REPLICATION_SLOT_DROP); |
404 | ConditionVariableCancelSleep(); |
405 | goto retry; |
406 | } |
407 | else |
408 | ConditionVariableCancelSleep(); /* no sleep needed after all */ |
409 | |
410 | /* Let everybody know we've modified this slot */ |
411 | ConditionVariableBroadcast(&slot->active_cv); |
412 | |
413 | /* We made this slot active, so it's ours now. */ |
414 | MyReplicationSlot = slot; |
415 | } |
416 | |
417 | /* |
418 | * Release the replication slot that this backend considers to own. |
419 | * |
420 | * This or another backend can re-acquire the slot later. |
421 | * Resources this slot requires will be preserved. |
422 | */ |
423 | void |
424 | ReplicationSlotRelease(void) |
425 | { |
426 | ReplicationSlot *slot = MyReplicationSlot; |
427 | |
428 | Assert(slot != NULL && slot->active_pid != 0); |
429 | |
430 | if (slot->data.persistency == RS_EPHEMERAL) |
431 | { |
432 | /* |
433 | * Delete the slot. There is no !PANIC case where this is allowed to |
434 | * fail, all that may happen is an incomplete cleanup of the on-disk |
435 | * data. |
436 | */ |
437 | ReplicationSlotDropAcquired(); |
438 | } |
439 | |
440 | /* |
441 | * If slot needed to temporarily restrain both data and catalog xmin to |
442 | * create the catalog snapshot, remove that temporary constraint. |
443 | * Snapshots can only be exported while the initial snapshot is still |
444 | * acquired. |
445 | */ |
446 | if (!TransactionIdIsValid(slot->data.xmin) && |
447 | TransactionIdIsValid(slot->effective_xmin)) |
448 | { |
449 | SpinLockAcquire(&slot->mutex); |
450 | slot->effective_xmin = InvalidTransactionId; |
451 | SpinLockRelease(&slot->mutex); |
452 | ReplicationSlotsComputeRequiredXmin(false); |
453 | } |
454 | |
455 | if (slot->data.persistency == RS_PERSISTENT) |
456 | { |
457 | /* |
458 | * Mark persistent slot inactive. We're not freeing it, just |
459 | * disconnecting, but wake up others that may be waiting for it. |
460 | */ |
461 | SpinLockAcquire(&slot->mutex); |
462 | slot->active_pid = 0; |
463 | SpinLockRelease(&slot->mutex); |
464 | ConditionVariableBroadcast(&slot->active_cv); |
465 | } |
466 | |
467 | MyReplicationSlot = NULL; |
468 | |
469 | /* might not have been set when we've been a plain slot */ |
470 | LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); |
471 | MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING; |
472 | LWLockRelease(ProcArrayLock); |
473 | } |
474 | |
475 | /* |
476 | * Cleanup all temporary slots created in current session. |
477 | */ |
478 | void |
479 | ReplicationSlotCleanup(void) |
480 | { |
481 | int i; |
482 | |
483 | Assert(MyReplicationSlot == NULL); |
484 | |
485 | restart: |
486 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
487 | for (i = 0; i < max_replication_slots; i++) |
488 | { |
489 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
490 | |
491 | if (!s->in_use) |
492 | continue; |
493 | |
494 | SpinLockAcquire(&s->mutex); |
495 | if (s->active_pid == MyProcPid) |
496 | { |
497 | Assert(s->data.persistency == RS_TEMPORARY); |
498 | SpinLockRelease(&s->mutex); |
499 | LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */ |
500 | |
501 | ReplicationSlotDropPtr(s); |
502 | |
503 | ConditionVariableBroadcast(&s->active_cv); |
504 | goto restart; |
505 | } |
506 | else |
507 | SpinLockRelease(&s->mutex); |
508 | } |
509 | |
510 | LWLockRelease(ReplicationSlotControlLock); |
511 | } |
512 | |
513 | /* |
514 | * Permanently drop replication slot identified by the passed in name. |
515 | */ |
516 | void |
517 | ReplicationSlotDrop(const char *name, bool nowait) |
518 | { |
519 | Assert(MyReplicationSlot == NULL); |
520 | |
521 | ReplicationSlotAcquire(name, nowait); |
522 | |
523 | ReplicationSlotDropAcquired(); |
524 | } |
525 | |
526 | /* |
527 | * Permanently drop the currently acquired replication slot. |
528 | */ |
529 | static void |
530 | ReplicationSlotDropAcquired(void) |
531 | { |
532 | ReplicationSlot *slot = MyReplicationSlot; |
533 | |
534 | Assert(MyReplicationSlot != NULL); |
535 | |
536 | /* slot isn't acquired anymore */ |
537 | MyReplicationSlot = NULL; |
538 | |
539 | ReplicationSlotDropPtr(slot); |
540 | } |
541 | |
542 | /* |
543 | * Permanently drop the replication slot which will be released by the point |
544 | * this function returns. |
545 | */ |
546 | static void |
547 | ReplicationSlotDropPtr(ReplicationSlot *slot) |
548 | { |
549 | char path[MAXPGPATH]; |
550 | char tmppath[MAXPGPATH]; |
551 | |
552 | /* |
553 | * If some other backend ran this code concurrently with us, we might try |
554 | * to delete a slot with a certain name while someone else was trying to |
555 | * create a slot with the same name. |
556 | */ |
557 | LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); |
558 | |
559 | /* Generate pathnames. */ |
560 | sprintf(path, "pg_replslot/%s" , NameStr(slot->data.name)); |
561 | sprintf(tmppath, "pg_replslot/%s.tmp" , NameStr(slot->data.name)); |
562 | |
563 | /* |
564 | * Rename the slot directory on disk, so that we'll no longer recognize |
565 | * this as a valid slot. Note that if this fails, we've got to mark the |
566 | * slot inactive before bailing out. If we're dropping an ephemeral or a |
567 | * temporary slot, we better never fail hard as the caller won't expect |
568 | * the slot to survive and this might get called during error handling. |
569 | */ |
570 | if (rename(path, tmppath) == 0) |
571 | { |
572 | /* |
573 | * We need to fsync() the directory we just renamed and its parent to |
574 | * make sure that our changes are on disk in a crash-safe fashion. If |
575 | * fsync() fails, we can't be sure whether the changes are on disk or |
576 | * not. For now, we handle that by panicking; |
577 | * StartupReplicationSlots() will try to straighten it out after |
578 | * restart. |
579 | */ |
580 | START_CRIT_SECTION(); |
581 | fsync_fname(tmppath, true); |
582 | fsync_fname("pg_replslot" , true); |
583 | END_CRIT_SECTION(); |
584 | } |
585 | else |
586 | { |
587 | bool fail_softly = slot->data.persistency != RS_PERSISTENT; |
588 | |
589 | SpinLockAcquire(&slot->mutex); |
590 | slot->active_pid = 0; |
591 | SpinLockRelease(&slot->mutex); |
592 | |
593 | /* wake up anyone waiting on this slot */ |
594 | ConditionVariableBroadcast(&slot->active_cv); |
595 | |
596 | ereport(fail_softly ? WARNING : ERROR, |
597 | (errcode_for_file_access(), |
598 | errmsg("could not rename file \"%s\" to \"%s\": %m" , |
599 | path, tmppath))); |
600 | } |
601 | |
602 | /* |
603 | * The slot is definitely gone. Lock out concurrent scans of the array |
604 | * long enough to kill it. It's OK to clear the active PID here without |
605 | * grabbing the mutex because nobody else can be scanning the array here, |
606 | * and nobody can be attached to this slot and thus access it without |
607 | * scanning the array. |
608 | * |
609 | * Also wake up processes waiting for it. |
610 | */ |
611 | LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); |
612 | slot->active_pid = 0; |
613 | slot->in_use = false; |
614 | LWLockRelease(ReplicationSlotControlLock); |
615 | ConditionVariableBroadcast(&slot->active_cv); |
616 | |
617 | /* |
618 | * Slot is dead and doesn't prevent resource removal anymore, recompute |
619 | * limits. |
620 | */ |
621 | ReplicationSlotsComputeRequiredXmin(false); |
622 | ReplicationSlotsComputeRequiredLSN(); |
623 | |
624 | /* |
625 | * If removing the directory fails, the worst thing that will happen is |
626 | * that the user won't be able to create a new slot with the same name |
627 | * until the next server restart. We warn about it, but that's all. |
628 | */ |
629 | if (!rmtree(tmppath, true)) |
630 | ereport(WARNING, |
631 | (errmsg("could not remove directory \"%s\"" , tmppath))); |
632 | |
633 | /* |
634 | * We release this at the very end, so that nobody starts trying to create |
635 | * a slot while we're still cleaning up the detritus of the old one. |
636 | */ |
637 | LWLockRelease(ReplicationSlotAllocationLock); |
638 | } |
639 | |
640 | /* |
641 | * Serialize the currently acquired slot's state from memory to disk, thereby |
642 | * guaranteeing the current state will survive a crash. |
643 | */ |
644 | void |
645 | ReplicationSlotSave(void) |
646 | { |
647 | char path[MAXPGPATH]; |
648 | |
649 | Assert(MyReplicationSlot != NULL); |
650 | |
651 | sprintf(path, "pg_replslot/%s" , NameStr(MyReplicationSlot->data.name)); |
652 | SaveSlotToPath(MyReplicationSlot, path, ERROR); |
653 | } |
654 | |
655 | /* |
656 | * Signal that it would be useful if the currently acquired slot would be |
657 | * flushed out to disk. |
658 | * |
659 | * Note that the actual flush to disk can be delayed for a long time, if |
660 | * required for correctness explicitly do a ReplicationSlotSave(). |
661 | */ |
662 | void |
663 | ReplicationSlotMarkDirty(void) |
664 | { |
665 | ReplicationSlot *slot = MyReplicationSlot; |
666 | |
667 | Assert(MyReplicationSlot != NULL); |
668 | |
669 | SpinLockAcquire(&slot->mutex); |
670 | MyReplicationSlot->just_dirtied = true; |
671 | MyReplicationSlot->dirty = true; |
672 | SpinLockRelease(&slot->mutex); |
673 | } |
674 | |
675 | /* |
676 | * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot, |
677 | * guaranteeing it will be there after an eventual crash. |
678 | */ |
679 | void |
680 | ReplicationSlotPersist(void) |
681 | { |
682 | ReplicationSlot *slot = MyReplicationSlot; |
683 | |
684 | Assert(slot != NULL); |
685 | Assert(slot->data.persistency != RS_PERSISTENT); |
686 | |
687 | SpinLockAcquire(&slot->mutex); |
688 | slot->data.persistency = RS_PERSISTENT; |
689 | SpinLockRelease(&slot->mutex); |
690 | |
691 | ReplicationSlotMarkDirty(); |
692 | ReplicationSlotSave(); |
693 | } |
694 | |
695 | /* |
696 | * Compute the oldest xmin across all slots and store it in the ProcArray. |
697 | * |
698 | * If already_locked is true, ProcArrayLock has already been acquired |
699 | * exclusively. |
700 | */ |
701 | void |
702 | ReplicationSlotsComputeRequiredXmin(bool already_locked) |
703 | { |
704 | int i; |
705 | TransactionId agg_xmin = InvalidTransactionId; |
706 | TransactionId agg_catalog_xmin = InvalidTransactionId; |
707 | |
708 | Assert(ReplicationSlotCtl != NULL); |
709 | |
710 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
711 | |
712 | for (i = 0; i < max_replication_slots; i++) |
713 | { |
714 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
715 | TransactionId effective_xmin; |
716 | TransactionId effective_catalog_xmin; |
717 | |
718 | if (!s->in_use) |
719 | continue; |
720 | |
721 | SpinLockAcquire(&s->mutex); |
722 | effective_xmin = s->effective_xmin; |
723 | effective_catalog_xmin = s->effective_catalog_xmin; |
724 | SpinLockRelease(&s->mutex); |
725 | |
726 | /* check the data xmin */ |
727 | if (TransactionIdIsValid(effective_xmin) && |
728 | (!TransactionIdIsValid(agg_xmin) || |
729 | TransactionIdPrecedes(effective_xmin, agg_xmin))) |
730 | agg_xmin = effective_xmin; |
731 | |
732 | /* check the catalog xmin */ |
733 | if (TransactionIdIsValid(effective_catalog_xmin) && |
734 | (!TransactionIdIsValid(agg_catalog_xmin) || |
735 | TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin))) |
736 | agg_catalog_xmin = effective_catalog_xmin; |
737 | } |
738 | |
739 | LWLockRelease(ReplicationSlotControlLock); |
740 | |
741 | ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked); |
742 | } |
743 | |
744 | /* |
745 | * Compute the oldest restart LSN across all slots and inform xlog module. |
746 | */ |
747 | void |
748 | ReplicationSlotsComputeRequiredLSN(void) |
749 | { |
750 | int i; |
751 | XLogRecPtr min_required = InvalidXLogRecPtr; |
752 | |
753 | Assert(ReplicationSlotCtl != NULL); |
754 | |
755 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
756 | for (i = 0; i < max_replication_slots; i++) |
757 | { |
758 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
759 | XLogRecPtr restart_lsn; |
760 | |
761 | if (!s->in_use) |
762 | continue; |
763 | |
764 | SpinLockAcquire(&s->mutex); |
765 | restart_lsn = s->data.restart_lsn; |
766 | SpinLockRelease(&s->mutex); |
767 | |
768 | if (restart_lsn != InvalidXLogRecPtr && |
769 | (min_required == InvalidXLogRecPtr || |
770 | restart_lsn < min_required)) |
771 | min_required = restart_lsn; |
772 | } |
773 | LWLockRelease(ReplicationSlotControlLock); |
774 | |
775 | XLogSetReplicationSlotMinimumLSN(min_required); |
776 | } |
777 | |
778 | /* |
779 | * Compute the oldest WAL LSN required by *logical* decoding slots.. |
780 | * |
781 | * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical |
782 | * slots exist. |
783 | * |
784 | * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it |
785 | * ignores physical replication slots. |
786 | * |
787 | * The results aren't required frequently, so we don't maintain a precomputed |
788 | * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin(). |
789 | */ |
790 | XLogRecPtr |
791 | ReplicationSlotsComputeLogicalRestartLSN(void) |
792 | { |
793 | XLogRecPtr result = InvalidXLogRecPtr; |
794 | int i; |
795 | |
796 | if (max_replication_slots <= 0) |
797 | return InvalidXLogRecPtr; |
798 | |
799 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
800 | |
801 | for (i = 0; i < max_replication_slots; i++) |
802 | { |
803 | ReplicationSlot *s; |
804 | XLogRecPtr restart_lsn; |
805 | |
806 | s = &ReplicationSlotCtl->replication_slots[i]; |
807 | |
808 | /* cannot change while ReplicationSlotCtlLock is held */ |
809 | if (!s->in_use) |
810 | continue; |
811 | |
812 | /* we're only interested in logical slots */ |
813 | if (!SlotIsLogical(s)) |
814 | continue; |
815 | |
816 | /* read once, it's ok if it increases while we're checking */ |
817 | SpinLockAcquire(&s->mutex); |
818 | restart_lsn = s->data.restart_lsn; |
819 | SpinLockRelease(&s->mutex); |
820 | |
821 | if (result == InvalidXLogRecPtr || |
822 | restart_lsn < result) |
823 | result = restart_lsn; |
824 | } |
825 | |
826 | LWLockRelease(ReplicationSlotControlLock); |
827 | |
828 | return result; |
829 | } |
830 | |
831 | /* |
832 | * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the |
833 | * passed database oid. |
834 | * |
835 | * Returns true if there are any slots referencing the database. *nslots will |
836 | * be set to the absolute number of slots in the database, *nactive to ones |
837 | * currently active. |
838 | */ |
839 | bool |
840 | ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) |
841 | { |
842 | int i; |
843 | |
844 | *nslots = *nactive = 0; |
845 | |
846 | if (max_replication_slots <= 0) |
847 | return false; |
848 | |
849 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
850 | for (i = 0; i < max_replication_slots; i++) |
851 | { |
852 | ReplicationSlot *s; |
853 | |
854 | s = &ReplicationSlotCtl->replication_slots[i]; |
855 | |
856 | /* cannot change while ReplicationSlotCtlLock is held */ |
857 | if (!s->in_use) |
858 | continue; |
859 | |
860 | /* only logical slots are database specific, skip */ |
861 | if (!SlotIsLogical(s)) |
862 | continue; |
863 | |
864 | /* not our database, skip */ |
865 | if (s->data.database != dboid) |
866 | continue; |
867 | |
868 | /* count slots with spinlock held */ |
869 | SpinLockAcquire(&s->mutex); |
870 | (*nslots)++; |
871 | if (s->active_pid != 0) |
872 | (*nactive)++; |
873 | SpinLockRelease(&s->mutex); |
874 | } |
875 | LWLockRelease(ReplicationSlotControlLock); |
876 | |
877 | if (*nslots > 0) |
878 | return true; |
879 | return false; |
880 | } |
881 | |
882 | /* |
883 | * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the |
884 | * passed database oid. The caller should hold an exclusive lock on the |
885 | * pg_database oid for the database to prevent creation of new slots on the db |
886 | * or replay from existing slots. |
887 | * |
888 | * Another session that concurrently acquires an existing slot on the target DB |
889 | * (most likely to drop it) may cause this function to ERROR. If that happens |
890 | * it may have dropped some but not all slots. |
891 | * |
892 | * This routine isn't as efficient as it could be - but we don't drop |
893 | * databases often, especially databases with lots of slots. |
894 | */ |
895 | void |
896 | ReplicationSlotsDropDBSlots(Oid dboid) |
897 | { |
898 | int i; |
899 | |
900 | if (max_replication_slots <= 0) |
901 | return; |
902 | |
903 | restart: |
904 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
905 | for (i = 0; i < max_replication_slots; i++) |
906 | { |
907 | ReplicationSlot *s; |
908 | char *slotname; |
909 | int active_pid; |
910 | |
911 | s = &ReplicationSlotCtl->replication_slots[i]; |
912 | |
913 | /* cannot change while ReplicationSlotCtlLock is held */ |
914 | if (!s->in_use) |
915 | continue; |
916 | |
917 | /* only logical slots are database specific, skip */ |
918 | if (!SlotIsLogical(s)) |
919 | continue; |
920 | |
921 | /* not our database, skip */ |
922 | if (s->data.database != dboid) |
923 | continue; |
924 | |
925 | /* acquire slot, so ReplicationSlotDropAcquired can be reused */ |
926 | SpinLockAcquire(&s->mutex); |
927 | /* can't change while ReplicationSlotControlLock is held */ |
928 | slotname = NameStr(s->data.name); |
929 | active_pid = s->active_pid; |
930 | if (active_pid == 0) |
931 | { |
932 | MyReplicationSlot = s; |
933 | s->active_pid = MyProcPid; |
934 | } |
935 | SpinLockRelease(&s->mutex); |
936 | |
937 | /* |
938 | * Even though we hold an exclusive lock on the database object a |
939 | * logical slot for that DB can still be active, e.g. if it's |
940 | * concurrently being dropped by a backend connected to another DB. |
941 | * |
942 | * That's fairly unlikely in practice, so we'll just bail out. |
943 | */ |
944 | if (active_pid) |
945 | ereport(ERROR, |
946 | (errcode(ERRCODE_OBJECT_IN_USE), |
947 | errmsg("replication slot \"%s\" is active for PID %d" , |
948 | slotname, active_pid))); |
949 | |
950 | /* |
951 | * To avoid duplicating ReplicationSlotDropAcquired() and to avoid |
952 | * holding ReplicationSlotControlLock over filesystem operations, |
953 | * release ReplicationSlotControlLock and use |
954 | * ReplicationSlotDropAcquired. |
955 | * |
956 | * As that means the set of slots could change, restart scan from the |
957 | * beginning each time we release the lock. |
958 | */ |
959 | LWLockRelease(ReplicationSlotControlLock); |
960 | ReplicationSlotDropAcquired(); |
961 | goto restart; |
962 | } |
963 | LWLockRelease(ReplicationSlotControlLock); |
964 | } |
965 | |
966 | |
967 | /* |
968 | * Check whether the server's configuration supports using replication |
969 | * slots. |
970 | */ |
971 | void |
972 | CheckSlotRequirements(void) |
973 | { |
974 | /* |
975 | * NB: Adding a new requirement likely means that RestoreSlotFromDisk() |
976 | * needs the same check. |
977 | */ |
978 | |
979 | if (max_replication_slots == 0) |
980 | ereport(ERROR, |
981 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
982 | (errmsg("replication slots can only be used if max_replication_slots > 0" )))); |
983 | |
984 | if (wal_level < WAL_LEVEL_REPLICA) |
985 | ereport(ERROR, |
986 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
987 | errmsg("replication slots can only be used if wal_level >= replica" ))); |
988 | } |
989 | |
990 | /* |
991 | * Reserve WAL for the currently active slot. |
992 | * |
993 | * Compute and set restart_lsn in a manner that's appropriate for the type of |
994 | * the slot and concurrency safe. |
995 | */ |
996 | void |
997 | ReplicationSlotReserveWal(void) |
998 | { |
999 | ReplicationSlot *slot = MyReplicationSlot; |
1000 | |
1001 | Assert(slot != NULL); |
1002 | Assert(slot->data.restart_lsn == InvalidXLogRecPtr); |
1003 | |
1004 | /* |
1005 | * The replication slot mechanism is used to prevent removal of required |
1006 | * WAL. As there is no interlock between this routine and checkpoints, WAL |
1007 | * segments could concurrently be removed when a now stale return value of |
1008 | * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that |
1009 | * this happens we'll just retry. |
1010 | */ |
1011 | while (true) |
1012 | { |
1013 | XLogSegNo segno; |
1014 | XLogRecPtr restart_lsn; |
1015 | |
1016 | /* |
1017 | * For logical slots log a standby snapshot and start logical decoding |
1018 | * at exactly that position. That allows the slot to start up more |
1019 | * quickly. |
1020 | * |
1021 | * That's not needed (or indeed helpful) for physical slots as they'll |
1022 | * start replay at the last logged checkpoint anyway. Instead return |
1023 | * the location of the last redo LSN. While that slightly increases |
1024 | * the chance that we have to retry, it's where a base backup has to |
1025 | * start replay at. |
1026 | */ |
1027 | if (!RecoveryInProgress() && SlotIsLogical(slot)) |
1028 | { |
1029 | XLogRecPtr flushptr; |
1030 | |
1031 | /* start at current insert position */ |
1032 | restart_lsn = GetXLogInsertRecPtr(); |
1033 | SpinLockAcquire(&slot->mutex); |
1034 | slot->data.restart_lsn = restart_lsn; |
1035 | SpinLockRelease(&slot->mutex); |
1036 | |
1037 | /* make sure we have enough information to start */ |
1038 | flushptr = LogStandbySnapshot(); |
1039 | |
1040 | /* and make sure it's fsynced to disk */ |
1041 | XLogFlush(flushptr); |
1042 | } |
1043 | else |
1044 | { |
1045 | restart_lsn = GetRedoRecPtr(); |
1046 | SpinLockAcquire(&slot->mutex); |
1047 | slot->data.restart_lsn = restart_lsn; |
1048 | SpinLockRelease(&slot->mutex); |
1049 | } |
1050 | |
1051 | /* prevent WAL removal as fast as possible */ |
1052 | ReplicationSlotsComputeRequiredLSN(); |
1053 | |
1054 | /* |
1055 | * If all required WAL is still there, great, otherwise retry. The |
1056 | * slot should prevent further removal of WAL, unless there's a |
1057 | * concurrent ReplicationSlotsComputeRequiredLSN() after we've written |
1058 | * the new restart_lsn above, so normally we should never need to loop |
1059 | * more than twice. |
1060 | */ |
1061 | XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); |
1062 | if (XLogGetLastRemovedSegno() < segno) |
1063 | break; |
1064 | } |
1065 | } |
1066 | |
1067 | /* |
1068 | * Flush all replication slots to disk. |
1069 | * |
1070 | * This needn't actually be part of a checkpoint, but it's a convenient |
1071 | * location. |
1072 | */ |
1073 | void |
1074 | CheckPointReplicationSlots(void) |
1075 | { |
1076 | int i; |
1077 | |
1078 | elog(DEBUG1, "performing replication slot checkpoint" ); |
1079 | |
1080 | /* |
1081 | * Prevent any slot from being created/dropped while we're active. As we |
1082 | * explicitly do *not* want to block iterating over replication_slots or |
1083 | * acquiring a slot we cannot take the control lock - but that's OK, |
1084 | * because holding ReplicationSlotAllocationLock is strictly stronger, and |
1085 | * enough to guarantee that nobody can change the in_use bits on us. |
1086 | */ |
1087 | LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); |
1088 | |
1089 | for (i = 0; i < max_replication_slots; i++) |
1090 | { |
1091 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
1092 | char path[MAXPGPATH]; |
1093 | |
1094 | if (!s->in_use) |
1095 | continue; |
1096 | |
1097 | /* save the slot to disk, locking is handled in SaveSlotToPath() */ |
1098 | sprintf(path, "pg_replslot/%s" , NameStr(s->data.name)); |
1099 | SaveSlotToPath(s, path, LOG); |
1100 | } |
1101 | LWLockRelease(ReplicationSlotAllocationLock); |
1102 | } |
1103 | |
1104 | /* |
1105 | * Load all replication slots from disk into memory at server startup. This |
1106 | * needs to be run before we start crash recovery. |
1107 | */ |
1108 | void |
1109 | StartupReplicationSlots(void) |
1110 | { |
1111 | DIR *replication_dir; |
1112 | struct dirent *replication_de; |
1113 | |
1114 | elog(DEBUG1, "starting up replication slots" ); |
1115 | |
1116 | /* restore all slots by iterating over all on-disk entries */ |
1117 | replication_dir = AllocateDir("pg_replslot" ); |
1118 | while ((replication_de = ReadDir(replication_dir, "pg_replslot" )) != NULL) |
1119 | { |
1120 | struct stat statbuf; |
1121 | char path[MAXPGPATH + 12]; |
1122 | |
1123 | if (strcmp(replication_de->d_name, "." ) == 0 || |
1124 | strcmp(replication_de->d_name, ".." ) == 0) |
1125 | continue; |
1126 | |
1127 | snprintf(path, sizeof(path), "pg_replslot/%s" , replication_de->d_name); |
1128 | |
1129 | /* we're only creating directories here, skip if it's not our's */ |
1130 | if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) |
1131 | continue; |
1132 | |
1133 | /* we crashed while a slot was being setup or deleted, clean up */ |
1134 | if (pg_str_endswith(replication_de->d_name, ".tmp" )) |
1135 | { |
1136 | if (!rmtree(path, true)) |
1137 | { |
1138 | ereport(WARNING, |
1139 | (errmsg("could not remove directory \"%s\"" , |
1140 | path))); |
1141 | continue; |
1142 | } |
1143 | fsync_fname("pg_replslot" , true); |
1144 | continue; |
1145 | } |
1146 | |
1147 | /* looks like a slot in a normal state, restore */ |
1148 | RestoreSlotFromDisk(replication_de->d_name); |
1149 | } |
1150 | FreeDir(replication_dir); |
1151 | |
1152 | /* currently no slots exist, we're done. */ |
1153 | if (max_replication_slots <= 0) |
1154 | return; |
1155 | |
1156 | /* Now that we have recovered all the data, compute replication xmin */ |
1157 | ReplicationSlotsComputeRequiredXmin(false); |
1158 | ReplicationSlotsComputeRequiredLSN(); |
1159 | } |
1160 | |
1161 | /* ---- |
1162 | * Manipulation of on-disk state of replication slots |
1163 | * |
1164 | * NB: none of the routines below should take any notice whether a slot is the |
1165 | * current one or not, that's all handled a layer above. |
1166 | * ---- |
1167 | */ |
1168 | static void |
1169 | CreateSlotOnDisk(ReplicationSlot *slot) |
1170 | { |
1171 | char tmppath[MAXPGPATH]; |
1172 | char path[MAXPGPATH]; |
1173 | struct stat st; |
1174 | |
1175 | /* |
1176 | * No need to take out the io_in_progress_lock, nobody else can see this |
1177 | * slot yet, so nobody else will write. We're reusing SaveSlotToPath which |
1178 | * takes out the lock, if we'd take the lock here, we'd deadlock. |
1179 | */ |
1180 | |
1181 | sprintf(path, "pg_replslot/%s" , NameStr(slot->data.name)); |
1182 | sprintf(tmppath, "pg_replslot/%s.tmp" , NameStr(slot->data.name)); |
1183 | |
1184 | /* |
1185 | * It's just barely possible that some previous effort to create or drop a |
1186 | * slot with this name left a temp directory lying around. If that seems |
1187 | * to be the case, try to remove it. If the rmtree() fails, we'll error |
1188 | * out at the MakePGDirectory() below, so we don't bother checking |
1189 | * success. |
1190 | */ |
1191 | if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode)) |
1192 | rmtree(tmppath, true); |
1193 | |
1194 | /* Create and fsync the temporary slot directory. */ |
1195 | if (MakePGDirectory(tmppath) < 0) |
1196 | ereport(ERROR, |
1197 | (errcode_for_file_access(), |
1198 | errmsg("could not create directory \"%s\": %m" , |
1199 | tmppath))); |
1200 | fsync_fname(tmppath, true); |
1201 | |
1202 | /* Write the actual state file. */ |
1203 | slot->dirty = true; /* signal that we really need to write */ |
1204 | SaveSlotToPath(slot, tmppath, ERROR); |
1205 | |
1206 | /* Rename the directory into place. */ |
1207 | if (rename(tmppath, path) != 0) |
1208 | ereport(ERROR, |
1209 | (errcode_for_file_access(), |
1210 | errmsg("could not rename file \"%s\" to \"%s\": %m" , |
1211 | tmppath, path))); |
1212 | |
1213 | /* |
1214 | * If we'd now fail - really unlikely - we wouldn't know whether this slot |
1215 | * would persist after an OS crash or not - so, force a restart. The |
1216 | * restart would try to fsync this again till it works. |
1217 | */ |
1218 | START_CRIT_SECTION(); |
1219 | |
1220 | fsync_fname(path, true); |
1221 | fsync_fname("pg_replslot" , true); |
1222 | |
1223 | END_CRIT_SECTION(); |
1224 | } |
1225 | |
1226 | /* |
1227 | * Shared functionality between saving and creating a replication slot. |
1228 | */ |
1229 | static void |
1230 | SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) |
1231 | { |
1232 | char tmppath[MAXPGPATH]; |
1233 | char path[MAXPGPATH]; |
1234 | int fd; |
1235 | ReplicationSlotOnDisk cp; |
1236 | bool was_dirty; |
1237 | |
1238 | /* first check whether there's something to write out */ |
1239 | SpinLockAcquire(&slot->mutex); |
1240 | was_dirty = slot->dirty; |
1241 | slot->just_dirtied = false; |
1242 | SpinLockRelease(&slot->mutex); |
1243 | |
1244 | /* and don't do anything if there's nothing to write */ |
1245 | if (!was_dirty) |
1246 | return; |
1247 | |
1248 | LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE); |
1249 | |
1250 | /* silence valgrind :( */ |
1251 | memset(&cp, 0, sizeof(ReplicationSlotOnDisk)); |
1252 | |
1253 | sprintf(tmppath, "%s/state.tmp" , dir); |
1254 | sprintf(path, "%s/state" , dir); |
1255 | |
1256 | fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); |
1257 | if (fd < 0) |
1258 | { |
1259 | ereport(elevel, |
1260 | (errcode_for_file_access(), |
1261 | errmsg("could not create file \"%s\": %m" , |
1262 | tmppath))); |
1263 | return; |
1264 | } |
1265 | |
1266 | cp.magic = SLOT_MAGIC; |
1267 | INIT_CRC32C(cp.checksum); |
1268 | cp.version = SLOT_VERSION; |
1269 | cp.length = ReplicationSlotOnDiskV2Size; |
1270 | |
1271 | SpinLockAcquire(&slot->mutex); |
1272 | |
1273 | memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData)); |
1274 | |
1275 | SpinLockRelease(&slot->mutex); |
1276 | |
1277 | COMP_CRC32C(cp.checksum, |
1278 | (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize, |
1279 | SnapBuildOnDiskChecksummedSize); |
1280 | FIN_CRC32C(cp.checksum); |
1281 | |
1282 | errno = 0; |
1283 | pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE); |
1284 | if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) |
1285 | { |
1286 | int save_errno = errno; |
1287 | |
1288 | pgstat_report_wait_end(); |
1289 | CloseTransientFile(fd); |
1290 | |
1291 | /* if write didn't set errno, assume problem is no disk space */ |
1292 | errno = save_errno ? save_errno : ENOSPC; |
1293 | ereport(elevel, |
1294 | (errcode_for_file_access(), |
1295 | errmsg("could not write to file \"%s\": %m" , |
1296 | tmppath))); |
1297 | return; |
1298 | } |
1299 | pgstat_report_wait_end(); |
1300 | |
1301 | /* fsync the temporary file */ |
1302 | pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC); |
1303 | if (pg_fsync(fd) != 0) |
1304 | { |
1305 | int save_errno = errno; |
1306 | |
1307 | pgstat_report_wait_end(); |
1308 | CloseTransientFile(fd); |
1309 | errno = save_errno; |
1310 | ereport(elevel, |
1311 | (errcode_for_file_access(), |
1312 | errmsg("could not fsync file \"%s\": %m" , |
1313 | tmppath))); |
1314 | return; |
1315 | } |
1316 | pgstat_report_wait_end(); |
1317 | |
1318 | if (CloseTransientFile(fd)) |
1319 | { |
1320 | ereport(elevel, |
1321 | (errcode_for_file_access(), |
1322 | errmsg("could not close file \"%s\": %m" , |
1323 | tmppath))); |
1324 | return; |
1325 | } |
1326 | |
1327 | /* rename to permanent file, fsync file and directory */ |
1328 | if (rename(tmppath, path) != 0) |
1329 | { |
1330 | ereport(elevel, |
1331 | (errcode_for_file_access(), |
1332 | errmsg("could not rename file \"%s\" to \"%s\": %m" , |
1333 | tmppath, path))); |
1334 | return; |
1335 | } |
1336 | |
1337 | /* |
1338 | * Check CreateSlotOnDisk() for the reasoning of using a critical section. |
1339 | */ |
1340 | START_CRIT_SECTION(); |
1341 | |
1342 | fsync_fname(path, false); |
1343 | fsync_fname(dir, true); |
1344 | fsync_fname("pg_replslot" , true); |
1345 | |
1346 | END_CRIT_SECTION(); |
1347 | |
1348 | /* |
1349 | * Successfully wrote, unset dirty bit, unless somebody dirtied again |
1350 | * already. |
1351 | */ |
1352 | SpinLockAcquire(&slot->mutex); |
1353 | if (!slot->just_dirtied) |
1354 | slot->dirty = false; |
1355 | SpinLockRelease(&slot->mutex); |
1356 | |
1357 | LWLockRelease(&slot->io_in_progress_lock); |
1358 | } |
1359 | |
1360 | /* |
1361 | * Load a single slot from disk into memory. |
1362 | */ |
1363 | static void |
1364 | RestoreSlotFromDisk(const char *name) |
1365 | { |
1366 | ReplicationSlotOnDisk cp; |
1367 | int i; |
1368 | char slotdir[MAXPGPATH + 12]; |
1369 | char path[MAXPGPATH + 22]; |
1370 | int fd; |
1371 | bool restored = false; |
1372 | int readBytes; |
1373 | pg_crc32c checksum; |
1374 | |
1375 | /* no need to lock here, no concurrent access allowed yet */ |
1376 | |
1377 | /* delete temp file if it exists */ |
1378 | sprintf(slotdir, "pg_replslot/%s" , name); |
1379 | sprintf(path, "%s/state.tmp" , slotdir); |
1380 | if (unlink(path) < 0 && errno != ENOENT) |
1381 | ereport(PANIC, |
1382 | (errcode_for_file_access(), |
1383 | errmsg("could not remove file \"%s\": %m" , path))); |
1384 | |
1385 | sprintf(path, "%s/state" , slotdir); |
1386 | |
1387 | elog(DEBUG1, "restoring replication slot from \"%s\"" , path); |
1388 | |
1389 | fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); |
1390 | |
1391 | /* |
1392 | * We do not need to handle this as we are rename()ing the directory into |
1393 | * place only after we fsync()ed the state file. |
1394 | */ |
1395 | if (fd < 0) |
1396 | ereport(PANIC, |
1397 | (errcode_for_file_access(), |
1398 | errmsg("could not open file \"%s\": %m" , path))); |
1399 | |
1400 | /* |
1401 | * Sync state file before we're reading from it. We might have crashed |
1402 | * while it wasn't synced yet and we shouldn't continue on that basis. |
1403 | */ |
1404 | pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC); |
1405 | if (pg_fsync(fd) != 0) |
1406 | ereport(PANIC, |
1407 | (errcode_for_file_access(), |
1408 | errmsg("could not fsync file \"%s\": %m" , |
1409 | path))); |
1410 | pgstat_report_wait_end(); |
1411 | |
1412 | /* Also sync the parent directory */ |
1413 | START_CRIT_SECTION(); |
1414 | fsync_fname(slotdir, true); |
1415 | END_CRIT_SECTION(); |
1416 | |
1417 | /* read part of statefile that's guaranteed to be version independent */ |
1418 | pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ); |
1419 | readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize); |
1420 | pgstat_report_wait_end(); |
1421 | if (readBytes != ReplicationSlotOnDiskConstantSize) |
1422 | { |
1423 | if (readBytes < 0) |
1424 | ereport(PANIC, |
1425 | (errcode_for_file_access(), |
1426 | errmsg("could not read file \"%s\": %m" , path))); |
1427 | else |
1428 | ereport(PANIC, |
1429 | (errcode(ERRCODE_DATA_CORRUPTED), |
1430 | errmsg("could not read file \"%s\": read %d of %zu" , |
1431 | path, readBytes, |
1432 | (Size) ReplicationSlotOnDiskConstantSize))); |
1433 | } |
1434 | |
1435 | /* verify magic */ |
1436 | if (cp.magic != SLOT_MAGIC) |
1437 | ereport(PANIC, |
1438 | (errcode(ERRCODE_DATA_CORRUPTED), |
1439 | errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u" , |
1440 | path, cp.magic, SLOT_MAGIC))); |
1441 | |
1442 | /* verify version */ |
1443 | if (cp.version != SLOT_VERSION) |
1444 | ereport(PANIC, |
1445 | (errcode(ERRCODE_DATA_CORRUPTED), |
1446 | errmsg("replication slot file \"%s\" has unsupported version %u" , |
1447 | path, cp.version))); |
1448 | |
1449 | /* boundary check on length */ |
1450 | if (cp.length != ReplicationSlotOnDiskV2Size) |
1451 | ereport(PANIC, |
1452 | (errcode(ERRCODE_DATA_CORRUPTED), |
1453 | errmsg("replication slot file \"%s\" has corrupted length %u" , |
1454 | path, cp.length))); |
1455 | |
1456 | /* Now that we know the size, read the entire file */ |
1457 | pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ); |
1458 | readBytes = read(fd, |
1459 | (char *) &cp + ReplicationSlotOnDiskConstantSize, |
1460 | cp.length); |
1461 | pgstat_report_wait_end(); |
1462 | if (readBytes != cp.length) |
1463 | { |
1464 | if (readBytes < 0) |
1465 | ereport(PANIC, |
1466 | (errcode_for_file_access(), |
1467 | errmsg("could not read file \"%s\": %m" , path))); |
1468 | else |
1469 | ereport(PANIC, |
1470 | (errcode(ERRCODE_DATA_CORRUPTED), |
1471 | errmsg("could not read file \"%s\": read %d of %zu" , |
1472 | path, readBytes, (Size) cp.length))); |
1473 | } |
1474 | |
1475 | if (CloseTransientFile(fd)) |
1476 | ereport(PANIC, |
1477 | (errcode_for_file_access(), |
1478 | errmsg("could not close file \"%s\": %m" , path))); |
1479 | |
1480 | /* now verify the CRC */ |
1481 | INIT_CRC32C(checksum); |
1482 | COMP_CRC32C(checksum, |
1483 | (char *) &cp + SnapBuildOnDiskNotChecksummedSize, |
1484 | SnapBuildOnDiskChecksummedSize); |
1485 | FIN_CRC32C(checksum); |
1486 | |
1487 | if (!EQ_CRC32C(checksum, cp.checksum)) |
1488 | ereport(PANIC, |
1489 | (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u" , |
1490 | path, checksum, cp.checksum))); |
1491 | |
1492 | /* |
1493 | * If we crashed with an ephemeral slot active, don't restore but delete |
1494 | * it. |
1495 | */ |
1496 | if (cp.slotdata.persistency != RS_PERSISTENT) |
1497 | { |
1498 | if (!rmtree(slotdir, true)) |
1499 | { |
1500 | ereport(WARNING, |
1501 | (errmsg("could not remove directory \"%s\"" , |
1502 | slotdir))); |
1503 | } |
1504 | fsync_fname("pg_replslot" , true); |
1505 | return; |
1506 | } |
1507 | |
1508 | /* |
1509 | * Verify that requirements for the specific slot type are met. That's |
1510 | * important because if these aren't met we're not guaranteed to retain |
1511 | * all the necessary resources for the slot. |
1512 | * |
1513 | * NB: We have to do so *after* the above checks for ephemeral slots, |
1514 | * because otherwise a slot that shouldn't exist anymore could prevent |
1515 | * restarts. |
1516 | * |
1517 | * NB: Changing the requirements here also requires adapting |
1518 | * CheckSlotRequirements() and CheckLogicalDecodingRequirements(). |
1519 | */ |
1520 | if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL) |
1521 | ereport(FATAL, |
1522 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1523 | errmsg("logical replication slot \"%s\" exists, but wal_level < logical" , |
1524 | NameStr(cp.slotdata.name)), |
1525 | errhint("Change wal_level to be logical or higher." ))); |
1526 | else if (wal_level < WAL_LEVEL_REPLICA) |
1527 | ereport(FATAL, |
1528 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1529 | errmsg("physical replication slot \"%s\" exists, but wal_level < replica" , |
1530 | NameStr(cp.slotdata.name)), |
1531 | errhint("Change wal_level to be replica or higher." ))); |
1532 | |
1533 | /* nothing can be active yet, don't lock anything */ |
1534 | for (i = 0; i < max_replication_slots; i++) |
1535 | { |
1536 | ReplicationSlot *slot; |
1537 | |
1538 | slot = &ReplicationSlotCtl->replication_slots[i]; |
1539 | |
1540 | if (slot->in_use) |
1541 | continue; |
1542 | |
1543 | /* restore the entire set of persistent data */ |
1544 | memcpy(&slot->data, &cp.slotdata, |
1545 | sizeof(ReplicationSlotPersistentData)); |
1546 | |
1547 | /* initialize in memory state */ |
1548 | slot->effective_xmin = cp.slotdata.xmin; |
1549 | slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; |
1550 | |
1551 | slot->candidate_catalog_xmin = InvalidTransactionId; |
1552 | slot->candidate_xmin_lsn = InvalidXLogRecPtr; |
1553 | slot->candidate_restart_lsn = InvalidXLogRecPtr; |
1554 | slot->candidate_restart_valid = InvalidXLogRecPtr; |
1555 | |
1556 | slot->in_use = true; |
1557 | slot->active_pid = 0; |
1558 | |
1559 | restored = true; |
1560 | break; |
1561 | } |
1562 | |
1563 | if (!restored) |
1564 | ereport(FATAL, |
1565 | (errmsg("too many replication slots active before shutdown" ), |
1566 | errhint("Increase max_replication_slots and try again." ))); |
1567 | } |
1568 | |