| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * slotfuncs.c |
| 4 | * Support functions for replication slots |
| 5 | * |
| 6 | * Copyright (c) 2012-2019, PostgreSQL Global Development Group |
| 7 | * |
| 8 | * IDENTIFICATION |
| 9 | * src/backend/replication/slotfuncs.c |
| 10 | * |
| 11 | *------------------------------------------------------------------------- |
| 12 | */ |
| 13 | #include "postgres.h" |
| 14 | |
| 15 | #include "access/htup_details.h" |
| 16 | #include "access/xlog_internal.h" |
| 17 | #include "funcapi.h" |
| 18 | #include "miscadmin.h" |
| 19 | #include "replication/decode.h" |
| 20 | #include "replication/slot.h" |
| 21 | #include "replication/logical.h" |
| 22 | #include "replication/logicalfuncs.h" |
| 23 | #include "utils/builtins.h" |
| 24 | #include "utils/inval.h" |
| 25 | #include "utils/pg_lsn.h" |
| 26 | #include "utils/resowner.h" |
| 27 | |
| 28 | static void |
| 29 | check_permissions(void) |
| 30 | { |
| 31 | if (!superuser() && !has_rolreplication(GetUserId())) |
| 32 | ereport(ERROR, |
| 33 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
| 34 | (errmsg("must be superuser or replication role to use replication slots" )))); |
| 35 | } |
| 36 | |
| 37 | /* |
| 38 | * Helper function for creating a new physical replication slot with |
| 39 | * given arguments. Note that this function doesn't release the created |
| 40 | * slot. |
| 41 | * |
| 42 | * If restart_lsn is a valid value, we use it without WAL reservation |
| 43 | * routine. So the caller must guarantee that WAL is available. |
| 44 | */ |
| 45 | static void |
| 46 | create_physical_replication_slot(char *name, bool immediately_reserve, |
| 47 | bool temporary, XLogRecPtr restart_lsn) |
| 48 | { |
| 49 | Assert(!MyReplicationSlot); |
| 50 | |
| 51 | /* acquire replication slot, this will check for conflicting names */ |
| 52 | ReplicationSlotCreate(name, false, |
| 53 | temporary ? RS_TEMPORARY : RS_PERSISTENT); |
| 54 | |
| 55 | if (immediately_reserve) |
| 56 | { |
| 57 | /* Reserve WAL as the user asked for it */ |
| 58 | if (XLogRecPtrIsInvalid(restart_lsn)) |
| 59 | ReplicationSlotReserveWal(); |
| 60 | else |
| 61 | MyReplicationSlot->data.restart_lsn = restart_lsn; |
| 62 | |
| 63 | /* Write this slot to disk */ |
| 64 | ReplicationSlotMarkDirty(); |
| 65 | ReplicationSlotSave(); |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | /* |
| 70 | * SQL function for creating a new physical (streaming replication) |
| 71 | * replication slot. |
| 72 | */ |
| 73 | Datum |
| 74 | pg_create_physical_replication_slot(PG_FUNCTION_ARGS) |
| 75 | { |
| 76 | Name name = PG_GETARG_NAME(0); |
| 77 | bool immediately_reserve = PG_GETARG_BOOL(1); |
| 78 | bool temporary = PG_GETARG_BOOL(2); |
| 79 | Datum values[2]; |
| 80 | bool nulls[2]; |
| 81 | TupleDesc tupdesc; |
| 82 | HeapTuple tuple; |
| 83 | Datum result; |
| 84 | |
| 85 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
| 86 | elog(ERROR, "return type must be a row type" ); |
| 87 | |
| 88 | check_permissions(); |
| 89 | |
| 90 | CheckSlotRequirements(); |
| 91 | |
| 92 | create_physical_replication_slot(NameStr(*name), |
| 93 | immediately_reserve, |
| 94 | temporary, |
| 95 | InvalidXLogRecPtr); |
| 96 | |
| 97 | values[0] = NameGetDatum(&MyReplicationSlot->data.name); |
| 98 | nulls[0] = false; |
| 99 | |
| 100 | if (immediately_reserve) |
| 101 | { |
| 102 | values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); |
| 103 | nulls[1] = false; |
| 104 | } |
| 105 | else |
| 106 | nulls[1] = true; |
| 107 | |
| 108 | tuple = heap_form_tuple(tupdesc, values, nulls); |
| 109 | result = HeapTupleGetDatum(tuple); |
| 110 | |
| 111 | ReplicationSlotRelease(); |
| 112 | |
| 113 | PG_RETURN_DATUM(result); |
| 114 | } |
| 115 | |
| 116 | |
| 117 | /* |
| 118 | * Helper function for creating a new logical replication slot with |
| 119 | * given arguments. Note that this function doesn't release the created |
| 120 | * slot. |
| 121 | */ |
| 122 | static void |
| 123 | create_logical_replication_slot(char *name, char *plugin, |
| 124 | bool temporary, XLogRecPtr restart_lsn) |
| 125 | { |
| 126 | LogicalDecodingContext *ctx = NULL; |
| 127 | |
| 128 | Assert(!MyReplicationSlot); |
| 129 | |
| 130 | /* |
| 131 | * Acquire a logical decoding slot, this will check for conflicting names. |
| 132 | * Initially create persistent slot as ephemeral - that allows us to |
| 133 | * nicely handle errors during initialization because it'll get dropped if |
| 134 | * this transaction fails. We'll make it persistent at the end. Temporary |
| 135 | * slots can be created as temporary from beginning as they get dropped on |
| 136 | * error as well. |
| 137 | */ |
| 138 | ReplicationSlotCreate(name, true, |
| 139 | temporary ? RS_TEMPORARY : RS_EPHEMERAL); |
| 140 | |
| 141 | /* |
| 142 | * Create logical decoding context, to build the initial snapshot. |
| 143 | */ |
| 144 | ctx = CreateInitDecodingContext(plugin, NIL, |
| 145 | false, /* do not build snapshot */ |
| 146 | restart_lsn, |
| 147 | logical_read_local_xlog_page, NULL, NULL, |
| 148 | NULL); |
| 149 | |
| 150 | /* build initial snapshot, might take a while */ |
| 151 | DecodingContextFindStartpoint(ctx); |
| 152 | |
| 153 | /* don't need the decoding context anymore */ |
| 154 | FreeDecodingContext(ctx); |
| 155 | } |
| 156 | |
| 157 | /* |
| 158 | * SQL function for creating a new logical replication slot. |
| 159 | */ |
| 160 | Datum |
| 161 | pg_create_logical_replication_slot(PG_FUNCTION_ARGS) |
| 162 | { |
| 163 | Name name = PG_GETARG_NAME(0); |
| 164 | Name plugin = PG_GETARG_NAME(1); |
| 165 | bool temporary = PG_GETARG_BOOL(2); |
| 166 | Datum result; |
| 167 | TupleDesc tupdesc; |
| 168 | HeapTuple tuple; |
| 169 | Datum values[2]; |
| 170 | bool nulls[2]; |
| 171 | |
| 172 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
| 173 | elog(ERROR, "return type must be a row type" ); |
| 174 | |
| 175 | check_permissions(); |
| 176 | |
| 177 | CheckLogicalDecodingRequirements(); |
| 178 | |
| 179 | create_logical_replication_slot(NameStr(*name), |
| 180 | NameStr(*plugin), |
| 181 | temporary, |
| 182 | InvalidXLogRecPtr); |
| 183 | |
| 184 | values[0] = NameGetDatum(&MyReplicationSlot->data.name); |
| 185 | values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); |
| 186 | |
| 187 | memset(nulls, 0, sizeof(nulls)); |
| 188 | |
| 189 | tuple = heap_form_tuple(tupdesc, values, nulls); |
| 190 | result = HeapTupleGetDatum(tuple); |
| 191 | |
| 192 | /* ok, slot is now fully created, mark it as persistent if needed */ |
| 193 | if (!temporary) |
| 194 | ReplicationSlotPersist(); |
| 195 | ReplicationSlotRelease(); |
| 196 | |
| 197 | PG_RETURN_DATUM(result); |
| 198 | } |
| 199 | |
| 200 | |
| 201 | /* |
| 202 | * SQL function for dropping a replication slot. |
| 203 | */ |
| 204 | Datum |
| 205 | pg_drop_replication_slot(PG_FUNCTION_ARGS) |
| 206 | { |
| 207 | Name name = PG_GETARG_NAME(0); |
| 208 | |
| 209 | check_permissions(); |
| 210 | |
| 211 | CheckSlotRequirements(); |
| 212 | |
| 213 | ReplicationSlotDrop(NameStr(*name), true); |
| 214 | |
| 215 | PG_RETURN_VOID(); |
| 216 | } |
| 217 | |
| 218 | /* |
| 219 | * pg_get_replication_slots - SQL SRF showing active replication slots. |
| 220 | */ |
| 221 | Datum |
| 222 | pg_get_replication_slots(PG_FUNCTION_ARGS) |
| 223 | { |
| 224 | #define PG_GET_REPLICATION_SLOTS_COLS 11 |
| 225 | ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
| 226 | TupleDesc tupdesc; |
| 227 | Tuplestorestate *tupstore; |
| 228 | MemoryContext per_query_ctx; |
| 229 | MemoryContext oldcontext; |
| 230 | int slotno; |
| 231 | |
| 232 | /* check to see if caller supports us returning a tuplestore */ |
| 233 | if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) |
| 234 | ereport(ERROR, |
| 235 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 236 | errmsg("set-valued function called in context that cannot accept a set" ))); |
| 237 | if (!(rsinfo->allowedModes & SFRM_Materialize)) |
| 238 | ereport(ERROR, |
| 239 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 240 | errmsg("materialize mode required, but it is not " \ |
| 241 | "allowed in this context" ))); |
| 242 | |
| 243 | /* Build a tuple descriptor for our result type */ |
| 244 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
| 245 | elog(ERROR, "return type must be a row type" ); |
| 246 | |
| 247 | /* |
| 248 | * We don't require any special permission to see this function's data |
| 249 | * because nothing should be sensitive. The most critical being the slot |
| 250 | * name, which shouldn't contain anything particularly sensitive. |
| 251 | */ |
| 252 | |
| 253 | per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; |
| 254 | oldcontext = MemoryContextSwitchTo(per_query_ctx); |
| 255 | |
| 256 | tupstore = tuplestore_begin_heap(true, false, work_mem); |
| 257 | rsinfo->returnMode = SFRM_Materialize; |
| 258 | rsinfo->setResult = tupstore; |
| 259 | rsinfo->setDesc = tupdesc; |
| 260 | |
| 261 | MemoryContextSwitchTo(oldcontext); |
| 262 | |
| 263 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
| 264 | for (slotno = 0; slotno < max_replication_slots; slotno++) |
| 265 | { |
| 266 | ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; |
| 267 | Datum values[PG_GET_REPLICATION_SLOTS_COLS]; |
| 268 | bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; |
| 269 | |
| 270 | ReplicationSlotPersistency persistency; |
| 271 | TransactionId xmin; |
| 272 | TransactionId catalog_xmin; |
| 273 | XLogRecPtr restart_lsn; |
| 274 | XLogRecPtr confirmed_flush_lsn; |
| 275 | pid_t active_pid; |
| 276 | Oid database; |
| 277 | NameData slot_name; |
| 278 | NameData plugin; |
| 279 | int i; |
| 280 | |
| 281 | if (!slot->in_use) |
| 282 | continue; |
| 283 | |
| 284 | SpinLockAcquire(&slot->mutex); |
| 285 | |
| 286 | xmin = slot->data.xmin; |
| 287 | catalog_xmin = slot->data.catalog_xmin; |
| 288 | database = slot->data.database; |
| 289 | restart_lsn = slot->data.restart_lsn; |
| 290 | confirmed_flush_lsn = slot->data.confirmed_flush; |
| 291 | namecpy(&slot_name, &slot->data.name); |
| 292 | namecpy(&plugin, &slot->data.plugin); |
| 293 | active_pid = slot->active_pid; |
| 294 | persistency = slot->data.persistency; |
| 295 | |
| 296 | SpinLockRelease(&slot->mutex); |
| 297 | |
| 298 | memset(nulls, 0, sizeof(nulls)); |
| 299 | |
| 300 | i = 0; |
| 301 | values[i++] = NameGetDatum(&slot_name); |
| 302 | |
| 303 | if (database == InvalidOid) |
| 304 | nulls[i++] = true; |
| 305 | else |
| 306 | values[i++] = NameGetDatum(&plugin); |
| 307 | |
| 308 | if (database == InvalidOid) |
| 309 | values[i++] = CStringGetTextDatum("physical" ); |
| 310 | else |
| 311 | values[i++] = CStringGetTextDatum("logical" ); |
| 312 | |
| 313 | if (database == InvalidOid) |
| 314 | nulls[i++] = true; |
| 315 | else |
| 316 | values[i++] = database; |
| 317 | |
| 318 | values[i++] = BoolGetDatum(persistency == RS_TEMPORARY); |
| 319 | values[i++] = BoolGetDatum(active_pid != 0); |
| 320 | |
| 321 | if (active_pid != 0) |
| 322 | values[i++] = Int32GetDatum(active_pid); |
| 323 | else |
| 324 | nulls[i++] = true; |
| 325 | |
| 326 | if (xmin != InvalidTransactionId) |
| 327 | values[i++] = TransactionIdGetDatum(xmin); |
| 328 | else |
| 329 | nulls[i++] = true; |
| 330 | |
| 331 | if (catalog_xmin != InvalidTransactionId) |
| 332 | values[i++] = TransactionIdGetDatum(catalog_xmin); |
| 333 | else |
| 334 | nulls[i++] = true; |
| 335 | |
| 336 | if (restart_lsn != InvalidXLogRecPtr) |
| 337 | values[i++] = LSNGetDatum(restart_lsn); |
| 338 | else |
| 339 | nulls[i++] = true; |
| 340 | |
| 341 | if (confirmed_flush_lsn != InvalidXLogRecPtr) |
| 342 | values[i++] = LSNGetDatum(confirmed_flush_lsn); |
| 343 | else |
| 344 | nulls[i++] = true; |
| 345 | |
| 346 | tuplestore_putvalues(tupstore, tupdesc, values, nulls); |
| 347 | } |
| 348 | LWLockRelease(ReplicationSlotControlLock); |
| 349 | |
| 350 | tuplestore_donestoring(tupstore); |
| 351 | |
| 352 | return (Datum) 0; |
| 353 | } |
| 354 | |
| 355 | /* |
| 356 | * Helper function for advancing our physical replication slot forward. |
| 357 | * |
| 358 | * The LSN position to move to is compared simply to the slot's restart_lsn, |
| 359 | * knowing that any position older than that would be removed by successive |
| 360 | * checkpoints. |
| 361 | */ |
| 362 | static XLogRecPtr |
| 363 | pg_physical_replication_slot_advance(XLogRecPtr moveto) |
| 364 | { |
| 365 | XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; |
| 366 | XLogRecPtr retlsn = startlsn; |
| 367 | |
| 368 | if (startlsn < moveto) |
| 369 | { |
| 370 | SpinLockAcquire(&MyReplicationSlot->mutex); |
| 371 | MyReplicationSlot->data.restart_lsn = moveto; |
| 372 | SpinLockRelease(&MyReplicationSlot->mutex); |
| 373 | retlsn = moveto; |
| 374 | } |
| 375 | |
| 376 | return retlsn; |
| 377 | } |
| 378 | |
| 379 | /* |
| 380 | * Helper function for advancing our logical replication slot forward. |
| 381 | * |
| 382 | * The slot's restart_lsn is used as start point for reading records, |
| 383 | * while confirmed_lsn is used as base point for the decoding context. |
| 384 | * |
| 385 | * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, |
| 386 | * because we need to digest WAL to advance restart_lsn allowing to recycle |
| 387 | * WAL and removal of old catalog tuples. As decoding is done in fast_forward |
| 388 | * mode, no changes are generated anyway. |
| 389 | */ |
| 390 | static XLogRecPtr |
| 391 | pg_logical_replication_slot_advance(XLogRecPtr moveto) |
| 392 | { |
| 393 | LogicalDecodingContext *ctx; |
| 394 | ResourceOwner old_resowner = CurrentResourceOwner; |
| 395 | XLogRecPtr startlsn; |
| 396 | XLogRecPtr retlsn; |
| 397 | |
| 398 | PG_TRY(); |
| 399 | { |
| 400 | /* |
| 401 | * Create our decoding context in fast_forward mode, passing start_lsn |
| 402 | * as InvalidXLogRecPtr, so that we start processing from my slot's |
| 403 | * confirmed_flush. |
| 404 | */ |
| 405 | ctx = CreateDecodingContext(InvalidXLogRecPtr, |
| 406 | NIL, |
| 407 | true, /* fast_forward */ |
| 408 | logical_read_local_xlog_page, |
| 409 | NULL, NULL, NULL); |
| 410 | |
| 411 | /* |
| 412 | * Start reading at the slot's restart_lsn, which we know to point to |
| 413 | * a valid record. |
| 414 | */ |
| 415 | startlsn = MyReplicationSlot->data.restart_lsn; |
| 416 | |
| 417 | /* Initialize our return value in case we don't do anything */ |
| 418 | retlsn = MyReplicationSlot->data.confirmed_flush; |
| 419 | |
| 420 | /* invalidate non-timetravel entries */ |
| 421 | InvalidateSystemCaches(); |
| 422 | |
| 423 | /* Decode at least one record, until we run out of records */ |
| 424 | while ((!XLogRecPtrIsInvalid(startlsn) && |
| 425 | startlsn < moveto) || |
| 426 | (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) && |
| 427 | ctx->reader->EndRecPtr < moveto)) |
| 428 | { |
| 429 | char *errm = NULL; |
| 430 | XLogRecord *record; |
| 431 | |
| 432 | /* |
| 433 | * Read records. No changes are generated in fast_forward mode, |
| 434 | * but snapbuilder/slot statuses are updated properly. |
| 435 | */ |
| 436 | record = XLogReadRecord(ctx->reader, startlsn, &errm); |
| 437 | if (errm) |
| 438 | elog(ERROR, "%s" , errm); |
| 439 | |
| 440 | /* Read sequentially from now on */ |
| 441 | startlsn = InvalidXLogRecPtr; |
| 442 | |
| 443 | /* |
| 444 | * Process the record. Storage-level changes are ignored in |
| 445 | * fast_forward mode, but other modules (such as snapbuilder) |
| 446 | * might still have critical updates to do. |
| 447 | */ |
| 448 | if (record) |
| 449 | LogicalDecodingProcessRecord(ctx, ctx->reader); |
| 450 | |
| 451 | /* Stop once the requested target has been reached */ |
| 452 | if (moveto <= ctx->reader->EndRecPtr) |
| 453 | break; |
| 454 | |
| 455 | CHECK_FOR_INTERRUPTS(); |
| 456 | } |
| 457 | |
| 458 | /* |
| 459 | * Logical decoding could have clobbered CurrentResourceOwner during |
| 460 | * transaction management, so restore the executor's value. (This is |
| 461 | * a kluge, but it's not worth cleaning up right now.) |
| 462 | */ |
| 463 | CurrentResourceOwner = old_resowner; |
| 464 | |
| 465 | if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) |
| 466 | { |
| 467 | LogicalConfirmReceivedLocation(moveto); |
| 468 | |
| 469 | /* |
| 470 | * If only the confirmed_flush LSN has changed the slot won't get |
| 471 | * marked as dirty by the above. Callers on the walsender |
| 472 | * interface are expected to keep track of their own progress and |
| 473 | * don't need it written out. But SQL-interface users cannot |
| 474 | * specify their own start positions and it's harder for them to |
| 475 | * keep track of their progress, so we should make more of an |
| 476 | * effort to save it for them. |
| 477 | * |
| 478 | * Dirty the slot so it's written out at the next checkpoint. |
| 479 | * We'll still lose its position on crash, as documented, but it's |
| 480 | * better than always losing the position even on clean restart. |
| 481 | */ |
| 482 | ReplicationSlotMarkDirty(); |
| 483 | } |
| 484 | |
| 485 | retlsn = MyReplicationSlot->data.confirmed_flush; |
| 486 | |
| 487 | /* free context, call shutdown callback */ |
| 488 | FreeDecodingContext(ctx); |
| 489 | |
| 490 | InvalidateSystemCaches(); |
| 491 | } |
| 492 | PG_CATCH(); |
| 493 | { |
| 494 | /* clear all timetravel entries */ |
| 495 | InvalidateSystemCaches(); |
| 496 | |
| 497 | PG_RE_THROW(); |
| 498 | } |
| 499 | PG_END_TRY(); |
| 500 | |
| 501 | return retlsn; |
| 502 | } |
| 503 | |
| 504 | /* |
| 505 | * SQL function for moving the position in a replication slot. |
| 506 | */ |
| 507 | Datum |
| 508 | pg_replication_slot_advance(PG_FUNCTION_ARGS) |
| 509 | { |
| 510 | Name slotname = PG_GETARG_NAME(0); |
| 511 | XLogRecPtr moveto = PG_GETARG_LSN(1); |
| 512 | XLogRecPtr endlsn; |
| 513 | XLogRecPtr minlsn; |
| 514 | TupleDesc tupdesc; |
| 515 | Datum values[2]; |
| 516 | bool nulls[2]; |
| 517 | HeapTuple tuple; |
| 518 | Datum result; |
| 519 | |
| 520 | Assert(!MyReplicationSlot); |
| 521 | |
| 522 | check_permissions(); |
| 523 | |
| 524 | if (XLogRecPtrIsInvalid(moveto)) |
| 525 | ereport(ERROR, |
| 526 | (errmsg("invalid target WAL LSN" ))); |
| 527 | |
| 528 | /* Build a tuple descriptor for our result type */ |
| 529 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
| 530 | elog(ERROR, "return type must be a row type" ); |
| 531 | |
| 532 | /* |
| 533 | * We can't move slot past what's been flushed/replayed so clamp the |
| 534 | * target position accordingly. |
| 535 | */ |
| 536 | if (!RecoveryInProgress()) |
| 537 | moveto = Min(moveto, GetFlushRecPtr()); |
| 538 | else |
| 539 | moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); |
| 540 | |
| 541 | /* Acquire the slot so we "own" it */ |
| 542 | ReplicationSlotAcquire(NameStr(*slotname), true); |
| 543 | |
| 544 | /* A slot whose restart_lsn has never been reserved cannot be advanced */ |
| 545 | if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) |
| 546 | ereport(ERROR, |
| 547 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| 548 | errmsg("cannot advance replication slot that has not previously reserved WAL" ))); |
| 549 | |
| 550 | /* |
| 551 | * Check if the slot is not moving backwards. Physical slots rely simply |
| 552 | * on restart_lsn as a minimum point, while logical slots have confirmed |
| 553 | * consumption up to confirmed_lsn, meaning that in both cases data older |
| 554 | * than that is not available anymore. |
| 555 | */ |
| 556 | if (OidIsValid(MyReplicationSlot->data.database)) |
| 557 | minlsn = MyReplicationSlot->data.confirmed_flush; |
| 558 | else |
| 559 | minlsn = MyReplicationSlot->data.restart_lsn; |
| 560 | |
| 561 | if (moveto < minlsn) |
| 562 | ereport(ERROR, |
| 563 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| 564 | errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X" , |
| 565 | (uint32) (moveto >> 32), (uint32) moveto, |
| 566 | (uint32) (minlsn >> 32), (uint32) minlsn))); |
| 567 | |
| 568 | /* Do the actual slot update, depending on the slot type */ |
| 569 | if (OidIsValid(MyReplicationSlot->data.database)) |
| 570 | endlsn = pg_logical_replication_slot_advance(moveto); |
| 571 | else |
| 572 | endlsn = pg_physical_replication_slot_advance(moveto); |
| 573 | |
| 574 | values[0] = NameGetDatum(&MyReplicationSlot->data.name); |
| 575 | nulls[0] = false; |
| 576 | |
| 577 | /* Update the on disk state when lsn was updated. */ |
| 578 | if (XLogRecPtrIsInvalid(endlsn)) |
| 579 | { |
| 580 | ReplicationSlotMarkDirty(); |
| 581 | ReplicationSlotsComputeRequiredXmin(false); |
| 582 | ReplicationSlotsComputeRequiredLSN(); |
| 583 | ReplicationSlotSave(); |
| 584 | } |
| 585 | |
| 586 | ReplicationSlotRelease(); |
| 587 | |
| 588 | /* Return the reached position. */ |
| 589 | values[1] = LSNGetDatum(endlsn); |
| 590 | nulls[1] = false; |
| 591 | |
| 592 | tuple = heap_form_tuple(tupdesc, values, nulls); |
| 593 | result = HeapTupleGetDatum(tuple); |
| 594 | |
| 595 | PG_RETURN_DATUM(result); |
| 596 | } |
| 597 | |
| 598 | /* |
| 599 | * Helper function of copying a replication slot. |
| 600 | */ |
| 601 | static Datum |
| 602 | copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) |
| 603 | { |
| 604 | Name src_name = PG_GETARG_NAME(0); |
| 605 | Name dst_name = PG_GETARG_NAME(1); |
| 606 | ReplicationSlot *src = NULL; |
| 607 | XLogRecPtr src_restart_lsn; |
| 608 | bool src_islogical; |
| 609 | bool temporary; |
| 610 | char *plugin; |
| 611 | Datum values[2]; |
| 612 | bool nulls[2]; |
| 613 | Datum result; |
| 614 | TupleDesc tupdesc; |
| 615 | HeapTuple tuple; |
| 616 | |
| 617 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
| 618 | elog(ERROR, "return type must be a row type" ); |
| 619 | |
| 620 | check_permissions(); |
| 621 | |
| 622 | if (logical_slot) |
| 623 | CheckLogicalDecodingRequirements(); |
| 624 | else |
| 625 | CheckSlotRequirements(); |
| 626 | |
| 627 | LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
| 628 | |
| 629 | /* |
| 630 | * We need to prevent the source slot's reserved WAL from being removed, |
| 631 | * but we don't want to lock that slot for very long, and it can advance |
| 632 | * in the meantime. So obtain the source slot's data, and create a new |
| 633 | * slot using its restart_lsn. Afterwards we lock the source slot again |
| 634 | * and verify that the data we copied (name, type) has not changed |
| 635 | * incompatibly. No inconvenient WAL removal can occur once the new slot |
| 636 | * is created -- but since WAL removal could have occurred before we |
| 637 | * managed to create the new slot, we advance the new slot's restart_lsn |
| 638 | * to the source slot's updated restart_lsn the second time we lock it. |
| 639 | */ |
| 640 | for (int i = 0; i < max_replication_slots; i++) |
| 641 | { |
| 642 | ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; |
| 643 | |
| 644 | if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0) |
| 645 | { |
| 646 | SpinLockAcquire(&s->mutex); |
| 647 | src_islogical = SlotIsLogical(s); |
| 648 | src_restart_lsn = s->data.restart_lsn; |
| 649 | temporary = s->data.persistency == RS_TEMPORARY; |
| 650 | plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL; |
| 651 | SpinLockRelease(&s->mutex); |
| 652 | |
| 653 | src = s; |
| 654 | break; |
| 655 | } |
| 656 | } |
| 657 | |
| 658 | LWLockRelease(ReplicationSlotControlLock); |
| 659 | |
| 660 | if (src == NULL) |
| 661 | ereport(ERROR, |
| 662 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
| 663 | errmsg("replication slot \"%s\" does not exist" , NameStr(*src_name)))); |
| 664 | |
| 665 | /* Check type of replication slot */ |
| 666 | if (src_islogical != logical_slot) |
| 667 | ereport(ERROR, |
| 668 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 669 | src_islogical ? |
| 670 | errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot" , |
| 671 | NameStr(*src_name)) : |
| 672 | errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot" , |
| 673 | NameStr(*src_name)))); |
| 674 | |
| 675 | /* Copying non-reserved slot doesn't make sense */ |
| 676 | if (XLogRecPtrIsInvalid(src_restart_lsn)) |
| 677 | { |
| 678 | Assert(!logical_slot); |
| 679 | ereport(ERROR, |
| 680 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 681 | (errmsg("cannot copy a replication slot that doesn't reserve WAL" )))); |
| 682 | } |
| 683 | |
| 684 | /* Overwrite params from optional arguments */ |
| 685 | if (PG_NARGS() >= 3) |
| 686 | temporary = PG_GETARG_BOOL(2); |
| 687 | if (PG_NARGS() >= 4) |
| 688 | { |
| 689 | Assert(logical_slot); |
| 690 | plugin = NameStr(*(PG_GETARG_NAME(3))); |
| 691 | } |
| 692 | |
| 693 | /* Create new slot and acquire it */ |
| 694 | if (logical_slot) |
| 695 | create_logical_replication_slot(NameStr(*dst_name), |
| 696 | plugin, |
| 697 | temporary, |
| 698 | src_restart_lsn); |
| 699 | else |
| 700 | create_physical_replication_slot(NameStr(*dst_name), |
| 701 | true, |
| 702 | temporary, |
| 703 | src_restart_lsn); |
| 704 | |
| 705 | /* |
| 706 | * Update the destination slot to current values of the source slot; |
| 707 | * recheck that the source slot is still the one we saw previously. |
| 708 | */ |
| 709 | { |
| 710 | TransactionId copy_effective_xmin; |
| 711 | TransactionId copy_effective_catalog_xmin; |
| 712 | TransactionId copy_xmin; |
| 713 | TransactionId copy_catalog_xmin; |
| 714 | XLogRecPtr copy_restart_lsn; |
| 715 | bool copy_islogical; |
| 716 | char *copy_name; |
| 717 | |
| 718 | /* Copy data of source slot again */ |
| 719 | SpinLockAcquire(&src->mutex); |
| 720 | copy_effective_xmin = src->effective_xmin; |
| 721 | copy_effective_catalog_xmin = src->effective_catalog_xmin; |
| 722 | |
| 723 | copy_xmin = src->data.xmin; |
| 724 | copy_catalog_xmin = src->data.catalog_xmin; |
| 725 | copy_restart_lsn = src->data.restart_lsn; |
| 726 | |
| 727 | /* for existence check */ |
| 728 | copy_name = pstrdup(NameStr(src->data.name)); |
| 729 | copy_islogical = SlotIsLogical(src); |
| 730 | SpinLockRelease(&src->mutex); |
| 731 | |
| 732 | /* |
| 733 | * Check if the source slot still exists and is valid. We regard it as |
| 734 | * invalid if the type of replication slot or name has been changed, |
| 735 | * or the restart_lsn either is invalid or has gone backward. (The |
| 736 | * restart_lsn could go backwards if the source slot is dropped and |
| 737 | * copied from an older slot during installation.) |
| 738 | * |
| 739 | * Since erroring out will release and drop the destination slot we |
| 740 | * don't need to release it here. |
| 741 | */ |
| 742 | if (copy_restart_lsn < src_restart_lsn || |
| 743 | src_islogical != copy_islogical || |
| 744 | strcmp(copy_name, NameStr(*src_name)) != 0) |
| 745 | ereport(ERROR, |
| 746 | (errmsg("could not copy replication slot \"%s\"" , |
| 747 | NameStr(*src_name)), |
| 748 | errdetail("The source replication slot was modified incompatibly during the copy operation." ))); |
| 749 | |
| 750 | /* Install copied values again */ |
| 751 | SpinLockAcquire(&MyReplicationSlot->mutex); |
| 752 | MyReplicationSlot->effective_xmin = copy_effective_xmin; |
| 753 | MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin; |
| 754 | |
| 755 | MyReplicationSlot->data.xmin = copy_xmin; |
| 756 | MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin; |
| 757 | MyReplicationSlot->data.restart_lsn = copy_restart_lsn; |
| 758 | SpinLockRelease(&MyReplicationSlot->mutex); |
| 759 | |
| 760 | ReplicationSlotMarkDirty(); |
| 761 | ReplicationSlotsComputeRequiredXmin(false); |
| 762 | ReplicationSlotsComputeRequiredLSN(); |
| 763 | ReplicationSlotSave(); |
| 764 | |
| 765 | #ifdef USE_ASSERT_CHECKING |
| 766 | /* Check that the restart_lsn is available */ |
| 767 | { |
| 768 | XLogSegNo segno; |
| 769 | |
| 770 | XLByteToSeg(copy_restart_lsn, segno, wal_segment_size); |
| 771 | Assert(XLogGetLastRemovedSegno() < segno); |
| 772 | } |
| 773 | #endif |
| 774 | } |
| 775 | |
| 776 | /* target slot fully created, mark as persistent if needed */ |
| 777 | if (logical_slot && !temporary) |
| 778 | ReplicationSlotPersist(); |
| 779 | |
| 780 | /* All done. Set up the return values */ |
| 781 | values[0] = NameGetDatum(dst_name); |
| 782 | nulls[0] = false; |
| 783 | if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush)) |
| 784 | { |
| 785 | values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); |
| 786 | nulls[1] = false; |
| 787 | } |
| 788 | else |
| 789 | nulls[1] = true; |
| 790 | |
| 791 | tuple = heap_form_tuple(tupdesc, values, nulls); |
| 792 | result = HeapTupleGetDatum(tuple); |
| 793 | |
| 794 | ReplicationSlotRelease(); |
| 795 | |
| 796 | PG_RETURN_DATUM(result); |
| 797 | } |
| 798 | |
| 799 | /* The wrappers below are all to appease opr_sanity */ |
| 800 | Datum |
| 801 | pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS) |
| 802 | { |
| 803 | return copy_replication_slot(fcinfo, true); |
| 804 | } |
| 805 | |
| 806 | Datum |
| 807 | pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS) |
| 808 | { |
| 809 | return copy_replication_slot(fcinfo, true); |
| 810 | } |
| 811 | |
| 812 | Datum |
| 813 | pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS) |
| 814 | { |
| 815 | return copy_replication_slot(fcinfo, true); |
| 816 | } |
| 817 | |
| 818 | Datum |
| 819 | pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS) |
| 820 | { |
| 821 | return copy_replication_slot(fcinfo, false); |
| 822 | } |
| 823 | |
| 824 | Datum |
| 825 | pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS) |
| 826 | { |
| 827 | return copy_replication_slot(fcinfo, false); |
| 828 | } |
| 829 | |