1/*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2019, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *-------------------------------------------------------------------------
22 */
23
24#include "postgres.h"
25
26#include "access/table.h"
27#include "access/tableam.h"
28#include "access/xact.h"
29#include "access/xlog_internal.h"
30#include "catalog/catalog.h"
31#include "catalog/namespace.h"
32#include "catalog/pg_subscription.h"
33#include "catalog/pg_subscription_rel.h"
34#include "commands/tablecmds.h"
35#include "commands/trigger.h"
36#include "executor/executor.h"
37#include "executor/nodeModifyTable.h"
38#include "funcapi.h"
39#include "libpq/pqformat.h"
40#include "libpq/pqsignal.h"
41#include "mb/pg_wchar.h"
42#include "miscadmin.h"
43#include "nodes/makefuncs.h"
44#include "optimizer/optimizer.h"
45#include "parser/parse_relation.h"
46#include "pgstat.h"
47#include "postmaster/bgworker.h"
48#include "postmaster/postmaster.h"
49#include "postmaster/walwriter.h"
50#include "replication/decode.h"
51#include "replication/logical.h"
52#include "replication/logicalproto.h"
53#include "replication/logicalrelation.h"
54#include "replication/logicalworker.h"
55#include "replication/origin.h"
56#include "replication/reorderbuffer.h"
57#include "replication/snapbuild.h"
58#include "replication/walreceiver.h"
59#include "replication/worker_internal.h"
60#include "rewrite/rewriteHandler.h"
61#include "storage/bufmgr.h"
62#include "storage/ipc.h"
63#include "storage/lmgr.h"
64#include "storage/proc.h"
65#include "storage/procarray.h"
66#include "tcop/tcopprot.h"
67#include "utils/builtins.h"
68#include "utils/catcache.h"
69#include "utils/datum.h"
70#include "utils/fmgroids.h"
71#include "utils/guc.h"
72#include "utils/inval.h"
73#include "utils/lsyscache.h"
74#include "utils/memutils.h"
75#include "utils/rel.h"
76#include "utils/syscache.h"
77#include "utils/timeout.h"
78
79#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
80
81typedef struct FlushPosition
82{
83 dlist_node node;
84 XLogRecPtr local_end;
85 XLogRecPtr remote_end;
86} FlushPosition;
87
88static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
89
90typedef struct SlotErrCallbackArg
91{
92 LogicalRepRelMapEntry *rel;
93 int local_attnum;
94 int remote_attnum;
95} SlotErrCallbackArg;
96
97static MemoryContext ApplyMessageContext = NULL;
98MemoryContext ApplyContext = NULL;
99
100WalReceiverConn *wrconn = NULL;
101
102Subscription *MySubscription = NULL;
103bool MySubscriptionValid = false;
104
105bool in_remote_transaction = false;
106static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
107
108static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
109
110static void store_flush_position(XLogRecPtr remote_lsn);
111
112static void maybe_reread_subscription(void);
113
114/* Flags set by signal handlers */
115static volatile sig_atomic_t got_SIGHUP = false;
116
117/*
118 * Should this worker apply changes for given relation.
119 *
120 * This is mainly needed for initial relation data sync as that runs in
121 * separate worker process running in parallel and we need some way to skip
122 * changes coming to the main apply worker during the sync of a table.
123 *
124 * Note we need to do smaller or equals comparison for SYNCDONE state because
125 * it might hold position of end of initial slot consistent point WAL
126 * record + 1 (ie start of next record) and next record can be COMMIT of
127 * transaction we are now processing (which is what we set remote_final_lsn
128 * to in apply_handle_begin).
129 */
130static bool
131should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
132{
133 if (am_tablesync_worker())
134 return MyLogicalRepWorker->relid == rel->localreloid;
135 else
136 return (rel->state == SUBREL_STATE_READY ||
137 (rel->state == SUBREL_STATE_SYNCDONE &&
138 rel->statelsn <= remote_final_lsn));
139}
140
141/*
142 * Make sure that we started local transaction.
143 *
144 * Also switches to ApplyMessageContext as necessary.
145 */
146static bool
147ensure_transaction(void)
148{
149 if (IsTransactionState())
150 {
151 SetCurrentStatementStartTimestamp();
152
153 if (CurrentMemoryContext != ApplyMessageContext)
154 MemoryContextSwitchTo(ApplyMessageContext);
155
156 return false;
157 }
158
159 SetCurrentStatementStartTimestamp();
160 StartTransactionCommand();
161
162 maybe_reread_subscription();
163
164 MemoryContextSwitchTo(ApplyMessageContext);
165 return true;
166}
167
168
169/*
170 * Executor state preparation for evaluation of constraint expressions,
171 * indexes and triggers.
172 *
173 * This is based on similar code in copy.c
174 */
175static EState *
176create_estate_for_relation(LogicalRepRelMapEntry *rel)
177{
178 EState *estate;
179 ResultRelInfo *resultRelInfo;
180 RangeTblEntry *rte;
181
182 estate = CreateExecutorState();
183
184 rte = makeNode(RangeTblEntry);
185 rte->rtekind = RTE_RELATION;
186 rte->relid = RelationGetRelid(rel->localrel);
187 rte->relkind = rel->localrel->rd_rel->relkind;
188 rte->rellockmode = AccessShareLock;
189 ExecInitRangeTable(estate, list_make1(rte));
190
191 resultRelInfo = makeNode(ResultRelInfo);
192 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
193
194 estate->es_result_relations = resultRelInfo;
195 estate->es_num_result_relations = 1;
196 estate->es_result_relation_info = resultRelInfo;
197
198 estate->es_output_cid = GetCurrentCommandId(true);
199
200 /* Prepare to catch AFTER triggers. */
201 AfterTriggerBeginQuery();
202
203 return estate;
204}
205
206/*
207 * Executes default values for columns for which we can't map to remote
208 * relation columns.
209 *
210 * This allows us to support tables which have more columns on the downstream
211 * than on the upstream.
212 */
213static void
214slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
215 TupleTableSlot *slot)
216{
217 TupleDesc desc = RelationGetDescr(rel->localrel);
218 int num_phys_attrs = desc->natts;
219 int i;
220 int attnum,
221 num_defaults = 0;
222 int *defmap;
223 ExprState **defexprs;
224 ExprContext *econtext;
225
226 econtext = GetPerTupleExprContext(estate);
227
228 /* We got all the data via replication, no need to evaluate anything. */
229 if (num_phys_attrs == rel->remoterel.natts)
230 return;
231
232 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
233 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
234
235 for (attnum = 0; attnum < num_phys_attrs; attnum++)
236 {
237 Expr *defexpr;
238
239 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
240 continue;
241
242 if (rel->attrmap[attnum] >= 0)
243 continue;
244
245 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
246
247 if (defexpr != NULL)
248 {
249 /* Run the expression through planner */
250 defexpr = expression_planner(defexpr);
251
252 /* Initialize executable expression in copycontext */
253 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
254 defmap[num_defaults] = attnum;
255 num_defaults++;
256 }
257
258 }
259
260 for (i = 0; i < num_defaults; i++)
261 slot->tts_values[defmap[i]] =
262 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
263}
264
265/*
266 * Error callback to give more context info about type conversion failure.
267 */
268static void
269slot_store_error_callback(void *arg)
270{
271 SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
272 LogicalRepRelMapEntry *rel;
273 char *remotetypname;
274 Oid remotetypoid,
275 localtypoid;
276
277 /* Nothing to do if remote attribute number is not set */
278 if (errarg->remote_attnum < 0)
279 return;
280
281 rel = errarg->rel;
282 remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
283
284 /* Fetch remote type name from the LogicalRepTypMap cache */
285 remotetypname = logicalrep_typmap_gettypname(remotetypoid);
286
287 /* Fetch local type OID from the local sys cache */
288 localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
289
290 errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
291 "remote type %s, local type %s",
292 rel->remoterel.nspname, rel->remoterel.relname,
293 rel->remoterel.attnames[errarg->remote_attnum],
294 remotetypname,
295 format_type_be(localtypoid));
296}
297
298/*
299 * Store data in C string form into slot.
300 * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
301 * use better.
302 */
303static void
304slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
305 char **values)
306{
307 int natts = slot->tts_tupleDescriptor->natts;
308 int i;
309 SlotErrCallbackArg errarg;
310 ErrorContextCallback errcallback;
311
312 ExecClearTuple(slot);
313
314 /* Push callback + info on the error context stack */
315 errarg.rel = rel;
316 errarg.local_attnum = -1;
317 errarg.remote_attnum = -1;
318 errcallback.callback = slot_store_error_callback;
319 errcallback.arg = (void *) &errarg;
320 errcallback.previous = error_context_stack;
321 error_context_stack = &errcallback;
322
323 /* Call the "in" function for each non-dropped attribute */
324 for (i = 0; i < natts; i++)
325 {
326 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
327 int remoteattnum = rel->attrmap[i];
328
329 if (!att->attisdropped && remoteattnum >= 0 &&
330 values[remoteattnum] != NULL)
331 {
332 Oid typinput;
333 Oid typioparam;
334
335 errarg.local_attnum = i;
336 errarg.remote_attnum = remoteattnum;
337
338 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
339 slot->tts_values[i] =
340 OidInputFunctionCall(typinput, values[remoteattnum],
341 typioparam, att->atttypmod);
342 slot->tts_isnull[i] = false;
343
344 errarg.local_attnum = -1;
345 errarg.remote_attnum = -1;
346 }
347 else
348 {
349 /*
350 * We assign NULL to dropped attributes, NULL values, and missing
351 * values (missing values should be later filled using
352 * slot_fill_defaults).
353 */
354 slot->tts_values[i] = (Datum) 0;
355 slot->tts_isnull[i] = true;
356 }
357 }
358
359 /* Pop the error context stack */
360 error_context_stack = errcallback.previous;
361
362 ExecStoreVirtualTuple(slot);
363}
364
365/*
366 * Modify slot with user data provided as C strings.
367 * This is somewhat similar to heap_modify_tuple but also calls the type
368 * input function on the user data as the input is the text representation
369 * of the types.
370 */
371static void
372slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
373 char **values, bool *replaces)
374{
375 int natts = slot->tts_tupleDescriptor->natts;
376 int i;
377 SlotErrCallbackArg errarg;
378 ErrorContextCallback errcallback;
379
380 slot_getallattrs(slot);
381 ExecClearTuple(slot);
382
383 /* Push callback + info on the error context stack */
384 errarg.rel = rel;
385 errarg.local_attnum = -1;
386 errarg.remote_attnum = -1;
387 errcallback.callback = slot_store_error_callback;
388 errcallback.arg = (void *) &errarg;
389 errcallback.previous = error_context_stack;
390 error_context_stack = &errcallback;
391
392 /* Call the "in" function for each replaced attribute */
393 for (i = 0; i < natts; i++)
394 {
395 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
396 int remoteattnum = rel->attrmap[i];
397
398 if (remoteattnum < 0)
399 continue;
400
401 if (!replaces[remoteattnum])
402 continue;
403
404 if (values[remoteattnum] != NULL)
405 {
406 Oid typinput;
407 Oid typioparam;
408
409 errarg.local_attnum = i;
410 errarg.remote_attnum = remoteattnum;
411
412 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
413 slot->tts_values[i] =
414 OidInputFunctionCall(typinput, values[remoteattnum],
415 typioparam, att->atttypmod);
416 slot->tts_isnull[i] = false;
417
418 errarg.local_attnum = -1;
419 errarg.remote_attnum = -1;
420 }
421 else
422 {
423 slot->tts_values[i] = (Datum) 0;
424 slot->tts_isnull[i] = true;
425 }
426 }
427
428 /* Pop the error context stack */
429 error_context_stack = errcallback.previous;
430
431 ExecStoreVirtualTuple(slot);
432}
433
434/*
435 * Handle BEGIN message.
436 */
437static void
438apply_handle_begin(StringInfo s)
439{
440 LogicalRepBeginData begin_data;
441
442 logicalrep_read_begin(s, &begin_data);
443
444 remote_final_lsn = begin_data.final_lsn;
445
446 in_remote_transaction = true;
447
448 pgstat_report_activity(STATE_RUNNING, NULL);
449}
450
451/*
452 * Handle COMMIT message.
453 *
454 * TODO, support tracking of multiple origins
455 */
456static void
457apply_handle_commit(StringInfo s)
458{
459 LogicalRepCommitData commit_data;
460
461 logicalrep_read_commit(s, &commit_data);
462
463 Assert(commit_data.commit_lsn == remote_final_lsn);
464
465 /* The synchronization worker runs in single transaction. */
466 if (IsTransactionState() && !am_tablesync_worker())
467 {
468 /*
469 * Update origin state so we can restart streaming from correct
470 * position in case of crash.
471 */
472 replorigin_session_origin_lsn = commit_data.end_lsn;
473 replorigin_session_origin_timestamp = commit_data.committime;
474
475 CommitTransactionCommand();
476 pgstat_report_stat(false);
477
478 store_flush_position(commit_data.end_lsn);
479 }
480 else
481 {
482 /* Process any invalidation messages that might have accumulated. */
483 AcceptInvalidationMessages();
484 maybe_reread_subscription();
485 }
486
487 in_remote_transaction = false;
488
489 /* Process any tables that are being synchronized in parallel. */
490 process_syncing_tables(commit_data.end_lsn);
491
492 pgstat_report_activity(STATE_IDLE, NULL);
493}
494
495/*
496 * Handle ORIGIN message.
497 *
498 * TODO, support tracking of multiple origins
499 */
500static void
501apply_handle_origin(StringInfo s)
502{
503 /*
504 * ORIGIN message can only come inside remote transaction and before any
505 * actual writes.
506 */
507 if (!in_remote_transaction ||
508 (IsTransactionState() && !am_tablesync_worker()))
509 ereport(ERROR,
510 (errcode(ERRCODE_PROTOCOL_VIOLATION),
511 errmsg("ORIGIN message sent out of order")));
512}
513
514/*
515 * Handle RELATION message.
516 *
517 * Note we don't do validation against local schema here. The validation
518 * against local schema is postponed until first change for given relation
519 * comes as we only care about it when applying changes for it anyway and we
520 * do less locking this way.
521 */
522static void
523apply_handle_relation(StringInfo s)
524{
525 LogicalRepRelation *rel;
526
527 rel = logicalrep_read_rel(s);
528 logicalrep_relmap_update(rel);
529}
530
531/*
532 * Handle TYPE message.
533 *
534 * Note we don't do local mapping here, that's done when the type is
535 * actually used.
536 */
537static void
538apply_handle_type(StringInfo s)
539{
540 LogicalRepTyp typ;
541
542 logicalrep_read_typ(s, &typ);
543 logicalrep_typmap_update(&typ);
544}
545
546/*
547 * Get replica identity index or if it is not defined a primary key.
548 *
549 * If neither is defined, returns InvalidOid
550 */
551static Oid
552GetRelationIdentityOrPK(Relation rel)
553{
554 Oid idxoid;
555
556 idxoid = RelationGetReplicaIndex(rel);
557
558 if (!OidIsValid(idxoid))
559 idxoid = RelationGetPrimaryKeyIndex(rel);
560
561 return idxoid;
562}
563
564/*
565 * Handle INSERT message.
566 */
567static void
568apply_handle_insert(StringInfo s)
569{
570 LogicalRepRelMapEntry *rel;
571 LogicalRepTupleData newtup;
572 LogicalRepRelId relid;
573 EState *estate;
574 TupleTableSlot *remoteslot;
575 MemoryContext oldctx;
576
577 ensure_transaction();
578
579 relid = logicalrep_read_insert(s, &newtup);
580 rel = logicalrep_rel_open(relid, RowExclusiveLock);
581 if (!should_apply_changes_for_rel(rel))
582 {
583 /*
584 * The relation can't become interesting in the middle of the
585 * transaction so it's safe to unlock it.
586 */
587 logicalrep_rel_close(rel, RowExclusiveLock);
588 return;
589 }
590
591 /* Initialize the executor state. */
592 estate = create_estate_for_relation(rel);
593 remoteslot = ExecInitExtraTupleSlot(estate,
594 RelationGetDescr(rel->localrel),
595 &TTSOpsVirtual);
596
597 /* Input functions may need an active snapshot, so get one */
598 PushActiveSnapshot(GetTransactionSnapshot());
599
600 /* Process and store remote tuple in the slot */
601 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
602 slot_store_cstrings(remoteslot, rel, newtup.values);
603 slot_fill_defaults(rel, estate, remoteslot);
604 MemoryContextSwitchTo(oldctx);
605
606 ExecOpenIndices(estate->es_result_relation_info, false);
607
608 /* Do the insert. */
609 ExecSimpleRelationInsert(estate, remoteslot);
610
611 /* Cleanup. */
612 ExecCloseIndices(estate->es_result_relation_info);
613 PopActiveSnapshot();
614
615 /* Handle queued AFTER triggers. */
616 AfterTriggerEndQuery(estate);
617
618 ExecResetTupleTable(estate->es_tupleTable, false);
619 FreeExecutorState(estate);
620
621 logicalrep_rel_close(rel, NoLock);
622
623 CommandCounterIncrement();
624}
625
626/*
627 * Check if the logical replication relation is updatable and throw
628 * appropriate error if it isn't.
629 */
630static void
631check_relation_updatable(LogicalRepRelMapEntry *rel)
632{
633 /* Updatable, no error. */
634 if (rel->updatable)
635 return;
636
637 /*
638 * We are in error mode so it's fine this is somewhat slow. It's better to
639 * give user correct error.
640 */
641 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
642 {
643 ereport(ERROR,
644 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
645 errmsg("publisher did not send replica identity column "
646 "expected by the logical replication target relation \"%s.%s\"",
647 rel->remoterel.nspname, rel->remoterel.relname)));
648 }
649
650 ereport(ERROR,
651 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652 errmsg("logical replication target relation \"%s.%s\" has "
653 "neither REPLICA IDENTITY index nor PRIMARY "
654 "KEY and published relation does not have "
655 "REPLICA IDENTITY FULL",
656 rel->remoterel.nspname, rel->remoterel.relname)));
657}
658
659/*
660 * Handle UPDATE message.
661 *
662 * TODO: FDW support
663 */
664static void
665apply_handle_update(StringInfo s)
666{
667 LogicalRepRelMapEntry *rel;
668 LogicalRepRelId relid;
669 Oid idxoid;
670 EState *estate;
671 EPQState epqstate;
672 LogicalRepTupleData oldtup;
673 LogicalRepTupleData newtup;
674 bool has_oldtup;
675 TupleTableSlot *localslot;
676 TupleTableSlot *remoteslot;
677 bool found;
678 MemoryContext oldctx;
679
680 ensure_transaction();
681
682 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
683 &newtup);
684 rel = logicalrep_rel_open(relid, RowExclusiveLock);
685 if (!should_apply_changes_for_rel(rel))
686 {
687 /*
688 * The relation can't become interesting in the middle of the
689 * transaction so it's safe to unlock it.
690 */
691 logicalrep_rel_close(rel, RowExclusiveLock);
692 return;
693 }
694
695 /* Check if we can do the update. */
696 check_relation_updatable(rel);
697
698 /* Initialize the executor state. */
699 estate = create_estate_for_relation(rel);
700 remoteslot = ExecInitExtraTupleSlot(estate,
701 RelationGetDescr(rel->localrel),
702 &TTSOpsVirtual);
703 localslot = table_slot_create(rel->localrel,
704 &estate->es_tupleTable);
705 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
706
707 PushActiveSnapshot(GetTransactionSnapshot());
708 ExecOpenIndices(estate->es_result_relation_info, false);
709
710 /* Build the search tuple. */
711 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
712 slot_store_cstrings(remoteslot, rel,
713 has_oldtup ? oldtup.values : newtup.values);
714 MemoryContextSwitchTo(oldctx);
715
716 /*
717 * Try to find tuple using either replica identity index, primary key or
718 * if needed, sequential scan.
719 */
720 idxoid = GetRelationIdentityOrPK(rel->localrel);
721 Assert(OidIsValid(idxoid) ||
722 (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
723
724 if (OidIsValid(idxoid))
725 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
726 LockTupleExclusive,
727 remoteslot, localslot);
728 else
729 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
730 remoteslot, localslot);
731
732 ExecClearTuple(remoteslot);
733
734 /*
735 * Tuple found.
736 *
737 * Note this will fail if there are other conflicting unique indexes.
738 */
739 if (found)
740 {
741 /* Process and store remote tuple in the slot */
742 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
743 ExecCopySlot(remoteslot, localslot);
744 slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
745 MemoryContextSwitchTo(oldctx);
746
747 EvalPlanQualSetSlot(&epqstate, remoteslot);
748
749 /* Do the actual update. */
750 ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
751 }
752 else
753 {
754 /*
755 * The tuple to be updated could not be found.
756 *
757 * TODO what to do here, change the log level to LOG perhaps?
758 */
759 elog(DEBUG1,
760 "logical replication did not find row for update "
761 "in replication target relation \"%s\"",
762 RelationGetRelationName(rel->localrel));
763 }
764
765 /* Cleanup. */
766 ExecCloseIndices(estate->es_result_relation_info);
767 PopActiveSnapshot();
768
769 /* Handle queued AFTER triggers. */
770 AfterTriggerEndQuery(estate);
771
772 EvalPlanQualEnd(&epqstate);
773 ExecResetTupleTable(estate->es_tupleTable, false);
774 FreeExecutorState(estate);
775
776 logicalrep_rel_close(rel, NoLock);
777
778 CommandCounterIncrement();
779}
780
781/*
782 * Handle DELETE message.
783 *
784 * TODO: FDW support
785 */
786static void
787apply_handle_delete(StringInfo s)
788{
789 LogicalRepRelMapEntry *rel;
790 LogicalRepTupleData oldtup;
791 LogicalRepRelId relid;
792 Oid idxoid;
793 EState *estate;
794 EPQState epqstate;
795 TupleTableSlot *remoteslot;
796 TupleTableSlot *localslot;
797 bool found;
798 MemoryContext oldctx;
799
800 ensure_transaction();
801
802 relid = logicalrep_read_delete(s, &oldtup);
803 rel = logicalrep_rel_open(relid, RowExclusiveLock);
804 if (!should_apply_changes_for_rel(rel))
805 {
806 /*
807 * The relation can't become interesting in the middle of the
808 * transaction so it's safe to unlock it.
809 */
810 logicalrep_rel_close(rel, RowExclusiveLock);
811 return;
812 }
813
814 /* Check if we can do the delete. */
815 check_relation_updatable(rel);
816
817 /* Initialize the executor state. */
818 estate = create_estate_for_relation(rel);
819 remoteslot = ExecInitExtraTupleSlot(estate,
820 RelationGetDescr(rel->localrel),
821 &TTSOpsVirtual);
822 localslot = table_slot_create(rel->localrel,
823 &estate->es_tupleTable);
824 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
825
826 PushActiveSnapshot(GetTransactionSnapshot());
827 ExecOpenIndices(estate->es_result_relation_info, false);
828
829 /* Find the tuple using the replica identity index. */
830 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
831 slot_store_cstrings(remoteslot, rel, oldtup.values);
832 MemoryContextSwitchTo(oldctx);
833
834 /*
835 * Try to find tuple using either replica identity index, primary key or
836 * if needed, sequential scan.
837 */
838 idxoid = GetRelationIdentityOrPK(rel->localrel);
839 Assert(OidIsValid(idxoid) ||
840 (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
841
842 if (OidIsValid(idxoid))
843 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
844 LockTupleExclusive,
845 remoteslot, localslot);
846 else
847 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
848 remoteslot, localslot);
849 /* If found delete it. */
850 if (found)
851 {
852 EvalPlanQualSetSlot(&epqstate, localslot);
853
854 /* Do the actual delete. */
855 ExecSimpleRelationDelete(estate, &epqstate, localslot);
856 }
857 else
858 {
859 /* The tuple to be deleted could not be found. */
860 elog(DEBUG1,
861 "logical replication could not find row for delete "
862 "in replication target relation \"%s\"",
863 RelationGetRelationName(rel->localrel));
864 }
865
866 /* Cleanup. */
867 ExecCloseIndices(estate->es_result_relation_info);
868 PopActiveSnapshot();
869
870 /* Handle queued AFTER triggers. */
871 AfterTriggerEndQuery(estate);
872
873 EvalPlanQualEnd(&epqstate);
874 ExecResetTupleTable(estate->es_tupleTable, false);
875 FreeExecutorState(estate);
876
877 logicalrep_rel_close(rel, NoLock);
878
879 CommandCounterIncrement();
880}
881
882/*
883 * Handle TRUNCATE message.
884 *
885 * TODO: FDW support
886 */
887static void
888apply_handle_truncate(StringInfo s)
889{
890 bool cascade = false;
891 bool restart_seqs = false;
892 List *remote_relids = NIL;
893 List *remote_rels = NIL;
894 List *rels = NIL;
895 List *relids = NIL;
896 List *relids_logged = NIL;
897 ListCell *lc;
898
899 ensure_transaction();
900
901 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
902
903 foreach(lc, remote_relids)
904 {
905 LogicalRepRelId relid = lfirst_oid(lc);
906 LogicalRepRelMapEntry *rel;
907
908 rel = logicalrep_rel_open(relid, RowExclusiveLock);
909 if (!should_apply_changes_for_rel(rel))
910 {
911 /*
912 * The relation can't become interesting in the middle of the
913 * transaction so it's safe to unlock it.
914 */
915 logicalrep_rel_close(rel, RowExclusiveLock);
916 continue;
917 }
918
919 remote_rels = lappend(remote_rels, rel);
920 rels = lappend(rels, rel->localrel);
921 relids = lappend_oid(relids, rel->localreloid);
922 if (RelationIsLogicallyLogged(rel->localrel))
923 relids_logged = lappend_oid(relids_logged, rel->localreloid);
924 }
925
926 /*
927 * Even if we used CASCADE on the upstream master we explicitly default to
928 * replaying changes without further cascading. This might be later
929 * changeable with a user specified option.
930 */
931 ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
932
933 foreach(lc, remote_rels)
934 {
935 LogicalRepRelMapEntry *rel = lfirst(lc);
936
937 logicalrep_rel_close(rel, NoLock);
938 }
939
940 CommandCounterIncrement();
941}
942
943
944/*
945 * Logical replication protocol message dispatcher.
946 */
947static void
948apply_dispatch(StringInfo s)
949{
950 char action = pq_getmsgbyte(s);
951
952 switch (action)
953 {
954 /* BEGIN */
955 case 'B':
956 apply_handle_begin(s);
957 break;
958 /* COMMIT */
959 case 'C':
960 apply_handle_commit(s);
961 break;
962 /* INSERT */
963 case 'I':
964 apply_handle_insert(s);
965 break;
966 /* UPDATE */
967 case 'U':
968 apply_handle_update(s);
969 break;
970 /* DELETE */
971 case 'D':
972 apply_handle_delete(s);
973 break;
974 /* TRUNCATE */
975 case 'T':
976 apply_handle_truncate(s);
977 break;
978 /* RELATION */
979 case 'R':
980 apply_handle_relation(s);
981 break;
982 /* TYPE */
983 case 'Y':
984 apply_handle_type(s);
985 break;
986 /* ORIGIN */
987 case 'O':
988 apply_handle_origin(s);
989 break;
990 default:
991 ereport(ERROR,
992 (errcode(ERRCODE_PROTOCOL_VIOLATION),
993 errmsg("invalid logical replication message type \"%c\"", action)));
994 }
995}
996
997/*
998 * Figure out which write/flush positions to report to the walsender process.
999 *
1000 * We can't simply report back the last LSN the walsender sent us because the
1001 * local transaction might not yet be flushed to disk locally. Instead we
1002 * build a list that associates local with remote LSNs for every commit. When
1003 * reporting back the flush position to the sender we iterate that list and
1004 * check which entries on it are already locally flushed. Those we can report
1005 * as having been flushed.
1006 *
1007 * The have_pending_txes is true if there are outstanding transactions that
1008 * need to be flushed.
1009 */
1010static void
1011get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1012 bool *have_pending_txes)
1013{
1014 dlist_mutable_iter iter;
1015 XLogRecPtr local_flush = GetFlushRecPtr();
1016
1017 *write = InvalidXLogRecPtr;
1018 *flush = InvalidXLogRecPtr;
1019
1020 dlist_foreach_modify(iter, &lsn_mapping)
1021 {
1022 FlushPosition *pos =
1023 dlist_container(FlushPosition, node, iter.cur);
1024
1025 *write = pos->remote_end;
1026
1027 if (pos->local_end <= local_flush)
1028 {
1029 *flush = pos->remote_end;
1030 dlist_delete(iter.cur);
1031 pfree(pos);
1032 }
1033 else
1034 {
1035 /*
1036 * Don't want to uselessly iterate over the rest of the list which
1037 * could potentially be long. Instead get the last element and
1038 * grab the write position from there.
1039 */
1040 pos = dlist_tail_element(FlushPosition, node,
1041 &lsn_mapping);
1042 *write = pos->remote_end;
1043 *have_pending_txes = true;
1044 return;
1045 }
1046 }
1047
1048 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1049}
1050
1051/*
1052 * Store current remote/local lsn pair in the tracking list.
1053 */
1054static void
1055store_flush_position(XLogRecPtr remote_lsn)
1056{
1057 FlushPosition *flushpos;
1058
1059 /* Need to do this in permanent context */
1060 MemoryContextSwitchTo(ApplyContext);
1061
1062 /* Track commit lsn */
1063 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1064 flushpos->local_end = XactLastCommitEnd;
1065 flushpos->remote_end = remote_lsn;
1066
1067 dlist_push_tail(&lsn_mapping, &flushpos->node);
1068 MemoryContextSwitchTo(ApplyMessageContext);
1069}
1070
1071
1072/* Update statistics of the worker. */
1073static void
1074UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1075{
1076 MyLogicalRepWorker->last_lsn = last_lsn;
1077 MyLogicalRepWorker->last_send_time = send_time;
1078 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1079 if (reply)
1080 {
1081 MyLogicalRepWorker->reply_lsn = last_lsn;
1082 MyLogicalRepWorker->reply_time = send_time;
1083 }
1084}
1085
1086/*
1087 * Apply main loop.
1088 */
1089static void
1090LogicalRepApplyLoop(XLogRecPtr last_received)
1091{
1092 /*
1093 * Init the ApplyMessageContext which we clean up after each replication
1094 * protocol message.
1095 */
1096 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1097 "ApplyMessageContext",
1098 ALLOCSET_DEFAULT_SIZES);
1099
1100 /* mark as idle, before starting to loop */
1101 pgstat_report_activity(STATE_IDLE, NULL);
1102
1103 for (;;)
1104 {
1105 pgsocket fd = PGINVALID_SOCKET;
1106 int rc;
1107 int len;
1108 char *buf = NULL;
1109 bool endofstream = false;
1110 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1111 bool ping_sent = false;
1112 long wait_time;
1113
1114 CHECK_FOR_INTERRUPTS();
1115
1116 MemoryContextSwitchTo(ApplyMessageContext);
1117
1118 len = walrcv_receive(wrconn, &buf, &fd);
1119
1120 if (len != 0)
1121 {
1122 /* Process the data */
1123 for (;;)
1124 {
1125 CHECK_FOR_INTERRUPTS();
1126
1127 if (len == 0)
1128 {
1129 break;
1130 }
1131 else if (len < 0)
1132 {
1133 ereport(LOG,
1134 (errmsg("data stream from publisher has ended")));
1135 endofstream = true;
1136 break;
1137 }
1138 else
1139 {
1140 int c;
1141 StringInfoData s;
1142
1143 /* Reset timeout. */
1144 last_recv_timestamp = GetCurrentTimestamp();
1145 ping_sent = false;
1146
1147 /* Ensure we are reading the data into our memory context. */
1148 MemoryContextSwitchTo(ApplyMessageContext);
1149
1150 s.data = buf;
1151 s.len = len;
1152 s.cursor = 0;
1153 s.maxlen = -1;
1154
1155 c = pq_getmsgbyte(&s);
1156
1157 if (c == 'w')
1158 {
1159 XLogRecPtr start_lsn;
1160 XLogRecPtr end_lsn;
1161 TimestampTz send_time;
1162
1163 start_lsn = pq_getmsgint64(&s);
1164 end_lsn = pq_getmsgint64(&s);
1165 send_time = pq_getmsgint64(&s);
1166
1167 if (last_received < start_lsn)
1168 last_received = start_lsn;
1169
1170 if (last_received < end_lsn)
1171 last_received = end_lsn;
1172
1173 UpdateWorkerStats(last_received, send_time, false);
1174
1175 apply_dispatch(&s);
1176 }
1177 else if (c == 'k')
1178 {
1179 XLogRecPtr end_lsn;
1180 TimestampTz timestamp;
1181 bool reply_requested;
1182
1183 end_lsn = pq_getmsgint64(&s);
1184 timestamp = pq_getmsgint64(&s);
1185 reply_requested = pq_getmsgbyte(&s);
1186
1187 if (last_received < end_lsn)
1188 last_received = end_lsn;
1189
1190 send_feedback(last_received, reply_requested, false);
1191 UpdateWorkerStats(last_received, timestamp, true);
1192 }
1193 /* other message types are purposefully ignored */
1194
1195 MemoryContextReset(ApplyMessageContext);
1196 }
1197
1198 len = walrcv_receive(wrconn, &buf, &fd);
1199 }
1200 }
1201
1202 /* confirm all writes so far */
1203 send_feedback(last_received, false, false);
1204
1205 if (!in_remote_transaction)
1206 {
1207 /*
1208 * If we didn't get any transactions for a while there might be
1209 * unconsumed invalidation messages in the queue, consume them
1210 * now.
1211 */
1212 AcceptInvalidationMessages();
1213 maybe_reread_subscription();
1214
1215 /* Process any table synchronization changes. */
1216 process_syncing_tables(last_received);
1217 }
1218
1219 /* Cleanup the memory. */
1220 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1221 MemoryContextSwitchTo(TopMemoryContext);
1222
1223 /* Check if we need to exit the streaming loop. */
1224 if (endofstream)
1225 {
1226 TimeLineID tli;
1227
1228 walrcv_endstreaming(wrconn, &tli);
1229 break;
1230 }
1231
1232 /*
1233 * Wait for more data or latch. If we have unflushed transactions,
1234 * wake up after WalWriterDelay to see if they've been flushed yet (in
1235 * which case we should send a feedback message). Otherwise, there's
1236 * no particular urgency about waking up unless we get data or a
1237 * signal.
1238 */
1239 if (!dlist_is_empty(&lsn_mapping))
1240 wait_time = WalWriterDelay;
1241 else
1242 wait_time = NAPTIME_PER_CYCLE;
1243
1244 rc = WaitLatchOrSocket(MyLatch,
1245 WL_SOCKET_READABLE | WL_LATCH_SET |
1246 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1247 fd, wait_time,
1248 WAIT_EVENT_LOGICAL_APPLY_MAIN);
1249
1250 if (rc & WL_LATCH_SET)
1251 {
1252 ResetLatch(MyLatch);
1253 CHECK_FOR_INTERRUPTS();
1254 }
1255
1256 if (got_SIGHUP)
1257 {
1258 got_SIGHUP = false;
1259 ProcessConfigFile(PGC_SIGHUP);
1260 }
1261
1262 if (rc & WL_TIMEOUT)
1263 {
1264 /*
1265 * We didn't receive anything new. If we haven't heard anything
1266 * from the server for more than wal_receiver_timeout / 2, ping
1267 * the server. Also, if it's been longer than
1268 * wal_receiver_status_interval since the last update we sent,
1269 * send a status update to the master anyway, to report any
1270 * progress in applying WAL.
1271 */
1272 bool requestReply = false;
1273
1274 /*
1275 * Check if time since last receive from standby has reached the
1276 * configured limit.
1277 */
1278 if (wal_receiver_timeout > 0)
1279 {
1280 TimestampTz now = GetCurrentTimestamp();
1281 TimestampTz timeout;
1282
1283 timeout =
1284 TimestampTzPlusMilliseconds(last_recv_timestamp,
1285 wal_receiver_timeout);
1286
1287 if (now >= timeout)
1288 ereport(ERROR,
1289 (errmsg("terminating logical replication worker due to timeout")));
1290
1291 /*
1292 * We didn't receive anything new, for half of receiver
1293 * replication timeout. Ping the server.
1294 */
1295 if (!ping_sent)
1296 {
1297 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1298 (wal_receiver_timeout / 2));
1299 if (now >= timeout)
1300 {
1301 requestReply = true;
1302 ping_sent = true;
1303 }
1304 }
1305 }
1306
1307 send_feedback(last_received, requestReply, requestReply);
1308 }
1309 }
1310}
1311
1312/*
1313 * Send a Standby Status Update message to server.
1314 *
1315 * 'recvpos' is the latest LSN we've received data to, force is set if we need
1316 * to send a response to avoid timeouts.
1317 */
1318static void
1319send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1320{
1321 static StringInfo reply_message = NULL;
1322 static TimestampTz send_time = 0;
1323
1324 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1325 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1326 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1327
1328 XLogRecPtr writepos;
1329 XLogRecPtr flushpos;
1330 TimestampTz now;
1331 bool have_pending_txes;
1332
1333 /*
1334 * If the user doesn't want status to be reported to the publisher, be
1335 * sure to exit before doing anything at all.
1336 */
1337 if (!force && wal_receiver_status_interval <= 0)
1338 return;
1339
1340 /* It's legal to not pass a recvpos */
1341 if (recvpos < last_recvpos)
1342 recvpos = last_recvpos;
1343
1344 get_flush_position(&writepos, &flushpos, &have_pending_txes);
1345
1346 /*
1347 * No outstanding transactions to flush, we can report the latest received
1348 * position. This is important for synchronous replication.
1349 */
1350 if (!have_pending_txes)
1351 flushpos = writepos = recvpos;
1352
1353 if (writepos < last_writepos)
1354 writepos = last_writepos;
1355
1356 if (flushpos < last_flushpos)
1357 flushpos = last_flushpos;
1358
1359 now = GetCurrentTimestamp();
1360
1361 /* if we've already reported everything we're good */
1362 if (!force &&
1363 writepos == last_writepos &&
1364 flushpos == last_flushpos &&
1365 !TimestampDifferenceExceeds(send_time, now,
1366 wal_receiver_status_interval * 1000))
1367 return;
1368 send_time = now;
1369
1370 if (!reply_message)
1371 {
1372 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1373
1374 reply_message = makeStringInfo();
1375 MemoryContextSwitchTo(oldctx);
1376 }
1377 else
1378 resetStringInfo(reply_message);
1379
1380 pq_sendbyte(reply_message, 'r');
1381 pq_sendint64(reply_message, recvpos); /* write */
1382 pq_sendint64(reply_message, flushpos); /* flush */
1383 pq_sendint64(reply_message, writepos); /* apply */
1384 pq_sendint64(reply_message, now); /* sendTime */
1385 pq_sendbyte(reply_message, requestReply); /* replyRequested */
1386
1387 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1388 force,
1389 (uint32) (recvpos >> 32), (uint32) recvpos,
1390 (uint32) (writepos >> 32), (uint32) writepos,
1391 (uint32) (flushpos >> 32), (uint32) flushpos
1392 );
1393
1394 walrcv_send(wrconn, reply_message->data, reply_message->len);
1395
1396 if (recvpos > last_recvpos)
1397 last_recvpos = recvpos;
1398 if (writepos > last_writepos)
1399 last_writepos = writepos;
1400 if (flushpos > last_flushpos)
1401 last_flushpos = flushpos;
1402}
1403
1404/*
1405 * Reread subscription info if needed. Most changes will be exit.
1406 */
1407static void
1408maybe_reread_subscription(void)
1409{
1410 MemoryContext oldctx;
1411 Subscription *newsub;
1412 bool started_tx = false;
1413
1414 /* When cache state is valid there is nothing to do here. */
1415 if (MySubscriptionValid)
1416 return;
1417
1418 /* This function might be called inside or outside of transaction. */
1419 if (!IsTransactionState())
1420 {
1421 StartTransactionCommand();
1422 started_tx = true;
1423 }
1424
1425 /* Ensure allocations in permanent context. */
1426 oldctx = MemoryContextSwitchTo(ApplyContext);
1427
1428 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1429
1430 /*
1431 * Exit if the subscription was removed. This normally should not happen
1432 * as the worker gets killed during DROP SUBSCRIPTION.
1433 */
1434 if (!newsub)
1435 {
1436 ereport(LOG,
1437 (errmsg("logical replication apply worker for subscription \"%s\" will "
1438 "stop because the subscription was removed",
1439 MySubscription->name)));
1440
1441 proc_exit(0);
1442 }
1443
1444 /*
1445 * Exit if the subscription was disabled. This normally should not happen
1446 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1447 */
1448 if (!newsub->enabled)
1449 {
1450 ereport(LOG,
1451 (errmsg("logical replication apply worker for subscription \"%s\" will "
1452 "stop because the subscription was disabled",
1453 MySubscription->name)));
1454
1455 proc_exit(0);
1456 }
1457
1458 /*
1459 * Exit if connection string was changed. The launcher will start new
1460 * worker.
1461 */
1462 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1463 {
1464 ereport(LOG,
1465 (errmsg("logical replication apply worker for subscription \"%s\" will "
1466 "restart because the connection information was changed",
1467 MySubscription->name)));
1468
1469 proc_exit(0);
1470 }
1471
1472 /*
1473 * Exit if subscription name was changed (it's used for
1474 * fallback_application_name). The launcher will start new worker.
1475 */
1476 if (strcmp(newsub->name, MySubscription->name) != 0)
1477 {
1478 ereport(LOG,
1479 (errmsg("logical replication apply worker for subscription \"%s\" will "
1480 "restart because subscription was renamed",
1481 MySubscription->name)));
1482
1483 proc_exit(0);
1484 }
1485
1486 /* !slotname should never happen when enabled is true. */
1487 Assert(newsub->slotname);
1488
1489 /*
1490 * We need to make new connection to new slot if slot name has changed so
1491 * exit here as well if that's the case.
1492 */
1493 if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1494 {
1495 ereport(LOG,
1496 (errmsg("logical replication apply worker for subscription \"%s\" will "
1497 "restart because the replication slot name was changed",
1498 MySubscription->name)));
1499
1500 proc_exit(0);
1501 }
1502
1503 /*
1504 * Exit if publication list was changed. The launcher will start new
1505 * worker.
1506 */
1507 if (!equal(newsub->publications, MySubscription->publications))
1508 {
1509 ereport(LOG,
1510 (errmsg("logical replication apply worker for subscription \"%s\" will "
1511 "restart because subscription's publications were changed",
1512 MySubscription->name)));
1513
1514 proc_exit(0);
1515 }
1516
1517 /* Check for other changes that should never happen too. */
1518 if (newsub->dbid != MySubscription->dbid)
1519 {
1520 elog(ERROR, "subscription %u changed unexpectedly",
1521 MyLogicalRepWorker->subid);
1522 }
1523
1524 /* Clean old subscription info and switch to new one. */
1525 FreeSubscription(MySubscription);
1526 MySubscription = newsub;
1527
1528 MemoryContextSwitchTo(oldctx);
1529
1530 /* Change synchronous commit according to the user's wishes */
1531 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1532 PGC_BACKEND, PGC_S_OVERRIDE);
1533
1534 if (started_tx)
1535 CommitTransactionCommand();
1536
1537 MySubscriptionValid = true;
1538}
1539
1540/*
1541 * Callback from subscription syscache invalidation.
1542 */
1543static void
1544subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1545{
1546 MySubscriptionValid = false;
1547}
1548
1549/* SIGHUP: set flag to reload configuration at next convenient time */
1550static void
1551logicalrep_worker_sighup(SIGNAL_ARGS)
1552{
1553 int save_errno = errno;
1554
1555 got_SIGHUP = true;
1556
1557 /* Waken anything waiting on the process latch */
1558 SetLatch(MyLatch);
1559
1560 errno = save_errno;
1561}
1562
1563/* Logical Replication Apply worker entry point */
1564void
1565ApplyWorkerMain(Datum main_arg)
1566{
1567 int worker_slot = DatumGetInt32(main_arg);
1568 MemoryContext oldctx;
1569 char originname[NAMEDATALEN];
1570 XLogRecPtr origin_startpos;
1571 char *myslotname;
1572 WalRcvStreamOptions options;
1573
1574 /* Attach to slot */
1575 logicalrep_worker_attach(worker_slot);
1576
1577 /* Setup signal handling */
1578 pqsignal(SIGHUP, logicalrep_worker_sighup);
1579 pqsignal(SIGTERM, die);
1580 BackgroundWorkerUnblockSignals();
1581
1582 /*
1583 * We don't currently need any ResourceOwner in a walreceiver process, but
1584 * if we did, we could call CreateAuxProcessResourceOwner here.
1585 */
1586
1587 /* Initialise stats to a sanish value */
1588 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1589 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1590
1591 /* Load the libpq-specific functions */
1592 load_file("libpqwalreceiver", false);
1593
1594 /* Run as replica session replication role. */
1595 SetConfigOption("session_replication_role", "replica",
1596 PGC_SUSET, PGC_S_OVERRIDE);
1597
1598 /* Connect to our database. */
1599 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1600 MyLogicalRepWorker->userid,
1601 0);
1602
1603 /* Load the subscription into persistent memory context. */
1604 ApplyContext = AllocSetContextCreate(TopMemoryContext,
1605 "ApplyContext",
1606 ALLOCSET_DEFAULT_SIZES);
1607 StartTransactionCommand();
1608 oldctx = MemoryContextSwitchTo(ApplyContext);
1609
1610 MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1611 if (!MySubscription)
1612 {
1613 ereport(LOG,
1614 (errmsg("logical replication apply worker for subscription %u will not "
1615 "start because the subscription was removed during startup",
1616 MyLogicalRepWorker->subid)));
1617 proc_exit(0);
1618 }
1619
1620 MySubscriptionValid = true;
1621 MemoryContextSwitchTo(oldctx);
1622
1623 if (!MySubscription->enabled)
1624 {
1625 ereport(LOG,
1626 (errmsg("logical replication apply worker for subscription \"%s\" will not "
1627 "start because the subscription was disabled during startup",
1628 MySubscription->name)));
1629
1630 proc_exit(0);
1631 }
1632
1633 /* Setup synchronous commit according to the user's wishes */
1634 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1635 PGC_BACKEND, PGC_S_OVERRIDE);
1636
1637 /* Keep us informed about subscription changes. */
1638 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1639 subscription_change_cb,
1640 (Datum) 0);
1641
1642 if (am_tablesync_worker())
1643 ereport(LOG,
1644 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1645 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1646 else
1647 ereport(LOG,
1648 (errmsg("logical replication apply worker for subscription \"%s\" has started",
1649 MySubscription->name)));
1650
1651 CommitTransactionCommand();
1652
1653 /* Connect to the origin and start the replication. */
1654 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1655 MySubscription->conninfo);
1656
1657 if (am_tablesync_worker())
1658 {
1659 char *syncslotname;
1660
1661 /* This is table synchroniation worker, call initial sync. */
1662 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1663
1664 /* The slot name needs to be allocated in permanent memory context. */
1665 oldctx = MemoryContextSwitchTo(ApplyContext);
1666 myslotname = pstrdup(syncslotname);
1667 MemoryContextSwitchTo(oldctx);
1668
1669 pfree(syncslotname);
1670 }
1671 else
1672 {
1673 /* This is main apply worker */
1674 RepOriginId originid;
1675 TimeLineID startpointTLI;
1676 char *err;
1677
1678 myslotname = MySubscription->slotname;
1679
1680 /*
1681 * This shouldn't happen if the subscription is enabled, but guard
1682 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1683 * crash if slot is NULL.)
1684 */
1685 if (!myslotname)
1686 ereport(ERROR,
1687 (errmsg("subscription has no replication slot set")));
1688
1689 /* Setup replication origin tracking. */
1690 StartTransactionCommand();
1691 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1692 originid = replorigin_by_name(originname, true);
1693 if (!OidIsValid(originid))
1694 originid = replorigin_create(originname);
1695 replorigin_session_setup(originid);
1696 replorigin_session_origin = originid;
1697 origin_startpos = replorigin_session_get_progress(false);
1698 CommitTransactionCommand();
1699
1700 wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1701 &err);
1702 if (wrconn == NULL)
1703 ereport(ERROR,
1704 (errmsg("could not connect to the publisher: %s", err)));
1705
1706 /*
1707 * We don't really use the output identify_system for anything but it
1708 * does some initializations on the upstream so let's still call it.
1709 */
1710 (void) walrcv_identify_system(wrconn, &startpointTLI);
1711
1712 }
1713
1714 /*
1715 * Setup callback for syscache so that we know when something changes in
1716 * the subscription relation state.
1717 */
1718 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1719 invalidate_syncing_table_states,
1720 (Datum) 0);
1721
1722 /* Build logical replication streaming options. */
1723 options.logical = true;
1724 options.startpoint = origin_startpos;
1725 options.slotname = myslotname;
1726 options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1727 options.proto.logical.publication_names = MySubscription->publications;
1728
1729 /* Start normal logical streaming replication. */
1730 walrcv_startstreaming(wrconn, &options);
1731
1732 /* Run the main loop. */
1733 LogicalRepApplyLoop(origin_startpos);
1734
1735 proc_exit(0);
1736}
1737
1738/*
1739 * Is current process a logical replication worker?
1740 */
1741bool
1742IsLogicalWorker(void)
1743{
1744 return MyLogicalRepWorker != NULL;
1745}
1746