1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * slotfuncs.c |
4 | * Support functions for replication slots |
5 | * |
6 | * Copyright (c) 2012-2019, PostgreSQL Global Development Group |
7 | * |
8 | * IDENTIFICATION |
9 | * src/backend/replication/slotfuncs.c |
10 | * |
11 | *------------------------------------------------------------------------- |
12 | */ |
13 | #include "postgres.h" |
14 | |
15 | #include "access/htup_details.h" |
16 | #include "access/xlog_internal.h" |
17 | #include "funcapi.h" |
18 | #include "miscadmin.h" |
19 | #include "replication/decode.h" |
20 | #include "replication/slot.h" |
21 | #include "replication/logical.h" |
22 | #include "replication/logicalfuncs.h" |
23 | #include "utils/builtins.h" |
24 | #include "utils/inval.h" |
25 | #include "utils/pg_lsn.h" |
26 | #include "utils/resowner.h" |
27 | |
28 | static void |
29 | check_permissions(void) |
30 | { |
31 | if (!superuser() && !has_rolreplication(GetUserId())) |
32 | ereport(ERROR, |
33 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
34 | (errmsg("must be superuser or replication role to use replication slots" )))); |
35 | } |
36 | |
37 | /* |
38 | * Helper function for creating a new physical replication slot with |
39 | * given arguments. Note that this function doesn't release the created |
40 | * slot. |
41 | * |
42 | * If restart_lsn is a valid value, we use it without WAL reservation |
43 | * routine. So the caller must guarantee that WAL is available. |
44 | */ |
45 | static void |
46 | create_physical_replication_slot(char *name, bool immediately_reserve, |
47 | bool temporary, XLogRecPtr restart_lsn) |
48 | { |
49 | Assert(!MyReplicationSlot); |
50 | |
51 | /* acquire replication slot, this will check for conflicting names */ |
52 | ReplicationSlotCreate(name, false, |
53 | temporary ? RS_TEMPORARY : RS_PERSISTENT); |
54 | |
55 | if (immediately_reserve) |
56 | { |
57 | /* Reserve WAL as the user asked for it */ |
58 | if (XLogRecPtrIsInvalid(restart_lsn)) |
59 | ReplicationSlotReserveWal(); |
60 | else |
61 | MyReplicationSlot->data.restart_lsn = restart_lsn; |
62 | |
63 | /* Write this slot to disk */ |
64 | ReplicationSlotMarkDirty(); |
65 | ReplicationSlotSave(); |
66 | } |
67 | } |
68 | |
69 | /* |
70 | * SQL function for creating a new physical (streaming replication) |
71 | * replication slot. |
72 | */ |
73 | Datum |
74 | pg_create_physical_replication_slot(PG_FUNCTION_ARGS) |
75 | { |
76 | Name name = PG_GETARG_NAME(0); |
77 | bool immediately_reserve = PG_GETARG_BOOL(1); |
78 | bool temporary = PG_GETARG_BOOL(2); |
79 | Datum values[2]; |
80 | bool nulls[2]; |
81 | TupleDesc tupdesc; |
82 | HeapTuple tuple; |
83 | Datum result; |
84 | |
85 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
86 | elog(ERROR, "return type must be a row type" ); |
87 | |
88 | check_permissions(); |
89 | |
90 | CheckSlotRequirements(); |
91 | |
92 | create_physical_replication_slot(NameStr(*name), |
93 | immediately_reserve, |
94 | temporary, |
95 | InvalidXLogRecPtr); |
96 | |
97 | values[0] = NameGetDatum(&MyReplicationSlot->data.name); |
98 | nulls[0] = false; |
99 | |
100 | if (immediately_reserve) |
101 | { |
102 | values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); |
103 | nulls[1] = false; |
104 | } |
105 | else |
106 | nulls[1] = true; |
107 | |
108 | tuple = heap_form_tuple(tupdesc, values, nulls); |
109 | result = HeapTupleGetDatum(tuple); |
110 | |
111 | ReplicationSlotRelease(); |
112 | |
113 | PG_RETURN_DATUM(result); |
114 | } |
115 | |
116 | |
117 | /* |
118 | * Helper function for creating a new logical replication slot with |
119 | * given arguments. Note that this function doesn't release the created |
120 | * slot. |
121 | */ |
122 | static void |
123 | create_logical_replication_slot(char *name, char *plugin, |
124 | bool temporary, XLogRecPtr restart_lsn) |
125 | { |
126 | LogicalDecodingContext *ctx = NULL; |
127 | |
128 | Assert(!MyReplicationSlot); |
129 | |
130 | /* |
131 | * Acquire a logical decoding slot, this will check for conflicting names. |
132 | * Initially create persistent slot as ephemeral - that allows us to |
133 | * nicely handle errors during initialization because it'll get dropped if |
134 | * this transaction fails. We'll make it persistent at the end. Temporary |
135 | * slots can be created as temporary from beginning as they get dropped on |
136 | * error as well. |
137 | */ |
138 | ReplicationSlotCreate(name, true, |
139 | temporary ? RS_TEMPORARY : RS_EPHEMERAL); |
140 | |
141 | /* |
142 | * Create logical decoding context, to build the initial snapshot. |
143 | */ |
144 | ctx = CreateInitDecodingContext(plugin, NIL, |
145 | false, /* do not build snapshot */ |
146 | restart_lsn, |
147 | logical_read_local_xlog_page, NULL, NULL, |
148 | NULL); |
149 | |
150 | /* build initial snapshot, might take a while */ |
151 | DecodingContextFindStartpoint(ctx); |
152 | |
153 | /* don't need the decoding context anymore */ |
154 | FreeDecodingContext(ctx); |
155 | } |
156 | |
157 | /* |
158 | * SQL function for creating a new logical replication slot. |
159 | */ |
160 | Datum |
161 | pg_create_logical_replication_slot(PG_FUNCTION_ARGS) |
162 | { |
163 | Name name = PG_GETARG_NAME(0); |
164 | Name plugin = PG_GETARG_NAME(1); |
165 | bool temporary = PG_GETARG_BOOL(2); |
166 | Datum result; |
167 | TupleDesc tupdesc; |
168 | HeapTuple tuple; |
169 | Datum values[2]; |
170 | bool nulls[2]; |
171 | |
172 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
173 | elog(ERROR, "return type must be a row type" ); |
174 | |
175 | check_permissions(); |
176 | |
177 | CheckLogicalDecodingRequirements(); |
178 | |
179 | create_logical_replication_slot(NameStr(*name), |
180 | NameStr(*plugin), |
181 | temporary, |
182 | InvalidXLogRecPtr); |
183 | |
184 | values[0] = NameGetDatum(&MyReplicationSlot->data.name); |
185 | values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); |
186 | |
187 | memset(nulls, 0, sizeof(nulls)); |
188 | |
189 | tuple = heap_form_tuple(tupdesc, values, nulls); |
190 | result = HeapTupleGetDatum(tuple); |
191 | |
192 | /* ok, slot is now fully created, mark it as persistent if needed */ |
193 | if (!temporary) |
194 | ReplicationSlotPersist(); |
195 | ReplicationSlotRelease(); |
196 | |
197 | PG_RETURN_DATUM(result); |
198 | } |
199 | |
200 | |
201 | /* |
202 | * SQL function for dropping a replication slot. |
203 | */ |
204 | Datum |
205 | pg_drop_replication_slot(PG_FUNCTION_ARGS) |
206 | { |
207 | Name name = PG_GETARG_NAME(0); |
208 | |
209 | check_permissions(); |
210 | |
211 | CheckSlotRequirements(); |
212 | |
213 | ReplicationSlotDrop(NameStr(*name), true); |
214 | |
215 | PG_RETURN_VOID(); |
216 | } |
217 | |
218 | /* |
219 | * pg_get_replication_slots - SQL SRF showing active replication slots. |
220 | */ |
221 | Datum |
222 | pg_get_replication_slots(PG_FUNCTION_ARGS) |
223 | { |
224 | #define PG_GET_REPLICATION_SLOTS_COLS 11 |
225 | ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
226 | TupleDesc tupdesc; |
227 | Tuplestorestate *tupstore; |
228 | MemoryContext per_query_ctx; |
229 | MemoryContext oldcontext; |
230 | int slotno; |
231 | |
232 | /* check to see if caller supports us returning a tuplestore */ |
233 | if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) |
234 | ereport(ERROR, |
235 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
236 | errmsg("set-valued function called in context that cannot accept a set" ))); |
237 | if (!(rsinfo->allowedModes & SFRM_Materialize)) |
238 | ereport(ERROR, |
239 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
240 | errmsg("materialize mode required, but it is not " \ |
241 | "allowed in this context" ))); |
242 | |
243 | /* Build a tuple descriptor for our result type */ |
244 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
245 | elog(ERROR, "return type must be a row type" ); |
246 | |
247 | /* |
248 | * We don't require any special permission to see this function's data |
249 | * because nothing should be sensitive. The most critical being the slot |
250 | * name, which shouldn't contain anything particularly sensitive. |
251 | */ |
252 | |
253 | per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; |
254 | oldcontext = MemoryContextSwitchTo(per_query_ctx); |
255 | |
256 | tupstore = tuplestore_begin_heap(true, false, work_mem); |
257 | rsinfo->returnMode = SFRM_Materialize; |
258 | rsinfo->setResult = tupstore; |
259 | rsinfo->setDesc = tupdesc; |
260 | |
261 | MemoryContextSwitchTo(oldcontext); |
262 | |
263 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
264 | for (slotno = 0; slotno < max_replication_slots; slotno++) |
265 | { |
266 | ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; |
267 | Datum values[PG_GET_REPLICATION_SLOTS_COLS]; |
268 | bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; |
269 | |
270 | ReplicationSlotPersistency persistency; |
271 | TransactionId xmin; |
272 | TransactionId catalog_xmin; |
273 | XLogRecPtr restart_lsn; |
274 | XLogRecPtr confirmed_flush_lsn; |
275 | pid_t active_pid; |
276 | Oid database; |
277 | NameData slot_name; |
278 | NameData plugin; |
279 | int i; |
280 | |
281 | if (!slot->in_use) |
282 | continue; |
283 | |
284 | SpinLockAcquire(&slot->mutex); |
285 | |
286 | xmin = slot->data.xmin; |
287 | catalog_xmin = slot->data.catalog_xmin; |
288 | database = slot->data.database; |
289 | restart_lsn = slot->data.restart_lsn; |
290 | confirmed_flush_lsn = slot->data.confirmed_flush; |
291 | namecpy(&slot_name, &slot->data.name); |
292 | namecpy(&plugin, &slot->data.plugin); |
293 | active_pid = slot->active_pid; |
294 | persistency = slot->data.persistency; |
295 | |
296 | SpinLockRelease(&slot->mutex); |
297 | |
298 | memset(nulls, 0, sizeof(nulls)); |
299 | |
300 | i = 0; |
301 | values[i++] = NameGetDatum(&slot_name); |
302 | |
303 | if (database == InvalidOid) |
304 | nulls[i++] = true; |
305 | else |
306 | values[i++] = NameGetDatum(&plugin); |
307 | |
308 | if (database == InvalidOid) |
309 | values[i++] = CStringGetTextDatum("physical" ); |
310 | else |
311 | values[i++] = CStringGetTextDatum("logical" ); |
312 | |
313 | if (database == InvalidOid) |
314 | nulls[i++] = true; |
315 | else |
316 | values[i++] = database; |
317 | |
318 | values[i++] = BoolGetDatum(persistency == RS_TEMPORARY); |
319 | values[i++] = BoolGetDatum(active_pid != 0); |
320 | |
321 | if (active_pid != 0) |
322 | values[i++] = Int32GetDatum(active_pid); |
323 | else |
324 | nulls[i++] = true; |
325 | |
326 | if (xmin != InvalidTransactionId) |
327 | values[i++] = TransactionIdGetDatum(xmin); |
328 | else |
329 | nulls[i++] = true; |
330 | |
331 | if (catalog_xmin != InvalidTransactionId) |
332 | values[i++] = TransactionIdGetDatum(catalog_xmin); |
333 | else |
334 | nulls[i++] = true; |
335 | |
336 | if (restart_lsn != InvalidXLogRecPtr) |
337 | values[i++] = LSNGetDatum(restart_lsn); |
338 | else |
339 | nulls[i++] = true; |
340 | |
341 | if (confirmed_flush_lsn != InvalidXLogRecPtr) |
342 | values[i++] = LSNGetDatum(confirmed_flush_lsn); |
343 | else |
344 | nulls[i++] = true; |
345 | |
346 | tuplestore_putvalues(tupstore, tupdesc, values, nulls); |
347 | } |
348 | LWLockRelease(ReplicationSlotControlLock); |
349 | |
350 | tuplestore_donestoring(tupstore); |
351 | |
352 | return (Datum) 0; |
353 | } |
354 | |
355 | /* |
356 | * Helper function for advancing our physical replication slot forward. |
357 | * |
358 | * The LSN position to move to is compared simply to the slot's restart_lsn, |
359 | * knowing that any position older than that would be removed by successive |
360 | * checkpoints. |
361 | */ |
362 | static XLogRecPtr |
363 | pg_physical_replication_slot_advance(XLogRecPtr moveto) |
364 | { |
365 | XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; |
366 | XLogRecPtr retlsn = startlsn; |
367 | |
368 | if (startlsn < moveto) |
369 | { |
370 | SpinLockAcquire(&MyReplicationSlot->mutex); |
371 | MyReplicationSlot->data.restart_lsn = moveto; |
372 | SpinLockRelease(&MyReplicationSlot->mutex); |
373 | retlsn = moveto; |
374 | } |
375 | |
376 | return retlsn; |
377 | } |
378 | |
379 | /* |
380 | * Helper function for advancing our logical replication slot forward. |
381 | * |
382 | * The slot's restart_lsn is used as start point for reading records, |
383 | * while confirmed_lsn is used as base point for the decoding context. |
384 | * |
385 | * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, |
386 | * because we need to digest WAL to advance restart_lsn allowing to recycle |
387 | * WAL and removal of old catalog tuples. As decoding is done in fast_forward |
388 | * mode, no changes are generated anyway. |
389 | */ |
390 | static XLogRecPtr |
391 | pg_logical_replication_slot_advance(XLogRecPtr moveto) |
392 | { |
393 | LogicalDecodingContext *ctx; |
394 | ResourceOwner old_resowner = CurrentResourceOwner; |
395 | XLogRecPtr startlsn; |
396 | XLogRecPtr retlsn; |
397 | |
398 | PG_TRY(); |
399 | { |
400 | /* |
401 | * Create our decoding context in fast_forward mode, passing start_lsn |
402 | * as InvalidXLogRecPtr, so that we start processing from my slot's |
403 | * confirmed_flush. |
404 | */ |
405 | ctx = CreateDecodingContext(InvalidXLogRecPtr, |
406 | NIL, |
407 | true, /* fast_forward */ |
408 | logical_read_local_xlog_page, |
409 | NULL, NULL, NULL); |
410 | |
411 | /* |
412 | * Start reading at the slot's restart_lsn, which we know to point to |
413 | * a valid record. |
414 | */ |
415 | startlsn = MyReplicationSlot->data.restart_lsn; |
416 | |
417 | /* Initialize our return value in case we don't do anything */ |
418 | retlsn = MyReplicationSlot->data.confirmed_flush; |
419 | |
420 | /* invalidate non-timetravel entries */ |
421 | InvalidateSystemCaches(); |
422 | |
423 | /* Decode at least one record, until we run out of records */ |
424 | while ((!XLogRecPtrIsInvalid(startlsn) && |
425 | startlsn < moveto) || |
426 | (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) && |
427 | ctx->reader->EndRecPtr < moveto)) |
428 | { |
429 | char *errm = NULL; |
430 | XLogRecord *record; |
431 | |
432 | /* |
433 | * Read records. No changes are generated in fast_forward mode, |
434 | * but snapbuilder/slot statuses are updated properly. |
435 | */ |
436 | record = XLogReadRecord(ctx->reader, startlsn, &errm); |
437 | if (errm) |
438 | elog(ERROR, "%s" , errm); |
439 | |
440 | /* Read sequentially from now on */ |
441 | startlsn = InvalidXLogRecPtr; |
442 | |
443 | /* |
444 | * Process the record. Storage-level changes are ignored in |
445 | * fast_forward mode, but other modules (such as snapbuilder) |
446 | * might still have critical updates to do. |
447 | */ |
448 | if (record) |
449 | LogicalDecodingProcessRecord(ctx, ctx->reader); |
450 | |
451 | /* Stop once the requested target has been reached */ |
452 | if (moveto <= ctx->reader->EndRecPtr) |
453 | break; |
454 | |
455 | CHECK_FOR_INTERRUPTS(); |
456 | } |
457 | |
458 | /* |
459 | * Logical decoding could have clobbered CurrentResourceOwner during |
460 | * transaction management, so restore the executor's value. (This is |
461 | * a kluge, but it's not worth cleaning up right now.) |
462 | */ |
463 | CurrentResourceOwner = old_resowner; |
464 | |
465 | if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) |
466 | { |
467 | LogicalConfirmReceivedLocation(moveto); |
468 | |
469 | /* |
470 | * If only the confirmed_flush LSN has changed the slot won't get |
471 | * marked as dirty by the above. Callers on the walsender |
472 | * interface are expected to keep track of their own progress and |
473 | * don't need it written out. But SQL-interface users cannot |
474 | * specify their own start positions and it's harder for them to |
475 | * keep track of their progress, so we should make more of an |
476 | * effort to save it for them. |
477 | * |
478 | * Dirty the slot so it's written out at the next checkpoint. |
479 | * We'll still lose its position on crash, as documented, but it's |
480 | * better than always losing the position even on clean restart. |
481 | */ |
482 | ReplicationSlotMarkDirty(); |
483 | } |
484 | |
485 | retlsn = MyReplicationSlot->data.confirmed_flush; |
486 | |
487 | /* free context, call shutdown callback */ |
488 | FreeDecodingContext(ctx); |
489 | |
490 | InvalidateSystemCaches(); |
491 | } |
492 | PG_CATCH(); |
493 | { |
494 | /* clear all timetravel entries */ |
495 | InvalidateSystemCaches(); |
496 | |
497 | PG_RE_THROW(); |
498 | } |
499 | PG_END_TRY(); |
500 | |
501 | return retlsn; |
502 | } |
503 | |
504 | /* |
505 | * SQL function for moving the position in a replication slot. |
506 | */ |
507 | Datum |
508 | pg_replication_slot_advance(PG_FUNCTION_ARGS) |
509 | { |
510 | Name slotname = PG_GETARG_NAME(0); |
511 | XLogRecPtr moveto = PG_GETARG_LSN(1); |
512 | XLogRecPtr endlsn; |
513 | XLogRecPtr minlsn; |
514 | TupleDesc tupdesc; |
515 | Datum values[2]; |
516 | bool nulls[2]; |
517 | HeapTuple tuple; |
518 | Datum result; |
519 | |
520 | Assert(!MyReplicationSlot); |
521 | |
522 | check_permissions(); |
523 | |
524 | if (XLogRecPtrIsInvalid(moveto)) |
525 | ereport(ERROR, |
526 | (errmsg("invalid target WAL LSN" ))); |
527 | |
528 | /* Build a tuple descriptor for our result type */ |
529 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
530 | elog(ERROR, "return type must be a row type" ); |
531 | |
532 | /* |
533 | * We can't move slot past what's been flushed/replayed so clamp the |
534 | * target position accordingly. |
535 | */ |
536 | if (!RecoveryInProgress()) |
537 | moveto = Min(moveto, GetFlushRecPtr()); |
538 | else |
539 | moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); |
540 | |
541 | /* Acquire the slot so we "own" it */ |
542 | ReplicationSlotAcquire(NameStr(*slotname), true); |
543 | |
544 | /* A slot whose restart_lsn has never been reserved cannot be advanced */ |
545 | if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) |
546 | ereport(ERROR, |
547 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
548 | errmsg("cannot advance replication slot that has not previously reserved WAL" ))); |
549 | |
550 | /* |
551 | * Check if the slot is not moving backwards. Physical slots rely simply |
552 | * on restart_lsn as a minimum point, while logical slots have confirmed |
553 | * consumption up to confirmed_lsn, meaning that in both cases data older |
554 | * than that is not available anymore. |
555 | */ |
556 | if (OidIsValid(MyReplicationSlot->data.database)) |
557 | minlsn = MyReplicationSlot->data.confirmed_flush; |
558 | else |
559 | minlsn = MyReplicationSlot->data.restart_lsn; |
560 | |
561 | if (moveto < minlsn) |
562 | ereport(ERROR, |
563 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
564 | errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X" , |
565 | (uint32) (moveto >> 32), (uint32) moveto, |
566 | (uint32) (minlsn >> 32), (uint32) minlsn))); |
567 | |
568 | /* Do the actual slot update, depending on the slot type */ |
569 | if (OidIsValid(MyReplicationSlot->data.database)) |
570 | endlsn = pg_logical_replication_slot_advance(moveto); |
571 | else |
572 | endlsn = pg_physical_replication_slot_advance(moveto); |
573 | |
574 | values[0] = NameGetDatum(&MyReplicationSlot->data.name); |
575 | nulls[0] = false; |
576 | |
577 | /* Update the on disk state when lsn was updated. */ |
578 | if (XLogRecPtrIsInvalid(endlsn)) |
579 | { |
580 | ReplicationSlotMarkDirty(); |
581 | ReplicationSlotsComputeRequiredXmin(false); |
582 | ReplicationSlotsComputeRequiredLSN(); |
583 | ReplicationSlotSave(); |
584 | } |
585 | |
586 | ReplicationSlotRelease(); |
587 | |
588 | /* Return the reached position. */ |
589 | values[1] = LSNGetDatum(endlsn); |
590 | nulls[1] = false; |
591 | |
592 | tuple = heap_form_tuple(tupdesc, values, nulls); |
593 | result = HeapTupleGetDatum(tuple); |
594 | |
595 | PG_RETURN_DATUM(result); |
596 | } |
597 | |
598 | /* |
599 | * Helper function of copying a replication slot. |
600 | */ |
601 | static Datum |
602 | copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) |
603 | { |
604 | Name src_name = PG_GETARG_NAME(0); |
605 | Name dst_name = PG_GETARG_NAME(1); |
606 | ReplicationSlot *src = NULL; |
607 | XLogRecPtr src_restart_lsn; |
608 | bool src_islogical; |
609 | bool temporary; |
610 | char *plugin; |
611 | Datum values[2]; |
612 | bool nulls[2]; |
613 | Datum result; |
614 | TupleDesc tupdesc; |
615 | HeapTuple tuple; |
616 | |
617 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
618 | elog(ERROR, "return type must be a row type" ); |
619 | |
620 | check_permissions(); |
621 | |
622 | if (logical_slot) |
623 | CheckLogicalDecodingRequirements(); |
624 | else |
625 | CheckSlotRequirements(); |
626 | |
627 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
628 | |
629 | /* |
630 | * We need to prevent the source slot's reserved WAL from being removed, |
631 | * but we don't want to lock that slot for very long, and it can advance |
632 | * in the meantime. So obtain the source slot's data, and create a new |
633 | * slot using its restart_lsn. Afterwards we lock the source slot again |
634 | * and verify that the data we copied (name, type) has not changed |
635 | * incompatibly. No inconvenient WAL removal can occur once the new slot |
636 | * is created -- but since WAL removal could have occurred before we |
637 | * managed to create the new slot, we advance the new slot's restart_lsn |
638 | * to the source slot's updated restart_lsn the second time we lock it. |
639 | */ |
640 | for (int i = 0; i < max_replication_slots; i++) |
641 | { |
642 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
643 | |
644 | if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0) |
645 | { |
646 | SpinLockAcquire(&s->mutex); |
647 | src_islogical = SlotIsLogical(s); |
648 | src_restart_lsn = s->data.restart_lsn; |
649 | temporary = s->data.persistency == RS_TEMPORARY; |
650 | plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL; |
651 | SpinLockRelease(&s->mutex); |
652 | |
653 | src = s; |
654 | break; |
655 | } |
656 | } |
657 | |
658 | LWLockRelease(ReplicationSlotControlLock); |
659 | |
660 | if (src == NULL) |
661 | ereport(ERROR, |
662 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
663 | errmsg("replication slot \"%s\" does not exist" , NameStr(*src_name)))); |
664 | |
665 | /* Check type of replication slot */ |
666 | if (src_islogical != logical_slot) |
667 | ereport(ERROR, |
668 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
669 | src_islogical ? |
670 | errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot" , |
671 | NameStr(*src_name)) : |
672 | errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot" , |
673 | NameStr(*src_name)))); |
674 | |
675 | /* Copying non-reserved slot doesn't make sense */ |
676 | if (XLogRecPtrIsInvalid(src_restart_lsn)) |
677 | { |
678 | Assert(!logical_slot); |
679 | ereport(ERROR, |
680 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
681 | (errmsg("cannot copy a replication slot that doesn't reserve WAL" )))); |
682 | } |
683 | |
684 | /* Overwrite params from optional arguments */ |
685 | if (PG_NARGS() >= 3) |
686 | temporary = PG_GETARG_BOOL(2); |
687 | if (PG_NARGS() >= 4) |
688 | { |
689 | Assert(logical_slot); |
690 | plugin = NameStr(*(PG_GETARG_NAME(3))); |
691 | } |
692 | |
693 | /* Create new slot and acquire it */ |
694 | if (logical_slot) |
695 | create_logical_replication_slot(NameStr(*dst_name), |
696 | plugin, |
697 | temporary, |
698 | src_restart_lsn); |
699 | else |
700 | create_physical_replication_slot(NameStr(*dst_name), |
701 | true, |
702 | temporary, |
703 | src_restart_lsn); |
704 | |
705 | /* |
706 | * Update the destination slot to current values of the source slot; |
707 | * recheck that the source slot is still the one we saw previously. |
708 | */ |
709 | { |
710 | TransactionId copy_effective_xmin; |
711 | TransactionId copy_effective_catalog_xmin; |
712 | TransactionId copy_xmin; |
713 | TransactionId copy_catalog_xmin; |
714 | XLogRecPtr copy_restart_lsn; |
715 | bool copy_islogical; |
716 | char *copy_name; |
717 | |
718 | /* Copy data of source slot again */ |
719 | SpinLockAcquire(&src->mutex); |
720 | copy_effective_xmin = src->effective_xmin; |
721 | copy_effective_catalog_xmin = src->effective_catalog_xmin; |
722 | |
723 | copy_xmin = src->data.xmin; |
724 | copy_catalog_xmin = src->data.catalog_xmin; |
725 | copy_restart_lsn = src->data.restart_lsn; |
726 | |
727 | /* for existence check */ |
728 | copy_name = pstrdup(NameStr(src->data.name)); |
729 | copy_islogical = SlotIsLogical(src); |
730 | SpinLockRelease(&src->mutex); |
731 | |
732 | /* |
733 | * Check if the source slot still exists and is valid. We regard it as |
734 | * invalid if the type of replication slot or name has been changed, |
735 | * or the restart_lsn either is invalid or has gone backward. (The |
736 | * restart_lsn could go backwards if the source slot is dropped and |
737 | * copied from an older slot during installation.) |
738 | * |
739 | * Since erroring out will release and drop the destination slot we |
740 | * don't need to release it here. |
741 | */ |
742 | if (copy_restart_lsn < src_restart_lsn || |
743 | src_islogical != copy_islogical || |
744 | strcmp(copy_name, NameStr(*src_name)) != 0) |
745 | ereport(ERROR, |
746 | (errmsg("could not copy replication slot \"%s\"" , |
747 | NameStr(*src_name)), |
748 | errdetail("The source replication slot was modified incompatibly during the copy operation." ))); |
749 | |
750 | /* Install copied values again */ |
751 | SpinLockAcquire(&MyReplicationSlot->mutex); |
752 | MyReplicationSlot->effective_xmin = copy_effective_xmin; |
753 | MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin; |
754 | |
755 | MyReplicationSlot->data.xmin = copy_xmin; |
756 | MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; |
757 | MyReplicationSlot->data.restart_lsn = copy_restart_lsn; |
758 | SpinLockRelease(&MyReplicationSlot->mutex); |
759 | |
760 | ReplicationSlotMarkDirty(); |
761 | ReplicationSlotsComputeRequiredXmin(false); |
762 | ReplicationSlotsComputeRequiredLSN(); |
763 | ReplicationSlotSave(); |
764 | |
765 | #ifdef USE_ASSERT_CHECKING |
766 | /* Check that the restart_lsn is available */ |
767 | { |
768 | XLogSegNo segno; |
769 | |
770 | XLByteToSeg(copy_restart_lsn, segno, wal_segment_size); |
771 | Assert(XLogGetLastRemovedSegno() < segno); |
772 | } |
773 | #endif |
774 | } |
775 | |
776 | /* target slot fully created, mark as persistent if needed */ |
777 | if (logical_slot && !temporary) |
778 | ReplicationSlotPersist(); |
779 | |
780 | /* All done. Set up the return values */ |
781 | values[0] = NameGetDatum(dst_name); |
782 | nulls[0] = false; |
783 | if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush)) |
784 | { |
785 | values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); |
786 | nulls[1] = false; |
787 | } |
788 | else |
789 | nulls[1] = true; |
790 | |
791 | tuple = heap_form_tuple(tupdesc, values, nulls); |
792 | result = HeapTupleGetDatum(tuple); |
793 | |
794 | ReplicationSlotRelease(); |
795 | |
796 | PG_RETURN_DATUM(result); |
797 | } |
798 | |
799 | /* The wrappers below are all to appease opr_sanity */ |
800 | Datum |
801 | pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS) |
802 | { |
803 | return copy_replication_slot(fcinfo, true); |
804 | } |
805 | |
806 | Datum |
807 | pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS) |
808 | { |
809 | return copy_replication_slot(fcinfo, true); |
810 | } |
811 | |
812 | Datum |
813 | pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS) |
814 | { |
815 | return copy_replication_slot(fcinfo, true); |
816 | } |
817 | |
818 | Datum |
819 | pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS) |
820 | { |
821 | return copy_replication_slot(fcinfo, false); |
822 | } |
823 | |
824 | Datum |
825 | pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS) |
826 | { |
827 | return copy_replication_slot(fcinfo, false); |
828 | } |
829 | |