1 | /*------------------------------------------------------------------------- |
2 | * tablesync.c |
3 | * PostgreSQL logical replication |
4 | * |
5 | * Copyright (c) 2012-2019, PostgreSQL Global Development Group |
6 | * |
7 | * IDENTIFICATION |
8 | * src/backend/replication/logical/tablesync.c |
9 | * |
10 | * NOTES |
11 | * This file contains code for initial table data synchronization for |
12 | * logical replication. |
13 | * |
14 | * The initial data synchronization is done separately for each table, |
15 | * in a separate apply worker that only fetches the initial snapshot data |
16 | * from the publisher and then synchronizes the position in the stream with |
17 | * the main apply worker. |
18 | * |
19 | * There are several reasons for doing the synchronization this way: |
20 | * - It allows us to parallelize the initial data synchronization |
21 | * which lowers the time needed for it to happen. |
22 | * - The initial synchronization does not have to hold the xid and LSN |
23 | * for the time it takes to copy data of all tables, causing less |
24 | * bloat and lower disk consumption compared to doing the |
25 | * synchronization in a single process for the whole database. |
26 | * - It allows us to synchronize any tables added after the initial |
27 | * synchronization has finished. |
28 | * |
29 | * The stream position synchronization works in multiple steps. |
30 | * - Sync finishes copy and sets worker state as SYNCWAIT and waits for |
31 | * state to change in a loop. |
32 | * - Apply periodically checks tables that are synchronizing for SYNCWAIT. |
33 | * When the desired state appears, it will set the worker state to |
34 | * CATCHUP and starts loop-waiting until either the table state is set |
35 | * to SYNCDONE or the sync worker exits. |
36 | * - After the sync worker has seen the state change to CATCHUP, it will |
37 | * read the stream and apply changes (acting like an apply worker) until |
38 | * it catches up to the specified stream position. Then it sets the |
39 | * state to SYNCDONE. There might be zero changes applied between |
40 | * CATCHUP and SYNCDONE, because the sync worker might be ahead of the |
41 | * apply worker. |
42 | * - Once the state was set to SYNCDONE, the apply will continue tracking |
43 | * the table until it reaches the SYNCDONE stream position, at which |
44 | * point it sets state to READY and stops tracking. Again, there might |
45 | * be zero changes in between. |
46 | * |
47 | * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> |
48 | * SYNCDONE -> READY. |
49 | * |
50 | * The catalog pg_subscription_rel is used to keep information about |
51 | * subscribed tables and their state. Some transient state during data |
52 | * synchronization is kept in shared memory. The states SYNCWAIT and |
53 | * CATCHUP only appear in memory. |
54 | * |
55 | * Example flows look like this: |
56 | * - Apply is in front: |
57 | * sync:8 |
58 | * -> set in memory SYNCWAIT |
59 | * apply:10 |
60 | * -> set in memory CATCHUP |
61 | * -> enter wait-loop |
62 | * sync:10 |
63 | * -> set in catalog SYNCDONE |
64 | * -> exit |
65 | * apply:10 |
66 | * -> exit wait-loop |
67 | * -> continue rep |
68 | * apply:11 |
69 | * -> set in catalog READY |
70 | * - Sync in front: |
71 | * sync:10 |
72 | * -> set in memory SYNCWAIT |
73 | * apply:8 |
74 | * -> set in memory CATCHUP |
75 | * -> continue per-table filtering |
76 | * sync:10 |
77 | * -> set in catalog SYNCDONE |
78 | * -> exit |
79 | * apply:10 |
80 | * -> set in catalog READY |
81 | * -> stop per-table filtering |
82 | * -> continue rep |
83 | *------------------------------------------------------------------------- |
84 | */ |
85 | |
86 | #include "postgres.h" |
87 | |
88 | #include "miscadmin.h" |
89 | #include "pgstat.h" |
90 | |
91 | #include "access/table.h" |
92 | #include "access/xact.h" |
93 | |
94 | #include "catalog/pg_subscription_rel.h" |
95 | #include "catalog/pg_type.h" |
96 | |
97 | #include "commands/copy.h" |
98 | |
99 | #include "parser/parse_relation.h" |
100 | |
101 | #include "replication/logicallauncher.h" |
102 | #include "replication/logicalrelation.h" |
103 | #include "replication/walreceiver.h" |
104 | #include "replication/worker_internal.h" |
105 | |
106 | #include "utils/snapmgr.h" |
107 | #include "storage/ipc.h" |
108 | |
109 | #include "utils/builtins.h" |
110 | #include "utils/lsyscache.h" |
111 | #include "utils/memutils.h" |
112 | |
113 | static bool table_states_valid = false; |
114 | |
115 | StringInfo copybuf = NULL; |
116 | |
117 | /* |
118 | * Exit routine for synchronization worker. |
119 | */ |
120 | static void |
121 | pg_attribute_noreturn() |
122 | finish_sync_worker(void) |
123 | { |
124 | /* |
125 | * Commit any outstanding transaction. This is the usual case, unless |
126 | * there was nothing to do for the table. |
127 | */ |
128 | if (IsTransactionState()) |
129 | { |
130 | CommitTransactionCommand(); |
131 | pgstat_report_stat(false); |
132 | } |
133 | |
134 | /* And flush all writes. */ |
135 | XLogFlush(GetXLogWriteRecPtr()); |
136 | |
137 | StartTransactionCommand(); |
138 | ereport(LOG, |
139 | (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished" , |
140 | MySubscription->name, |
141 | get_rel_name(MyLogicalRepWorker->relid)))); |
142 | CommitTransactionCommand(); |
143 | |
144 | /* Find the main apply worker and signal it. */ |
145 | logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); |
146 | |
147 | /* Stop gracefully */ |
148 | proc_exit(0); |
149 | } |
150 | |
151 | /* |
152 | * Wait until the relation synchronization state is set in the catalog to the |
153 | * expected one. |
154 | * |
155 | * Used when transitioning from CATCHUP state to SYNCDONE. |
156 | * |
157 | * Returns false if the synchronization worker has disappeared or the table state |
158 | * has been reset. |
159 | */ |
160 | static bool |
161 | wait_for_relation_state_change(Oid relid, char expected_state) |
162 | { |
163 | char state; |
164 | |
165 | for (;;) |
166 | { |
167 | LogicalRepWorker *worker; |
168 | XLogRecPtr statelsn; |
169 | |
170 | CHECK_FOR_INTERRUPTS(); |
171 | |
172 | /* XXX use cache invalidation here to improve performance? */ |
173 | PushActiveSnapshot(GetLatestSnapshot()); |
174 | state = GetSubscriptionRelState(MyLogicalRepWorker->subid, |
175 | relid, &statelsn, true); |
176 | PopActiveSnapshot(); |
177 | |
178 | if (state == SUBREL_STATE_UNKNOWN) |
179 | return false; |
180 | |
181 | if (state == expected_state) |
182 | return true; |
183 | |
184 | /* Check if the sync worker is still running and bail if not. */ |
185 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
186 | |
187 | /* Check if the opposite worker is still running and bail if not. */ |
188 | worker = logicalrep_worker_find(MyLogicalRepWorker->subid, |
189 | am_tablesync_worker() ? InvalidOid : relid, |
190 | false); |
191 | LWLockRelease(LogicalRepWorkerLock); |
192 | if (!worker) |
193 | return false; |
194 | |
195 | (void) WaitLatch(MyLatch, |
196 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
197 | 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); |
198 | |
199 | ResetLatch(MyLatch); |
200 | } |
201 | |
202 | return false; |
203 | } |
204 | |
205 | /* |
206 | * Wait until the apply worker changes the state of our synchronization |
207 | * worker to the expected one. |
208 | * |
209 | * Used when transitioning from SYNCWAIT state to CATCHUP. |
210 | * |
211 | * Returns false if the apply worker has disappeared. |
212 | */ |
213 | static bool |
214 | wait_for_worker_state_change(char expected_state) |
215 | { |
216 | int rc; |
217 | |
218 | for (;;) |
219 | { |
220 | LogicalRepWorker *worker; |
221 | |
222 | CHECK_FOR_INTERRUPTS(); |
223 | |
224 | /* |
225 | * Done if already in correct state. (We assume this fetch is atomic |
226 | * enough to not give a misleading answer if we do it with no lock.) |
227 | */ |
228 | if (MyLogicalRepWorker->relstate == expected_state) |
229 | return true; |
230 | |
231 | /* |
232 | * Bail out if the apply worker has died, else signal it we're |
233 | * waiting. |
234 | */ |
235 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
236 | worker = logicalrep_worker_find(MyLogicalRepWorker->subid, |
237 | InvalidOid, false); |
238 | if (worker && worker->proc) |
239 | logicalrep_worker_wakeup_ptr(worker); |
240 | LWLockRelease(LogicalRepWorkerLock); |
241 | if (!worker) |
242 | break; |
243 | |
244 | /* |
245 | * Wait. We expect to get a latch signal back from the apply worker, |
246 | * but use a timeout in case it dies without sending one. |
247 | */ |
248 | rc = WaitLatch(MyLatch, |
249 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
250 | 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); |
251 | |
252 | if (rc & WL_LATCH_SET) |
253 | ResetLatch(MyLatch); |
254 | } |
255 | |
256 | return false; |
257 | } |
258 | |
259 | /* |
260 | * Callback from syscache invalidation. |
261 | */ |
262 | void |
263 | invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) |
264 | { |
265 | table_states_valid = false; |
266 | } |
267 | |
268 | /* |
269 | * Handle table synchronization cooperation from the synchronization |
270 | * worker. |
271 | * |
272 | * If the sync worker is in CATCHUP state and reached (or passed) the |
273 | * predetermined synchronization point in the WAL stream, mark the table as |
274 | * SYNCDONE and finish. |
275 | */ |
276 | static void |
277 | process_syncing_tables_for_sync(XLogRecPtr current_lsn) |
278 | { |
279 | Assert(IsTransactionState()); |
280 | |
281 | SpinLockAcquire(&MyLogicalRepWorker->relmutex); |
282 | |
283 | if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && |
284 | current_lsn >= MyLogicalRepWorker->relstate_lsn) |
285 | { |
286 | TimeLineID tli; |
287 | |
288 | MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; |
289 | MyLogicalRepWorker->relstate_lsn = current_lsn; |
290 | |
291 | SpinLockRelease(&MyLogicalRepWorker->relmutex); |
292 | |
293 | UpdateSubscriptionRelState(MyLogicalRepWorker->subid, |
294 | MyLogicalRepWorker->relid, |
295 | MyLogicalRepWorker->relstate, |
296 | MyLogicalRepWorker->relstate_lsn); |
297 | |
298 | walrcv_endstreaming(wrconn, &tli); |
299 | finish_sync_worker(); |
300 | } |
301 | else |
302 | SpinLockRelease(&MyLogicalRepWorker->relmutex); |
303 | } |
304 | |
305 | /* |
306 | * Handle table synchronization cooperation from the apply worker. |
307 | * |
308 | * Walk over all subscription tables that are individually tracked by the |
309 | * apply process (currently, all that have state other than |
310 | * SUBREL_STATE_READY) and manage synchronization for them. |
311 | * |
312 | * If there are tables that need synchronizing and are not being synchronized |
313 | * yet, start sync workers for them (if there are free slots for sync |
314 | * workers). To prevent starting the sync worker for the same relation at a |
315 | * high frequency after a failure, we store its last start time with each sync |
316 | * state info. We start the sync worker for the same relation after waiting |
317 | * at least wal_retrieve_retry_interval. |
318 | * |
319 | * For tables that are being synchronized already, check if sync workers |
320 | * either need action from the apply worker or have finished. This is the |
321 | * SYNCWAIT to CATCHUP transition. |
322 | * |
323 | * If the synchronization position is reached (SYNCDONE), then the table can |
324 | * be marked as READY and is no longer tracked. |
325 | */ |
326 | static void |
327 | process_syncing_tables_for_apply(XLogRecPtr current_lsn) |
328 | { |
329 | struct tablesync_start_time_mapping |
330 | { |
331 | Oid relid; |
332 | TimestampTz last_start_time; |
333 | }; |
334 | static List *table_states = NIL; |
335 | static HTAB *last_start_times = NULL; |
336 | ListCell *lc; |
337 | bool started_tx = false; |
338 | |
339 | Assert(!IsTransactionState()); |
340 | |
341 | /* We need up-to-date sync state info for subscription tables here. */ |
342 | if (!table_states_valid) |
343 | { |
344 | MemoryContext oldctx; |
345 | List *rstates; |
346 | ListCell *lc; |
347 | SubscriptionRelState *rstate; |
348 | |
349 | /* Clean the old list. */ |
350 | list_free_deep(table_states); |
351 | table_states = NIL; |
352 | |
353 | StartTransactionCommand(); |
354 | started_tx = true; |
355 | |
356 | /* Fetch all non-ready tables. */ |
357 | rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); |
358 | |
359 | /* Allocate the tracking info in a permanent memory context. */ |
360 | oldctx = MemoryContextSwitchTo(CacheMemoryContext); |
361 | foreach(lc, rstates) |
362 | { |
363 | rstate = palloc(sizeof(SubscriptionRelState)); |
364 | memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); |
365 | table_states = lappend(table_states, rstate); |
366 | } |
367 | MemoryContextSwitchTo(oldctx); |
368 | |
369 | table_states_valid = true; |
370 | } |
371 | |
372 | /* |
373 | * Prepare a hash table for tracking last start times of workers, to avoid |
374 | * immediate restarts. We don't need it if there are no tables that need |
375 | * syncing. |
376 | */ |
377 | if (table_states && !last_start_times) |
378 | { |
379 | HASHCTL ctl; |
380 | |
381 | memset(&ctl, 0, sizeof(ctl)); |
382 | ctl.keysize = sizeof(Oid); |
383 | ctl.entrysize = sizeof(struct tablesync_start_time_mapping); |
384 | last_start_times = hash_create("Logical replication table sync worker start times" , |
385 | 256, &ctl, HASH_ELEM | HASH_BLOBS); |
386 | } |
387 | |
388 | /* |
389 | * Clean up the hash table when we're done with all tables (just to |
390 | * release the bit of memory). |
391 | */ |
392 | else if (!table_states && last_start_times) |
393 | { |
394 | hash_destroy(last_start_times); |
395 | last_start_times = NULL; |
396 | } |
397 | |
398 | /* |
399 | * Process all tables that are being synchronized. |
400 | */ |
401 | foreach(lc, table_states) |
402 | { |
403 | SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); |
404 | |
405 | if (rstate->state == SUBREL_STATE_SYNCDONE) |
406 | { |
407 | /* |
408 | * Apply has caught up to the position where the table sync has |
409 | * finished. Mark the table as ready so that the apply will just |
410 | * continue to replicate it normally. |
411 | */ |
412 | if (current_lsn >= rstate->lsn) |
413 | { |
414 | rstate->state = SUBREL_STATE_READY; |
415 | rstate->lsn = current_lsn; |
416 | if (!started_tx) |
417 | { |
418 | StartTransactionCommand(); |
419 | started_tx = true; |
420 | } |
421 | |
422 | UpdateSubscriptionRelState(MyLogicalRepWorker->subid, |
423 | rstate->relid, rstate->state, |
424 | rstate->lsn); |
425 | } |
426 | } |
427 | else |
428 | { |
429 | LogicalRepWorker *syncworker; |
430 | |
431 | /* |
432 | * Look for a sync worker for this relation. |
433 | */ |
434 | LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); |
435 | |
436 | syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, |
437 | rstate->relid, false); |
438 | |
439 | if (syncworker) |
440 | { |
441 | /* Found one, update our copy of its state */ |
442 | SpinLockAcquire(&syncworker->relmutex); |
443 | rstate->state = syncworker->relstate; |
444 | rstate->lsn = syncworker->relstate_lsn; |
445 | if (rstate->state == SUBREL_STATE_SYNCWAIT) |
446 | { |
447 | /* |
448 | * Sync worker is waiting for apply. Tell sync worker it |
449 | * can catchup now. |
450 | */ |
451 | syncworker->relstate = SUBREL_STATE_CATCHUP; |
452 | syncworker->relstate_lsn = |
453 | Max(syncworker->relstate_lsn, current_lsn); |
454 | } |
455 | SpinLockRelease(&syncworker->relmutex); |
456 | |
457 | /* If we told worker to catch up, wait for it. */ |
458 | if (rstate->state == SUBREL_STATE_SYNCWAIT) |
459 | { |
460 | /* Signal the sync worker, as it may be waiting for us. */ |
461 | if (syncworker->proc) |
462 | logicalrep_worker_wakeup_ptr(syncworker); |
463 | |
464 | /* Now safe to release the LWLock */ |
465 | LWLockRelease(LogicalRepWorkerLock); |
466 | |
467 | /* |
468 | * Enter busy loop and wait for synchronization worker to |
469 | * reach expected state (or die trying). |
470 | */ |
471 | if (!started_tx) |
472 | { |
473 | StartTransactionCommand(); |
474 | started_tx = true; |
475 | } |
476 | |
477 | wait_for_relation_state_change(rstate->relid, |
478 | SUBREL_STATE_SYNCDONE); |
479 | } |
480 | else |
481 | LWLockRelease(LogicalRepWorkerLock); |
482 | } |
483 | else |
484 | { |
485 | /* |
486 | * If there is no sync worker for this table yet, count |
487 | * running sync workers for this subscription, while we have |
488 | * the lock. |
489 | */ |
490 | int nsyncworkers = |
491 | logicalrep_sync_worker_count(MyLogicalRepWorker->subid); |
492 | |
493 | /* Now safe to release the LWLock */ |
494 | LWLockRelease(LogicalRepWorkerLock); |
495 | |
496 | /* |
497 | * If there are free sync worker slot(s), start a new sync |
498 | * worker for the table. |
499 | */ |
500 | if (nsyncworkers < max_sync_workers_per_subscription) |
501 | { |
502 | TimestampTz now = GetCurrentTimestamp(); |
503 | struct tablesync_start_time_mapping *hentry; |
504 | bool found; |
505 | |
506 | hentry = hash_search(last_start_times, &rstate->relid, |
507 | HASH_ENTER, &found); |
508 | |
509 | if (!found || |
510 | TimestampDifferenceExceeds(hentry->last_start_time, now, |
511 | wal_retrieve_retry_interval)) |
512 | { |
513 | logicalrep_worker_launch(MyLogicalRepWorker->dbid, |
514 | MySubscription->oid, |
515 | MySubscription->name, |
516 | MyLogicalRepWorker->userid, |
517 | rstate->relid); |
518 | hentry->last_start_time = now; |
519 | } |
520 | } |
521 | } |
522 | } |
523 | } |
524 | |
525 | if (started_tx) |
526 | { |
527 | CommitTransactionCommand(); |
528 | pgstat_report_stat(false); |
529 | } |
530 | } |
531 | |
532 | /* |
533 | * Process possible state change(s) of tables that are being synchronized. |
534 | */ |
535 | void |
536 | process_syncing_tables(XLogRecPtr current_lsn) |
537 | { |
538 | if (am_tablesync_worker()) |
539 | process_syncing_tables_for_sync(current_lsn); |
540 | else |
541 | process_syncing_tables_for_apply(current_lsn); |
542 | } |
543 | |
544 | /* |
545 | * Create list of columns for COPY based on logical relation mapping. |
546 | */ |
547 | static List * |
548 | make_copy_attnamelist(LogicalRepRelMapEntry *rel) |
549 | { |
550 | List *attnamelist = NIL; |
551 | int i; |
552 | |
553 | for (i = 0; i < rel->remoterel.natts; i++) |
554 | { |
555 | attnamelist = lappend(attnamelist, |
556 | makeString(rel->remoterel.attnames[i])); |
557 | } |
558 | |
559 | |
560 | return attnamelist; |
561 | } |
562 | |
563 | /* |
564 | * Data source callback for the COPY FROM, which reads from the remote |
565 | * connection and passes the data back to our local COPY. |
566 | */ |
567 | static int |
568 | copy_read_data(void *outbuf, int minread, int maxread) |
569 | { |
570 | int bytesread = 0; |
571 | int avail; |
572 | |
573 | /* If there are some leftover data from previous read, use it. */ |
574 | avail = copybuf->len - copybuf->cursor; |
575 | if (avail) |
576 | { |
577 | if (avail > maxread) |
578 | avail = maxread; |
579 | memcpy(outbuf, ©buf->data[copybuf->cursor], avail); |
580 | copybuf->cursor += avail; |
581 | maxread -= avail; |
582 | bytesread += avail; |
583 | } |
584 | |
585 | while (maxread > 0 && bytesread < minread) |
586 | { |
587 | pgsocket fd = PGINVALID_SOCKET; |
588 | int len; |
589 | char *buf = NULL; |
590 | |
591 | for (;;) |
592 | { |
593 | /* Try read the data. */ |
594 | len = walrcv_receive(wrconn, &buf, &fd); |
595 | |
596 | CHECK_FOR_INTERRUPTS(); |
597 | |
598 | if (len == 0) |
599 | break; |
600 | else if (len < 0) |
601 | return bytesread; |
602 | else |
603 | { |
604 | /* Process the data */ |
605 | copybuf->data = buf; |
606 | copybuf->len = len; |
607 | copybuf->cursor = 0; |
608 | |
609 | avail = copybuf->len - copybuf->cursor; |
610 | if (avail > maxread) |
611 | avail = maxread; |
612 | memcpy(outbuf, ©buf->data[copybuf->cursor], avail); |
613 | outbuf = (void *) ((char *) outbuf + avail); |
614 | copybuf->cursor += avail; |
615 | maxread -= avail; |
616 | bytesread += avail; |
617 | } |
618 | |
619 | if (maxread <= 0 || bytesread >= minread) |
620 | return bytesread; |
621 | } |
622 | |
623 | /* |
624 | * Wait for more data or latch. |
625 | */ |
626 | (void) WaitLatchOrSocket(MyLatch, |
627 | WL_SOCKET_READABLE | WL_LATCH_SET | |
628 | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
629 | fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); |
630 | |
631 | ResetLatch(MyLatch); |
632 | } |
633 | |
634 | return bytesread; |
635 | } |
636 | |
637 | |
638 | /* |
639 | * Get information about remote relation in similar fashion the RELATION |
640 | * message provides during replication. |
641 | */ |
642 | static void |
643 | fetch_remote_table_info(char *nspname, char *relname, |
644 | LogicalRepRelation *lrel) |
645 | { |
646 | WalRcvExecResult *res; |
647 | StringInfoData cmd; |
648 | TupleTableSlot *slot; |
649 | Oid tableRow[2] = {OIDOID, CHAROID}; |
650 | Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; |
651 | bool isnull; |
652 | int natt; |
653 | |
654 | lrel->nspname = nspname; |
655 | lrel->relname = relname; |
656 | |
657 | /* First fetch Oid and replica identity. */ |
658 | initStringInfo(&cmd); |
659 | appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" |
660 | " FROM pg_catalog.pg_class c" |
661 | " INNER JOIN pg_catalog.pg_namespace n" |
662 | " ON (c.relnamespace = n.oid)" |
663 | " WHERE n.nspname = %s" |
664 | " AND c.relname = %s" |
665 | " AND c.relkind = 'r'" , |
666 | quote_literal_cstr(nspname), |
667 | quote_literal_cstr(relname)); |
668 | res = walrcv_exec(wrconn, cmd.data, 2, tableRow); |
669 | |
670 | if (res->status != WALRCV_OK_TUPLES) |
671 | ereport(ERROR, |
672 | (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s" , |
673 | nspname, relname, res->err))); |
674 | |
675 | slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); |
676 | if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) |
677 | ereport(ERROR, |
678 | (errmsg("table \"%s.%s\" not found on publisher" , |
679 | nspname, relname))); |
680 | |
681 | lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); |
682 | Assert(!isnull); |
683 | lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); |
684 | Assert(!isnull); |
685 | |
686 | ExecDropSingleTupleTableSlot(slot); |
687 | walrcv_clear_result(res); |
688 | |
689 | /* Now fetch columns. */ |
690 | resetStringInfo(&cmd); |
691 | appendStringInfo(&cmd, |
692 | "SELECT a.attname," |
693 | " a.atttypid," |
694 | " a.atttypmod," |
695 | " a.attnum = ANY(i.indkey)" |
696 | " FROM pg_catalog.pg_attribute a" |
697 | " LEFT JOIN pg_catalog.pg_index i" |
698 | " ON (i.indexrelid = pg_get_replica_identity_index(%u))" |
699 | " WHERE a.attnum > 0::pg_catalog.int2" |
700 | " AND NOT a.attisdropped %s" |
701 | " AND a.attrelid = %u" |
702 | " ORDER BY a.attnum" , |
703 | lrel->remoteid, |
704 | (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : "" ), |
705 | lrel->remoteid); |
706 | res = walrcv_exec(wrconn, cmd.data, 4, attrRow); |
707 | |
708 | if (res->status != WALRCV_OK_TUPLES) |
709 | ereport(ERROR, |
710 | (errmsg("could not fetch table info for table \"%s.%s\": %s" , |
711 | nspname, relname, res->err))); |
712 | |
713 | /* We don't know the number of rows coming, so allocate enough space. */ |
714 | lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); |
715 | lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); |
716 | lrel->attkeys = NULL; |
717 | |
718 | natt = 0; |
719 | slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); |
720 | while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) |
721 | { |
722 | lrel->attnames[natt] = |
723 | TextDatumGetCString(slot_getattr(slot, 1, &isnull)); |
724 | Assert(!isnull); |
725 | lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); |
726 | Assert(!isnull); |
727 | if (DatumGetBool(slot_getattr(slot, 4, &isnull))) |
728 | lrel->attkeys = bms_add_member(lrel->attkeys, natt); |
729 | |
730 | /* Should never happen. */ |
731 | if (++natt >= MaxTupleAttributeNumber) |
732 | elog(ERROR, "too many columns in remote table \"%s.%s\"" , |
733 | nspname, relname); |
734 | |
735 | ExecClearTuple(slot); |
736 | } |
737 | ExecDropSingleTupleTableSlot(slot); |
738 | |
739 | lrel->natts = natt; |
740 | |
741 | walrcv_clear_result(res); |
742 | pfree(cmd.data); |
743 | } |
744 | |
745 | /* |
746 | * Copy existing data of a table from publisher. |
747 | * |
748 | * Caller is responsible for locking the local relation. |
749 | */ |
750 | static void |
751 | copy_table(Relation rel) |
752 | { |
753 | LogicalRepRelMapEntry *relmapentry; |
754 | LogicalRepRelation lrel; |
755 | WalRcvExecResult *res; |
756 | StringInfoData cmd; |
757 | CopyState cstate; |
758 | List *attnamelist; |
759 | ParseState *pstate; |
760 | |
761 | /* Get the publisher relation info. */ |
762 | fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), |
763 | RelationGetRelationName(rel), &lrel); |
764 | |
765 | /* Put the relation into relmap. */ |
766 | logicalrep_relmap_update(&lrel); |
767 | |
768 | /* Map the publisher relation to local one. */ |
769 | relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); |
770 | Assert(rel == relmapentry->localrel); |
771 | |
772 | /* Start copy on the publisher. */ |
773 | initStringInfo(&cmd); |
774 | appendStringInfo(&cmd, "COPY %s TO STDOUT" , |
775 | quote_qualified_identifier(lrel.nspname, lrel.relname)); |
776 | res = walrcv_exec(wrconn, cmd.data, 0, NULL); |
777 | pfree(cmd.data); |
778 | if (res->status != WALRCV_OK_COPY_OUT) |
779 | ereport(ERROR, |
780 | (errmsg("could not start initial contents copy for table \"%s.%s\": %s" , |
781 | lrel.nspname, lrel.relname, res->err))); |
782 | walrcv_clear_result(res); |
783 | |
784 | copybuf = makeStringInfo(); |
785 | |
786 | pstate = make_parsestate(NULL); |
787 | addRangeTableEntryForRelation(pstate, rel, AccessShareLock, |
788 | NULL, false, false); |
789 | |
790 | attnamelist = make_copy_attnamelist(relmapentry); |
791 | cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); |
792 | |
793 | /* Do the copy */ |
794 | (void) CopyFrom(cstate); |
795 | |
796 | logicalrep_rel_close(relmapentry, NoLock); |
797 | } |
798 | |
799 | /* |
800 | * Start syncing the table in the sync worker. |
801 | * |
802 | * The returned slot name is palloc'ed in current memory context. |
803 | */ |
804 | char * |
805 | LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) |
806 | { |
807 | char *slotname; |
808 | char *err; |
809 | char relstate; |
810 | XLogRecPtr relstate_lsn; |
811 | |
812 | /* Check the state of the table synchronization. */ |
813 | StartTransactionCommand(); |
814 | relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, |
815 | MyLogicalRepWorker->relid, |
816 | &relstate_lsn, true); |
817 | CommitTransactionCommand(); |
818 | |
819 | SpinLockAcquire(&MyLogicalRepWorker->relmutex); |
820 | MyLogicalRepWorker->relstate = relstate; |
821 | MyLogicalRepWorker->relstate_lsn = relstate_lsn; |
822 | SpinLockRelease(&MyLogicalRepWorker->relmutex); |
823 | |
824 | /* |
825 | * To build a slot name for the sync work, we are limited to NAMEDATALEN - |
826 | * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars |
827 | * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the |
828 | * NAMEDATALEN on the remote that matters, but this scheme will also work |
829 | * reasonably if that is different.) |
830 | */ |
831 | StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small" ); /* for sanity */ |
832 | slotname = psprintf("%.*s_%u_sync_%u" , |
833 | NAMEDATALEN - 28, |
834 | MySubscription->slotname, |
835 | MySubscription->oid, |
836 | MyLogicalRepWorker->relid); |
837 | |
838 | /* |
839 | * Here we use the slot name instead of the subscription name as the |
840 | * application_name, so that it is different from the main apply worker, |
841 | * so that synchronous replication can distinguish them. |
842 | */ |
843 | wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); |
844 | if (wrconn == NULL) |
845 | ereport(ERROR, |
846 | (errmsg("could not connect to the publisher: %s" , err))); |
847 | |
848 | switch (MyLogicalRepWorker->relstate) |
849 | { |
850 | case SUBREL_STATE_INIT: |
851 | case SUBREL_STATE_DATASYNC: |
852 | { |
853 | Relation rel; |
854 | WalRcvExecResult *res; |
855 | |
856 | SpinLockAcquire(&MyLogicalRepWorker->relmutex); |
857 | MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; |
858 | MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; |
859 | SpinLockRelease(&MyLogicalRepWorker->relmutex); |
860 | |
861 | /* Update the state and make it visible to others. */ |
862 | StartTransactionCommand(); |
863 | UpdateSubscriptionRelState(MyLogicalRepWorker->subid, |
864 | MyLogicalRepWorker->relid, |
865 | MyLogicalRepWorker->relstate, |
866 | MyLogicalRepWorker->relstate_lsn); |
867 | CommitTransactionCommand(); |
868 | pgstat_report_stat(false); |
869 | |
870 | /* |
871 | * We want to do the table data sync in a single transaction. |
872 | */ |
873 | StartTransactionCommand(); |
874 | |
875 | /* |
876 | * Use a standard write lock here. It might be better to |
877 | * disallow access to the table while it's being synchronized. |
878 | * But we don't want to block the main apply process from |
879 | * working and it has to open the relation in RowExclusiveLock |
880 | * when remapping remote relation id to local one. |
881 | */ |
882 | rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); |
883 | |
884 | /* |
885 | * Create a temporary slot for the sync process. We do this |
886 | * inside the transaction so that we can use the snapshot made |
887 | * by the slot to get existing data. |
888 | */ |
889 | res = walrcv_exec(wrconn, |
890 | "BEGIN READ ONLY ISOLATION LEVEL " |
891 | "REPEATABLE READ" , 0, NULL); |
892 | if (res->status != WALRCV_OK_COMMAND) |
893 | ereport(ERROR, |
894 | (errmsg("table copy could not start transaction on publisher" ), |
895 | errdetail("The error was: %s" , res->err))); |
896 | walrcv_clear_result(res); |
897 | |
898 | /* |
899 | * Create new temporary logical decoding slot. |
900 | * |
901 | * We'll use slot for data copy so make sure the snapshot is |
902 | * used for the transaction; that way the COPY will get data |
903 | * that is consistent with the lsn used by the slot to start |
904 | * decoding. |
905 | */ |
906 | walrcv_create_slot(wrconn, slotname, true, |
907 | CRS_USE_SNAPSHOT, origin_startpos); |
908 | |
909 | PushActiveSnapshot(GetTransactionSnapshot()); |
910 | copy_table(rel); |
911 | PopActiveSnapshot(); |
912 | |
913 | res = walrcv_exec(wrconn, "COMMIT" , 0, NULL); |
914 | if (res->status != WALRCV_OK_COMMAND) |
915 | ereport(ERROR, |
916 | (errmsg("table copy could not finish transaction on publisher" ), |
917 | errdetail("The error was: %s" , res->err))); |
918 | walrcv_clear_result(res); |
919 | |
920 | table_close(rel, NoLock); |
921 | |
922 | /* Make the copy visible. */ |
923 | CommandCounterIncrement(); |
924 | |
925 | /* |
926 | * We are done with the initial data synchronization, update |
927 | * the state. |
928 | */ |
929 | SpinLockAcquire(&MyLogicalRepWorker->relmutex); |
930 | MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; |
931 | MyLogicalRepWorker->relstate_lsn = *origin_startpos; |
932 | SpinLockRelease(&MyLogicalRepWorker->relmutex); |
933 | |
934 | /* Wait for main apply worker to tell us to catchup. */ |
935 | wait_for_worker_state_change(SUBREL_STATE_CATCHUP); |
936 | |
937 | /*---------- |
938 | * There are now two possible states here: |
939 | * a) Sync is behind the apply. If that's the case we need to |
940 | * catch up with it by consuming the logical replication |
941 | * stream up to the relstate_lsn. For that, we exit this |
942 | * function and continue in ApplyWorkerMain(). |
943 | * b) Sync is caught up with the apply. So it can just set |
944 | * the state to SYNCDONE and finish. |
945 | *---------- |
946 | */ |
947 | if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) |
948 | { |
949 | /* |
950 | * Update the new state in catalog. No need to bother |
951 | * with the shmem state as we are exiting for good. |
952 | */ |
953 | UpdateSubscriptionRelState(MyLogicalRepWorker->subid, |
954 | MyLogicalRepWorker->relid, |
955 | SUBREL_STATE_SYNCDONE, |
956 | *origin_startpos); |
957 | finish_sync_worker(); |
958 | } |
959 | break; |
960 | } |
961 | case SUBREL_STATE_SYNCDONE: |
962 | case SUBREL_STATE_READY: |
963 | case SUBREL_STATE_UNKNOWN: |
964 | |
965 | /* |
966 | * Nothing to do here but finish. (UNKNOWN means the relation was |
967 | * removed from pg_subscription_rel before the sync worker could |
968 | * start.) |
969 | */ |
970 | finish_sync_worker(); |
971 | break; |
972 | default: |
973 | elog(ERROR, "unknown relation state \"%c\"" , |
974 | MyLogicalRepWorker->relstate); |
975 | } |
976 | |
977 | return slotname; |
978 | } |
979 | |