1 | /*------------------------------------------------------------------------- |
2 | * worker.c |
3 | * PostgreSQL logical replication worker (apply) |
4 | * |
5 | * Copyright (c) 2016-2019, PostgreSQL Global Development Group |
6 | * |
7 | * IDENTIFICATION |
8 | * src/backend/replication/logical/worker.c |
9 | * |
10 | * NOTES |
11 | * This file contains the worker which applies logical changes as they come |
12 | * from remote logical replication stream. |
13 | * |
14 | * The main worker (apply) is started by logical replication worker |
15 | * launcher for every enabled subscription in a database. It uses |
16 | * walsender protocol to communicate with publisher. |
17 | * |
18 | * This module includes server facing code and shares libpqwalreceiver |
19 | * module with walreceiver for providing the libpq specific functionality. |
20 | * |
21 | *------------------------------------------------------------------------- |
22 | */ |
23 | |
24 | #include "postgres.h" |
25 | |
26 | #include "access/table.h" |
27 | #include "access/tableam.h" |
28 | #include "access/xact.h" |
29 | #include "access/xlog_internal.h" |
30 | #include "catalog/catalog.h" |
31 | #include "catalog/namespace.h" |
32 | #include "catalog/pg_subscription.h" |
33 | #include "catalog/pg_subscription_rel.h" |
34 | #include "commands/tablecmds.h" |
35 | #include "commands/trigger.h" |
36 | #include "executor/executor.h" |
37 | #include "executor/nodeModifyTable.h" |
38 | #include "funcapi.h" |
39 | #include "libpq/pqformat.h" |
40 | #include "libpq/pqsignal.h" |
41 | #include "mb/pg_wchar.h" |
42 | #include "miscadmin.h" |
43 | #include "nodes/makefuncs.h" |
44 | #include "optimizer/optimizer.h" |
45 | #include "parser/parse_relation.h" |
46 | #include "pgstat.h" |
47 | #include "postmaster/bgworker.h" |
48 | #include "postmaster/postmaster.h" |
49 | #include "postmaster/walwriter.h" |
50 | #include "replication/decode.h" |
51 | #include "replication/logical.h" |
52 | #include "replication/logicalproto.h" |
53 | #include "replication/logicalrelation.h" |
54 | #include "replication/logicalworker.h" |
55 | #include "replication/origin.h" |
56 | #include "replication/reorderbuffer.h" |
57 | #include "replication/snapbuild.h" |
58 | #include "replication/walreceiver.h" |
59 | #include "replication/worker_internal.h" |
60 | #include "rewrite/rewriteHandler.h" |
61 | #include "storage/bufmgr.h" |
62 | #include "storage/ipc.h" |
63 | #include "storage/lmgr.h" |
64 | #include "storage/proc.h" |
65 | #include "storage/procarray.h" |
66 | #include "tcop/tcopprot.h" |
67 | #include "utils/builtins.h" |
68 | #include "utils/catcache.h" |
69 | #include "utils/datum.h" |
70 | #include "utils/fmgroids.h" |
71 | #include "utils/guc.h" |
72 | #include "utils/inval.h" |
73 | #include "utils/lsyscache.h" |
74 | #include "utils/memutils.h" |
75 | #include "utils/rel.h" |
76 | #include "utils/syscache.h" |
77 | #include "utils/timeout.h" |
78 | |
79 | #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ |
80 | |
81 | typedef struct FlushPosition |
82 | { |
83 | dlist_node node; |
84 | XLogRecPtr local_end; |
85 | XLogRecPtr remote_end; |
86 | } FlushPosition; |
87 | |
88 | static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); |
89 | |
90 | typedef struct SlotErrCallbackArg |
91 | { |
92 | LogicalRepRelMapEntry *rel; |
93 | int local_attnum; |
94 | int remote_attnum; |
95 | } SlotErrCallbackArg; |
96 | |
97 | static MemoryContext ApplyMessageContext = NULL; |
98 | MemoryContext ApplyContext = NULL; |
99 | |
100 | WalReceiverConn *wrconn = NULL; |
101 | |
102 | Subscription *MySubscription = NULL; |
103 | bool MySubscriptionValid = false; |
104 | |
105 | bool in_remote_transaction = false; |
106 | static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; |
107 | |
108 | static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); |
109 | |
110 | static void store_flush_position(XLogRecPtr remote_lsn); |
111 | |
112 | static void maybe_reread_subscription(void); |
113 | |
114 | /* Flags set by signal handlers */ |
115 | static volatile sig_atomic_t got_SIGHUP = false; |
116 | |
117 | /* |
118 | * Should this worker apply changes for given relation. |
119 | * |
120 | * This is mainly needed for initial relation data sync as that runs in |
121 | * separate worker process running in parallel and we need some way to skip |
122 | * changes coming to the main apply worker during the sync of a table. |
123 | * |
124 | * Note we need to do smaller or equals comparison for SYNCDONE state because |
125 | * it might hold position of end of initial slot consistent point WAL |
126 | * record + 1 (ie start of next record) and next record can be COMMIT of |
127 | * transaction we are now processing (which is what we set remote_final_lsn |
128 | * to in apply_handle_begin). |
129 | */ |
130 | static bool |
131 | should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) |
132 | { |
133 | if (am_tablesync_worker()) |
134 | return MyLogicalRepWorker->relid == rel->localreloid; |
135 | else |
136 | return (rel->state == SUBREL_STATE_READY || |
137 | (rel->state == SUBREL_STATE_SYNCDONE && |
138 | rel->statelsn <= remote_final_lsn)); |
139 | } |
140 | |
141 | /* |
142 | * Make sure that we started local transaction. |
143 | * |
144 | * Also switches to ApplyMessageContext as necessary. |
145 | */ |
146 | static bool |
147 | ensure_transaction(void) |
148 | { |
149 | if (IsTransactionState()) |
150 | { |
151 | SetCurrentStatementStartTimestamp(); |
152 | |
153 | if (CurrentMemoryContext != ApplyMessageContext) |
154 | MemoryContextSwitchTo(ApplyMessageContext); |
155 | |
156 | return false; |
157 | } |
158 | |
159 | SetCurrentStatementStartTimestamp(); |
160 | StartTransactionCommand(); |
161 | |
162 | maybe_reread_subscription(); |
163 | |
164 | MemoryContextSwitchTo(ApplyMessageContext); |
165 | return true; |
166 | } |
167 | |
168 | |
169 | /* |
170 | * Executor state preparation for evaluation of constraint expressions, |
171 | * indexes and triggers. |
172 | * |
173 | * This is based on similar code in copy.c |
174 | */ |
175 | static EState * |
176 | create_estate_for_relation(LogicalRepRelMapEntry *rel) |
177 | { |
178 | EState *estate; |
179 | ResultRelInfo *resultRelInfo; |
180 | RangeTblEntry *rte; |
181 | |
182 | estate = CreateExecutorState(); |
183 | |
184 | rte = makeNode(RangeTblEntry); |
185 | rte->rtekind = RTE_RELATION; |
186 | rte->relid = RelationGetRelid(rel->localrel); |
187 | rte->relkind = rel->localrel->rd_rel->relkind; |
188 | rte->rellockmode = AccessShareLock; |
189 | ExecInitRangeTable(estate, list_make1(rte)); |
190 | |
191 | resultRelInfo = makeNode(ResultRelInfo); |
192 | InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); |
193 | |
194 | estate->es_result_relations = resultRelInfo; |
195 | estate->es_num_result_relations = 1; |
196 | estate->es_result_relation_info = resultRelInfo; |
197 | |
198 | estate->es_output_cid = GetCurrentCommandId(true); |
199 | |
200 | /* Prepare to catch AFTER triggers. */ |
201 | AfterTriggerBeginQuery(); |
202 | |
203 | return estate; |
204 | } |
205 | |
206 | /* |
207 | * Executes default values for columns for which we can't map to remote |
208 | * relation columns. |
209 | * |
210 | * This allows us to support tables which have more columns on the downstream |
211 | * than on the upstream. |
212 | */ |
213 | static void |
214 | slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, |
215 | TupleTableSlot *slot) |
216 | { |
217 | TupleDesc desc = RelationGetDescr(rel->localrel); |
218 | int num_phys_attrs = desc->natts; |
219 | int i; |
220 | int attnum, |
221 | num_defaults = 0; |
222 | int *defmap; |
223 | ExprState **defexprs; |
224 | ExprContext *econtext; |
225 | |
226 | econtext = GetPerTupleExprContext(estate); |
227 | |
228 | /* We got all the data via replication, no need to evaluate anything. */ |
229 | if (num_phys_attrs == rel->remoterel.natts) |
230 | return; |
231 | |
232 | defmap = (int *) palloc(num_phys_attrs * sizeof(int)); |
233 | defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); |
234 | |
235 | for (attnum = 0; attnum < num_phys_attrs; attnum++) |
236 | { |
237 | Expr *defexpr; |
238 | |
239 | if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated) |
240 | continue; |
241 | |
242 | if (rel->attrmap[attnum] >= 0) |
243 | continue; |
244 | |
245 | defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1); |
246 | |
247 | if (defexpr != NULL) |
248 | { |
249 | /* Run the expression through planner */ |
250 | defexpr = expression_planner(defexpr); |
251 | |
252 | /* Initialize executable expression in copycontext */ |
253 | defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); |
254 | defmap[num_defaults] = attnum; |
255 | num_defaults++; |
256 | } |
257 | |
258 | } |
259 | |
260 | for (i = 0; i < num_defaults; i++) |
261 | slot->tts_values[defmap[i]] = |
262 | ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]); |
263 | } |
264 | |
265 | /* |
266 | * Error callback to give more context info about type conversion failure. |
267 | */ |
268 | static void |
269 | slot_store_error_callback(void *arg) |
270 | { |
271 | SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; |
272 | LogicalRepRelMapEntry *rel; |
273 | char *remotetypname; |
274 | Oid remotetypoid, |
275 | localtypoid; |
276 | |
277 | /* Nothing to do if remote attribute number is not set */ |
278 | if (errarg->remote_attnum < 0) |
279 | return; |
280 | |
281 | rel = errarg->rel; |
282 | remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum]; |
283 | |
284 | /* Fetch remote type name from the LogicalRepTypMap cache */ |
285 | remotetypname = logicalrep_typmap_gettypname(remotetypoid); |
286 | |
287 | /* Fetch local type OID from the local sys cache */ |
288 | localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1); |
289 | |
290 | errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", " |
291 | "remote type %s, local type %s" , |
292 | rel->remoterel.nspname, rel->remoterel.relname, |
293 | rel->remoterel.attnames[errarg->remote_attnum], |
294 | remotetypname, |
295 | format_type_be(localtypoid)); |
296 | } |
297 | |
298 | /* |
299 | * Store data in C string form into slot. |
300 | * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our |
301 | * use better. |
302 | */ |
303 | static void |
304 | slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, |
305 | char **values) |
306 | { |
307 | int natts = slot->tts_tupleDescriptor->natts; |
308 | int i; |
309 | SlotErrCallbackArg errarg; |
310 | ErrorContextCallback errcallback; |
311 | |
312 | ExecClearTuple(slot); |
313 | |
314 | /* Push callback + info on the error context stack */ |
315 | errarg.rel = rel; |
316 | errarg.local_attnum = -1; |
317 | errarg.remote_attnum = -1; |
318 | errcallback.callback = slot_store_error_callback; |
319 | errcallback.arg = (void *) &errarg; |
320 | errcallback.previous = error_context_stack; |
321 | error_context_stack = &errcallback; |
322 | |
323 | /* Call the "in" function for each non-dropped attribute */ |
324 | for (i = 0; i < natts; i++) |
325 | { |
326 | Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); |
327 | int remoteattnum = rel->attrmap[i]; |
328 | |
329 | if (!att->attisdropped && remoteattnum >= 0 && |
330 | values[remoteattnum] != NULL) |
331 | { |
332 | Oid typinput; |
333 | Oid typioparam; |
334 | |
335 | errarg.local_attnum = i; |
336 | errarg.remote_attnum = remoteattnum; |
337 | |
338 | getTypeInputInfo(att->atttypid, &typinput, &typioparam); |
339 | slot->tts_values[i] = |
340 | OidInputFunctionCall(typinput, values[remoteattnum], |
341 | typioparam, att->atttypmod); |
342 | slot->tts_isnull[i] = false; |
343 | |
344 | errarg.local_attnum = -1; |
345 | errarg.remote_attnum = -1; |
346 | } |
347 | else |
348 | { |
349 | /* |
350 | * We assign NULL to dropped attributes, NULL values, and missing |
351 | * values (missing values should be later filled using |
352 | * slot_fill_defaults). |
353 | */ |
354 | slot->tts_values[i] = (Datum) 0; |
355 | slot->tts_isnull[i] = true; |
356 | } |
357 | } |
358 | |
359 | /* Pop the error context stack */ |
360 | error_context_stack = errcallback.previous; |
361 | |
362 | ExecStoreVirtualTuple(slot); |
363 | } |
364 | |
365 | /* |
366 | * Modify slot with user data provided as C strings. |
367 | * This is somewhat similar to heap_modify_tuple but also calls the type |
368 | * input function on the user data as the input is the text representation |
369 | * of the types. |
370 | */ |
371 | static void |
372 | slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, |
373 | char **values, bool *replaces) |
374 | { |
375 | int natts = slot->tts_tupleDescriptor->natts; |
376 | int i; |
377 | SlotErrCallbackArg errarg; |
378 | ErrorContextCallback errcallback; |
379 | |
380 | slot_getallattrs(slot); |
381 | ExecClearTuple(slot); |
382 | |
383 | /* Push callback + info on the error context stack */ |
384 | errarg.rel = rel; |
385 | errarg.local_attnum = -1; |
386 | errarg.remote_attnum = -1; |
387 | errcallback.callback = slot_store_error_callback; |
388 | errcallback.arg = (void *) &errarg; |
389 | errcallback.previous = error_context_stack; |
390 | error_context_stack = &errcallback; |
391 | |
392 | /* Call the "in" function for each replaced attribute */ |
393 | for (i = 0; i < natts; i++) |
394 | { |
395 | Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); |
396 | int remoteattnum = rel->attrmap[i]; |
397 | |
398 | if (remoteattnum < 0) |
399 | continue; |
400 | |
401 | if (!replaces[remoteattnum]) |
402 | continue; |
403 | |
404 | if (values[remoteattnum] != NULL) |
405 | { |
406 | Oid typinput; |
407 | Oid typioparam; |
408 | |
409 | errarg.local_attnum = i; |
410 | errarg.remote_attnum = remoteattnum; |
411 | |
412 | getTypeInputInfo(att->atttypid, &typinput, &typioparam); |
413 | slot->tts_values[i] = |
414 | OidInputFunctionCall(typinput, values[remoteattnum], |
415 | typioparam, att->atttypmod); |
416 | slot->tts_isnull[i] = false; |
417 | |
418 | errarg.local_attnum = -1; |
419 | errarg.remote_attnum = -1; |
420 | } |
421 | else |
422 | { |
423 | slot->tts_values[i] = (Datum) 0; |
424 | slot->tts_isnull[i] = true; |
425 | } |
426 | } |
427 | |
428 | /* Pop the error context stack */ |
429 | error_context_stack = errcallback.previous; |
430 | |
431 | ExecStoreVirtualTuple(slot); |
432 | } |
433 | |
434 | /* |
435 | * Handle BEGIN message. |
436 | */ |
437 | static void |
438 | apply_handle_begin(StringInfo s) |
439 | { |
440 | LogicalRepBeginData begin_data; |
441 | |
442 | logicalrep_read_begin(s, &begin_data); |
443 | |
444 | remote_final_lsn = begin_data.final_lsn; |
445 | |
446 | in_remote_transaction = true; |
447 | |
448 | pgstat_report_activity(STATE_RUNNING, NULL); |
449 | } |
450 | |
451 | /* |
452 | * Handle COMMIT message. |
453 | * |
454 | * TODO, support tracking of multiple origins |
455 | */ |
456 | static void |
457 | apply_handle_commit(StringInfo s) |
458 | { |
459 | LogicalRepCommitData commit_data; |
460 | |
461 | logicalrep_read_commit(s, &commit_data); |
462 | |
463 | Assert(commit_data.commit_lsn == remote_final_lsn); |
464 | |
465 | /* The synchronization worker runs in single transaction. */ |
466 | if (IsTransactionState() && !am_tablesync_worker()) |
467 | { |
468 | /* |
469 | * Update origin state so we can restart streaming from correct |
470 | * position in case of crash. |
471 | */ |
472 | replorigin_session_origin_lsn = commit_data.end_lsn; |
473 | replorigin_session_origin_timestamp = commit_data.committime; |
474 | |
475 | CommitTransactionCommand(); |
476 | pgstat_report_stat(false); |
477 | |
478 | store_flush_position(commit_data.end_lsn); |
479 | } |
480 | else |
481 | { |
482 | /* Process any invalidation messages that might have accumulated. */ |
483 | AcceptInvalidationMessages(); |
484 | maybe_reread_subscription(); |
485 | } |
486 | |
487 | in_remote_transaction = false; |
488 | |
489 | /* Process any tables that are being synchronized in parallel. */ |
490 | process_syncing_tables(commit_data.end_lsn); |
491 | |
492 | pgstat_report_activity(STATE_IDLE, NULL); |
493 | } |
494 | |
495 | /* |
496 | * Handle ORIGIN message. |
497 | * |
498 | * TODO, support tracking of multiple origins |
499 | */ |
500 | static void |
501 | apply_handle_origin(StringInfo s) |
502 | { |
503 | /* |
504 | * ORIGIN message can only come inside remote transaction and before any |
505 | * actual writes. |
506 | */ |
507 | if (!in_remote_transaction || |
508 | (IsTransactionState() && !am_tablesync_worker())) |
509 | ereport(ERROR, |
510 | (errcode(ERRCODE_PROTOCOL_VIOLATION), |
511 | errmsg("ORIGIN message sent out of order" ))); |
512 | } |
513 | |
514 | /* |
515 | * Handle RELATION message. |
516 | * |
517 | * Note we don't do validation against local schema here. The validation |
518 | * against local schema is postponed until first change for given relation |
519 | * comes as we only care about it when applying changes for it anyway and we |
520 | * do less locking this way. |
521 | */ |
522 | static void |
523 | apply_handle_relation(StringInfo s) |
524 | { |
525 | LogicalRepRelation *rel; |
526 | |
527 | rel = logicalrep_read_rel(s); |
528 | logicalrep_relmap_update(rel); |
529 | } |
530 | |
531 | /* |
532 | * Handle TYPE message. |
533 | * |
534 | * Note we don't do local mapping here, that's done when the type is |
535 | * actually used. |
536 | */ |
537 | static void |
538 | apply_handle_type(StringInfo s) |
539 | { |
540 | LogicalRepTyp typ; |
541 | |
542 | logicalrep_read_typ(s, &typ); |
543 | logicalrep_typmap_update(&typ); |
544 | } |
545 | |
546 | /* |
547 | * Get replica identity index or if it is not defined a primary key. |
548 | * |
549 | * If neither is defined, returns InvalidOid |
550 | */ |
551 | static Oid |
552 | GetRelationIdentityOrPK(Relation rel) |
553 | { |
554 | Oid idxoid; |
555 | |
556 | idxoid = RelationGetReplicaIndex(rel); |
557 | |
558 | if (!OidIsValid(idxoid)) |
559 | idxoid = RelationGetPrimaryKeyIndex(rel); |
560 | |
561 | return idxoid; |
562 | } |
563 | |
564 | /* |
565 | * Handle INSERT message. |
566 | */ |
567 | static void |
568 | apply_handle_insert(StringInfo s) |
569 | { |
570 | LogicalRepRelMapEntry *rel; |
571 | LogicalRepTupleData newtup; |
572 | LogicalRepRelId relid; |
573 | EState *estate; |
574 | TupleTableSlot *remoteslot; |
575 | MemoryContext oldctx; |
576 | |
577 | ensure_transaction(); |
578 | |
579 | relid = logicalrep_read_insert(s, &newtup); |
580 | rel = logicalrep_rel_open(relid, RowExclusiveLock); |
581 | if (!should_apply_changes_for_rel(rel)) |
582 | { |
583 | /* |
584 | * The relation can't become interesting in the middle of the |
585 | * transaction so it's safe to unlock it. |
586 | */ |
587 | logicalrep_rel_close(rel, RowExclusiveLock); |
588 | return; |
589 | } |
590 | |
591 | /* Initialize the executor state. */ |
592 | estate = create_estate_for_relation(rel); |
593 | remoteslot = ExecInitExtraTupleSlot(estate, |
594 | RelationGetDescr(rel->localrel), |
595 | &TTSOpsVirtual); |
596 | |
597 | /* Input functions may need an active snapshot, so get one */ |
598 | PushActiveSnapshot(GetTransactionSnapshot()); |
599 | |
600 | /* Process and store remote tuple in the slot */ |
601 | oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
602 | slot_store_cstrings(remoteslot, rel, newtup.values); |
603 | slot_fill_defaults(rel, estate, remoteslot); |
604 | MemoryContextSwitchTo(oldctx); |
605 | |
606 | ExecOpenIndices(estate->es_result_relation_info, false); |
607 | |
608 | /* Do the insert. */ |
609 | ExecSimpleRelationInsert(estate, remoteslot); |
610 | |
611 | /* Cleanup. */ |
612 | ExecCloseIndices(estate->es_result_relation_info); |
613 | PopActiveSnapshot(); |
614 | |
615 | /* Handle queued AFTER triggers. */ |
616 | AfterTriggerEndQuery(estate); |
617 | |
618 | ExecResetTupleTable(estate->es_tupleTable, false); |
619 | FreeExecutorState(estate); |
620 | |
621 | logicalrep_rel_close(rel, NoLock); |
622 | |
623 | CommandCounterIncrement(); |
624 | } |
625 | |
626 | /* |
627 | * Check if the logical replication relation is updatable and throw |
628 | * appropriate error if it isn't. |
629 | */ |
630 | static void |
631 | check_relation_updatable(LogicalRepRelMapEntry *rel) |
632 | { |
633 | /* Updatable, no error. */ |
634 | if (rel->updatable) |
635 | return; |
636 | |
637 | /* |
638 | * We are in error mode so it's fine this is somewhat slow. It's better to |
639 | * give user correct error. |
640 | */ |
641 | if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) |
642 | { |
643 | ereport(ERROR, |
644 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
645 | errmsg("publisher did not send replica identity column " |
646 | "expected by the logical replication target relation \"%s.%s\"" , |
647 | rel->remoterel.nspname, rel->remoterel.relname))); |
648 | } |
649 | |
650 | ereport(ERROR, |
651 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
652 | errmsg("logical replication target relation \"%s.%s\" has " |
653 | "neither REPLICA IDENTITY index nor PRIMARY " |
654 | "KEY and published relation does not have " |
655 | "REPLICA IDENTITY FULL" , |
656 | rel->remoterel.nspname, rel->remoterel.relname))); |
657 | } |
658 | |
659 | /* |
660 | * Handle UPDATE message. |
661 | * |
662 | * TODO: FDW support |
663 | */ |
664 | static void |
665 | apply_handle_update(StringInfo s) |
666 | { |
667 | LogicalRepRelMapEntry *rel; |
668 | LogicalRepRelId relid; |
669 | Oid idxoid; |
670 | EState *estate; |
671 | EPQState epqstate; |
672 | LogicalRepTupleData oldtup; |
673 | LogicalRepTupleData newtup; |
674 | bool has_oldtup; |
675 | TupleTableSlot *localslot; |
676 | TupleTableSlot *remoteslot; |
677 | bool found; |
678 | MemoryContext oldctx; |
679 | |
680 | ensure_transaction(); |
681 | |
682 | relid = logicalrep_read_update(s, &has_oldtup, &oldtup, |
683 | &newtup); |
684 | rel = logicalrep_rel_open(relid, RowExclusiveLock); |
685 | if (!should_apply_changes_for_rel(rel)) |
686 | { |
687 | /* |
688 | * The relation can't become interesting in the middle of the |
689 | * transaction so it's safe to unlock it. |
690 | */ |
691 | logicalrep_rel_close(rel, RowExclusiveLock); |
692 | return; |
693 | } |
694 | |
695 | /* Check if we can do the update. */ |
696 | check_relation_updatable(rel); |
697 | |
698 | /* Initialize the executor state. */ |
699 | estate = create_estate_for_relation(rel); |
700 | remoteslot = ExecInitExtraTupleSlot(estate, |
701 | RelationGetDescr(rel->localrel), |
702 | &TTSOpsVirtual); |
703 | localslot = table_slot_create(rel->localrel, |
704 | &estate->es_tupleTable); |
705 | EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); |
706 | |
707 | PushActiveSnapshot(GetTransactionSnapshot()); |
708 | ExecOpenIndices(estate->es_result_relation_info, false); |
709 | |
710 | /* Build the search tuple. */ |
711 | oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
712 | slot_store_cstrings(remoteslot, rel, |
713 | has_oldtup ? oldtup.values : newtup.values); |
714 | MemoryContextSwitchTo(oldctx); |
715 | |
716 | /* |
717 | * Try to find tuple using either replica identity index, primary key or |
718 | * if needed, sequential scan. |
719 | */ |
720 | idxoid = GetRelationIdentityOrPK(rel->localrel); |
721 | Assert(OidIsValid(idxoid) || |
722 | (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup)); |
723 | |
724 | if (OidIsValid(idxoid)) |
725 | found = RelationFindReplTupleByIndex(rel->localrel, idxoid, |
726 | LockTupleExclusive, |
727 | remoteslot, localslot); |
728 | else |
729 | found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, |
730 | remoteslot, localslot); |
731 | |
732 | ExecClearTuple(remoteslot); |
733 | |
734 | /* |
735 | * Tuple found. |
736 | * |
737 | * Note this will fail if there are other conflicting unique indexes. |
738 | */ |
739 | if (found) |
740 | { |
741 | /* Process and store remote tuple in the slot */ |
742 | oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
743 | ExecCopySlot(remoteslot, localslot); |
744 | slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed); |
745 | MemoryContextSwitchTo(oldctx); |
746 | |
747 | EvalPlanQualSetSlot(&epqstate, remoteslot); |
748 | |
749 | /* Do the actual update. */ |
750 | ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot); |
751 | } |
752 | else |
753 | { |
754 | /* |
755 | * The tuple to be updated could not be found. |
756 | * |
757 | * TODO what to do here, change the log level to LOG perhaps? |
758 | */ |
759 | elog(DEBUG1, |
760 | "logical replication did not find row for update " |
761 | "in replication target relation \"%s\"" , |
762 | RelationGetRelationName(rel->localrel)); |
763 | } |
764 | |
765 | /* Cleanup. */ |
766 | ExecCloseIndices(estate->es_result_relation_info); |
767 | PopActiveSnapshot(); |
768 | |
769 | /* Handle queued AFTER triggers. */ |
770 | AfterTriggerEndQuery(estate); |
771 | |
772 | EvalPlanQualEnd(&epqstate); |
773 | ExecResetTupleTable(estate->es_tupleTable, false); |
774 | FreeExecutorState(estate); |
775 | |
776 | logicalrep_rel_close(rel, NoLock); |
777 | |
778 | CommandCounterIncrement(); |
779 | } |
780 | |
781 | /* |
782 | * Handle DELETE message. |
783 | * |
784 | * TODO: FDW support |
785 | */ |
786 | static void |
787 | apply_handle_delete(StringInfo s) |
788 | { |
789 | LogicalRepRelMapEntry *rel; |
790 | LogicalRepTupleData oldtup; |
791 | LogicalRepRelId relid; |
792 | Oid idxoid; |
793 | EState *estate; |
794 | EPQState epqstate; |
795 | TupleTableSlot *remoteslot; |
796 | TupleTableSlot *localslot; |
797 | bool found; |
798 | MemoryContext oldctx; |
799 | |
800 | ensure_transaction(); |
801 | |
802 | relid = logicalrep_read_delete(s, &oldtup); |
803 | rel = logicalrep_rel_open(relid, RowExclusiveLock); |
804 | if (!should_apply_changes_for_rel(rel)) |
805 | { |
806 | /* |
807 | * The relation can't become interesting in the middle of the |
808 | * transaction so it's safe to unlock it. |
809 | */ |
810 | logicalrep_rel_close(rel, RowExclusiveLock); |
811 | return; |
812 | } |
813 | |
814 | /* Check if we can do the delete. */ |
815 | check_relation_updatable(rel); |
816 | |
817 | /* Initialize the executor state. */ |
818 | estate = create_estate_for_relation(rel); |
819 | remoteslot = ExecInitExtraTupleSlot(estate, |
820 | RelationGetDescr(rel->localrel), |
821 | &TTSOpsVirtual); |
822 | localslot = table_slot_create(rel->localrel, |
823 | &estate->es_tupleTable); |
824 | EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); |
825 | |
826 | PushActiveSnapshot(GetTransactionSnapshot()); |
827 | ExecOpenIndices(estate->es_result_relation_info, false); |
828 | |
829 | /* Find the tuple using the replica identity index. */ |
830 | oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
831 | slot_store_cstrings(remoteslot, rel, oldtup.values); |
832 | MemoryContextSwitchTo(oldctx); |
833 | |
834 | /* |
835 | * Try to find tuple using either replica identity index, primary key or |
836 | * if needed, sequential scan. |
837 | */ |
838 | idxoid = GetRelationIdentityOrPK(rel->localrel); |
839 | Assert(OidIsValid(idxoid) || |
840 | (rel->remoterel.replident == REPLICA_IDENTITY_FULL)); |
841 | |
842 | if (OidIsValid(idxoid)) |
843 | found = RelationFindReplTupleByIndex(rel->localrel, idxoid, |
844 | LockTupleExclusive, |
845 | remoteslot, localslot); |
846 | else |
847 | found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, |
848 | remoteslot, localslot); |
849 | /* If found delete it. */ |
850 | if (found) |
851 | { |
852 | EvalPlanQualSetSlot(&epqstate, localslot); |
853 | |
854 | /* Do the actual delete. */ |
855 | ExecSimpleRelationDelete(estate, &epqstate, localslot); |
856 | } |
857 | else |
858 | { |
859 | /* The tuple to be deleted could not be found. */ |
860 | elog(DEBUG1, |
861 | "logical replication could not find row for delete " |
862 | "in replication target relation \"%s\"" , |
863 | RelationGetRelationName(rel->localrel)); |
864 | } |
865 | |
866 | /* Cleanup. */ |
867 | ExecCloseIndices(estate->es_result_relation_info); |
868 | PopActiveSnapshot(); |
869 | |
870 | /* Handle queued AFTER triggers. */ |
871 | AfterTriggerEndQuery(estate); |
872 | |
873 | EvalPlanQualEnd(&epqstate); |
874 | ExecResetTupleTable(estate->es_tupleTable, false); |
875 | FreeExecutorState(estate); |
876 | |
877 | logicalrep_rel_close(rel, NoLock); |
878 | |
879 | CommandCounterIncrement(); |
880 | } |
881 | |
882 | /* |
883 | * Handle TRUNCATE message. |
884 | * |
885 | * TODO: FDW support |
886 | */ |
887 | static void |
888 | apply_handle_truncate(StringInfo s) |
889 | { |
890 | bool cascade = false; |
891 | bool restart_seqs = false; |
892 | List *remote_relids = NIL; |
893 | List *remote_rels = NIL; |
894 | List *rels = NIL; |
895 | List *relids = NIL; |
896 | List *relids_logged = NIL; |
897 | ListCell *lc; |
898 | |
899 | ensure_transaction(); |
900 | |
901 | remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); |
902 | |
903 | foreach(lc, remote_relids) |
904 | { |
905 | LogicalRepRelId relid = lfirst_oid(lc); |
906 | LogicalRepRelMapEntry *rel; |
907 | |
908 | rel = logicalrep_rel_open(relid, RowExclusiveLock); |
909 | if (!should_apply_changes_for_rel(rel)) |
910 | { |
911 | /* |
912 | * The relation can't become interesting in the middle of the |
913 | * transaction so it's safe to unlock it. |
914 | */ |
915 | logicalrep_rel_close(rel, RowExclusiveLock); |
916 | continue; |
917 | } |
918 | |
919 | remote_rels = lappend(remote_rels, rel); |
920 | rels = lappend(rels, rel->localrel); |
921 | relids = lappend_oid(relids, rel->localreloid); |
922 | if (RelationIsLogicallyLogged(rel->localrel)) |
923 | relids_logged = lappend_oid(relids_logged, rel->localreloid); |
924 | } |
925 | |
926 | /* |
927 | * Even if we used CASCADE on the upstream master we explicitly default to |
928 | * replaying changes without further cascading. This might be later |
929 | * changeable with a user specified option. |
930 | */ |
931 | ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); |
932 | |
933 | foreach(lc, remote_rels) |
934 | { |
935 | LogicalRepRelMapEntry *rel = lfirst(lc); |
936 | |
937 | logicalrep_rel_close(rel, NoLock); |
938 | } |
939 | |
940 | CommandCounterIncrement(); |
941 | } |
942 | |
943 | |
944 | /* |
945 | * Logical replication protocol message dispatcher. |
946 | */ |
947 | static void |
948 | apply_dispatch(StringInfo s) |
949 | { |
950 | char action = pq_getmsgbyte(s); |
951 | |
952 | switch (action) |
953 | { |
954 | /* BEGIN */ |
955 | case 'B': |
956 | apply_handle_begin(s); |
957 | break; |
958 | /* COMMIT */ |
959 | case 'C': |
960 | apply_handle_commit(s); |
961 | break; |
962 | /* INSERT */ |
963 | case 'I': |
964 | apply_handle_insert(s); |
965 | break; |
966 | /* UPDATE */ |
967 | case 'U': |
968 | apply_handle_update(s); |
969 | break; |
970 | /* DELETE */ |
971 | case 'D': |
972 | apply_handle_delete(s); |
973 | break; |
974 | /* TRUNCATE */ |
975 | case 'T': |
976 | apply_handle_truncate(s); |
977 | break; |
978 | /* RELATION */ |
979 | case 'R': |
980 | apply_handle_relation(s); |
981 | break; |
982 | /* TYPE */ |
983 | case 'Y': |
984 | apply_handle_type(s); |
985 | break; |
986 | /* ORIGIN */ |
987 | case 'O': |
988 | apply_handle_origin(s); |
989 | break; |
990 | default: |
991 | ereport(ERROR, |
992 | (errcode(ERRCODE_PROTOCOL_VIOLATION), |
993 | errmsg("invalid logical replication message type \"%c\"" , action))); |
994 | } |
995 | } |
996 | |
997 | /* |
998 | * Figure out which write/flush positions to report to the walsender process. |
999 | * |
1000 | * We can't simply report back the last LSN the walsender sent us because the |
1001 | * local transaction might not yet be flushed to disk locally. Instead we |
1002 | * build a list that associates local with remote LSNs for every commit. When |
1003 | * reporting back the flush position to the sender we iterate that list and |
1004 | * check which entries on it are already locally flushed. Those we can report |
1005 | * as having been flushed. |
1006 | * |
1007 | * The have_pending_txes is true if there are outstanding transactions that |
1008 | * need to be flushed. |
1009 | */ |
1010 | static void |
1011 | get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, |
1012 | bool *have_pending_txes) |
1013 | { |
1014 | dlist_mutable_iter iter; |
1015 | XLogRecPtr local_flush = GetFlushRecPtr(); |
1016 | |
1017 | *write = InvalidXLogRecPtr; |
1018 | *flush = InvalidXLogRecPtr; |
1019 | |
1020 | dlist_foreach_modify(iter, &lsn_mapping) |
1021 | { |
1022 | FlushPosition *pos = |
1023 | dlist_container(FlushPosition, node, iter.cur); |
1024 | |
1025 | *write = pos->remote_end; |
1026 | |
1027 | if (pos->local_end <= local_flush) |
1028 | { |
1029 | *flush = pos->remote_end; |
1030 | dlist_delete(iter.cur); |
1031 | pfree(pos); |
1032 | } |
1033 | else |
1034 | { |
1035 | /* |
1036 | * Don't want to uselessly iterate over the rest of the list which |
1037 | * could potentially be long. Instead get the last element and |
1038 | * grab the write position from there. |
1039 | */ |
1040 | pos = dlist_tail_element(FlushPosition, node, |
1041 | &lsn_mapping); |
1042 | *write = pos->remote_end; |
1043 | *have_pending_txes = true; |
1044 | return; |
1045 | } |
1046 | } |
1047 | |
1048 | *have_pending_txes = !dlist_is_empty(&lsn_mapping); |
1049 | } |
1050 | |
1051 | /* |
1052 | * Store current remote/local lsn pair in the tracking list. |
1053 | */ |
1054 | static void |
1055 | store_flush_position(XLogRecPtr remote_lsn) |
1056 | { |
1057 | FlushPosition *flushpos; |
1058 | |
1059 | /* Need to do this in permanent context */ |
1060 | MemoryContextSwitchTo(ApplyContext); |
1061 | |
1062 | /* Track commit lsn */ |
1063 | flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); |
1064 | flushpos->local_end = XactLastCommitEnd; |
1065 | flushpos->remote_end = remote_lsn; |
1066 | |
1067 | dlist_push_tail(&lsn_mapping, &flushpos->node); |
1068 | MemoryContextSwitchTo(ApplyMessageContext); |
1069 | } |
1070 | |
1071 | |
1072 | /* Update statistics of the worker. */ |
1073 | static void |
1074 | UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) |
1075 | { |
1076 | MyLogicalRepWorker->last_lsn = last_lsn; |
1077 | MyLogicalRepWorker->last_send_time = send_time; |
1078 | MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp(); |
1079 | if (reply) |
1080 | { |
1081 | MyLogicalRepWorker->reply_lsn = last_lsn; |
1082 | MyLogicalRepWorker->reply_time = send_time; |
1083 | } |
1084 | } |
1085 | |
1086 | /* |
1087 | * Apply main loop. |
1088 | */ |
1089 | static void |
1090 | LogicalRepApplyLoop(XLogRecPtr last_received) |
1091 | { |
1092 | /* |
1093 | * Init the ApplyMessageContext which we clean up after each replication |
1094 | * protocol message. |
1095 | */ |
1096 | ApplyMessageContext = AllocSetContextCreate(ApplyContext, |
1097 | "ApplyMessageContext" , |
1098 | ALLOCSET_DEFAULT_SIZES); |
1099 | |
1100 | /* mark as idle, before starting to loop */ |
1101 | pgstat_report_activity(STATE_IDLE, NULL); |
1102 | |
1103 | for (;;) |
1104 | { |
1105 | pgsocket fd = PGINVALID_SOCKET; |
1106 | int rc; |
1107 | int len; |
1108 | char *buf = NULL; |
1109 | bool endofstream = false; |
1110 | TimestampTz last_recv_timestamp = GetCurrentTimestamp(); |
1111 | bool ping_sent = false; |
1112 | long wait_time; |
1113 | |
1114 | CHECK_FOR_INTERRUPTS(); |
1115 | |
1116 | MemoryContextSwitchTo(ApplyMessageContext); |
1117 | |
1118 | len = walrcv_receive(wrconn, &buf, &fd); |
1119 | |
1120 | if (len != 0) |
1121 | { |
1122 | /* Process the data */ |
1123 | for (;;) |
1124 | { |
1125 | CHECK_FOR_INTERRUPTS(); |
1126 | |
1127 | if (len == 0) |
1128 | { |
1129 | break; |
1130 | } |
1131 | else if (len < 0) |
1132 | { |
1133 | ereport(LOG, |
1134 | (errmsg("data stream from publisher has ended" ))); |
1135 | endofstream = true; |
1136 | break; |
1137 | } |
1138 | else |
1139 | { |
1140 | int c; |
1141 | StringInfoData s; |
1142 | |
1143 | /* Reset timeout. */ |
1144 | last_recv_timestamp = GetCurrentTimestamp(); |
1145 | ping_sent = false; |
1146 | |
1147 | /* Ensure we are reading the data into our memory context. */ |
1148 | MemoryContextSwitchTo(ApplyMessageContext); |
1149 | |
1150 | s.data = buf; |
1151 | s.len = len; |
1152 | s.cursor = 0; |
1153 | s.maxlen = -1; |
1154 | |
1155 | c = pq_getmsgbyte(&s); |
1156 | |
1157 | if (c == 'w') |
1158 | { |
1159 | XLogRecPtr start_lsn; |
1160 | XLogRecPtr end_lsn; |
1161 | TimestampTz send_time; |
1162 | |
1163 | start_lsn = pq_getmsgint64(&s); |
1164 | end_lsn = pq_getmsgint64(&s); |
1165 | send_time = pq_getmsgint64(&s); |
1166 | |
1167 | if (last_received < start_lsn) |
1168 | last_received = start_lsn; |
1169 | |
1170 | if (last_received < end_lsn) |
1171 | last_received = end_lsn; |
1172 | |
1173 | UpdateWorkerStats(last_received, send_time, false); |
1174 | |
1175 | apply_dispatch(&s); |
1176 | } |
1177 | else if (c == 'k') |
1178 | { |
1179 | XLogRecPtr end_lsn; |
1180 | TimestampTz timestamp; |
1181 | bool reply_requested; |
1182 | |
1183 | end_lsn = pq_getmsgint64(&s); |
1184 | timestamp = pq_getmsgint64(&s); |
1185 | reply_requested = pq_getmsgbyte(&s); |
1186 | |
1187 | if (last_received < end_lsn) |
1188 | last_received = end_lsn; |
1189 | |
1190 | send_feedback(last_received, reply_requested, false); |
1191 | UpdateWorkerStats(last_received, timestamp, true); |
1192 | } |
1193 | /* other message types are purposefully ignored */ |
1194 | |
1195 | MemoryContextReset(ApplyMessageContext); |
1196 | } |
1197 | |
1198 | len = walrcv_receive(wrconn, &buf, &fd); |
1199 | } |
1200 | } |
1201 | |
1202 | /* confirm all writes so far */ |
1203 | send_feedback(last_received, false, false); |
1204 | |
1205 | if (!in_remote_transaction) |
1206 | { |
1207 | /* |
1208 | * If we didn't get any transactions for a while there might be |
1209 | * unconsumed invalidation messages in the queue, consume them |
1210 | * now. |
1211 | */ |
1212 | AcceptInvalidationMessages(); |
1213 | maybe_reread_subscription(); |
1214 | |
1215 | /* Process any table synchronization changes. */ |
1216 | process_syncing_tables(last_received); |
1217 | } |
1218 | |
1219 | /* Cleanup the memory. */ |
1220 | MemoryContextResetAndDeleteChildren(ApplyMessageContext); |
1221 | MemoryContextSwitchTo(TopMemoryContext); |
1222 | |
1223 | /* Check if we need to exit the streaming loop. */ |
1224 | if (endofstream) |
1225 | { |
1226 | TimeLineID tli; |
1227 | |
1228 | walrcv_endstreaming(wrconn, &tli); |
1229 | break; |
1230 | } |
1231 | |
1232 | /* |
1233 | * Wait for more data or latch. If we have unflushed transactions, |
1234 | * wake up after WalWriterDelay to see if they've been flushed yet (in |
1235 | * which case we should send a feedback message). Otherwise, there's |
1236 | * no particular urgency about waking up unless we get data or a |
1237 | * signal. |
1238 | */ |
1239 | if (!dlist_is_empty(&lsn_mapping)) |
1240 | wait_time = WalWriterDelay; |
1241 | else |
1242 | wait_time = NAPTIME_PER_CYCLE; |
1243 | |
1244 | rc = WaitLatchOrSocket(MyLatch, |
1245 | WL_SOCKET_READABLE | WL_LATCH_SET | |
1246 | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
1247 | fd, wait_time, |
1248 | WAIT_EVENT_LOGICAL_APPLY_MAIN); |
1249 | |
1250 | if (rc & WL_LATCH_SET) |
1251 | { |
1252 | ResetLatch(MyLatch); |
1253 | CHECK_FOR_INTERRUPTS(); |
1254 | } |
1255 | |
1256 | if (got_SIGHUP) |
1257 | { |
1258 | got_SIGHUP = false; |
1259 | ProcessConfigFile(PGC_SIGHUP); |
1260 | } |
1261 | |
1262 | if (rc & WL_TIMEOUT) |
1263 | { |
1264 | /* |
1265 | * We didn't receive anything new. If we haven't heard anything |
1266 | * from the server for more than wal_receiver_timeout / 2, ping |
1267 | * the server. Also, if it's been longer than |
1268 | * wal_receiver_status_interval since the last update we sent, |
1269 | * send a status update to the master anyway, to report any |
1270 | * progress in applying WAL. |
1271 | */ |
1272 | bool requestReply = false; |
1273 | |
1274 | /* |
1275 | * Check if time since last receive from standby has reached the |
1276 | * configured limit. |
1277 | */ |
1278 | if (wal_receiver_timeout > 0) |
1279 | { |
1280 | TimestampTz now = GetCurrentTimestamp(); |
1281 | TimestampTz timeout; |
1282 | |
1283 | timeout = |
1284 | TimestampTzPlusMilliseconds(last_recv_timestamp, |
1285 | wal_receiver_timeout); |
1286 | |
1287 | if (now >= timeout) |
1288 | ereport(ERROR, |
1289 | (errmsg("terminating logical replication worker due to timeout" ))); |
1290 | |
1291 | /* |
1292 | * We didn't receive anything new, for half of receiver |
1293 | * replication timeout. Ping the server. |
1294 | */ |
1295 | if (!ping_sent) |
1296 | { |
1297 | timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, |
1298 | (wal_receiver_timeout / 2)); |
1299 | if (now >= timeout) |
1300 | { |
1301 | requestReply = true; |
1302 | ping_sent = true; |
1303 | } |
1304 | } |
1305 | } |
1306 | |
1307 | send_feedback(last_received, requestReply, requestReply); |
1308 | } |
1309 | } |
1310 | } |
1311 | |
1312 | /* |
1313 | * Send a Standby Status Update message to server. |
1314 | * |
1315 | * 'recvpos' is the latest LSN we've received data to, force is set if we need |
1316 | * to send a response to avoid timeouts. |
1317 | */ |
1318 | static void |
1319 | send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) |
1320 | { |
1321 | static StringInfo reply_message = NULL; |
1322 | static TimestampTz send_time = 0; |
1323 | |
1324 | static XLogRecPtr last_recvpos = InvalidXLogRecPtr; |
1325 | static XLogRecPtr last_writepos = InvalidXLogRecPtr; |
1326 | static XLogRecPtr last_flushpos = InvalidXLogRecPtr; |
1327 | |
1328 | XLogRecPtr writepos; |
1329 | XLogRecPtr flushpos; |
1330 | TimestampTz now; |
1331 | bool have_pending_txes; |
1332 | |
1333 | /* |
1334 | * If the user doesn't want status to be reported to the publisher, be |
1335 | * sure to exit before doing anything at all. |
1336 | */ |
1337 | if (!force && wal_receiver_status_interval <= 0) |
1338 | return; |
1339 | |
1340 | /* It's legal to not pass a recvpos */ |
1341 | if (recvpos < last_recvpos) |
1342 | recvpos = last_recvpos; |
1343 | |
1344 | get_flush_position(&writepos, &flushpos, &have_pending_txes); |
1345 | |
1346 | /* |
1347 | * No outstanding transactions to flush, we can report the latest received |
1348 | * position. This is important for synchronous replication. |
1349 | */ |
1350 | if (!have_pending_txes) |
1351 | flushpos = writepos = recvpos; |
1352 | |
1353 | if (writepos < last_writepos) |
1354 | writepos = last_writepos; |
1355 | |
1356 | if (flushpos < last_flushpos) |
1357 | flushpos = last_flushpos; |
1358 | |
1359 | now = GetCurrentTimestamp(); |
1360 | |
1361 | /* if we've already reported everything we're good */ |
1362 | if (!force && |
1363 | writepos == last_writepos && |
1364 | flushpos == last_flushpos && |
1365 | !TimestampDifferenceExceeds(send_time, now, |
1366 | wal_receiver_status_interval * 1000)) |
1367 | return; |
1368 | send_time = now; |
1369 | |
1370 | if (!reply_message) |
1371 | { |
1372 | MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); |
1373 | |
1374 | reply_message = makeStringInfo(); |
1375 | MemoryContextSwitchTo(oldctx); |
1376 | } |
1377 | else |
1378 | resetStringInfo(reply_message); |
1379 | |
1380 | pq_sendbyte(reply_message, 'r'); |
1381 | pq_sendint64(reply_message, recvpos); /* write */ |
1382 | pq_sendint64(reply_message, flushpos); /* flush */ |
1383 | pq_sendint64(reply_message, writepos); /* apply */ |
1384 | pq_sendint64(reply_message, now); /* sendTime */ |
1385 | pq_sendbyte(reply_message, requestReply); /* replyRequested */ |
1386 | |
1387 | elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X" , |
1388 | force, |
1389 | (uint32) (recvpos >> 32), (uint32) recvpos, |
1390 | (uint32) (writepos >> 32), (uint32) writepos, |
1391 | (uint32) (flushpos >> 32), (uint32) flushpos |
1392 | ); |
1393 | |
1394 | walrcv_send(wrconn, reply_message->data, reply_message->len); |
1395 | |
1396 | if (recvpos > last_recvpos) |
1397 | last_recvpos = recvpos; |
1398 | if (writepos > last_writepos) |
1399 | last_writepos = writepos; |
1400 | if (flushpos > last_flushpos) |
1401 | last_flushpos = flushpos; |
1402 | } |
1403 | |
1404 | /* |
1405 | * Reread subscription info if needed. Most changes will be exit. |
1406 | */ |
1407 | static void |
1408 | maybe_reread_subscription(void) |
1409 | { |
1410 | MemoryContext oldctx; |
1411 | Subscription *newsub; |
1412 | bool started_tx = false; |
1413 | |
1414 | /* When cache state is valid there is nothing to do here. */ |
1415 | if (MySubscriptionValid) |
1416 | return; |
1417 | |
1418 | /* This function might be called inside or outside of transaction. */ |
1419 | if (!IsTransactionState()) |
1420 | { |
1421 | StartTransactionCommand(); |
1422 | started_tx = true; |
1423 | } |
1424 | |
1425 | /* Ensure allocations in permanent context. */ |
1426 | oldctx = MemoryContextSwitchTo(ApplyContext); |
1427 | |
1428 | newsub = GetSubscription(MyLogicalRepWorker->subid, true); |
1429 | |
1430 | /* |
1431 | * Exit if the subscription was removed. This normally should not happen |
1432 | * as the worker gets killed during DROP SUBSCRIPTION. |
1433 | */ |
1434 | if (!newsub) |
1435 | { |
1436 | ereport(LOG, |
1437 | (errmsg("logical replication apply worker for subscription \"%s\" will " |
1438 | "stop because the subscription was removed" , |
1439 | MySubscription->name))); |
1440 | |
1441 | proc_exit(0); |
1442 | } |
1443 | |
1444 | /* |
1445 | * Exit if the subscription was disabled. This normally should not happen |
1446 | * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE. |
1447 | */ |
1448 | if (!newsub->enabled) |
1449 | { |
1450 | ereport(LOG, |
1451 | (errmsg("logical replication apply worker for subscription \"%s\" will " |
1452 | "stop because the subscription was disabled" , |
1453 | MySubscription->name))); |
1454 | |
1455 | proc_exit(0); |
1456 | } |
1457 | |
1458 | /* |
1459 | * Exit if connection string was changed. The launcher will start new |
1460 | * worker. |
1461 | */ |
1462 | if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) |
1463 | { |
1464 | ereport(LOG, |
1465 | (errmsg("logical replication apply worker for subscription \"%s\" will " |
1466 | "restart because the connection information was changed" , |
1467 | MySubscription->name))); |
1468 | |
1469 | proc_exit(0); |
1470 | } |
1471 | |
1472 | /* |
1473 | * Exit if subscription name was changed (it's used for |
1474 | * fallback_application_name). The launcher will start new worker. |
1475 | */ |
1476 | if (strcmp(newsub->name, MySubscription->name) != 0) |
1477 | { |
1478 | ereport(LOG, |
1479 | (errmsg("logical replication apply worker for subscription \"%s\" will " |
1480 | "restart because subscription was renamed" , |
1481 | MySubscription->name))); |
1482 | |
1483 | proc_exit(0); |
1484 | } |
1485 | |
1486 | /* !slotname should never happen when enabled is true. */ |
1487 | Assert(newsub->slotname); |
1488 | |
1489 | /* |
1490 | * We need to make new connection to new slot if slot name has changed so |
1491 | * exit here as well if that's the case. |
1492 | */ |
1493 | if (strcmp(newsub->slotname, MySubscription->slotname) != 0) |
1494 | { |
1495 | ereport(LOG, |
1496 | (errmsg("logical replication apply worker for subscription \"%s\" will " |
1497 | "restart because the replication slot name was changed" , |
1498 | MySubscription->name))); |
1499 | |
1500 | proc_exit(0); |
1501 | } |
1502 | |
1503 | /* |
1504 | * Exit if publication list was changed. The launcher will start new |
1505 | * worker. |
1506 | */ |
1507 | if (!equal(newsub->publications, MySubscription->publications)) |
1508 | { |
1509 | ereport(LOG, |
1510 | (errmsg("logical replication apply worker for subscription \"%s\" will " |
1511 | "restart because subscription's publications were changed" , |
1512 | MySubscription->name))); |
1513 | |
1514 | proc_exit(0); |
1515 | } |
1516 | |
1517 | /* Check for other changes that should never happen too. */ |
1518 | if (newsub->dbid != MySubscription->dbid) |
1519 | { |
1520 | elog(ERROR, "subscription %u changed unexpectedly" , |
1521 | MyLogicalRepWorker->subid); |
1522 | } |
1523 | |
1524 | /* Clean old subscription info and switch to new one. */ |
1525 | FreeSubscription(MySubscription); |
1526 | MySubscription = newsub; |
1527 | |
1528 | MemoryContextSwitchTo(oldctx); |
1529 | |
1530 | /* Change synchronous commit according to the user's wishes */ |
1531 | SetConfigOption("synchronous_commit" , MySubscription->synccommit, |
1532 | PGC_BACKEND, PGC_S_OVERRIDE); |
1533 | |
1534 | if (started_tx) |
1535 | CommitTransactionCommand(); |
1536 | |
1537 | MySubscriptionValid = true; |
1538 | } |
1539 | |
1540 | /* |
1541 | * Callback from subscription syscache invalidation. |
1542 | */ |
1543 | static void |
1544 | subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) |
1545 | { |
1546 | MySubscriptionValid = false; |
1547 | } |
1548 | |
1549 | /* SIGHUP: set flag to reload configuration at next convenient time */ |
1550 | static void |
1551 | logicalrep_worker_sighup(SIGNAL_ARGS) |
1552 | { |
1553 | int save_errno = errno; |
1554 | |
1555 | got_SIGHUP = true; |
1556 | |
1557 | /* Waken anything waiting on the process latch */ |
1558 | SetLatch(MyLatch); |
1559 | |
1560 | errno = save_errno; |
1561 | } |
1562 | |
1563 | /* Logical Replication Apply worker entry point */ |
1564 | void |
1565 | ApplyWorkerMain(Datum main_arg) |
1566 | { |
1567 | int worker_slot = DatumGetInt32(main_arg); |
1568 | MemoryContext oldctx; |
1569 | char originname[NAMEDATALEN]; |
1570 | XLogRecPtr origin_startpos; |
1571 | char *myslotname; |
1572 | WalRcvStreamOptions options; |
1573 | |
1574 | /* Attach to slot */ |
1575 | logicalrep_worker_attach(worker_slot); |
1576 | |
1577 | /* Setup signal handling */ |
1578 | pqsignal(SIGHUP, logicalrep_worker_sighup); |
1579 | pqsignal(SIGTERM, die); |
1580 | BackgroundWorkerUnblockSignals(); |
1581 | |
1582 | /* |
1583 | * We don't currently need any ResourceOwner in a walreceiver process, but |
1584 | * if we did, we could call CreateAuxProcessResourceOwner here. |
1585 | */ |
1586 | |
1587 | /* Initialise stats to a sanish value */ |
1588 | MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = |
1589 | MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); |
1590 | |
1591 | /* Load the libpq-specific functions */ |
1592 | load_file("libpqwalreceiver" , false); |
1593 | |
1594 | /* Run as replica session replication role. */ |
1595 | SetConfigOption("session_replication_role" , "replica" , |
1596 | PGC_SUSET, PGC_S_OVERRIDE); |
1597 | |
1598 | /* Connect to our database. */ |
1599 | BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, |
1600 | MyLogicalRepWorker->userid, |
1601 | 0); |
1602 | |
1603 | /* Load the subscription into persistent memory context. */ |
1604 | ApplyContext = AllocSetContextCreate(TopMemoryContext, |
1605 | "ApplyContext" , |
1606 | ALLOCSET_DEFAULT_SIZES); |
1607 | StartTransactionCommand(); |
1608 | oldctx = MemoryContextSwitchTo(ApplyContext); |
1609 | |
1610 | MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); |
1611 | if (!MySubscription) |
1612 | { |
1613 | ereport(LOG, |
1614 | (errmsg("logical replication apply worker for subscription %u will not " |
1615 | "start because the subscription was removed during startup" , |
1616 | MyLogicalRepWorker->subid))); |
1617 | proc_exit(0); |
1618 | } |
1619 | |
1620 | MySubscriptionValid = true; |
1621 | MemoryContextSwitchTo(oldctx); |
1622 | |
1623 | if (!MySubscription->enabled) |
1624 | { |
1625 | ereport(LOG, |
1626 | (errmsg("logical replication apply worker for subscription \"%s\" will not " |
1627 | "start because the subscription was disabled during startup" , |
1628 | MySubscription->name))); |
1629 | |
1630 | proc_exit(0); |
1631 | } |
1632 | |
1633 | /* Setup synchronous commit according to the user's wishes */ |
1634 | SetConfigOption("synchronous_commit" , MySubscription->synccommit, |
1635 | PGC_BACKEND, PGC_S_OVERRIDE); |
1636 | |
1637 | /* Keep us informed about subscription changes. */ |
1638 | CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, |
1639 | subscription_change_cb, |
1640 | (Datum) 0); |
1641 | |
1642 | if (am_tablesync_worker()) |
1643 | ereport(LOG, |
1644 | (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started" , |
1645 | MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); |
1646 | else |
1647 | ereport(LOG, |
1648 | (errmsg("logical replication apply worker for subscription \"%s\" has started" , |
1649 | MySubscription->name))); |
1650 | |
1651 | CommitTransactionCommand(); |
1652 | |
1653 | /* Connect to the origin and start the replication. */ |
1654 | elog(DEBUG1, "connecting to publisher using connection string \"%s\"" , |
1655 | MySubscription->conninfo); |
1656 | |
1657 | if (am_tablesync_worker()) |
1658 | { |
1659 | char *syncslotname; |
1660 | |
1661 | /* This is table synchroniation worker, call initial sync. */ |
1662 | syncslotname = LogicalRepSyncTableStart(&origin_startpos); |
1663 | |
1664 | /* The slot name needs to be allocated in permanent memory context. */ |
1665 | oldctx = MemoryContextSwitchTo(ApplyContext); |
1666 | myslotname = pstrdup(syncslotname); |
1667 | MemoryContextSwitchTo(oldctx); |
1668 | |
1669 | pfree(syncslotname); |
1670 | } |
1671 | else |
1672 | { |
1673 | /* This is main apply worker */ |
1674 | RepOriginId originid; |
1675 | TimeLineID startpointTLI; |
1676 | char *err; |
1677 | |
1678 | myslotname = MySubscription->slotname; |
1679 | |
1680 | /* |
1681 | * This shouldn't happen if the subscription is enabled, but guard |
1682 | * against DDL bugs or manual catalog changes. (libpqwalreceiver will |
1683 | * crash if slot is NULL.) |
1684 | */ |
1685 | if (!myslotname) |
1686 | ereport(ERROR, |
1687 | (errmsg("subscription has no replication slot set" ))); |
1688 | |
1689 | /* Setup replication origin tracking. */ |
1690 | StartTransactionCommand(); |
1691 | snprintf(originname, sizeof(originname), "pg_%u" , MySubscription->oid); |
1692 | originid = replorigin_by_name(originname, true); |
1693 | if (!OidIsValid(originid)) |
1694 | originid = replorigin_create(originname); |
1695 | replorigin_session_setup(originid); |
1696 | replorigin_session_origin = originid; |
1697 | origin_startpos = replorigin_session_get_progress(false); |
1698 | CommitTransactionCommand(); |
1699 | |
1700 | wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, |
1701 | &err); |
1702 | if (wrconn == NULL) |
1703 | ereport(ERROR, |
1704 | (errmsg("could not connect to the publisher: %s" , err))); |
1705 | |
1706 | /* |
1707 | * We don't really use the output identify_system for anything but it |
1708 | * does some initializations on the upstream so let's still call it. |
1709 | */ |
1710 | (void) walrcv_identify_system(wrconn, &startpointTLI); |
1711 | |
1712 | } |
1713 | |
1714 | /* |
1715 | * Setup callback for syscache so that we know when something changes in |
1716 | * the subscription relation state. |
1717 | */ |
1718 | CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, |
1719 | invalidate_syncing_table_states, |
1720 | (Datum) 0); |
1721 | |
1722 | /* Build logical replication streaming options. */ |
1723 | options.logical = true; |
1724 | options.startpoint = origin_startpos; |
1725 | options.slotname = myslotname; |
1726 | options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; |
1727 | options.proto.logical.publication_names = MySubscription->publications; |
1728 | |
1729 | /* Start normal logical streaming replication. */ |
1730 | walrcv_startstreaming(wrconn, &options); |
1731 | |
1732 | /* Run the main loop. */ |
1733 | LogicalRepApplyLoop(origin_startpos); |
1734 | |
1735 | proc_exit(0); |
1736 | } |
1737 | |
1738 | /* |
1739 | * Is current process a logical replication worker? |
1740 | */ |
1741 | bool |
1742 | IsLogicalWorker(void) |
1743 | { |
1744 | return MyLogicalRepWorker != NULL; |
1745 | } |
1746 | |