1/*-------------------------------------------------------------------------
2 * launcher.c
3 * PostgreSQL logical replication worker launcher process
4 *
5 * Copyright (c) 2016-2019, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/launcher.c
9 *
10 * NOTES
11 * This module contains the logical replication worker launcher which
12 * uses the background worker infrastructure to start the logical
13 * replication workers for every enabled subscription.
14 *
15 *-------------------------------------------------------------------------
16 */
17
18#include "postgres.h"
19
20#include "funcapi.h"
21#include "miscadmin.h"
22#include "pgstat.h"
23
24#include "access/heapam.h"
25#include "access/htup.h"
26#include "access/htup_details.h"
27#include "access/tableam.h"
28#include "access/xact.h"
29
30#include "catalog/pg_subscription.h"
31#include "catalog/pg_subscription_rel.h"
32
33#include "libpq/pqsignal.h"
34
35#include "postmaster/bgworker.h"
36#include "postmaster/fork_process.h"
37#include "postmaster/postmaster.h"
38
39#include "replication/logicallauncher.h"
40#include "replication/logicalworker.h"
41#include "replication/slot.h"
42#include "replication/walreceiver.h"
43#include "replication/worker_internal.h"
44
45#include "storage/ipc.h"
46#include "storage/proc.h"
47#include "storage/procarray.h"
48#include "storage/procsignal.h"
49
50#include "tcop/tcopprot.h"
51
52#include "utils/memutils.h"
53#include "utils/pg_lsn.h"
54#include "utils/ps_status.h"
55#include "utils/timeout.h"
56#include "utils/snapmgr.h"
57
58/* max sleep time between cycles (3min) */
59#define DEFAULT_NAPTIME_PER_CYCLE 180000L
60
61int max_logical_replication_workers = 4;
62int max_sync_workers_per_subscription = 2;
63
64LogicalRepWorker *MyLogicalRepWorker = NULL;
65
66typedef struct LogicalRepCtxStruct
67{
68 /* Supervisor process. */
69 pid_t launcher_pid;
70
71 /* Background workers. */
72 LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
73} LogicalRepCtxStruct;
74
75LogicalRepCtxStruct *LogicalRepCtx;
76
77typedef struct LogicalRepWorkerId
78{
79 Oid subid;
80 Oid relid;
81} LogicalRepWorkerId;
82
83typedef struct StopWorkersData
84{
85 int nestDepth; /* Sub-transaction nest level */
86 List *workers; /* List of LogicalRepWorkerId */
87 struct StopWorkersData *parent; /* This need not be an immediate
88 * subtransaction parent */
89} StopWorkersData;
90
91/*
92 * Stack of StopWorkersData elements. Each stack element contains the workers
93 * to be stopped for that subtransaction.
94 */
95static StopWorkersData *on_commit_stop_workers = NULL;
96
97static void ApplyLauncherWakeup(void);
98static void logicalrep_launcher_onexit(int code, Datum arg);
99static void logicalrep_worker_onexit(int code, Datum arg);
100static void logicalrep_worker_detach(void);
101static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
102
103/* Flags set by signal handlers */
104static volatile sig_atomic_t got_SIGHUP = false;
105
106static bool on_commit_launcher_wakeup = false;
107
108Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
109
110
111/*
112 * Load the list of subscriptions.
113 *
114 * Only the fields interesting for worker start/stop functions are filled for
115 * each subscription.
116 */
117static List *
118get_subscription_list(void)
119{
120 List *res = NIL;
121 Relation rel;
122 TableScanDesc scan;
123 HeapTuple tup;
124 MemoryContext resultcxt;
125
126 /* This is the context that we will allocate our output data in */
127 resultcxt = CurrentMemoryContext;
128
129 /*
130 * Start a transaction so we can access pg_database, and get a snapshot.
131 * We don't have a use for the snapshot itself, but we're interested in
132 * the secondary effect that it sets RecentGlobalXmin. (This is critical
133 * for anything that reads heap pages, because HOT may decide to prune
134 * them even if the process doesn't attempt to modify any tuples.)
135 */
136 StartTransactionCommand();
137 (void) GetTransactionSnapshot();
138
139 rel = table_open(SubscriptionRelationId, AccessShareLock);
140 scan = table_beginscan_catalog(rel, 0, NULL);
141
142 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
143 {
144 Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
145 Subscription *sub;
146 MemoryContext oldcxt;
147
148 /*
149 * Allocate our results in the caller's context, not the
150 * transaction's. We do this inside the loop, and restore the original
151 * context at the end, so that leaky things like heap_getnext() are
152 * not called in a potentially long-lived context.
153 */
154 oldcxt = MemoryContextSwitchTo(resultcxt);
155
156 sub = (Subscription *) palloc0(sizeof(Subscription));
157 sub->oid = subform->oid;
158 sub->dbid = subform->subdbid;
159 sub->owner = subform->subowner;
160 sub->enabled = subform->subenabled;
161 sub->name = pstrdup(NameStr(subform->subname));
162 /* We don't fill fields we are not interested in. */
163
164 res = lappend(res, sub);
165 MemoryContextSwitchTo(oldcxt);
166 }
167
168 table_endscan(scan);
169 table_close(rel, AccessShareLock);
170
171 CommitTransactionCommand();
172
173 return res;
174}
175
176/*
177 * Wait for a background worker to start up and attach to the shmem context.
178 *
179 * This is only needed for cleaning up the shared memory in case the worker
180 * fails to attach.
181 */
182static void
183WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
184 uint16 generation,
185 BackgroundWorkerHandle *handle)
186{
187 BgwHandleStatus status;
188 int rc;
189
190 for (;;)
191 {
192 pid_t pid;
193
194 CHECK_FOR_INTERRUPTS();
195
196 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197
198 /* Worker either died or has started; no need to do anything. */
199 if (!worker->in_use || worker->proc)
200 {
201 LWLockRelease(LogicalRepWorkerLock);
202 return;
203 }
204
205 LWLockRelease(LogicalRepWorkerLock);
206
207 /* Check if worker has died before attaching, and clean up after it. */
208 status = GetBackgroundWorkerPid(handle, &pid);
209
210 if (status == BGWH_STOPPED)
211 {
212 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
213 /* Ensure that this was indeed the worker we waited for. */
214 if (generation == worker->generation)
215 logicalrep_worker_cleanup(worker);
216 LWLockRelease(LogicalRepWorkerLock);
217 return;
218 }
219
220 /*
221 * We need timeout because we generally don't get notified via latch
222 * about the worker attach. But we don't expect to have to wait long.
223 */
224 rc = WaitLatch(MyLatch,
225 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
226 10L, WAIT_EVENT_BGWORKER_STARTUP);
227
228 if (rc & WL_LATCH_SET)
229 {
230 ResetLatch(MyLatch);
231 CHECK_FOR_INTERRUPTS();
232 }
233 }
234
235 return;
236}
237
238/*
239 * Walks the workers array and searches for one that matches given
240 * subscription id and relid.
241 */
242LogicalRepWorker *
243logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
244{
245 int i;
246 LogicalRepWorker *res = NULL;
247
248 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
249
250 /* Search for attached worker for a given subscription id. */
251 for (i = 0; i < max_logical_replication_workers; i++)
252 {
253 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
254
255 if (w->in_use && w->subid == subid && w->relid == relid &&
256 (!only_running || w->proc))
257 {
258 res = w;
259 break;
260 }
261 }
262
263 return res;
264}
265
266/*
267 * Similar to logicalrep_worker_find(), but returns list of all workers for
268 * the subscription, instead just one.
269 */
270List *
271logicalrep_workers_find(Oid subid, bool only_running)
272{
273 int i;
274 List *res = NIL;
275
276 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
277
278 /* Search for attached worker for a given subscription id. */
279 for (i = 0; i < max_logical_replication_workers; i++)
280 {
281 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
282
283 if (w->in_use && w->subid == subid && (!only_running || w->proc))
284 res = lappend(res, w);
285 }
286
287 return res;
288}
289
290/*
291 * Start new apply background worker, if possible.
292 */
293void
294logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
295 Oid relid)
296{
297 BackgroundWorker bgw;
298 BackgroundWorkerHandle *bgw_handle;
299 uint16 generation;
300 int i;
301 int slot = 0;
302 LogicalRepWorker *worker = NULL;
303 int nsyncworkers;
304 TimestampTz now;
305
306 ereport(DEBUG1,
307 (errmsg("starting logical replication worker for subscription \"%s\"",
308 subname)));
309
310 /* Report this after the initial starting message for consistency. */
311 if (max_replication_slots == 0)
312 ereport(ERROR,
313 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
314 errmsg("cannot start logical replication workers when max_replication_slots = 0")));
315
316 /*
317 * We need to do the modification of the shared memory under lock so that
318 * we have consistent view.
319 */
320 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
321
322retry:
323 /* Find unused worker slot. */
324 for (i = 0; i < max_logical_replication_workers; i++)
325 {
326 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
327
328 if (!w->in_use)
329 {
330 worker = w;
331 slot = i;
332 break;
333 }
334 }
335
336 nsyncworkers = logicalrep_sync_worker_count(subid);
337
338 now = GetCurrentTimestamp();
339
340 /*
341 * If we didn't find a free slot, try to do garbage collection. The
342 * reason we do this is because if some worker failed to start up and its
343 * parent has crashed while waiting, the in_use state was never cleared.
344 */
345 if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
346 {
347 bool did_cleanup = false;
348
349 for (i = 0; i < max_logical_replication_workers; i++)
350 {
351 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
352
353 /*
354 * If the worker was marked in use but didn't manage to attach in
355 * time, clean it up.
356 */
357 if (w->in_use && !w->proc &&
358 TimestampDifferenceExceeds(w->launch_time, now,
359 wal_receiver_timeout))
360 {
361 elog(WARNING,
362 "logical replication worker for subscription %u took too long to start; canceled",
363 w->subid);
364
365 logicalrep_worker_cleanup(w);
366 did_cleanup = true;
367 }
368 }
369
370 if (did_cleanup)
371 goto retry;
372 }
373
374 /*
375 * If we reached the sync worker limit per subscription, just exit
376 * silently as we might get here because of an otherwise harmless race
377 * condition.
378 */
379 if (nsyncworkers >= max_sync_workers_per_subscription)
380 {
381 LWLockRelease(LogicalRepWorkerLock);
382 return;
383 }
384
385 /*
386 * However if there are no more free worker slots, inform user about it
387 * before exiting.
388 */
389 if (worker == NULL)
390 {
391 LWLockRelease(LogicalRepWorkerLock);
392 ereport(WARNING,
393 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
394 errmsg("out of logical replication worker slots"),
395 errhint("You might need to increase max_logical_replication_workers.")));
396 return;
397 }
398
399 /* Prepare the worker slot. */
400 worker->launch_time = now;
401 worker->in_use = true;
402 worker->generation++;
403 worker->proc = NULL;
404 worker->dbid = dbid;
405 worker->userid = userid;
406 worker->subid = subid;
407 worker->relid = relid;
408 worker->relstate = SUBREL_STATE_UNKNOWN;
409 worker->relstate_lsn = InvalidXLogRecPtr;
410 worker->last_lsn = InvalidXLogRecPtr;
411 TIMESTAMP_NOBEGIN(worker->last_send_time);
412 TIMESTAMP_NOBEGIN(worker->last_recv_time);
413 worker->reply_lsn = InvalidXLogRecPtr;
414 TIMESTAMP_NOBEGIN(worker->reply_time);
415
416 /* Before releasing lock, remember generation for future identification. */
417 generation = worker->generation;
418
419 LWLockRelease(LogicalRepWorkerLock);
420
421 /* Register the new dynamic worker. */
422 memset(&bgw, 0, sizeof(bgw));
423 bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
424 BGWORKER_BACKEND_DATABASE_CONNECTION;
425 bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
426 snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
427 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
428 if (OidIsValid(relid))
429 snprintf(bgw.bgw_name, BGW_MAXLEN,
430 "logical replication worker for subscription %u sync %u", subid, relid);
431 else
432 snprintf(bgw.bgw_name, BGW_MAXLEN,
433 "logical replication worker for subscription %u", subid);
434 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
435
436 bgw.bgw_restart_time = BGW_NEVER_RESTART;
437 bgw.bgw_notify_pid = MyProcPid;
438 bgw.bgw_main_arg = Int32GetDatum(slot);
439
440 if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
441 {
442 /* Failed to start worker, so clean up the worker slot. */
443 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
444 Assert(generation == worker->generation);
445 logicalrep_worker_cleanup(worker);
446 LWLockRelease(LogicalRepWorkerLock);
447
448 ereport(WARNING,
449 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
450 errmsg("out of background worker slots"),
451 errhint("You might need to increase max_worker_processes.")));
452 return;
453 }
454
455 /* Now wait until it attaches. */
456 WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
457}
458
459/*
460 * Stop the logical replication worker for subid/relid, if any, and wait until
461 * it detaches from the slot.
462 */
463void
464logicalrep_worker_stop(Oid subid, Oid relid)
465{
466 LogicalRepWorker *worker;
467 uint16 generation;
468
469 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
470
471 worker = logicalrep_worker_find(subid, relid, false);
472
473 /* No worker, nothing to do. */
474 if (!worker)
475 {
476 LWLockRelease(LogicalRepWorkerLock);
477 return;
478 }
479
480 /*
481 * Remember which generation was our worker so we can check if what we see
482 * is still the same one.
483 */
484 generation = worker->generation;
485
486 /*
487 * If we found a worker but it does not have proc set then it is still
488 * starting up; wait for it to finish starting and then kill it.
489 */
490 while (worker->in_use && !worker->proc)
491 {
492 int rc;
493
494 LWLockRelease(LogicalRepWorkerLock);
495
496 /* Wait a bit --- we don't expect to have to wait long. */
497 rc = WaitLatch(MyLatch,
498 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
499 10L, WAIT_EVENT_BGWORKER_STARTUP);
500
501 if (rc & WL_LATCH_SET)
502 {
503 ResetLatch(MyLatch);
504 CHECK_FOR_INTERRUPTS();
505 }
506
507 /* Recheck worker status. */
508 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
509
510 /*
511 * Check whether the worker slot is no longer used, which would mean
512 * that the worker has exited, or whether the worker generation is
513 * different, meaning that a different worker has taken the slot.
514 */
515 if (!worker->in_use || worker->generation != generation)
516 {
517 LWLockRelease(LogicalRepWorkerLock);
518 return;
519 }
520
521 /* Worker has assigned proc, so it has started. */
522 if (worker->proc)
523 break;
524 }
525
526 /* Now terminate the worker ... */
527 kill(worker->proc->pid, SIGTERM);
528
529 /* ... and wait for it to die. */
530 for (;;)
531 {
532 int rc;
533
534 /* is it gone? */
535 if (!worker->proc || worker->generation != generation)
536 break;
537
538 LWLockRelease(LogicalRepWorkerLock);
539
540 /* Wait a bit --- we don't expect to have to wait long. */
541 rc = WaitLatch(MyLatch,
542 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
543 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
544
545 if (rc & WL_LATCH_SET)
546 {
547 ResetLatch(MyLatch);
548 CHECK_FOR_INTERRUPTS();
549 }
550
551 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
552 }
553
554 LWLockRelease(LogicalRepWorkerLock);
555}
556
557/*
558 * Request worker for specified sub/rel to be stopped on commit.
559 */
560void
561logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
562{
563 int nestDepth = GetCurrentTransactionNestLevel();
564 LogicalRepWorkerId *wid;
565 MemoryContext oldctx;
566
567 /* Make sure we store the info in context that survives until commit. */
568 oldctx = MemoryContextSwitchTo(TopTransactionContext);
569
570 /* Check that previous transactions were properly cleaned up. */
571 Assert(on_commit_stop_workers == NULL ||
572 nestDepth >= on_commit_stop_workers->nestDepth);
573
574 /*
575 * Push a new stack element if we don't already have one for the current
576 * nestDepth.
577 */
578 if (on_commit_stop_workers == NULL ||
579 nestDepth > on_commit_stop_workers->nestDepth)
580 {
581 StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
582
583 newdata->nestDepth = nestDepth;
584 newdata->workers = NIL;
585 newdata->parent = on_commit_stop_workers;
586 on_commit_stop_workers = newdata;
587 }
588
589 /*
590 * Finally add a new worker into the worker list of the current
591 * subtransaction.
592 */
593 wid = palloc(sizeof(LogicalRepWorkerId));
594 wid->subid = subid;
595 wid->relid = relid;
596 on_commit_stop_workers->workers =
597 lappend(on_commit_stop_workers->workers, wid);
598
599 MemoryContextSwitchTo(oldctx);
600}
601
602/*
603 * Wake up (using latch) any logical replication worker for specified sub/rel.
604 */
605void
606logicalrep_worker_wakeup(Oid subid, Oid relid)
607{
608 LogicalRepWorker *worker;
609
610 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
611
612 worker = logicalrep_worker_find(subid, relid, true);
613
614 if (worker)
615 logicalrep_worker_wakeup_ptr(worker);
616
617 LWLockRelease(LogicalRepWorkerLock);
618}
619
620/*
621 * Wake up (using latch) the specified logical replication worker.
622 *
623 * Caller must hold lock, else worker->proc could change under us.
624 */
625void
626logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
627{
628 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
629
630 SetLatch(&worker->proc->procLatch);
631}
632
633/*
634 * Attach to a slot.
635 */
636void
637logicalrep_worker_attach(int slot)
638{
639 /* Block concurrent access. */
640 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
641
642 Assert(slot >= 0 && slot < max_logical_replication_workers);
643 MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
644
645 if (!MyLogicalRepWorker->in_use)
646 {
647 LWLockRelease(LogicalRepWorkerLock);
648 ereport(ERROR,
649 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
650 errmsg("logical replication worker slot %d is empty, cannot attach",
651 slot)));
652 }
653
654 if (MyLogicalRepWorker->proc)
655 {
656 LWLockRelease(LogicalRepWorkerLock);
657 ereport(ERROR,
658 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
659 errmsg("logical replication worker slot %d is already used by "
660 "another worker, cannot attach", slot)));
661 }
662
663 MyLogicalRepWorker->proc = MyProc;
664 before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
665
666 LWLockRelease(LogicalRepWorkerLock);
667}
668
669/*
670 * Detach the worker (cleans up the worker info).
671 */
672static void
673logicalrep_worker_detach(void)
674{
675 /* Block concurrent access. */
676 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
677
678 logicalrep_worker_cleanup(MyLogicalRepWorker);
679
680 LWLockRelease(LogicalRepWorkerLock);
681}
682
683/*
684 * Clean up worker info.
685 */
686static void
687logicalrep_worker_cleanup(LogicalRepWorker *worker)
688{
689 Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
690
691 worker->in_use = false;
692 worker->proc = NULL;
693 worker->dbid = InvalidOid;
694 worker->userid = InvalidOid;
695 worker->subid = InvalidOid;
696 worker->relid = InvalidOid;
697}
698
699/*
700 * Cleanup function for logical replication launcher.
701 *
702 * Called on logical replication launcher exit.
703 */
704static void
705logicalrep_launcher_onexit(int code, Datum arg)
706{
707 LogicalRepCtx->launcher_pid = 0;
708}
709
710/*
711 * Cleanup function.
712 *
713 * Called on logical replication worker exit.
714 */
715static void
716logicalrep_worker_onexit(int code, Datum arg)
717{
718 /* Disconnect gracefully from the remote side. */
719 if (wrconn)
720 walrcv_disconnect(wrconn);
721
722 logicalrep_worker_detach();
723
724 ApplyLauncherWakeup();
725}
726
727/* SIGHUP: set flag to reload configuration at next convenient time */
728static void
729logicalrep_launcher_sighup(SIGNAL_ARGS)
730{
731 int save_errno = errno;
732
733 got_SIGHUP = true;
734
735 /* Waken anything waiting on the process latch */
736 SetLatch(MyLatch);
737
738 errno = save_errno;
739}
740
741/*
742 * Count the number of registered (not necessarily running) sync workers
743 * for a subscription.
744 */
745int
746logicalrep_sync_worker_count(Oid subid)
747{
748 int i;
749 int res = 0;
750
751 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
752
753 /* Search for attached worker for a given subscription id. */
754 for (i = 0; i < max_logical_replication_workers; i++)
755 {
756 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
757
758 if (w->subid == subid && OidIsValid(w->relid))
759 res++;
760 }
761
762 return res;
763}
764
765/*
766 * ApplyLauncherShmemSize
767 * Compute space needed for replication launcher shared memory
768 */
769Size
770ApplyLauncherShmemSize(void)
771{
772 Size size;
773
774 /*
775 * Need the fixed struct and the array of LogicalRepWorker.
776 */
777 size = sizeof(LogicalRepCtxStruct);
778 size = MAXALIGN(size);
779 size = add_size(size, mul_size(max_logical_replication_workers,
780 sizeof(LogicalRepWorker)));
781 return size;
782}
783
784/*
785 * ApplyLauncherRegister
786 * Register a background worker running the logical replication launcher.
787 */
788void
789ApplyLauncherRegister(void)
790{
791 BackgroundWorker bgw;
792
793 if (max_logical_replication_workers == 0)
794 return;
795
796 memset(&bgw, 0, sizeof(bgw));
797 bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
798 BGWORKER_BACKEND_DATABASE_CONNECTION;
799 bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
800 snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
801 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
802 snprintf(bgw.bgw_name, BGW_MAXLEN,
803 "logical replication launcher");
804 snprintf(bgw.bgw_type, BGW_MAXLEN,
805 "logical replication launcher");
806 bgw.bgw_restart_time = 5;
807 bgw.bgw_notify_pid = 0;
808 bgw.bgw_main_arg = (Datum) 0;
809
810 RegisterBackgroundWorker(&bgw);
811}
812
813/*
814 * ApplyLauncherShmemInit
815 * Allocate and initialize replication launcher shared memory
816 */
817void
818ApplyLauncherShmemInit(void)
819{
820 bool found;
821
822 LogicalRepCtx = (LogicalRepCtxStruct *)
823 ShmemInitStruct("Logical Replication Launcher Data",
824 ApplyLauncherShmemSize(),
825 &found);
826
827 if (!found)
828 {
829 int slot;
830
831 memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
832
833 /* Initialize memory and spin locks for each worker slot. */
834 for (slot = 0; slot < max_logical_replication_workers; slot++)
835 {
836 LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
837
838 memset(worker, 0, sizeof(LogicalRepWorker));
839 SpinLockInit(&worker->relmutex);
840 }
841 }
842}
843
844/*
845 * Check whether current transaction has manipulated logical replication
846 * workers.
847 */
848bool
849XactManipulatesLogicalReplicationWorkers(void)
850{
851 return (on_commit_stop_workers != NULL);
852}
853
854/*
855 * Wakeup the launcher on commit if requested.
856 */
857void
858AtEOXact_ApplyLauncher(bool isCommit)
859{
860
861 Assert(on_commit_stop_workers == NULL ||
862 (on_commit_stop_workers->nestDepth == 1 &&
863 on_commit_stop_workers->parent == NULL));
864
865 if (isCommit)
866 {
867 ListCell *lc;
868
869 if (on_commit_stop_workers != NULL)
870 {
871 List *workers = on_commit_stop_workers->workers;
872
873 foreach(lc, workers)
874 {
875 LogicalRepWorkerId *wid = lfirst(lc);
876
877 logicalrep_worker_stop(wid->subid, wid->relid);
878 }
879 }
880
881 if (on_commit_launcher_wakeup)
882 ApplyLauncherWakeup();
883 }
884
885 /*
886 * No need to pfree on_commit_stop_workers. It was allocated in
887 * transaction memory context, which is going to be cleaned soon.
888 */
889 on_commit_stop_workers = NULL;
890 on_commit_launcher_wakeup = false;
891}
892
893/*
894 * On commit, merge the current on_commit_stop_workers list into the
895 * immediate parent, if present.
896 * On rollback, discard the current on_commit_stop_workers list.
897 * Pop out the stack.
898 */
899void
900AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
901{
902 StopWorkersData *parent;
903
904 /* Exit immediately if there's no work to do at this level. */
905 if (on_commit_stop_workers == NULL ||
906 on_commit_stop_workers->nestDepth < nestDepth)
907 return;
908
909 Assert(on_commit_stop_workers->nestDepth == nestDepth);
910
911 parent = on_commit_stop_workers->parent;
912
913 if (isCommit)
914 {
915 /*
916 * If the upper stack element is not an immediate parent
917 * subtransaction, just decrement the notional nesting depth without
918 * doing any real work. Else, we need to merge the current workers
919 * list into the parent.
920 */
921 if (!parent || parent->nestDepth < nestDepth - 1)
922 {
923 on_commit_stop_workers->nestDepth--;
924 return;
925 }
926
927 parent->workers =
928 list_concat(parent->workers, on_commit_stop_workers->workers);
929 }
930 else
931 {
932 /*
933 * Abandon everything that was done at this nesting level. Explicitly
934 * free memory to avoid a transaction-lifespan leak.
935 */
936 list_free_deep(on_commit_stop_workers->workers);
937 }
938
939 /*
940 * We have taken care of the current subtransaction workers list for both
941 * abort or commit. So we are ready to pop the stack.
942 */
943 pfree(on_commit_stop_workers);
944 on_commit_stop_workers = parent;
945}
946
947/*
948 * Request wakeup of the launcher on commit of the transaction.
949 *
950 * This is used to send launcher signal to stop sleeping and process the
951 * subscriptions when current transaction commits. Should be used when new
952 * tuple was added to the pg_subscription catalog.
953*/
954void
955ApplyLauncherWakeupAtCommit(void)
956{
957 if (!on_commit_launcher_wakeup)
958 on_commit_launcher_wakeup = true;
959}
960
961static void
962ApplyLauncherWakeup(void)
963{
964 if (LogicalRepCtx->launcher_pid != 0)
965 kill(LogicalRepCtx->launcher_pid, SIGUSR1);
966}
967
968/*
969 * Main loop for the apply launcher process.
970 */
971void
972ApplyLauncherMain(Datum main_arg)
973{
974 TimestampTz last_start_time = 0;
975
976 ereport(DEBUG1,
977 (errmsg("logical replication launcher started")));
978
979 before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
980
981 Assert(LogicalRepCtx->launcher_pid == 0);
982 LogicalRepCtx->launcher_pid = MyProcPid;
983
984 /* Establish signal handlers. */
985 pqsignal(SIGHUP, logicalrep_launcher_sighup);
986 pqsignal(SIGTERM, die);
987 BackgroundWorkerUnblockSignals();
988
989 /*
990 * Establish connection to nailed catalogs (we only ever access
991 * pg_subscription).
992 */
993 BackgroundWorkerInitializeConnection(NULL, NULL, 0);
994
995 /* Enter main loop */
996 for (;;)
997 {
998 int rc;
999 List *sublist;
1000 ListCell *lc;
1001 MemoryContext subctx;
1002 MemoryContext oldctx;
1003 TimestampTz now;
1004 long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1005
1006 CHECK_FOR_INTERRUPTS();
1007
1008 now = GetCurrentTimestamp();
1009
1010 /* Limit the start retry to once a wal_retrieve_retry_interval */
1011 if (TimestampDifferenceExceeds(last_start_time, now,
1012 wal_retrieve_retry_interval))
1013 {
1014 /* Use temporary context for the database list and worker info. */
1015 subctx = AllocSetContextCreate(TopMemoryContext,
1016 "Logical Replication Launcher sublist",
1017 ALLOCSET_DEFAULT_SIZES);
1018 oldctx = MemoryContextSwitchTo(subctx);
1019
1020 /* search for subscriptions to start or stop. */
1021 sublist = get_subscription_list();
1022
1023 /* Start the missing workers for enabled subscriptions. */
1024 foreach(lc, sublist)
1025 {
1026 Subscription *sub = (Subscription *) lfirst(lc);
1027 LogicalRepWorker *w;
1028
1029 if (!sub->enabled)
1030 continue;
1031
1032 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1033 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1034 LWLockRelease(LogicalRepWorkerLock);
1035
1036 if (w == NULL)
1037 {
1038 last_start_time = now;
1039 wait_time = wal_retrieve_retry_interval;
1040
1041 logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
1042 sub->owner, InvalidOid);
1043 }
1044 }
1045
1046 /* Switch back to original memory context. */
1047 MemoryContextSwitchTo(oldctx);
1048 /* Clean the temporary memory. */
1049 MemoryContextDelete(subctx);
1050 }
1051 else
1052 {
1053 /*
1054 * The wait in previous cycle was interrupted in less than
1055 * wal_retrieve_retry_interval since last worker was started, this
1056 * usually means crash of the worker, so we should retry in
1057 * wal_retrieve_retry_interval again.
1058 */
1059 wait_time = wal_retrieve_retry_interval;
1060 }
1061
1062 /* Wait for more work. */
1063 rc = WaitLatch(MyLatch,
1064 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1065 wait_time,
1066 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1067
1068 if (rc & WL_LATCH_SET)
1069 {
1070 ResetLatch(MyLatch);
1071 CHECK_FOR_INTERRUPTS();
1072 }
1073
1074 if (got_SIGHUP)
1075 {
1076 got_SIGHUP = false;
1077 ProcessConfigFile(PGC_SIGHUP);
1078 }
1079 }
1080
1081 /* Not reachable */
1082}
1083
1084/*
1085 * Is current process the logical replication launcher?
1086 */
1087bool
1088IsLogicalLauncher(void)
1089{
1090 return LogicalRepCtx->launcher_pid == MyProcPid;
1091}
1092
1093/*
1094 * Returns state of the subscriptions.
1095 */
1096Datum
1097pg_stat_get_subscription(PG_FUNCTION_ARGS)
1098{
1099#define PG_STAT_GET_SUBSCRIPTION_COLS 8
1100 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1101 int i;
1102 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1103 TupleDesc tupdesc;
1104 Tuplestorestate *tupstore;
1105 MemoryContext per_query_ctx;
1106 MemoryContext oldcontext;
1107
1108 /* check to see if caller supports us returning a tuplestore */
1109 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1110 ereport(ERROR,
1111 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1112 errmsg("set-valued function called in context that cannot accept a set")));
1113 if (!(rsinfo->allowedModes & SFRM_Materialize))
1114 ereport(ERROR,
1115 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1116 errmsg("materialize mode required, but it is not " \
1117 "allowed in this context")));
1118
1119 /* Build a tuple descriptor for our result type */
1120 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1121 elog(ERROR, "return type must be a row type");
1122
1123 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1124 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1125
1126 tupstore = tuplestore_begin_heap(true, false, work_mem);
1127 rsinfo->returnMode = SFRM_Materialize;
1128 rsinfo->setResult = tupstore;
1129 rsinfo->setDesc = tupdesc;
1130
1131 MemoryContextSwitchTo(oldcontext);
1132
1133 /* Make sure we get consistent view of the workers. */
1134 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1135
1136 for (i = 0; i <= max_logical_replication_workers; i++)
1137 {
1138 /* for each row */
1139 Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
1140 bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
1141 int worker_pid;
1142 LogicalRepWorker worker;
1143
1144 memcpy(&worker, &LogicalRepCtx->workers[i],
1145 sizeof(LogicalRepWorker));
1146 if (!worker.proc || !IsBackendPid(worker.proc->pid))
1147 continue;
1148
1149 if (OidIsValid(subid) && worker.subid != subid)
1150 continue;
1151
1152 worker_pid = worker.proc->pid;
1153
1154 MemSet(values, 0, sizeof(values));
1155 MemSet(nulls, 0, sizeof(nulls));
1156
1157 values[0] = ObjectIdGetDatum(worker.subid);
1158 if (OidIsValid(worker.relid))
1159 values[1] = ObjectIdGetDatum(worker.relid);
1160 else
1161 nulls[1] = true;
1162 values[2] = Int32GetDatum(worker_pid);
1163 if (XLogRecPtrIsInvalid(worker.last_lsn))
1164 nulls[3] = true;
1165 else
1166 values[3] = LSNGetDatum(worker.last_lsn);
1167 if (worker.last_send_time == 0)
1168 nulls[4] = true;
1169 else
1170 values[4] = TimestampTzGetDatum(worker.last_send_time);
1171 if (worker.last_recv_time == 0)
1172 nulls[5] = true;
1173 else
1174 values[5] = TimestampTzGetDatum(worker.last_recv_time);
1175 if (XLogRecPtrIsInvalid(worker.reply_lsn))
1176 nulls[6] = true;
1177 else
1178 values[6] = LSNGetDatum(worker.reply_lsn);
1179 if (worker.reply_time == 0)
1180 nulls[7] = true;
1181 else
1182 values[7] = TimestampTzGetDatum(worker.reply_time);
1183
1184 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1185
1186 /*
1187 * If only a single subscription was requested, and we found it,
1188 * break.
1189 */
1190 if (OidIsValid(subid))
1191 break;
1192 }
1193
1194 LWLockRelease(LogicalRepWorkerLock);
1195
1196 /* clean up and return the tuplestore */
1197 tuplestore_donestoring(tupstore);
1198
1199 return (Datum) 0;
1200}
1201