1 | /*------------------------------------------------------------------------- |
2 | * launcher.c |
3 | * PostgreSQL logical replication worker launcher process |
4 | * |
5 | * Copyright (c) 2016-2019, PostgreSQL Global Development Group |
6 | * |
7 | * IDENTIFICATION |
8 | * src/backend/replication/logical/launcher.c |
9 | * |
10 | * NOTES |
11 | * This module contains the logical replication worker launcher which |
12 | * uses the background worker infrastructure to start the logical |
13 | * replication workers for every enabled subscription. |
14 | * |
15 | *------------------------------------------------------------------------- |
16 | */ |
17 | |
18 | #include "postgres.h" |
19 | |
20 | #include "funcapi.h" |
21 | #include "miscadmin.h" |
22 | #include "pgstat.h" |
23 | |
24 | #include "access/heapam.h" |
25 | #include "access/htup.h" |
26 | #include "access/htup_details.h" |
27 | #include "access/tableam.h" |
28 | #include "access/xact.h" |
29 | |
30 | #include "catalog/pg_subscription.h" |
31 | #include "catalog/pg_subscription_rel.h" |
32 | |
33 | #include "libpq/pqsignal.h" |
34 | |
35 | #include "postmaster/bgworker.h" |
36 | #include "postmaster/fork_process.h" |
37 | #include "postmaster/postmaster.h" |
38 | |
39 | #include "replication/logicallauncher.h" |
40 | #include "replication/logicalworker.h" |
41 | #include "replication/slot.h" |
42 | #include "replication/walreceiver.h" |
43 | #include "replication/worker_internal.h" |
44 | |
45 | #include "storage/ipc.h" |
46 | #include "storage/proc.h" |
47 | #include "storage/procarray.h" |
48 | #include "storage/procsignal.h" |
49 | |
50 | #include "tcop/tcopprot.h" |
51 | |
52 | #include "utils/memutils.h" |
53 | #include "utils/pg_lsn.h" |
54 | #include "utils/ps_status.h" |
55 | #include "utils/timeout.h" |
56 | #include "utils/snapmgr.h" |
57 | |
58 | /* max sleep time between cycles (3min) */ |
59 | #define DEFAULT_NAPTIME_PER_CYCLE 180000L |
60 | |
61 | int max_logical_replication_workers = 4; |
62 | int max_sync_workers_per_subscription = 2; |
63 | |
64 | LogicalRepWorker *MyLogicalRepWorker = NULL; |
65 | |
66 | typedef struct LogicalRepCtxStruct |
67 | { |
68 | /* Supervisor process. */ |
69 | pid_t launcher_pid; |
70 | |
71 | /* Background workers. */ |
72 | LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; |
73 | } LogicalRepCtxStruct; |
74 | |
75 | LogicalRepCtxStruct *LogicalRepCtx; |
76 | |
77 | typedef struct LogicalRepWorkerId |
78 | { |
79 | Oid subid; |
80 | Oid relid; |
81 | } LogicalRepWorkerId; |
82 | |
83 | typedef struct StopWorkersData |
84 | { |
85 | int nestDepth; /* Sub-transaction nest level */ |
86 | List *workers; /* List of LogicalRepWorkerId */ |
87 | struct StopWorkersData *parent; /* This need not be an immediate |
88 | * subtransaction parent */ |
89 | } StopWorkersData; |
90 | |
91 | /* |
92 | * Stack of StopWorkersData elements. Each stack element contains the workers |
93 | * to be stopped for that subtransaction. |
94 | */ |
95 | static StopWorkersData *on_commit_stop_workers = NULL; |
96 | |
97 | static void ApplyLauncherWakeup(void); |
98 | static void logicalrep_launcher_onexit(int code, Datum arg); |
99 | static void logicalrep_worker_onexit(int code, Datum arg); |
100 | static void logicalrep_worker_detach(void); |
101 | static void logicalrep_worker_cleanup(LogicalRepWorker *worker); |
102 | |
103 | /* Flags set by signal handlers */ |
104 | static volatile sig_atomic_t got_SIGHUP = false; |
105 | |
106 | static bool on_commit_launcher_wakeup = false; |
107 | |
108 | Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); |
109 | |
110 | |
111 | /* |
112 | * Load the list of subscriptions. |
113 | * |
114 | * Only the fields interesting for worker start/stop functions are filled for |
115 | * each subscription. |
116 | */ |
117 | static List * |
118 | get_subscription_list(void) |
119 | { |
120 | List *res = NIL; |
121 | Relation rel; |
122 | TableScanDesc scan; |
123 | HeapTuple tup; |
124 | MemoryContext resultcxt; |
125 | |
126 | /* This is the context that we will allocate our output data in */ |
127 | resultcxt = CurrentMemoryContext; |
128 | |
129 | /* |
130 | * Start a transaction so we can access pg_database, and get a snapshot. |
131 | * We don't have a use for the snapshot itself, but we're interested in |
132 | * the secondary effect that it sets RecentGlobalXmin. (This is critical |
133 | * for anything that reads heap pages, because HOT may decide to prune |
134 | * them even if the process doesn't attempt to modify any tuples.) |
135 | */ |
136 | StartTransactionCommand(); |
137 | (void) GetTransactionSnapshot(); |
138 | |
139 | rel = table_open(SubscriptionRelationId, AccessShareLock); |
140 | scan = table_beginscan_catalog(rel, 0, NULL); |
141 | |
142 | while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) |
143 | { |
144 | Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); |
145 | Subscription *sub; |
146 | MemoryContext oldcxt; |
147 | |
148 | /* |
149 | * Allocate our results in the caller's context, not the |
150 | * transaction's. We do this inside the loop, and restore the original |
151 | * context at the end, so that leaky things like heap_getnext() are |
152 | * not called in a potentially long-lived context. |
153 | */ |
154 | oldcxt = MemoryContextSwitchTo(resultcxt); |
155 | |
156 | sub = (Subscription *) palloc0(sizeof(Subscription)); |
157 | sub->oid = subform->oid; |
158 | sub->dbid = subform->subdbid; |
159 | sub->owner = subform->subowner; |
160 | sub->enabled = subform->subenabled; |
161 | sub->name = pstrdup(NameStr(subform->subname)); |
162 | /* We don't fill fields we are not interested in. */ |
163 | |
164 | res = lappend(res, sub); |
165 | MemoryContextSwitchTo(oldcxt); |
166 | } |
167 | |
168 | table_endscan(scan); |
169 | table_close(rel, AccessShareLock); |
170 | |
171 | CommitTransactionCommand(); |
172 | |
173 | return res; |
174 | } |
175 | |
176 | /* |
177 | * Wait for a background worker to start up and attach to the shmem context. |
178 | * |
179 | * This is only needed for cleaning up the shared memory in case the worker |
180 | * fails to attach. |
181 | */ |
182 | static void |
183 | WaitForReplicationWorkerAttach(LogicalRepWorker *worker, |
184 | uint16 generation, |
185 | BackgroundWorkerHandle *handle) |
186 | { |
187 | BgwHandleStatus status; |
188 | int rc; |
189 | |
190 | for (;;) |
191 | { |
192 | pid_t pid; |
193 | |
194 | CHECK_FOR_INTERRUPTS(); |
195 | |
196 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
197 | |
198 | /* Worker either died or has started; no need to do anything. */ |
199 | if (!worker->in_use || worker->proc) |
200 | { |
201 | LWLockRelease(LogicalRepWorkerLock); |
202 | return; |
203 | } |
204 | |
205 | LWLockRelease(LogicalRepWorkerLock); |
206 | |
207 | /* Check if worker has died before attaching, and clean up after it. */ |
208 | status = GetBackgroundWorkerPid(handle, &pid); |
209 | |
210 | if (status == BGWH_STOPPED) |
211 | { |
212 | LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
213 | /* Ensure that this was indeed the worker we waited for. */ |
214 | if (generation == worker->generation) |
215 | logicalrep_worker_cleanup(worker); |
216 | LWLockRelease(LogicalRepWorkerLock); |
217 | return; |
218 | } |
219 | |
220 | /* |
221 | * We need timeout because we generally don't get notified via latch |
222 | * about the worker attach. But we don't expect to have to wait long. |
223 | */ |
224 | rc = WaitLatch(MyLatch, |
225 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
226 | 10L, WAIT_EVENT_BGWORKER_STARTUP); |
227 | |
228 | if (rc & WL_LATCH_SET) |
229 | { |
230 | ResetLatch(MyLatch); |
231 | CHECK_FOR_INTERRUPTS(); |
232 | } |
233 | } |
234 | |
235 | return; |
236 | } |
237 | |
238 | /* |
239 | * Walks the workers array and searches for one that matches given |
240 | * subscription id and relid. |
241 | */ |
242 | LogicalRepWorker * |
243 | logicalrep_worker_find(Oid subid, Oid relid, bool only_running) |
244 | { |
245 | int i; |
246 | LogicalRepWorker *res = NULL; |
247 | |
248 | Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
249 | |
250 | /* Search for attached worker for a given subscription id. */ |
251 | for (i = 0; i < max_logical_replication_workers; i++) |
252 | { |
253 | LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
254 | |
255 | if (w->in_use && w->subid == subid && w->relid == relid && |
256 | (!only_running || w->proc)) |
257 | { |
258 | res = w; |
259 | break; |
260 | } |
261 | } |
262 | |
263 | return res; |
264 | } |
265 | |
266 | /* |
267 | * Similar to logicalrep_worker_find(), but returns list of all workers for |
268 | * the subscription, instead just one. |
269 | */ |
270 | List * |
271 | logicalrep_workers_find(Oid subid, bool only_running) |
272 | { |
273 | int i; |
274 | List *res = NIL; |
275 | |
276 | Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
277 | |
278 | /* Search for attached worker for a given subscription id. */ |
279 | for (i = 0; i < max_logical_replication_workers; i++) |
280 | { |
281 | LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
282 | |
283 | if (w->in_use && w->subid == subid && (!only_running || w->proc)) |
284 | res = lappend(res, w); |
285 | } |
286 | |
287 | return res; |
288 | } |
289 | |
290 | /* |
291 | * Start new apply background worker, if possible. |
292 | */ |
293 | void |
294 | logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, |
295 | Oid relid) |
296 | { |
297 | BackgroundWorker bgw; |
298 | BackgroundWorkerHandle *bgw_handle; |
299 | uint16 generation; |
300 | int i; |
301 | int slot = 0; |
302 | LogicalRepWorker *worker = NULL; |
303 | int nsyncworkers; |
304 | TimestampTz now; |
305 | |
306 | ereport(DEBUG1, |
307 | (errmsg("starting logical replication worker for subscription \"%s\"" , |
308 | subname))); |
309 | |
310 | /* Report this after the initial starting message for consistency. */ |
311 | if (max_replication_slots == 0) |
312 | ereport(ERROR, |
313 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
314 | errmsg("cannot start logical replication workers when max_replication_slots = 0" ))); |
315 | |
316 | /* |
317 | * We need to do the modification of the shared memory under lock so that |
318 | * we have consistent view. |
319 | */ |
320 | LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
321 | |
322 | retry: |
323 | /* Find unused worker slot. */ |
324 | for (i = 0; i < max_logical_replication_workers; i++) |
325 | { |
326 | LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
327 | |
328 | if (!w->in_use) |
329 | { |
330 | worker = w; |
331 | slot = i; |
332 | break; |
333 | } |
334 | } |
335 | |
336 | nsyncworkers = logicalrep_sync_worker_count(subid); |
337 | |
338 | now = GetCurrentTimestamp(); |
339 | |
340 | /* |
341 | * If we didn't find a free slot, try to do garbage collection. The |
342 | * reason we do this is because if some worker failed to start up and its |
343 | * parent has crashed while waiting, the in_use state was never cleared. |
344 | */ |
345 | if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) |
346 | { |
347 | bool did_cleanup = false; |
348 | |
349 | for (i = 0; i < max_logical_replication_workers; i++) |
350 | { |
351 | LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
352 | |
353 | /* |
354 | * If the worker was marked in use but didn't manage to attach in |
355 | * time, clean it up. |
356 | */ |
357 | if (w->in_use && !w->proc && |
358 | TimestampDifferenceExceeds(w->launch_time, now, |
359 | wal_receiver_timeout)) |
360 | { |
361 | elog(WARNING, |
362 | "logical replication worker for subscription %u took too long to start; canceled" , |
363 | w->subid); |
364 | |
365 | logicalrep_worker_cleanup(w); |
366 | did_cleanup = true; |
367 | } |
368 | } |
369 | |
370 | if (did_cleanup) |
371 | goto retry; |
372 | } |
373 | |
374 | /* |
375 | * If we reached the sync worker limit per subscription, just exit |
376 | * silently as we might get here because of an otherwise harmless race |
377 | * condition. |
378 | */ |
379 | if (nsyncworkers >= max_sync_workers_per_subscription) |
380 | { |
381 | LWLockRelease(LogicalRepWorkerLock); |
382 | return; |
383 | } |
384 | |
385 | /* |
386 | * However if there are no more free worker slots, inform user about it |
387 | * before exiting. |
388 | */ |
389 | if (worker == NULL) |
390 | { |
391 | LWLockRelease(LogicalRepWorkerLock); |
392 | ereport(WARNING, |
393 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
394 | errmsg("out of logical replication worker slots" ), |
395 | errhint("You might need to increase max_logical_replication_workers." ))); |
396 | return; |
397 | } |
398 | |
399 | /* Prepare the worker slot. */ |
400 | worker->launch_time = now; |
401 | worker->in_use = true; |
402 | worker->generation++; |
403 | worker->proc = NULL; |
404 | worker->dbid = dbid; |
405 | worker->userid = userid; |
406 | worker->subid = subid; |
407 | worker->relid = relid; |
408 | worker->relstate = SUBREL_STATE_UNKNOWN; |
409 | worker->relstate_lsn = InvalidXLogRecPtr; |
410 | worker->last_lsn = InvalidXLogRecPtr; |
411 | TIMESTAMP_NOBEGIN(worker->last_send_time); |
412 | TIMESTAMP_NOBEGIN(worker->last_recv_time); |
413 | worker->reply_lsn = InvalidXLogRecPtr; |
414 | TIMESTAMP_NOBEGIN(worker->reply_time); |
415 | |
416 | /* Before releasing lock, remember generation for future identification. */ |
417 | generation = worker->generation; |
418 | |
419 | LWLockRelease(LogicalRepWorkerLock); |
420 | |
421 | /* Register the new dynamic worker. */ |
422 | memset(&bgw, 0, sizeof(bgw)); |
423 | bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | |
424 | BGWORKER_BACKEND_DATABASE_CONNECTION; |
425 | bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; |
426 | snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres" ); |
427 | snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain" ); |
428 | if (OidIsValid(relid)) |
429 | snprintf(bgw.bgw_name, BGW_MAXLEN, |
430 | "logical replication worker for subscription %u sync %u" , subid, relid); |
431 | else |
432 | snprintf(bgw.bgw_name, BGW_MAXLEN, |
433 | "logical replication worker for subscription %u" , subid); |
434 | snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker" ); |
435 | |
436 | bgw.bgw_restart_time = BGW_NEVER_RESTART; |
437 | bgw.bgw_notify_pid = MyProcPid; |
438 | bgw.bgw_main_arg = Int32GetDatum(slot); |
439 | |
440 | if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) |
441 | { |
442 | /* Failed to start worker, so clean up the worker slot. */ |
443 | LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
444 | Assert(generation == worker->generation); |
445 | logicalrep_worker_cleanup(worker); |
446 | LWLockRelease(LogicalRepWorkerLock); |
447 | |
448 | ereport(WARNING, |
449 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
450 | errmsg("out of background worker slots" ), |
451 | errhint("You might need to increase max_worker_processes." ))); |
452 | return; |
453 | } |
454 | |
455 | /* Now wait until it attaches. */ |
456 | WaitForReplicationWorkerAttach(worker, generation, bgw_handle); |
457 | } |
458 | |
459 | /* |
460 | * Stop the logical replication worker for subid/relid, if any, and wait until |
461 | * it detaches from the slot. |
462 | */ |
463 | void |
464 | logicalrep_worker_stop(Oid subid, Oid relid) |
465 | { |
466 | LogicalRepWorker *worker; |
467 | uint16 generation; |
468 | |
469 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
470 | |
471 | worker = logicalrep_worker_find(subid, relid, false); |
472 | |
473 | /* No worker, nothing to do. */ |
474 | if (!worker) |
475 | { |
476 | LWLockRelease(LogicalRepWorkerLock); |
477 | return; |
478 | } |
479 | |
480 | /* |
481 | * Remember which generation was our worker so we can check if what we see |
482 | * is still the same one. |
483 | */ |
484 | generation = worker->generation; |
485 | |
486 | /* |
487 | * If we found a worker but it does not have proc set then it is still |
488 | * starting up; wait for it to finish starting and then kill it. |
489 | */ |
490 | while (worker->in_use && !worker->proc) |
491 | { |
492 | int rc; |
493 | |
494 | LWLockRelease(LogicalRepWorkerLock); |
495 | |
496 | /* Wait a bit --- we don't expect to have to wait long. */ |
497 | rc = WaitLatch(MyLatch, |
498 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
499 | 10L, WAIT_EVENT_BGWORKER_STARTUP); |
500 | |
501 | if (rc & WL_LATCH_SET) |
502 | { |
503 | ResetLatch(MyLatch); |
504 | CHECK_FOR_INTERRUPTS(); |
505 | } |
506 | |
507 | /* Recheck worker status. */ |
508 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
509 | |
510 | /* |
511 | * Check whether the worker slot is no longer used, which would mean |
512 | * that the worker has exited, or whether the worker generation is |
513 | * different, meaning that a different worker has taken the slot. |
514 | */ |
515 | if (!worker->in_use || worker->generation != generation) |
516 | { |
517 | LWLockRelease(LogicalRepWorkerLock); |
518 | return; |
519 | } |
520 | |
521 | /* Worker has assigned proc, so it has started. */ |
522 | if (worker->proc) |
523 | break; |
524 | } |
525 | |
526 | /* Now terminate the worker ... */ |
527 | kill(worker->proc->pid, SIGTERM); |
528 | |
529 | /* ... and wait for it to die. */ |
530 | for (;;) |
531 | { |
532 | int rc; |
533 | |
534 | /* is it gone? */ |
535 | if (!worker->proc || worker->generation != generation) |
536 | break; |
537 | |
538 | LWLockRelease(LogicalRepWorkerLock); |
539 | |
540 | /* Wait a bit --- we don't expect to have to wait long. */ |
541 | rc = WaitLatch(MyLatch, |
542 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
543 | 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); |
544 | |
545 | if (rc & WL_LATCH_SET) |
546 | { |
547 | ResetLatch(MyLatch); |
548 | CHECK_FOR_INTERRUPTS(); |
549 | } |
550 | |
551 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
552 | } |
553 | |
554 | LWLockRelease(LogicalRepWorkerLock); |
555 | } |
556 | |
557 | /* |
558 | * Request worker for specified sub/rel to be stopped on commit. |
559 | */ |
560 | void |
561 | logicalrep_worker_stop_at_commit(Oid subid, Oid relid) |
562 | { |
563 | int nestDepth = GetCurrentTransactionNestLevel(); |
564 | LogicalRepWorkerId *wid; |
565 | MemoryContext oldctx; |
566 | |
567 | /* Make sure we store the info in context that survives until commit. */ |
568 | oldctx = MemoryContextSwitchTo(TopTransactionContext); |
569 | |
570 | /* Check that previous transactions were properly cleaned up. */ |
571 | Assert(on_commit_stop_workers == NULL || |
572 | nestDepth >= on_commit_stop_workers->nestDepth); |
573 | |
574 | /* |
575 | * Push a new stack element if we don't already have one for the current |
576 | * nestDepth. |
577 | */ |
578 | if (on_commit_stop_workers == NULL || |
579 | nestDepth > on_commit_stop_workers->nestDepth) |
580 | { |
581 | StopWorkersData *newdata = palloc(sizeof(StopWorkersData)); |
582 | |
583 | newdata->nestDepth = nestDepth; |
584 | newdata->workers = NIL; |
585 | newdata->parent = on_commit_stop_workers; |
586 | on_commit_stop_workers = newdata; |
587 | } |
588 | |
589 | /* |
590 | * Finally add a new worker into the worker list of the current |
591 | * subtransaction. |
592 | */ |
593 | wid = palloc(sizeof(LogicalRepWorkerId)); |
594 | wid->subid = subid; |
595 | wid->relid = relid; |
596 | on_commit_stop_workers->workers = |
597 | lappend(on_commit_stop_workers->workers, wid); |
598 | |
599 | MemoryContextSwitchTo(oldctx); |
600 | } |
601 | |
602 | /* |
603 | * Wake up (using latch) any logical replication worker for specified sub/rel. |
604 | */ |
605 | void |
606 | logicalrep_worker_wakeup(Oid subid, Oid relid) |
607 | { |
608 | LogicalRepWorker *worker; |
609 | |
610 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
611 | |
612 | worker = logicalrep_worker_find(subid, relid, true); |
613 | |
614 | if (worker) |
615 | logicalrep_worker_wakeup_ptr(worker); |
616 | |
617 | LWLockRelease(LogicalRepWorkerLock); |
618 | } |
619 | |
620 | /* |
621 | * Wake up (using latch) the specified logical replication worker. |
622 | * |
623 | * Caller must hold lock, else worker->proc could change under us. |
624 | */ |
625 | void |
626 | logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) |
627 | { |
628 | Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
629 | |
630 | SetLatch(&worker->proc->procLatch); |
631 | } |
632 | |
633 | /* |
634 | * Attach to a slot. |
635 | */ |
636 | void |
637 | logicalrep_worker_attach(int slot) |
638 | { |
639 | /* Block concurrent access. */ |
640 | LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
641 | |
642 | Assert(slot >= 0 && slot < max_logical_replication_workers); |
643 | MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; |
644 | |
645 | if (!MyLogicalRepWorker->in_use) |
646 | { |
647 | LWLockRelease(LogicalRepWorkerLock); |
648 | ereport(ERROR, |
649 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
650 | errmsg("logical replication worker slot %d is empty, cannot attach" , |
651 | slot))); |
652 | } |
653 | |
654 | if (MyLogicalRepWorker->proc) |
655 | { |
656 | LWLockRelease(LogicalRepWorkerLock); |
657 | ereport(ERROR, |
658 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
659 | errmsg("logical replication worker slot %d is already used by " |
660 | "another worker, cannot attach" , slot))); |
661 | } |
662 | |
663 | MyLogicalRepWorker->proc = MyProc; |
664 | before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); |
665 | |
666 | LWLockRelease(LogicalRepWorkerLock); |
667 | } |
668 | |
669 | /* |
670 | * Detach the worker (cleans up the worker info). |
671 | */ |
672 | static void |
673 | logicalrep_worker_detach(void) |
674 | { |
675 | /* Block concurrent access. */ |
676 | LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); |
677 | |
678 | logicalrep_worker_cleanup(MyLogicalRepWorker); |
679 | |
680 | LWLockRelease(LogicalRepWorkerLock); |
681 | } |
682 | |
683 | /* |
684 | * Clean up worker info. |
685 | */ |
686 | static void |
687 | logicalrep_worker_cleanup(LogicalRepWorker *worker) |
688 | { |
689 | Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); |
690 | |
691 | worker->in_use = false; |
692 | worker->proc = NULL; |
693 | worker->dbid = InvalidOid; |
694 | worker->userid = InvalidOid; |
695 | worker->subid = InvalidOid; |
696 | worker->relid = InvalidOid; |
697 | } |
698 | |
699 | /* |
700 | * Cleanup function for logical replication launcher. |
701 | * |
702 | * Called on logical replication launcher exit. |
703 | */ |
704 | static void |
705 | logicalrep_launcher_onexit(int code, Datum arg) |
706 | { |
707 | LogicalRepCtx->launcher_pid = 0; |
708 | } |
709 | |
710 | /* |
711 | * Cleanup function. |
712 | * |
713 | * Called on logical replication worker exit. |
714 | */ |
715 | static void |
716 | logicalrep_worker_onexit(int code, Datum arg) |
717 | { |
718 | /* Disconnect gracefully from the remote side. */ |
719 | if (wrconn) |
720 | walrcv_disconnect(wrconn); |
721 | |
722 | logicalrep_worker_detach(); |
723 | |
724 | ApplyLauncherWakeup(); |
725 | } |
726 | |
727 | /* SIGHUP: set flag to reload configuration at next convenient time */ |
728 | static void |
729 | logicalrep_launcher_sighup(SIGNAL_ARGS) |
730 | { |
731 | int save_errno = errno; |
732 | |
733 | got_SIGHUP = true; |
734 | |
735 | /* Waken anything waiting on the process latch */ |
736 | SetLatch(MyLatch); |
737 | |
738 | errno = save_errno; |
739 | } |
740 | |
741 | /* |
742 | * Count the number of registered (not necessarily running) sync workers |
743 | * for a subscription. |
744 | */ |
745 | int |
746 | logicalrep_sync_worker_count(Oid subid) |
747 | { |
748 | int i; |
749 | int res = 0; |
750 | |
751 | Assert(LWLockHeldByMe(LogicalRepWorkerLock)); |
752 | |
753 | /* Search for attached worker for a given subscription id. */ |
754 | for (i = 0; i < max_logical_replication_workers; i++) |
755 | { |
756 | LogicalRepWorker *w = &LogicalRepCtx->workers[i]; |
757 | |
758 | if (w->subid == subid && OidIsValid(w->relid)) |
759 | res++; |
760 | } |
761 | |
762 | return res; |
763 | } |
764 | |
765 | /* |
766 | * ApplyLauncherShmemSize |
767 | * Compute space needed for replication launcher shared memory |
768 | */ |
769 | Size |
770 | ApplyLauncherShmemSize(void) |
771 | { |
772 | Size size; |
773 | |
774 | /* |
775 | * Need the fixed struct and the array of LogicalRepWorker. |
776 | */ |
777 | size = sizeof(LogicalRepCtxStruct); |
778 | size = MAXALIGN(size); |
779 | size = add_size(size, mul_size(max_logical_replication_workers, |
780 | sizeof(LogicalRepWorker))); |
781 | return size; |
782 | } |
783 | |
784 | /* |
785 | * ApplyLauncherRegister |
786 | * Register a background worker running the logical replication launcher. |
787 | */ |
788 | void |
789 | ApplyLauncherRegister(void) |
790 | { |
791 | BackgroundWorker bgw; |
792 | |
793 | if (max_logical_replication_workers == 0) |
794 | return; |
795 | |
796 | memset(&bgw, 0, sizeof(bgw)); |
797 | bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | |
798 | BGWORKER_BACKEND_DATABASE_CONNECTION; |
799 | bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; |
800 | snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres" ); |
801 | snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain" ); |
802 | snprintf(bgw.bgw_name, BGW_MAXLEN, |
803 | "logical replication launcher" ); |
804 | snprintf(bgw.bgw_type, BGW_MAXLEN, |
805 | "logical replication launcher" ); |
806 | bgw.bgw_restart_time = 5; |
807 | bgw.bgw_notify_pid = 0; |
808 | bgw.bgw_main_arg = (Datum) 0; |
809 | |
810 | RegisterBackgroundWorker(&bgw); |
811 | } |
812 | |
813 | /* |
814 | * ApplyLauncherShmemInit |
815 | * Allocate and initialize replication launcher shared memory |
816 | */ |
817 | void |
818 | ApplyLauncherShmemInit(void) |
819 | { |
820 | bool found; |
821 | |
822 | LogicalRepCtx = (LogicalRepCtxStruct *) |
823 | ShmemInitStruct("Logical Replication Launcher Data" , |
824 | ApplyLauncherShmemSize(), |
825 | &found); |
826 | |
827 | if (!found) |
828 | { |
829 | int slot; |
830 | |
831 | memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); |
832 | |
833 | /* Initialize memory and spin locks for each worker slot. */ |
834 | for (slot = 0; slot < max_logical_replication_workers; slot++) |
835 | { |
836 | LogicalRepWorker *worker = &LogicalRepCtx->workers[slot]; |
837 | |
838 | memset(worker, 0, sizeof(LogicalRepWorker)); |
839 | SpinLockInit(&worker->relmutex); |
840 | } |
841 | } |
842 | } |
843 | |
844 | /* |
845 | * Check whether current transaction has manipulated logical replication |
846 | * workers. |
847 | */ |
848 | bool |
849 | XactManipulatesLogicalReplicationWorkers(void) |
850 | { |
851 | return (on_commit_stop_workers != NULL); |
852 | } |
853 | |
854 | /* |
855 | * Wakeup the launcher on commit if requested. |
856 | */ |
857 | void |
858 | AtEOXact_ApplyLauncher(bool isCommit) |
859 | { |
860 | |
861 | Assert(on_commit_stop_workers == NULL || |
862 | (on_commit_stop_workers->nestDepth == 1 && |
863 | on_commit_stop_workers->parent == NULL)); |
864 | |
865 | if (isCommit) |
866 | { |
867 | ListCell *lc; |
868 | |
869 | if (on_commit_stop_workers != NULL) |
870 | { |
871 | List *workers = on_commit_stop_workers->workers; |
872 | |
873 | foreach(lc, workers) |
874 | { |
875 | LogicalRepWorkerId *wid = lfirst(lc); |
876 | |
877 | logicalrep_worker_stop(wid->subid, wid->relid); |
878 | } |
879 | } |
880 | |
881 | if (on_commit_launcher_wakeup) |
882 | ApplyLauncherWakeup(); |
883 | } |
884 | |
885 | /* |
886 | * No need to pfree on_commit_stop_workers. It was allocated in |
887 | * transaction memory context, which is going to be cleaned soon. |
888 | */ |
889 | on_commit_stop_workers = NULL; |
890 | on_commit_launcher_wakeup = false; |
891 | } |
892 | |
893 | /* |
894 | * On commit, merge the current on_commit_stop_workers list into the |
895 | * immediate parent, if present. |
896 | * On rollback, discard the current on_commit_stop_workers list. |
897 | * Pop out the stack. |
898 | */ |
899 | void |
900 | AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth) |
901 | { |
902 | StopWorkersData *parent; |
903 | |
904 | /* Exit immediately if there's no work to do at this level. */ |
905 | if (on_commit_stop_workers == NULL || |
906 | on_commit_stop_workers->nestDepth < nestDepth) |
907 | return; |
908 | |
909 | Assert(on_commit_stop_workers->nestDepth == nestDepth); |
910 | |
911 | parent = on_commit_stop_workers->parent; |
912 | |
913 | if (isCommit) |
914 | { |
915 | /* |
916 | * If the upper stack element is not an immediate parent |
917 | * subtransaction, just decrement the notional nesting depth without |
918 | * doing any real work. Else, we need to merge the current workers |
919 | * list into the parent. |
920 | */ |
921 | if (!parent || parent->nestDepth < nestDepth - 1) |
922 | { |
923 | on_commit_stop_workers->nestDepth--; |
924 | return; |
925 | } |
926 | |
927 | parent->workers = |
928 | list_concat(parent->workers, on_commit_stop_workers->workers); |
929 | } |
930 | else |
931 | { |
932 | /* |
933 | * Abandon everything that was done at this nesting level. Explicitly |
934 | * free memory to avoid a transaction-lifespan leak. |
935 | */ |
936 | list_free_deep(on_commit_stop_workers->workers); |
937 | } |
938 | |
939 | /* |
940 | * We have taken care of the current subtransaction workers list for both |
941 | * abort or commit. So we are ready to pop the stack. |
942 | */ |
943 | pfree(on_commit_stop_workers); |
944 | on_commit_stop_workers = parent; |
945 | } |
946 | |
947 | /* |
948 | * Request wakeup of the launcher on commit of the transaction. |
949 | * |
950 | * This is used to send launcher signal to stop sleeping and process the |
951 | * subscriptions when current transaction commits. Should be used when new |
952 | * tuple was added to the pg_subscription catalog. |
953 | */ |
954 | void |
955 | ApplyLauncherWakeupAtCommit(void) |
956 | { |
957 | if (!on_commit_launcher_wakeup) |
958 | on_commit_launcher_wakeup = true; |
959 | } |
960 | |
961 | static void |
962 | ApplyLauncherWakeup(void) |
963 | { |
964 | if (LogicalRepCtx->launcher_pid != 0) |
965 | kill(LogicalRepCtx->launcher_pid, SIGUSR1); |
966 | } |
967 | |
968 | /* |
969 | * Main loop for the apply launcher process. |
970 | */ |
971 | void |
972 | ApplyLauncherMain(Datum main_arg) |
973 | { |
974 | TimestampTz last_start_time = 0; |
975 | |
976 | ereport(DEBUG1, |
977 | (errmsg("logical replication launcher started" ))); |
978 | |
979 | before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); |
980 | |
981 | Assert(LogicalRepCtx->launcher_pid == 0); |
982 | LogicalRepCtx->launcher_pid = MyProcPid; |
983 | |
984 | /* Establish signal handlers. */ |
985 | pqsignal(SIGHUP, logicalrep_launcher_sighup); |
986 | pqsignal(SIGTERM, die); |
987 | BackgroundWorkerUnblockSignals(); |
988 | |
989 | /* |
990 | * Establish connection to nailed catalogs (we only ever access |
991 | * pg_subscription). |
992 | */ |
993 | BackgroundWorkerInitializeConnection(NULL, NULL, 0); |
994 | |
995 | /* Enter main loop */ |
996 | for (;;) |
997 | { |
998 | int rc; |
999 | List *sublist; |
1000 | ListCell *lc; |
1001 | MemoryContext subctx; |
1002 | MemoryContext oldctx; |
1003 | TimestampTz now; |
1004 | long wait_time = DEFAULT_NAPTIME_PER_CYCLE; |
1005 | |
1006 | CHECK_FOR_INTERRUPTS(); |
1007 | |
1008 | now = GetCurrentTimestamp(); |
1009 | |
1010 | /* Limit the start retry to once a wal_retrieve_retry_interval */ |
1011 | if (TimestampDifferenceExceeds(last_start_time, now, |
1012 | wal_retrieve_retry_interval)) |
1013 | { |
1014 | /* Use temporary context for the database list and worker info. */ |
1015 | subctx = AllocSetContextCreate(TopMemoryContext, |
1016 | "Logical Replication Launcher sublist" , |
1017 | ALLOCSET_DEFAULT_SIZES); |
1018 | oldctx = MemoryContextSwitchTo(subctx); |
1019 | |
1020 | /* search for subscriptions to start or stop. */ |
1021 | sublist = get_subscription_list(); |
1022 | |
1023 | /* Start the missing workers for enabled subscriptions. */ |
1024 | foreach(lc, sublist) |
1025 | { |
1026 | Subscription *sub = (Subscription *) lfirst(lc); |
1027 | LogicalRepWorker *w; |
1028 | |
1029 | if (!sub->enabled) |
1030 | continue; |
1031 | |
1032 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
1033 | w = logicalrep_worker_find(sub->oid, InvalidOid, false); |
1034 | LWLockRelease(LogicalRepWorkerLock); |
1035 | |
1036 | if (w == NULL) |
1037 | { |
1038 | last_start_time = now; |
1039 | wait_time = wal_retrieve_retry_interval; |
1040 | |
1041 | logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, |
1042 | sub->owner, InvalidOid); |
1043 | } |
1044 | } |
1045 | |
1046 | /* Switch back to original memory context. */ |
1047 | MemoryContextSwitchTo(oldctx); |
1048 | /* Clean the temporary memory. */ |
1049 | MemoryContextDelete(subctx); |
1050 | } |
1051 | else |
1052 | { |
1053 | /* |
1054 | * The wait in previous cycle was interrupted in less than |
1055 | * wal_retrieve_retry_interval since last worker was started, this |
1056 | * usually means crash of the worker, so we should retry in |
1057 | * wal_retrieve_retry_interval again. |
1058 | */ |
1059 | wait_time = wal_retrieve_retry_interval; |
1060 | } |
1061 | |
1062 | /* Wait for more work. */ |
1063 | rc = WaitLatch(MyLatch, |
1064 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
1065 | wait_time, |
1066 | WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); |
1067 | |
1068 | if (rc & WL_LATCH_SET) |
1069 | { |
1070 | ResetLatch(MyLatch); |
1071 | CHECK_FOR_INTERRUPTS(); |
1072 | } |
1073 | |
1074 | if (got_SIGHUP) |
1075 | { |
1076 | got_SIGHUP = false; |
1077 | ProcessConfigFile(PGC_SIGHUP); |
1078 | } |
1079 | } |
1080 | |
1081 | /* Not reachable */ |
1082 | } |
1083 | |
1084 | /* |
1085 | * Is current process the logical replication launcher? |
1086 | */ |
1087 | bool |
1088 | IsLogicalLauncher(void) |
1089 | { |
1090 | return LogicalRepCtx->launcher_pid == MyProcPid; |
1091 | } |
1092 | |
1093 | /* |
1094 | * Returns state of the subscriptions. |
1095 | */ |
1096 | Datum |
1097 | pg_stat_get_subscription(PG_FUNCTION_ARGS) |
1098 | { |
1099 | #define PG_STAT_GET_SUBSCRIPTION_COLS 8 |
1100 | Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); |
1101 | int i; |
1102 | ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
1103 | TupleDesc tupdesc; |
1104 | Tuplestorestate *tupstore; |
1105 | MemoryContext per_query_ctx; |
1106 | MemoryContext oldcontext; |
1107 | |
1108 | /* check to see if caller supports us returning a tuplestore */ |
1109 | if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) |
1110 | ereport(ERROR, |
1111 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
1112 | errmsg("set-valued function called in context that cannot accept a set" ))); |
1113 | if (!(rsinfo->allowedModes & SFRM_Materialize)) |
1114 | ereport(ERROR, |
1115 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
1116 | errmsg("materialize mode required, but it is not " \ |
1117 | "allowed in this context" ))); |
1118 | |
1119 | /* Build a tuple descriptor for our result type */ |
1120 | if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
1121 | elog(ERROR, "return type must be a row type" ); |
1122 | |
1123 | per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; |
1124 | oldcontext = MemoryContextSwitchTo(per_query_ctx); |
1125 | |
1126 | tupstore = tuplestore_begin_heap(true, false, work_mem); |
1127 | rsinfo->returnMode = SFRM_Materialize; |
1128 | rsinfo->setResult = tupstore; |
1129 | rsinfo->setDesc = tupdesc; |
1130 | |
1131 | MemoryContextSwitchTo(oldcontext); |
1132 | |
1133 | /* Make sure we get consistent view of the workers. */ |
1134 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
1135 | |
1136 | for (i = 0; i <= max_logical_replication_workers; i++) |
1137 | { |
1138 | /* for each row */ |
1139 | Datum values[PG_STAT_GET_SUBSCRIPTION_COLS]; |
1140 | bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS]; |
1141 | int worker_pid; |
1142 | LogicalRepWorker worker; |
1143 | |
1144 | memcpy(&worker, &LogicalRepCtx->workers[i], |
1145 | sizeof(LogicalRepWorker)); |
1146 | if (!worker.proc || !IsBackendPid(worker.proc->pid)) |
1147 | continue; |
1148 | |
1149 | if (OidIsValid(subid) && worker.subid != subid) |
1150 | continue; |
1151 | |
1152 | worker_pid = worker.proc->pid; |
1153 | |
1154 | MemSet(values, 0, sizeof(values)); |
1155 | MemSet(nulls, 0, sizeof(nulls)); |
1156 | |
1157 | values[0] = ObjectIdGetDatum(worker.subid); |
1158 | if (OidIsValid(worker.relid)) |
1159 | values[1] = ObjectIdGetDatum(worker.relid); |
1160 | else |
1161 | nulls[1] = true; |
1162 | values[2] = Int32GetDatum(worker_pid); |
1163 | if (XLogRecPtrIsInvalid(worker.last_lsn)) |
1164 | nulls[3] = true; |
1165 | else |
1166 | values[3] = LSNGetDatum(worker.last_lsn); |
1167 | if (worker.last_send_time == 0) |
1168 | nulls[4] = true; |
1169 | else |
1170 | values[4] = TimestampTzGetDatum(worker.last_send_time); |
1171 | if (worker.last_recv_time == 0) |
1172 | nulls[5] = true; |
1173 | else |
1174 | values[5] = TimestampTzGetDatum(worker.last_recv_time); |
1175 | if (XLogRecPtrIsInvalid(worker.reply_lsn)) |
1176 | nulls[6] = true; |
1177 | else |
1178 | values[6] = LSNGetDatum(worker.reply_lsn); |
1179 | if (worker.reply_time == 0) |
1180 | nulls[7] = true; |
1181 | else |
1182 | values[7] = TimestampTzGetDatum(worker.reply_time); |
1183 | |
1184 | tuplestore_putvalues(tupstore, tupdesc, values, nulls); |
1185 | |
1186 | /* |
1187 | * If only a single subscription was requested, and we found it, |
1188 | * break. |
1189 | */ |
1190 | if (OidIsValid(subid)) |
1191 | break; |
1192 | } |
1193 | |
1194 | LWLockRelease(LogicalRepWorkerLock); |
1195 | |
1196 | /* clean up and return the tuplestore */ |
1197 | tuplestore_donestoring(tupstore); |
1198 | |
1199 | return (Datum) 0; |
1200 | } |
1201 | |