1/*-------------------------------------------------------------------------
2 * tablesync.c
3 * PostgreSQL logical replication
4 *
5 * Copyright (c) 2012-2019, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/tablesync.c
9 *
10 * NOTES
11 * This file contains code for initial table data synchronization for
12 * logical replication.
13 *
14 * The initial data synchronization is done separately for each table,
15 * in a separate apply worker that only fetches the initial snapshot data
16 * from the publisher and then synchronizes the position in the stream with
17 * the main apply worker.
18 *
19 * There are several reasons for doing the synchronization this way:
20 * - It allows us to parallelize the initial data synchronization
21 * which lowers the time needed for it to happen.
22 * - The initial synchronization does not have to hold the xid and LSN
23 * for the time it takes to copy data of all tables, causing less
24 * bloat and lower disk consumption compared to doing the
25 * synchronization in a single process for the whole database.
26 * - It allows us to synchronize any tables added after the initial
27 * synchronization has finished.
28 *
29 * The stream position synchronization works in multiple steps.
30 * - Sync finishes copy and sets worker state as SYNCWAIT and waits for
31 * state to change in a loop.
32 * - Apply periodically checks tables that are synchronizing for SYNCWAIT.
33 * When the desired state appears, it will set the worker state to
34 * CATCHUP and starts loop-waiting until either the table state is set
35 * to SYNCDONE or the sync worker exits.
36 * - After the sync worker has seen the state change to CATCHUP, it will
37 * read the stream and apply changes (acting like an apply worker) until
38 * it catches up to the specified stream position. Then it sets the
39 * state to SYNCDONE. There might be zero changes applied between
40 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
41 * apply worker.
42 * - Once the state was set to SYNCDONE, the apply will continue tracking
43 * the table until it reaches the SYNCDONE stream position, at which
44 * point it sets state to READY and stops tracking. Again, there might
45 * be zero changes in between.
46 *
47 * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
48 * SYNCDONE -> READY.
49 *
50 * The catalog pg_subscription_rel is used to keep information about
51 * subscribed tables and their state. Some transient state during data
52 * synchronization is kept in shared memory. The states SYNCWAIT and
53 * CATCHUP only appear in memory.
54 *
55 * Example flows look like this:
56 * - Apply is in front:
57 * sync:8
58 * -> set in memory SYNCWAIT
59 * apply:10
60 * -> set in memory CATCHUP
61 * -> enter wait-loop
62 * sync:10
63 * -> set in catalog SYNCDONE
64 * -> exit
65 * apply:10
66 * -> exit wait-loop
67 * -> continue rep
68 * apply:11
69 * -> set in catalog READY
70 * - Sync in front:
71 * sync:10
72 * -> set in memory SYNCWAIT
73 * apply:8
74 * -> set in memory CATCHUP
75 * -> continue per-table filtering
76 * sync:10
77 * -> set in catalog SYNCDONE
78 * -> exit
79 * apply:10
80 * -> set in catalog READY
81 * -> stop per-table filtering
82 * -> continue rep
83 *-------------------------------------------------------------------------
84 */
85
86#include "postgres.h"
87
88#include "miscadmin.h"
89#include "pgstat.h"
90
91#include "access/table.h"
92#include "access/xact.h"
93
94#include "catalog/pg_subscription_rel.h"
95#include "catalog/pg_type.h"
96
97#include "commands/copy.h"
98
99#include "parser/parse_relation.h"
100
101#include "replication/logicallauncher.h"
102#include "replication/logicalrelation.h"
103#include "replication/walreceiver.h"
104#include "replication/worker_internal.h"
105
106#include "utils/snapmgr.h"
107#include "storage/ipc.h"
108
109#include "utils/builtins.h"
110#include "utils/lsyscache.h"
111#include "utils/memutils.h"
112
113static bool table_states_valid = false;
114
115StringInfo copybuf = NULL;
116
117/*
118 * Exit routine for synchronization worker.
119 */
120static void
121pg_attribute_noreturn()
122finish_sync_worker(void)
123{
124 /*
125 * Commit any outstanding transaction. This is the usual case, unless
126 * there was nothing to do for the table.
127 */
128 if (IsTransactionState())
129 {
130 CommitTransactionCommand();
131 pgstat_report_stat(false);
132 }
133
134 /* And flush all writes. */
135 XLogFlush(GetXLogWriteRecPtr());
136
137 StartTransactionCommand();
138 ereport(LOG,
139 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
140 MySubscription->name,
141 get_rel_name(MyLogicalRepWorker->relid))));
142 CommitTransactionCommand();
143
144 /* Find the main apply worker and signal it. */
145 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
146
147 /* Stop gracefully */
148 proc_exit(0);
149}
150
151/*
152 * Wait until the relation synchronization state is set in the catalog to the
153 * expected one.
154 *
155 * Used when transitioning from CATCHUP state to SYNCDONE.
156 *
157 * Returns false if the synchronization worker has disappeared or the table state
158 * has been reset.
159 */
160static bool
161wait_for_relation_state_change(Oid relid, char expected_state)
162{
163 char state;
164
165 for (;;)
166 {
167 LogicalRepWorker *worker;
168 XLogRecPtr statelsn;
169
170 CHECK_FOR_INTERRUPTS();
171
172 /* XXX use cache invalidation here to improve performance? */
173 PushActiveSnapshot(GetLatestSnapshot());
174 state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
175 relid, &statelsn, true);
176 PopActiveSnapshot();
177
178 if (state == SUBREL_STATE_UNKNOWN)
179 return false;
180
181 if (state == expected_state)
182 return true;
183
184 /* Check if the sync worker is still running and bail if not. */
185 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
186
187 /* Check if the opposite worker is still running and bail if not. */
188 worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
189 am_tablesync_worker() ? InvalidOid : relid,
190 false);
191 LWLockRelease(LogicalRepWorkerLock);
192 if (!worker)
193 return false;
194
195 (void) WaitLatch(MyLatch,
196 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
197 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
198
199 ResetLatch(MyLatch);
200 }
201
202 return false;
203}
204
205/*
206 * Wait until the apply worker changes the state of our synchronization
207 * worker to the expected one.
208 *
209 * Used when transitioning from SYNCWAIT state to CATCHUP.
210 *
211 * Returns false if the apply worker has disappeared.
212 */
213static bool
214wait_for_worker_state_change(char expected_state)
215{
216 int rc;
217
218 for (;;)
219 {
220 LogicalRepWorker *worker;
221
222 CHECK_FOR_INTERRUPTS();
223
224 /*
225 * Done if already in correct state. (We assume this fetch is atomic
226 * enough to not give a misleading answer if we do it with no lock.)
227 */
228 if (MyLogicalRepWorker->relstate == expected_state)
229 return true;
230
231 /*
232 * Bail out if the apply worker has died, else signal it we're
233 * waiting.
234 */
235 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
236 worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
237 InvalidOid, false);
238 if (worker && worker->proc)
239 logicalrep_worker_wakeup_ptr(worker);
240 LWLockRelease(LogicalRepWorkerLock);
241 if (!worker)
242 break;
243
244 /*
245 * Wait. We expect to get a latch signal back from the apply worker,
246 * but use a timeout in case it dies without sending one.
247 */
248 rc = WaitLatch(MyLatch,
249 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
250 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
251
252 if (rc & WL_LATCH_SET)
253 ResetLatch(MyLatch);
254 }
255
256 return false;
257}
258
259/*
260 * Callback from syscache invalidation.
261 */
262void
263invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
264{
265 table_states_valid = false;
266}
267
268/*
269 * Handle table synchronization cooperation from the synchronization
270 * worker.
271 *
272 * If the sync worker is in CATCHUP state and reached (or passed) the
273 * predetermined synchronization point in the WAL stream, mark the table as
274 * SYNCDONE and finish.
275 */
276static void
277process_syncing_tables_for_sync(XLogRecPtr current_lsn)
278{
279 Assert(IsTransactionState());
280
281 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
282
283 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
284 current_lsn >= MyLogicalRepWorker->relstate_lsn)
285 {
286 TimeLineID tli;
287
288 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
289 MyLogicalRepWorker->relstate_lsn = current_lsn;
290
291 SpinLockRelease(&MyLogicalRepWorker->relmutex);
292
293 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
294 MyLogicalRepWorker->relid,
295 MyLogicalRepWorker->relstate,
296 MyLogicalRepWorker->relstate_lsn);
297
298 walrcv_endstreaming(wrconn, &tli);
299 finish_sync_worker();
300 }
301 else
302 SpinLockRelease(&MyLogicalRepWorker->relmutex);
303}
304
305/*
306 * Handle table synchronization cooperation from the apply worker.
307 *
308 * Walk over all subscription tables that are individually tracked by the
309 * apply process (currently, all that have state other than
310 * SUBREL_STATE_READY) and manage synchronization for them.
311 *
312 * If there are tables that need synchronizing and are not being synchronized
313 * yet, start sync workers for them (if there are free slots for sync
314 * workers). To prevent starting the sync worker for the same relation at a
315 * high frequency after a failure, we store its last start time with each sync
316 * state info. We start the sync worker for the same relation after waiting
317 * at least wal_retrieve_retry_interval.
318 *
319 * For tables that are being synchronized already, check if sync workers
320 * either need action from the apply worker or have finished. This is the
321 * SYNCWAIT to CATCHUP transition.
322 *
323 * If the synchronization position is reached (SYNCDONE), then the table can
324 * be marked as READY and is no longer tracked.
325 */
326static void
327process_syncing_tables_for_apply(XLogRecPtr current_lsn)
328{
329 struct tablesync_start_time_mapping
330 {
331 Oid relid;
332 TimestampTz last_start_time;
333 };
334 static List *table_states = NIL;
335 static HTAB *last_start_times = NULL;
336 ListCell *lc;
337 bool started_tx = false;
338
339 Assert(!IsTransactionState());
340
341 /* We need up-to-date sync state info for subscription tables here. */
342 if (!table_states_valid)
343 {
344 MemoryContext oldctx;
345 List *rstates;
346 ListCell *lc;
347 SubscriptionRelState *rstate;
348
349 /* Clean the old list. */
350 list_free_deep(table_states);
351 table_states = NIL;
352
353 StartTransactionCommand();
354 started_tx = true;
355
356 /* Fetch all non-ready tables. */
357 rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
358
359 /* Allocate the tracking info in a permanent memory context. */
360 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
361 foreach(lc, rstates)
362 {
363 rstate = palloc(sizeof(SubscriptionRelState));
364 memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
365 table_states = lappend(table_states, rstate);
366 }
367 MemoryContextSwitchTo(oldctx);
368
369 table_states_valid = true;
370 }
371
372 /*
373 * Prepare a hash table for tracking last start times of workers, to avoid
374 * immediate restarts. We don't need it if there are no tables that need
375 * syncing.
376 */
377 if (table_states && !last_start_times)
378 {
379 HASHCTL ctl;
380
381 memset(&ctl, 0, sizeof(ctl));
382 ctl.keysize = sizeof(Oid);
383 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
384 last_start_times = hash_create("Logical replication table sync worker start times",
385 256, &ctl, HASH_ELEM | HASH_BLOBS);
386 }
387
388 /*
389 * Clean up the hash table when we're done with all tables (just to
390 * release the bit of memory).
391 */
392 else if (!table_states && last_start_times)
393 {
394 hash_destroy(last_start_times);
395 last_start_times = NULL;
396 }
397
398 /*
399 * Process all tables that are being synchronized.
400 */
401 foreach(lc, table_states)
402 {
403 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
404
405 if (rstate->state == SUBREL_STATE_SYNCDONE)
406 {
407 /*
408 * Apply has caught up to the position where the table sync has
409 * finished. Mark the table as ready so that the apply will just
410 * continue to replicate it normally.
411 */
412 if (current_lsn >= rstate->lsn)
413 {
414 rstate->state = SUBREL_STATE_READY;
415 rstate->lsn = current_lsn;
416 if (!started_tx)
417 {
418 StartTransactionCommand();
419 started_tx = true;
420 }
421
422 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
423 rstate->relid, rstate->state,
424 rstate->lsn);
425 }
426 }
427 else
428 {
429 LogicalRepWorker *syncworker;
430
431 /*
432 * Look for a sync worker for this relation.
433 */
434 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
435
436 syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
437 rstate->relid, false);
438
439 if (syncworker)
440 {
441 /* Found one, update our copy of its state */
442 SpinLockAcquire(&syncworker->relmutex);
443 rstate->state = syncworker->relstate;
444 rstate->lsn = syncworker->relstate_lsn;
445 if (rstate->state == SUBREL_STATE_SYNCWAIT)
446 {
447 /*
448 * Sync worker is waiting for apply. Tell sync worker it
449 * can catchup now.
450 */
451 syncworker->relstate = SUBREL_STATE_CATCHUP;
452 syncworker->relstate_lsn =
453 Max(syncworker->relstate_lsn, current_lsn);
454 }
455 SpinLockRelease(&syncworker->relmutex);
456
457 /* If we told worker to catch up, wait for it. */
458 if (rstate->state == SUBREL_STATE_SYNCWAIT)
459 {
460 /* Signal the sync worker, as it may be waiting for us. */
461 if (syncworker->proc)
462 logicalrep_worker_wakeup_ptr(syncworker);
463
464 /* Now safe to release the LWLock */
465 LWLockRelease(LogicalRepWorkerLock);
466
467 /*
468 * Enter busy loop and wait for synchronization worker to
469 * reach expected state (or die trying).
470 */
471 if (!started_tx)
472 {
473 StartTransactionCommand();
474 started_tx = true;
475 }
476
477 wait_for_relation_state_change(rstate->relid,
478 SUBREL_STATE_SYNCDONE);
479 }
480 else
481 LWLockRelease(LogicalRepWorkerLock);
482 }
483 else
484 {
485 /*
486 * If there is no sync worker for this table yet, count
487 * running sync workers for this subscription, while we have
488 * the lock.
489 */
490 int nsyncworkers =
491 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
492
493 /* Now safe to release the LWLock */
494 LWLockRelease(LogicalRepWorkerLock);
495
496 /*
497 * If there are free sync worker slot(s), start a new sync
498 * worker for the table.
499 */
500 if (nsyncworkers < max_sync_workers_per_subscription)
501 {
502 TimestampTz now = GetCurrentTimestamp();
503 struct tablesync_start_time_mapping *hentry;
504 bool found;
505
506 hentry = hash_search(last_start_times, &rstate->relid,
507 HASH_ENTER, &found);
508
509 if (!found ||
510 TimestampDifferenceExceeds(hentry->last_start_time, now,
511 wal_retrieve_retry_interval))
512 {
513 logicalrep_worker_launch(MyLogicalRepWorker->dbid,
514 MySubscription->oid,
515 MySubscription->name,
516 MyLogicalRepWorker->userid,
517 rstate->relid);
518 hentry->last_start_time = now;
519 }
520 }
521 }
522 }
523 }
524
525 if (started_tx)
526 {
527 CommitTransactionCommand();
528 pgstat_report_stat(false);
529 }
530}
531
532/*
533 * Process possible state change(s) of tables that are being synchronized.
534 */
535void
536process_syncing_tables(XLogRecPtr current_lsn)
537{
538 if (am_tablesync_worker())
539 process_syncing_tables_for_sync(current_lsn);
540 else
541 process_syncing_tables_for_apply(current_lsn);
542}
543
544/*
545 * Create list of columns for COPY based on logical relation mapping.
546 */
547static List *
548make_copy_attnamelist(LogicalRepRelMapEntry *rel)
549{
550 List *attnamelist = NIL;
551 int i;
552
553 for (i = 0; i < rel->remoterel.natts; i++)
554 {
555 attnamelist = lappend(attnamelist,
556 makeString(rel->remoterel.attnames[i]));
557 }
558
559
560 return attnamelist;
561}
562
563/*
564 * Data source callback for the COPY FROM, which reads from the remote
565 * connection and passes the data back to our local COPY.
566 */
567static int
568copy_read_data(void *outbuf, int minread, int maxread)
569{
570 int bytesread = 0;
571 int avail;
572
573 /* If there are some leftover data from previous read, use it. */
574 avail = copybuf->len - copybuf->cursor;
575 if (avail)
576 {
577 if (avail > maxread)
578 avail = maxread;
579 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
580 copybuf->cursor += avail;
581 maxread -= avail;
582 bytesread += avail;
583 }
584
585 while (maxread > 0 && bytesread < minread)
586 {
587 pgsocket fd = PGINVALID_SOCKET;
588 int len;
589 char *buf = NULL;
590
591 for (;;)
592 {
593 /* Try read the data. */
594 len = walrcv_receive(wrconn, &buf, &fd);
595
596 CHECK_FOR_INTERRUPTS();
597
598 if (len == 0)
599 break;
600 else if (len < 0)
601 return bytesread;
602 else
603 {
604 /* Process the data */
605 copybuf->data = buf;
606 copybuf->len = len;
607 copybuf->cursor = 0;
608
609 avail = copybuf->len - copybuf->cursor;
610 if (avail > maxread)
611 avail = maxread;
612 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
613 outbuf = (void *) ((char *) outbuf + avail);
614 copybuf->cursor += avail;
615 maxread -= avail;
616 bytesread += avail;
617 }
618
619 if (maxread <= 0 || bytesread >= minread)
620 return bytesread;
621 }
622
623 /*
624 * Wait for more data or latch.
625 */
626 (void) WaitLatchOrSocket(MyLatch,
627 WL_SOCKET_READABLE | WL_LATCH_SET |
628 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
629 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
630
631 ResetLatch(MyLatch);
632 }
633
634 return bytesread;
635}
636
637
638/*
639 * Get information about remote relation in similar fashion the RELATION
640 * message provides during replication.
641 */
642static void
643fetch_remote_table_info(char *nspname, char *relname,
644 LogicalRepRelation *lrel)
645{
646 WalRcvExecResult *res;
647 StringInfoData cmd;
648 TupleTableSlot *slot;
649 Oid tableRow[2] = {OIDOID, CHAROID};
650 Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
651 bool isnull;
652 int natt;
653
654 lrel->nspname = nspname;
655 lrel->relname = relname;
656
657 /* First fetch Oid and replica identity. */
658 initStringInfo(&cmd);
659 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
660 " FROM pg_catalog.pg_class c"
661 " INNER JOIN pg_catalog.pg_namespace n"
662 " ON (c.relnamespace = n.oid)"
663 " WHERE n.nspname = %s"
664 " AND c.relname = %s"
665 " AND c.relkind = 'r'",
666 quote_literal_cstr(nspname),
667 quote_literal_cstr(relname));
668 res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
669
670 if (res->status != WALRCV_OK_TUPLES)
671 ereport(ERROR,
672 (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
673 nspname, relname, res->err)));
674
675 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
676 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
677 ereport(ERROR,
678 (errmsg("table \"%s.%s\" not found on publisher",
679 nspname, relname)));
680
681 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
682 Assert(!isnull);
683 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
684 Assert(!isnull);
685
686 ExecDropSingleTupleTableSlot(slot);
687 walrcv_clear_result(res);
688
689 /* Now fetch columns. */
690 resetStringInfo(&cmd);
691 appendStringInfo(&cmd,
692 "SELECT a.attname,"
693 " a.atttypid,"
694 " a.atttypmod,"
695 " a.attnum = ANY(i.indkey)"
696 " FROM pg_catalog.pg_attribute a"
697 " LEFT JOIN pg_catalog.pg_index i"
698 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
699 " WHERE a.attnum > 0::pg_catalog.int2"
700 " AND NOT a.attisdropped %s"
701 " AND a.attrelid = %u"
702 " ORDER BY a.attnum",
703 lrel->remoteid,
704 (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
705 lrel->remoteid);
706 res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
707
708 if (res->status != WALRCV_OK_TUPLES)
709 ereport(ERROR,
710 (errmsg("could not fetch table info for table \"%s.%s\": %s",
711 nspname, relname, res->err)));
712
713 /* We don't know the number of rows coming, so allocate enough space. */
714 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
715 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
716 lrel->attkeys = NULL;
717
718 natt = 0;
719 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
720 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
721 {
722 lrel->attnames[natt] =
723 TextDatumGetCString(slot_getattr(slot, 1, &isnull));
724 Assert(!isnull);
725 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
726 Assert(!isnull);
727 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
728 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
729
730 /* Should never happen. */
731 if (++natt >= MaxTupleAttributeNumber)
732 elog(ERROR, "too many columns in remote table \"%s.%s\"",
733 nspname, relname);
734
735 ExecClearTuple(slot);
736 }
737 ExecDropSingleTupleTableSlot(slot);
738
739 lrel->natts = natt;
740
741 walrcv_clear_result(res);
742 pfree(cmd.data);
743}
744
745/*
746 * Copy existing data of a table from publisher.
747 *
748 * Caller is responsible for locking the local relation.
749 */
750static void
751copy_table(Relation rel)
752{
753 LogicalRepRelMapEntry *relmapentry;
754 LogicalRepRelation lrel;
755 WalRcvExecResult *res;
756 StringInfoData cmd;
757 CopyState cstate;
758 List *attnamelist;
759 ParseState *pstate;
760
761 /* Get the publisher relation info. */
762 fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
763 RelationGetRelationName(rel), &lrel);
764
765 /* Put the relation into relmap. */
766 logicalrep_relmap_update(&lrel);
767
768 /* Map the publisher relation to local one. */
769 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
770 Assert(rel == relmapentry->localrel);
771
772 /* Start copy on the publisher. */
773 initStringInfo(&cmd);
774 appendStringInfo(&cmd, "COPY %s TO STDOUT",
775 quote_qualified_identifier(lrel.nspname, lrel.relname));
776 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
777 pfree(cmd.data);
778 if (res->status != WALRCV_OK_COPY_OUT)
779 ereport(ERROR,
780 (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
781 lrel.nspname, lrel.relname, res->err)));
782 walrcv_clear_result(res);
783
784 copybuf = makeStringInfo();
785
786 pstate = make_parsestate(NULL);
787 addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
788 NULL, false, false);
789
790 attnamelist = make_copy_attnamelist(relmapentry);
791 cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
792
793 /* Do the copy */
794 (void) CopyFrom(cstate);
795
796 logicalrep_rel_close(relmapentry, NoLock);
797}
798
799/*
800 * Start syncing the table in the sync worker.
801 *
802 * The returned slot name is palloc'ed in current memory context.
803 */
804char *
805LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
806{
807 char *slotname;
808 char *err;
809 char relstate;
810 XLogRecPtr relstate_lsn;
811
812 /* Check the state of the table synchronization. */
813 StartTransactionCommand();
814 relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
815 MyLogicalRepWorker->relid,
816 &relstate_lsn, true);
817 CommitTransactionCommand();
818
819 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
820 MyLogicalRepWorker->relstate = relstate;
821 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
822 SpinLockRelease(&MyLogicalRepWorker->relmutex);
823
824 /*
825 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
826 * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
827 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
828 * NAMEDATALEN on the remote that matters, but this scheme will also work
829 * reasonably if that is different.)
830 */
831 StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
832 slotname = psprintf("%.*s_%u_sync_%u",
833 NAMEDATALEN - 28,
834 MySubscription->slotname,
835 MySubscription->oid,
836 MyLogicalRepWorker->relid);
837
838 /*
839 * Here we use the slot name instead of the subscription name as the
840 * application_name, so that it is different from the main apply worker,
841 * so that synchronous replication can distinguish them.
842 */
843 wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
844 if (wrconn == NULL)
845 ereport(ERROR,
846 (errmsg("could not connect to the publisher: %s", err)));
847
848 switch (MyLogicalRepWorker->relstate)
849 {
850 case SUBREL_STATE_INIT:
851 case SUBREL_STATE_DATASYNC:
852 {
853 Relation rel;
854 WalRcvExecResult *res;
855
856 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
857 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
858 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
859 SpinLockRelease(&MyLogicalRepWorker->relmutex);
860
861 /* Update the state and make it visible to others. */
862 StartTransactionCommand();
863 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
864 MyLogicalRepWorker->relid,
865 MyLogicalRepWorker->relstate,
866 MyLogicalRepWorker->relstate_lsn);
867 CommitTransactionCommand();
868 pgstat_report_stat(false);
869
870 /*
871 * We want to do the table data sync in a single transaction.
872 */
873 StartTransactionCommand();
874
875 /*
876 * Use a standard write lock here. It might be better to
877 * disallow access to the table while it's being synchronized.
878 * But we don't want to block the main apply process from
879 * working and it has to open the relation in RowExclusiveLock
880 * when remapping remote relation id to local one.
881 */
882 rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
883
884 /*
885 * Create a temporary slot for the sync process. We do this
886 * inside the transaction so that we can use the snapshot made
887 * by the slot to get existing data.
888 */
889 res = walrcv_exec(wrconn,
890 "BEGIN READ ONLY ISOLATION LEVEL "
891 "REPEATABLE READ", 0, NULL);
892 if (res->status != WALRCV_OK_COMMAND)
893 ereport(ERROR,
894 (errmsg("table copy could not start transaction on publisher"),
895 errdetail("The error was: %s", res->err)));
896 walrcv_clear_result(res);
897
898 /*
899 * Create new temporary logical decoding slot.
900 *
901 * We'll use slot for data copy so make sure the snapshot is
902 * used for the transaction; that way the COPY will get data
903 * that is consistent with the lsn used by the slot to start
904 * decoding.
905 */
906 walrcv_create_slot(wrconn, slotname, true,
907 CRS_USE_SNAPSHOT, origin_startpos);
908
909 PushActiveSnapshot(GetTransactionSnapshot());
910 copy_table(rel);
911 PopActiveSnapshot();
912
913 res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
914 if (res->status != WALRCV_OK_COMMAND)
915 ereport(ERROR,
916 (errmsg("table copy could not finish transaction on publisher"),
917 errdetail("The error was: %s", res->err)));
918 walrcv_clear_result(res);
919
920 table_close(rel, NoLock);
921
922 /* Make the copy visible. */
923 CommandCounterIncrement();
924
925 /*
926 * We are done with the initial data synchronization, update
927 * the state.
928 */
929 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
930 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
931 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
932 SpinLockRelease(&MyLogicalRepWorker->relmutex);
933
934 /* Wait for main apply worker to tell us to catchup. */
935 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
936
937 /*----------
938 * There are now two possible states here:
939 * a) Sync is behind the apply. If that's the case we need to
940 * catch up with it by consuming the logical replication
941 * stream up to the relstate_lsn. For that, we exit this
942 * function and continue in ApplyWorkerMain().
943 * b) Sync is caught up with the apply. So it can just set
944 * the state to SYNCDONE and finish.
945 *----------
946 */
947 if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
948 {
949 /*
950 * Update the new state in catalog. No need to bother
951 * with the shmem state as we are exiting for good.
952 */
953 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
954 MyLogicalRepWorker->relid,
955 SUBREL_STATE_SYNCDONE,
956 *origin_startpos);
957 finish_sync_worker();
958 }
959 break;
960 }
961 case SUBREL_STATE_SYNCDONE:
962 case SUBREL_STATE_READY:
963 case SUBREL_STATE_UNKNOWN:
964
965 /*
966 * Nothing to do here but finish. (UNKNOWN means the relation was
967 * removed from pg_subscription_rel before the sync worker could
968 * start.)
969 */
970 finish_sync_worker();
971 break;
972 default:
973 elog(ERROR, "unknown relation state \"%c\"",
974 MyLogicalRepWorker->relstate);
975 }
976
977 return slotname;
978}
979