1 | /*------------------------------------------------------------------------- |
2 | * logical.c |
3 | * PostgreSQL logical decoding coordination |
4 | * |
5 | * Copyright (c) 2012-2019, PostgreSQL Global Development Group |
6 | * |
7 | * IDENTIFICATION |
8 | * src/backend/replication/logical/logical.c |
9 | * |
10 | * NOTES |
11 | * This file coordinates interaction between the various modules that |
12 | * together provide logical decoding, primarily by providing so |
13 | * called LogicalDecodingContexts. The goal is to encapsulate most of the |
14 | * internal complexity for consumers of logical decoding, so they can |
15 | * create and consume a changestream with a low amount of code. Builtin |
16 | * consumers are the walsender and SQL SRF interface, but it's possible to |
17 | * add further ones without changing core code, e.g. to consume changes in |
18 | * a bgworker. |
19 | * |
20 | * The idea is that a consumer provides three callbacks, one to read WAL, |
21 | * one to prepare a data write, and a final one for actually writing since |
22 | * their implementation depends on the type of consumer. Check |
23 | * logicalfuncs.c for an example implementation of a fairly simple consumer |
24 | * and an implementation of a WAL reading callback that's suitable for |
25 | * simple consumers. |
26 | *------------------------------------------------------------------------- |
27 | */ |
28 | |
29 | #include "postgres.h" |
30 | |
31 | #include "miscadmin.h" |
32 | |
33 | #include "access/xact.h" |
34 | #include "access/xlog_internal.h" |
35 | |
36 | #include "replication/decode.h" |
37 | #include "replication/logical.h" |
38 | #include "replication/reorderbuffer.h" |
39 | #include "replication/origin.h" |
40 | #include "replication/snapbuild.h" |
41 | |
42 | #include "storage/proc.h" |
43 | #include "storage/procarray.h" |
44 | |
45 | #include "utils/memutils.h" |
46 | |
47 | /* data for errcontext callback */ |
48 | typedef struct LogicalErrorCallbackState |
49 | { |
50 | LogicalDecodingContext *ctx; |
51 | const char *callback_name; |
52 | XLogRecPtr report_location; |
53 | } LogicalErrorCallbackState; |
54 | |
55 | /* wrappers around output plugin callbacks */ |
56 | static void output_plugin_error_callback(void *arg); |
57 | static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, |
58 | bool is_init); |
59 | static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); |
60 | static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); |
61 | static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
62 | XLogRecPtr commit_lsn); |
63 | static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
64 | Relation relation, ReorderBufferChange *change); |
65 | static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
66 | int nrelations, Relation relations[], ReorderBufferChange *change); |
67 | static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
68 | XLogRecPtr message_lsn, bool transactional, |
69 | const char *prefix, Size message_size, const char *message); |
70 | |
71 | static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); |
72 | |
73 | /* |
74 | * Make sure the current settings & environment are capable of doing logical |
75 | * decoding. |
76 | */ |
77 | void |
78 | CheckLogicalDecodingRequirements(void) |
79 | { |
80 | CheckSlotRequirements(); |
81 | |
82 | /* |
83 | * NB: Adding a new requirement likely means that RestoreSlotFromDisk() |
84 | * needs the same check. |
85 | */ |
86 | |
87 | if (wal_level < WAL_LEVEL_LOGICAL) |
88 | ereport(ERROR, |
89 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
90 | errmsg("logical decoding requires wal_level >= logical" ))); |
91 | |
92 | if (MyDatabaseId == InvalidOid) |
93 | ereport(ERROR, |
94 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
95 | errmsg("logical decoding requires a database connection" ))); |
96 | |
97 | /* ---- |
98 | * TODO: We got to change that someday soon... |
99 | * |
100 | * There's basically three things missing to allow this: |
101 | * 1) We need to be able to correctly and quickly identify the timeline a |
102 | * LSN belongs to |
103 | * 2) We need to force hot_standby_feedback to be enabled at all times so |
104 | * the primary cannot remove rows we need. |
105 | * 3) support dropping replication slots referring to a database, in |
106 | * dbase_redo. There can't be any active ones due to HS recovery |
107 | * conflicts, so that should be relatively easy. |
108 | * ---- |
109 | */ |
110 | if (RecoveryInProgress()) |
111 | ereport(ERROR, |
112 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
113 | errmsg("logical decoding cannot be used while in recovery" ))); |
114 | } |
115 | |
116 | /* |
117 | * Helper function for CreateInitDecodingContext() and |
118 | * CreateDecodingContext() performing common tasks. |
119 | */ |
120 | static LogicalDecodingContext * |
121 | StartupDecodingContext(List *output_plugin_options, |
122 | XLogRecPtr start_lsn, |
123 | TransactionId xmin_horizon, |
124 | bool need_full_snapshot, |
125 | bool fast_forward, |
126 | XLogPageReadCB read_page, |
127 | LogicalOutputPluginWriterPrepareWrite prepare_write, |
128 | LogicalOutputPluginWriterWrite do_write, |
129 | LogicalOutputPluginWriterUpdateProgress update_progress) |
130 | { |
131 | ReplicationSlot *slot; |
132 | MemoryContext context, |
133 | old_context; |
134 | LogicalDecodingContext *ctx; |
135 | |
136 | /* shorter lines... */ |
137 | slot = MyReplicationSlot; |
138 | |
139 | context = AllocSetContextCreate(CurrentMemoryContext, |
140 | "Logical decoding context" , |
141 | ALLOCSET_DEFAULT_SIZES); |
142 | old_context = MemoryContextSwitchTo(context); |
143 | ctx = palloc0(sizeof(LogicalDecodingContext)); |
144 | |
145 | ctx->context = context; |
146 | |
147 | /* |
148 | * (re-)load output plugins, so we detect a bad (removed) output plugin |
149 | * now. |
150 | */ |
151 | if (!fast_forward) |
152 | LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); |
153 | |
154 | /* |
155 | * Now that the slot's xmin has been set, we can announce ourselves as a |
156 | * logical decoding backend which doesn't need to be checked individually |
157 | * when computing the xmin horizon because the xmin is enforced via |
158 | * replication slots. |
159 | * |
160 | * We can only do so if we're outside of a transaction (i.e. the case when |
161 | * streaming changes via walsender), otherwise an already setup |
162 | * snapshot/xid would end up being ignored. That's not a particularly |
163 | * bothersome restriction since the SQL interface can't be used for |
164 | * streaming anyway. |
165 | */ |
166 | if (!IsTransactionOrTransactionBlock()) |
167 | { |
168 | LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); |
169 | MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING; |
170 | LWLockRelease(ProcArrayLock); |
171 | } |
172 | |
173 | ctx->slot = slot; |
174 | |
175 | ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx); |
176 | if (!ctx->reader) |
177 | ereport(ERROR, |
178 | (errcode(ERRCODE_OUT_OF_MEMORY), |
179 | errmsg("out of memory" ))); |
180 | |
181 | ctx->reorder = ReorderBufferAllocate(); |
182 | ctx->snapshot_builder = |
183 | AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, |
184 | need_full_snapshot); |
185 | |
186 | ctx->reorder->private_data = ctx; |
187 | |
188 | /* wrap output plugin callbacks, so we can add error context information */ |
189 | ctx->reorder->begin = begin_cb_wrapper; |
190 | ctx->reorder->apply_change = change_cb_wrapper; |
191 | ctx->reorder->apply_truncate = truncate_cb_wrapper; |
192 | ctx->reorder->commit = commit_cb_wrapper; |
193 | ctx->reorder->message = message_cb_wrapper; |
194 | |
195 | ctx->out = makeStringInfo(); |
196 | ctx->prepare_write = prepare_write; |
197 | ctx->write = do_write; |
198 | ctx->update_progress = update_progress; |
199 | |
200 | ctx->output_plugin_options = output_plugin_options; |
201 | |
202 | ctx->fast_forward = fast_forward; |
203 | |
204 | MemoryContextSwitchTo(old_context); |
205 | |
206 | return ctx; |
207 | } |
208 | |
209 | /* |
210 | * Create a new decoding context, for a new logical slot. |
211 | * |
212 | * plugin -- contains the name of the output plugin |
213 | * output_plugin_options -- contains options passed to the output plugin |
214 | * restart_lsn -- if given as invalid, it's this routine's responsibility to |
215 | * mark WAL as reserved by setting a convenient restart_lsn for the slot. |
216 | * Otherwise, we set for decoding to start from the given LSN without |
217 | * marking WAL reserved beforehand. In that scenario, it's up to the |
218 | * caller to guarantee that WAL remains available. |
219 | * read_page, prepare_write, do_write, update_progress -- |
220 | * callbacks that perform the use-case dependent, actual, work. |
221 | * |
222 | * Needs to be called while in a memory context that's at least as long lived |
223 | * as the decoding context because further memory contexts will be created |
224 | * inside it. |
225 | * |
226 | * Returns an initialized decoding context after calling the output plugin's |
227 | * startup function. |
228 | */ |
229 | LogicalDecodingContext * |
230 | CreateInitDecodingContext(char *plugin, |
231 | List *output_plugin_options, |
232 | bool need_full_snapshot, |
233 | XLogRecPtr restart_lsn, |
234 | XLogPageReadCB read_page, |
235 | LogicalOutputPluginWriterPrepareWrite prepare_write, |
236 | LogicalOutputPluginWriterWrite do_write, |
237 | LogicalOutputPluginWriterUpdateProgress update_progress) |
238 | { |
239 | TransactionId xmin_horizon = InvalidTransactionId; |
240 | ReplicationSlot *slot; |
241 | LogicalDecodingContext *ctx; |
242 | MemoryContext old_context; |
243 | |
244 | /* shorter lines... */ |
245 | slot = MyReplicationSlot; |
246 | |
247 | /* first some sanity checks that are unlikely to be violated */ |
248 | if (slot == NULL) |
249 | elog(ERROR, "cannot perform logical decoding without an acquired slot" ); |
250 | |
251 | if (plugin == NULL) |
252 | elog(ERROR, "cannot initialize logical decoding without a specified plugin" ); |
253 | |
254 | /* Make sure the passed slot is suitable. These are user facing errors. */ |
255 | if (SlotIsPhysical(slot)) |
256 | ereport(ERROR, |
257 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
258 | errmsg("cannot use physical replication slot for logical decoding" ))); |
259 | |
260 | if (slot->data.database != MyDatabaseId) |
261 | ereport(ERROR, |
262 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
263 | errmsg("replication slot \"%s\" was not created in this database" , |
264 | NameStr(slot->data.name)))); |
265 | |
266 | if (IsTransactionState() && |
267 | GetTopTransactionIdIfAny() != InvalidTransactionId) |
268 | ereport(ERROR, |
269 | (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), |
270 | errmsg("cannot create logical replication slot in transaction that has performed writes" ))); |
271 | |
272 | /* register output plugin name with slot */ |
273 | SpinLockAcquire(&slot->mutex); |
274 | StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); |
275 | SpinLockRelease(&slot->mutex); |
276 | |
277 | if (XLogRecPtrIsInvalid(restart_lsn)) |
278 | ReplicationSlotReserveWal(); |
279 | else |
280 | { |
281 | SpinLockAcquire(&slot->mutex); |
282 | slot->data.restart_lsn = restart_lsn; |
283 | SpinLockRelease(&slot->mutex); |
284 | } |
285 | |
286 | /* ---- |
287 | * This is a bit tricky: We need to determine a safe xmin horizon to start |
288 | * decoding from, to avoid starting from a running xacts record referring |
289 | * to xids whose rows have been vacuumed or pruned |
290 | * already. GetOldestSafeDecodingTransactionId() returns such a value, but |
291 | * without further interlock its return value might immediately be out of |
292 | * date. |
293 | * |
294 | * So we have to acquire the ProcArrayLock to prevent computation of new |
295 | * xmin horizons by other backends, get the safe decoding xid, and inform |
296 | * the slot machinery about the new limit. Once that's done the |
297 | * ProcArrayLock can be released as the slot machinery now is |
298 | * protecting against vacuum. |
299 | * |
300 | * Note that, temporarily, the data, not just the catalog, xmin has to be |
301 | * reserved if a data snapshot is to be exported. Otherwise the initial |
302 | * data snapshot created here is not guaranteed to be valid. After that |
303 | * the data xmin doesn't need to be managed anymore and the global xmin |
304 | * should be recomputed. As we are fine with losing the pegged data xmin |
305 | * after crash - no chance a snapshot would get exported anymore - we can |
306 | * get away with just setting the slot's |
307 | * effective_xmin. ReplicationSlotRelease will reset it again. |
308 | * |
309 | * ---- |
310 | */ |
311 | LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); |
312 | |
313 | xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); |
314 | |
315 | SpinLockAcquire(&slot->mutex); |
316 | slot->effective_catalog_xmin = xmin_horizon; |
317 | slot->data.catalog_xmin = xmin_horizon; |
318 | if (need_full_snapshot) |
319 | slot->effective_xmin = xmin_horizon; |
320 | SpinLockRelease(&slot->mutex); |
321 | |
322 | ReplicationSlotsComputeRequiredXmin(true); |
323 | |
324 | LWLockRelease(ProcArrayLock); |
325 | |
326 | ReplicationSlotMarkDirty(); |
327 | ReplicationSlotSave(); |
328 | |
329 | ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, |
330 | need_full_snapshot, false, |
331 | read_page, prepare_write, do_write, |
332 | update_progress); |
333 | |
334 | /* call output plugin initialization callback */ |
335 | old_context = MemoryContextSwitchTo(ctx->context); |
336 | if (ctx->callbacks.startup_cb != NULL) |
337 | startup_cb_wrapper(ctx, &ctx->options, true); |
338 | MemoryContextSwitchTo(old_context); |
339 | |
340 | ctx->reorder->output_rewrites = ctx->options.receive_rewrites; |
341 | |
342 | return ctx; |
343 | } |
344 | |
345 | /* |
346 | * Create a new decoding context, for a logical slot that has previously been |
347 | * used already. |
348 | * |
349 | * start_lsn |
350 | * The LSN at which to start decoding. If InvalidXLogRecPtr, restart |
351 | * from the slot's confirmed_flush; otherwise, start from the specified |
352 | * location (but move it forwards to confirmed_flush if it's older than |
353 | * that, see below). |
354 | * |
355 | * output_plugin_options |
356 | * options passed to the output plugin. |
357 | * |
358 | * fast_forward |
359 | * bypass the generation of logical changes. |
360 | * |
361 | * read_page, prepare_write, do_write, update_progress |
362 | * callbacks that have to be filled to perform the use-case dependent, |
363 | * actual work. |
364 | * |
365 | * Needs to be called while in a memory context that's at least as long lived |
366 | * as the decoding context because further memory contexts will be created |
367 | * inside it. |
368 | * |
369 | * Returns an initialized decoding context after calling the output plugin's |
370 | * startup function. |
371 | */ |
372 | LogicalDecodingContext * |
373 | CreateDecodingContext(XLogRecPtr start_lsn, |
374 | List *output_plugin_options, |
375 | bool fast_forward, |
376 | XLogPageReadCB read_page, |
377 | LogicalOutputPluginWriterPrepareWrite prepare_write, |
378 | LogicalOutputPluginWriterWrite do_write, |
379 | LogicalOutputPluginWriterUpdateProgress update_progress) |
380 | { |
381 | LogicalDecodingContext *ctx; |
382 | ReplicationSlot *slot; |
383 | MemoryContext old_context; |
384 | |
385 | /* shorter lines... */ |
386 | slot = MyReplicationSlot; |
387 | |
388 | /* first some sanity checks that are unlikely to be violated */ |
389 | if (slot == NULL) |
390 | elog(ERROR, "cannot perform logical decoding without an acquired slot" ); |
391 | |
392 | /* make sure the passed slot is suitable, these are user facing errors */ |
393 | if (SlotIsPhysical(slot)) |
394 | ereport(ERROR, |
395 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
396 | (errmsg("cannot use physical replication slot for logical decoding" )))); |
397 | |
398 | if (slot->data.database != MyDatabaseId) |
399 | ereport(ERROR, |
400 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
401 | (errmsg("replication slot \"%s\" was not created in this database" , |
402 | NameStr(slot->data.name))))); |
403 | |
404 | if (start_lsn == InvalidXLogRecPtr) |
405 | { |
406 | /* continue from last position */ |
407 | start_lsn = slot->data.confirmed_flush; |
408 | } |
409 | else if (start_lsn < slot->data.confirmed_flush) |
410 | { |
411 | /* |
412 | * It might seem like we should error out in this case, but it's |
413 | * pretty common for a client to acknowledge a LSN it doesn't have to |
414 | * do anything for, and thus didn't store persistently, because the |
415 | * xlog records didn't result in anything relevant for logical |
416 | * decoding. Clients have to be able to do that to support synchronous |
417 | * replication. |
418 | */ |
419 | elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding" , |
420 | (uint32) (start_lsn >> 32), (uint32) start_lsn, |
421 | (uint32) (slot->data.confirmed_flush >> 32), |
422 | (uint32) slot->data.confirmed_flush); |
423 | |
424 | start_lsn = slot->data.confirmed_flush; |
425 | } |
426 | |
427 | ctx = StartupDecodingContext(output_plugin_options, |
428 | start_lsn, InvalidTransactionId, false, |
429 | fast_forward, read_page, prepare_write, |
430 | do_write, update_progress); |
431 | |
432 | /* call output plugin initialization callback */ |
433 | old_context = MemoryContextSwitchTo(ctx->context); |
434 | if (ctx->callbacks.startup_cb != NULL) |
435 | startup_cb_wrapper(ctx, &ctx->options, false); |
436 | MemoryContextSwitchTo(old_context); |
437 | |
438 | ctx->reorder->output_rewrites = ctx->options.receive_rewrites; |
439 | |
440 | ereport(LOG, |
441 | (errmsg("starting logical decoding for slot \"%s\"" , |
442 | NameStr(slot->data.name)), |
443 | errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X." , |
444 | (uint32) (slot->data.confirmed_flush >> 32), |
445 | (uint32) slot->data.confirmed_flush, |
446 | (uint32) (slot->data.restart_lsn >> 32), |
447 | (uint32) slot->data.restart_lsn))); |
448 | |
449 | return ctx; |
450 | } |
451 | |
452 | /* |
453 | * Returns true if a consistent initial decoding snapshot has been built. |
454 | */ |
455 | bool |
456 | DecodingContextReady(LogicalDecodingContext *ctx) |
457 | { |
458 | return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT; |
459 | } |
460 | |
461 | /* |
462 | * Read from the decoding slot, until it is ready to start extracting changes. |
463 | */ |
464 | void |
465 | DecodingContextFindStartpoint(LogicalDecodingContext *ctx) |
466 | { |
467 | XLogRecPtr startptr; |
468 | ReplicationSlot *slot = ctx->slot; |
469 | |
470 | /* Initialize from where to start reading WAL. */ |
471 | startptr = slot->data.restart_lsn; |
472 | |
473 | elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X" , |
474 | (uint32) (slot->data.restart_lsn >> 32), |
475 | (uint32) slot->data.restart_lsn); |
476 | |
477 | /* Wait for a consistent starting point */ |
478 | for (;;) |
479 | { |
480 | XLogRecord *record; |
481 | char *err = NULL; |
482 | |
483 | /* the read_page callback waits for new WAL */ |
484 | record = XLogReadRecord(ctx->reader, startptr, &err); |
485 | if (err) |
486 | elog(ERROR, "%s" , err); |
487 | if (!record) |
488 | elog(ERROR, "no record found" ); /* shouldn't happen */ |
489 | |
490 | startptr = InvalidXLogRecPtr; |
491 | |
492 | LogicalDecodingProcessRecord(ctx, ctx->reader); |
493 | |
494 | /* only continue till we found a consistent spot */ |
495 | if (DecodingContextReady(ctx)) |
496 | break; |
497 | |
498 | CHECK_FOR_INTERRUPTS(); |
499 | } |
500 | |
501 | SpinLockAcquire(&slot->mutex); |
502 | slot->data.confirmed_flush = ctx->reader->EndRecPtr; |
503 | SpinLockRelease(&slot->mutex); |
504 | } |
505 | |
506 | /* |
507 | * Free a previously allocated decoding context, invoking the shutdown |
508 | * callback if necessary. |
509 | */ |
510 | void |
511 | FreeDecodingContext(LogicalDecodingContext *ctx) |
512 | { |
513 | if (ctx->callbacks.shutdown_cb != NULL) |
514 | shutdown_cb_wrapper(ctx); |
515 | |
516 | ReorderBufferFree(ctx->reorder); |
517 | FreeSnapshotBuilder(ctx->snapshot_builder); |
518 | XLogReaderFree(ctx->reader); |
519 | MemoryContextDelete(ctx->context); |
520 | } |
521 | |
522 | /* |
523 | * Prepare a write using the context's output routine. |
524 | */ |
525 | void |
526 | OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write) |
527 | { |
528 | if (!ctx->accept_writes) |
529 | elog(ERROR, "writes are only accepted in commit, begin and change callbacks" ); |
530 | |
531 | ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write); |
532 | ctx->prepared_write = true; |
533 | } |
534 | |
535 | /* |
536 | * Perform a write using the context's output routine. |
537 | */ |
538 | void |
539 | OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) |
540 | { |
541 | if (!ctx->prepared_write) |
542 | elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite" ); |
543 | |
544 | ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write); |
545 | ctx->prepared_write = false; |
546 | } |
547 | |
548 | /* |
549 | * Update progress tracking (if supported). |
550 | */ |
551 | void |
552 | OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx) |
553 | { |
554 | if (!ctx->update_progress) |
555 | return; |
556 | |
557 | ctx->update_progress(ctx, ctx->write_location, ctx->write_xid); |
558 | } |
559 | |
560 | /* |
561 | * Load the output plugin, lookup its output plugin init function, and check |
562 | * that it provides the required callbacks. |
563 | */ |
564 | static void |
565 | LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin) |
566 | { |
567 | LogicalOutputPluginInit plugin_init; |
568 | |
569 | plugin_init = (LogicalOutputPluginInit) |
570 | load_external_function(plugin, "_PG_output_plugin_init" , false, NULL); |
571 | |
572 | if (plugin_init == NULL) |
573 | elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol" ); |
574 | |
575 | /* ask the output plugin to fill the callback struct */ |
576 | plugin_init(callbacks); |
577 | |
578 | if (callbacks->begin_cb == NULL) |
579 | elog(ERROR, "output plugins have to register a begin callback" ); |
580 | if (callbacks->change_cb == NULL) |
581 | elog(ERROR, "output plugins have to register a change callback" ); |
582 | if (callbacks->commit_cb == NULL) |
583 | elog(ERROR, "output plugins have to register a commit callback" ); |
584 | } |
585 | |
586 | static void |
587 | output_plugin_error_callback(void *arg) |
588 | { |
589 | LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg; |
590 | |
591 | /* not all callbacks have an associated LSN */ |
592 | if (state->report_location != InvalidXLogRecPtr) |
593 | errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X" , |
594 | NameStr(state->ctx->slot->data.name), |
595 | NameStr(state->ctx->slot->data.plugin), |
596 | state->callback_name, |
597 | (uint32) (state->report_location >> 32), |
598 | (uint32) state->report_location); |
599 | else |
600 | errcontext("slot \"%s\", output plugin \"%s\", in the %s callback" , |
601 | NameStr(state->ctx->slot->data.name), |
602 | NameStr(state->ctx->slot->data.plugin), |
603 | state->callback_name); |
604 | } |
605 | |
606 | static void |
607 | startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) |
608 | { |
609 | LogicalErrorCallbackState state; |
610 | ErrorContextCallback errcallback; |
611 | |
612 | Assert(!ctx->fast_forward); |
613 | |
614 | /* Push callback + info on the error context stack */ |
615 | state.ctx = ctx; |
616 | state.callback_name = "startup" ; |
617 | state.report_location = InvalidXLogRecPtr; |
618 | errcallback.callback = output_plugin_error_callback; |
619 | errcallback.arg = (void *) &state; |
620 | errcallback.previous = error_context_stack; |
621 | error_context_stack = &errcallback; |
622 | |
623 | /* set output state */ |
624 | ctx->accept_writes = false; |
625 | |
626 | /* do the actual work: call callback */ |
627 | ctx->callbacks.startup_cb(ctx, opt, is_init); |
628 | |
629 | /* Pop the error context stack */ |
630 | error_context_stack = errcallback.previous; |
631 | } |
632 | |
633 | static void |
634 | shutdown_cb_wrapper(LogicalDecodingContext *ctx) |
635 | { |
636 | LogicalErrorCallbackState state; |
637 | ErrorContextCallback errcallback; |
638 | |
639 | Assert(!ctx->fast_forward); |
640 | |
641 | /* Push callback + info on the error context stack */ |
642 | state.ctx = ctx; |
643 | state.callback_name = "shutdown" ; |
644 | state.report_location = InvalidXLogRecPtr; |
645 | errcallback.callback = output_plugin_error_callback; |
646 | errcallback.arg = (void *) &state; |
647 | errcallback.previous = error_context_stack; |
648 | error_context_stack = &errcallback; |
649 | |
650 | /* set output state */ |
651 | ctx->accept_writes = false; |
652 | |
653 | /* do the actual work: call callback */ |
654 | ctx->callbacks.shutdown_cb(ctx); |
655 | |
656 | /* Pop the error context stack */ |
657 | error_context_stack = errcallback.previous; |
658 | } |
659 | |
660 | |
661 | /* |
662 | * Callbacks for ReorderBuffer which add in some more information and then call |
663 | * output_plugin.h plugins. |
664 | */ |
665 | static void |
666 | begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) |
667 | { |
668 | LogicalDecodingContext *ctx = cache->private_data; |
669 | LogicalErrorCallbackState state; |
670 | ErrorContextCallback errcallback; |
671 | |
672 | Assert(!ctx->fast_forward); |
673 | |
674 | /* Push callback + info on the error context stack */ |
675 | state.ctx = ctx; |
676 | state.callback_name = "begin" ; |
677 | state.report_location = txn->first_lsn; |
678 | errcallback.callback = output_plugin_error_callback; |
679 | errcallback.arg = (void *) &state; |
680 | errcallback.previous = error_context_stack; |
681 | error_context_stack = &errcallback; |
682 | |
683 | /* set output state */ |
684 | ctx->accept_writes = true; |
685 | ctx->write_xid = txn->xid; |
686 | ctx->write_location = txn->first_lsn; |
687 | |
688 | /* do the actual work: call callback */ |
689 | ctx->callbacks.begin_cb(ctx, txn); |
690 | |
691 | /* Pop the error context stack */ |
692 | error_context_stack = errcallback.previous; |
693 | } |
694 | |
695 | static void |
696 | commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
697 | XLogRecPtr commit_lsn) |
698 | { |
699 | LogicalDecodingContext *ctx = cache->private_data; |
700 | LogicalErrorCallbackState state; |
701 | ErrorContextCallback errcallback; |
702 | |
703 | Assert(!ctx->fast_forward); |
704 | |
705 | /* Push callback + info on the error context stack */ |
706 | state.ctx = ctx; |
707 | state.callback_name = "commit" ; |
708 | state.report_location = txn->final_lsn; /* beginning of commit record */ |
709 | errcallback.callback = output_plugin_error_callback; |
710 | errcallback.arg = (void *) &state; |
711 | errcallback.previous = error_context_stack; |
712 | error_context_stack = &errcallback; |
713 | |
714 | /* set output state */ |
715 | ctx->accept_writes = true; |
716 | ctx->write_xid = txn->xid; |
717 | ctx->write_location = txn->end_lsn; /* points to the end of the record */ |
718 | |
719 | /* do the actual work: call callback */ |
720 | ctx->callbacks.commit_cb(ctx, txn, commit_lsn); |
721 | |
722 | /* Pop the error context stack */ |
723 | error_context_stack = errcallback.previous; |
724 | } |
725 | |
726 | static void |
727 | change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
728 | Relation relation, ReorderBufferChange *change) |
729 | { |
730 | LogicalDecodingContext *ctx = cache->private_data; |
731 | LogicalErrorCallbackState state; |
732 | ErrorContextCallback errcallback; |
733 | |
734 | Assert(!ctx->fast_forward); |
735 | |
736 | /* Push callback + info on the error context stack */ |
737 | state.ctx = ctx; |
738 | state.callback_name = "change" ; |
739 | state.report_location = change->lsn; |
740 | errcallback.callback = output_plugin_error_callback; |
741 | errcallback.arg = (void *) &state; |
742 | errcallback.previous = error_context_stack; |
743 | error_context_stack = &errcallback; |
744 | |
745 | /* set output state */ |
746 | ctx->accept_writes = true; |
747 | ctx->write_xid = txn->xid; |
748 | |
749 | /* |
750 | * report this change's lsn so replies from clients can give an up2date |
751 | * answer. This won't ever be enough (and shouldn't be!) to confirm |
752 | * receipt of this transaction, but it might allow another transaction's |
753 | * commit to be confirmed with one message. |
754 | */ |
755 | ctx->write_location = change->lsn; |
756 | |
757 | ctx->callbacks.change_cb(ctx, txn, relation, change); |
758 | |
759 | /* Pop the error context stack */ |
760 | error_context_stack = errcallback.previous; |
761 | } |
762 | |
763 | static void |
764 | truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
765 | int nrelations, Relation relations[], ReorderBufferChange *change) |
766 | { |
767 | LogicalDecodingContext *ctx = cache->private_data; |
768 | LogicalErrorCallbackState state; |
769 | ErrorContextCallback errcallback; |
770 | |
771 | Assert(!ctx->fast_forward); |
772 | |
773 | if (!ctx->callbacks.truncate_cb) |
774 | return; |
775 | |
776 | /* Push callback + info on the error context stack */ |
777 | state.ctx = ctx; |
778 | state.callback_name = "truncate" ; |
779 | state.report_location = change->lsn; |
780 | errcallback.callback = output_plugin_error_callback; |
781 | errcallback.arg = (void *) &state; |
782 | errcallback.previous = error_context_stack; |
783 | error_context_stack = &errcallback; |
784 | |
785 | /* set output state */ |
786 | ctx->accept_writes = true; |
787 | ctx->write_xid = txn->xid; |
788 | |
789 | /* |
790 | * report this change's lsn so replies from clients can give an up2date |
791 | * answer. This won't ever be enough (and shouldn't be!) to confirm |
792 | * receipt of this transaction, but it might allow another transaction's |
793 | * commit to be confirmed with one message. |
794 | */ |
795 | ctx->write_location = change->lsn; |
796 | |
797 | ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); |
798 | |
799 | /* Pop the error context stack */ |
800 | error_context_stack = errcallback.previous; |
801 | } |
802 | |
803 | bool |
804 | filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) |
805 | { |
806 | LogicalErrorCallbackState state; |
807 | ErrorContextCallback errcallback; |
808 | bool ret; |
809 | |
810 | Assert(!ctx->fast_forward); |
811 | |
812 | /* Push callback + info on the error context stack */ |
813 | state.ctx = ctx; |
814 | state.callback_name = "filter_by_origin" ; |
815 | state.report_location = InvalidXLogRecPtr; |
816 | errcallback.callback = output_plugin_error_callback; |
817 | errcallback.arg = (void *) &state; |
818 | errcallback.previous = error_context_stack; |
819 | error_context_stack = &errcallback; |
820 | |
821 | /* set output state */ |
822 | ctx->accept_writes = false; |
823 | |
824 | /* do the actual work: call callback */ |
825 | ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id); |
826 | |
827 | /* Pop the error context stack */ |
828 | error_context_stack = errcallback.previous; |
829 | |
830 | return ret; |
831 | } |
832 | |
833 | static void |
834 | message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
835 | XLogRecPtr message_lsn, bool transactional, |
836 | const char *prefix, Size message_size, const char *message) |
837 | { |
838 | LogicalDecodingContext *ctx = cache->private_data; |
839 | LogicalErrorCallbackState state; |
840 | ErrorContextCallback errcallback; |
841 | |
842 | Assert(!ctx->fast_forward); |
843 | |
844 | if (ctx->callbacks.message_cb == NULL) |
845 | return; |
846 | |
847 | /* Push callback + info on the error context stack */ |
848 | state.ctx = ctx; |
849 | state.callback_name = "message" ; |
850 | state.report_location = message_lsn; |
851 | errcallback.callback = output_plugin_error_callback; |
852 | errcallback.arg = (void *) &state; |
853 | errcallback.previous = error_context_stack; |
854 | error_context_stack = &errcallback; |
855 | |
856 | /* set output state */ |
857 | ctx->accept_writes = true; |
858 | ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; |
859 | ctx->write_location = message_lsn; |
860 | |
861 | /* do the actual work: call callback */ |
862 | ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, |
863 | message_size, message); |
864 | |
865 | /* Pop the error context stack */ |
866 | error_context_stack = errcallback.previous; |
867 | } |
868 | |
869 | /* |
870 | * Set the required catalog xmin horizon for historic snapshots in the current |
871 | * replication slot. |
872 | * |
873 | * Note that in the most cases, we won't be able to immediately use the xmin |
874 | * to increase the xmin horizon: we need to wait till the client has confirmed |
875 | * receiving current_lsn with LogicalConfirmReceivedLocation(). |
876 | */ |
877 | void |
878 | LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) |
879 | { |
880 | bool updated_xmin = false; |
881 | ReplicationSlot *slot; |
882 | |
883 | slot = MyReplicationSlot; |
884 | |
885 | Assert(slot != NULL); |
886 | |
887 | SpinLockAcquire(&slot->mutex); |
888 | |
889 | /* |
890 | * don't overwrite if we already have a newer xmin. This can happen if we |
891 | * restart decoding in a slot. |
892 | */ |
893 | if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin)) |
894 | { |
895 | } |
896 | |
897 | /* |
898 | * If the client has already confirmed up to this lsn, we directly can |
899 | * mark this as accepted. This can happen if we restart decoding in a |
900 | * slot. |
901 | */ |
902 | else if (current_lsn <= slot->data.confirmed_flush) |
903 | { |
904 | slot->candidate_catalog_xmin = xmin; |
905 | slot->candidate_xmin_lsn = current_lsn; |
906 | |
907 | /* our candidate can directly be used */ |
908 | updated_xmin = true; |
909 | } |
910 | |
911 | /* |
912 | * Only increase if the previous values have been applied, otherwise we |
913 | * might never end up updating if the receiver acks too slowly. |
914 | */ |
915 | else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr) |
916 | { |
917 | slot->candidate_catalog_xmin = xmin; |
918 | slot->candidate_xmin_lsn = current_lsn; |
919 | } |
920 | SpinLockRelease(&slot->mutex); |
921 | |
922 | /* candidate already valid with the current flush position, apply */ |
923 | if (updated_xmin) |
924 | LogicalConfirmReceivedLocation(slot->data.confirmed_flush); |
925 | } |
926 | |
927 | /* |
928 | * Mark the minimal LSN (restart_lsn) we need to read to replay all |
929 | * transactions that have not yet committed at current_lsn. |
930 | * |
931 | * Just like LogicalIncreaseXminForSlot this only takes effect when the |
932 | * client has confirmed to have received current_lsn. |
933 | */ |
934 | void |
935 | LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn) |
936 | { |
937 | bool updated_lsn = false; |
938 | ReplicationSlot *slot; |
939 | |
940 | slot = MyReplicationSlot; |
941 | |
942 | Assert(slot != NULL); |
943 | Assert(restart_lsn != InvalidXLogRecPtr); |
944 | Assert(current_lsn != InvalidXLogRecPtr); |
945 | |
946 | SpinLockAcquire(&slot->mutex); |
947 | |
948 | /* don't overwrite if have a newer restart lsn */ |
949 | if (restart_lsn <= slot->data.restart_lsn) |
950 | { |
951 | } |
952 | |
953 | /* |
954 | * We might have already flushed far enough to directly accept this lsn, |
955 | * in this case there is no need to check for existing candidate LSNs |
956 | */ |
957 | else if (current_lsn <= slot->data.confirmed_flush) |
958 | { |
959 | slot->candidate_restart_valid = current_lsn; |
960 | slot->candidate_restart_lsn = restart_lsn; |
961 | |
962 | /* our candidate can directly be used */ |
963 | updated_lsn = true; |
964 | } |
965 | |
966 | /* |
967 | * Only increase if the previous values have been applied, otherwise we |
968 | * might never end up updating if the receiver acks too slowly. A missed |
969 | * value here will just cause some extra effort after reconnecting. |
970 | */ |
971 | if (slot->candidate_restart_valid == InvalidXLogRecPtr) |
972 | { |
973 | slot->candidate_restart_valid = current_lsn; |
974 | slot->candidate_restart_lsn = restart_lsn; |
975 | |
976 | elog(DEBUG1, "got new restart lsn %X/%X at %X/%X" , |
977 | (uint32) (restart_lsn >> 32), (uint32) restart_lsn, |
978 | (uint32) (current_lsn >> 32), (uint32) current_lsn); |
979 | } |
980 | else |
981 | { |
982 | elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X" , |
983 | (uint32) (restart_lsn >> 32), (uint32) restart_lsn, |
984 | (uint32) (current_lsn >> 32), (uint32) current_lsn, |
985 | (uint32) (slot->candidate_restart_lsn >> 32), |
986 | (uint32) slot->candidate_restart_lsn, |
987 | (uint32) (slot->candidate_restart_valid >> 32), |
988 | (uint32) slot->candidate_restart_valid, |
989 | (uint32) (slot->data.confirmed_flush >> 32), |
990 | (uint32) slot->data.confirmed_flush |
991 | ); |
992 | } |
993 | SpinLockRelease(&slot->mutex); |
994 | |
995 | /* candidates are already valid with the current flush position, apply */ |
996 | if (updated_lsn) |
997 | LogicalConfirmReceivedLocation(slot->data.confirmed_flush); |
998 | } |
999 | |
1000 | /* |
1001 | * Handle a consumer's confirmation having received all changes up to lsn. |
1002 | */ |
1003 | void |
1004 | LogicalConfirmReceivedLocation(XLogRecPtr lsn) |
1005 | { |
1006 | Assert(lsn != InvalidXLogRecPtr); |
1007 | |
1008 | /* Do an unlocked check for candidate_lsn first. */ |
1009 | if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr || |
1010 | MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr) |
1011 | { |
1012 | bool updated_xmin = false; |
1013 | bool updated_restart = false; |
1014 | |
1015 | SpinLockAcquire(&MyReplicationSlot->mutex); |
1016 | |
1017 | MyReplicationSlot->data.confirmed_flush = lsn; |
1018 | |
1019 | /* if we're past the location required for bumping xmin, do so */ |
1020 | if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && |
1021 | MyReplicationSlot->candidate_xmin_lsn <= lsn) |
1022 | { |
1023 | /* |
1024 | * We have to write the changed xmin to disk *before* we change |
1025 | * the in-memory value, otherwise after a crash we wouldn't know |
1026 | * that some catalog tuples might have been removed already. |
1027 | * |
1028 | * Ensure that by first writing to ->xmin and only update |
1029 | * ->effective_xmin once the new state is synced to disk. After a |
1030 | * crash ->effective_xmin is set to ->xmin. |
1031 | */ |
1032 | if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) && |
1033 | MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin) |
1034 | { |
1035 | MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin; |
1036 | MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId; |
1037 | MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr; |
1038 | updated_xmin = true; |
1039 | } |
1040 | } |
1041 | |
1042 | if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr && |
1043 | MyReplicationSlot->candidate_restart_valid <= lsn) |
1044 | { |
1045 | Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr); |
1046 | |
1047 | MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; |
1048 | MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; |
1049 | MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; |
1050 | updated_restart = true; |
1051 | } |
1052 | |
1053 | SpinLockRelease(&MyReplicationSlot->mutex); |
1054 | |
1055 | /* first write new xmin to disk, so we know what's up after a crash */ |
1056 | if (updated_xmin || updated_restart) |
1057 | { |
1058 | ReplicationSlotMarkDirty(); |
1059 | ReplicationSlotSave(); |
1060 | elog(DEBUG1, "updated xmin: %u restart: %u" , updated_xmin, updated_restart); |
1061 | } |
1062 | |
1063 | /* |
1064 | * Now the new xmin is safely on disk, we can let the global value |
1065 | * advance. We do not take ProcArrayLock or similar since we only |
1066 | * advance xmin here and there's not much harm done by a concurrent |
1067 | * computation missing that. |
1068 | */ |
1069 | if (updated_xmin) |
1070 | { |
1071 | SpinLockAcquire(&MyReplicationSlot->mutex); |
1072 | MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; |
1073 | SpinLockRelease(&MyReplicationSlot->mutex); |
1074 | |
1075 | ReplicationSlotsComputeRequiredXmin(false); |
1076 | ReplicationSlotsComputeRequiredLSN(); |
1077 | } |
1078 | } |
1079 | else |
1080 | { |
1081 | SpinLockAcquire(&MyReplicationSlot->mutex); |
1082 | MyReplicationSlot->data.confirmed_flush = lsn; |
1083 | SpinLockRelease(&MyReplicationSlot->mutex); |
1084 | } |
1085 | } |
1086 | |