| 1 | /*------------------------------------------------------------------------- | 
| 2 |  * worker.c | 
| 3 |  *	   PostgreSQL logical replication worker (apply) | 
| 4 |  * | 
| 5 |  * Copyright (c) 2016-2019, PostgreSQL Global Development Group | 
| 6 |  * | 
| 7 |  * IDENTIFICATION | 
| 8 |  *	  src/backend/replication/logical/worker.c | 
| 9 |  * | 
| 10 |  * NOTES | 
| 11 |  *	  This file contains the worker which applies logical changes as they come | 
| 12 |  *	  from remote logical replication stream. | 
| 13 |  * | 
| 14 |  *	  The main worker (apply) is started by logical replication worker | 
| 15 |  *	  launcher for every enabled subscription in a database. It uses | 
| 16 |  *	  walsender protocol to communicate with publisher. | 
| 17 |  * | 
| 18 |  *	  This module includes server facing code and shares libpqwalreceiver | 
| 19 |  *	  module with walreceiver for providing the libpq specific functionality. | 
| 20 |  * | 
| 21 |  *------------------------------------------------------------------------- | 
| 22 |  */ | 
| 23 |  | 
| 24 | #include "postgres.h" | 
| 25 |  | 
| 26 | #include "access/table.h" | 
| 27 | #include "access/tableam.h" | 
| 28 | #include "access/xact.h" | 
| 29 | #include "access/xlog_internal.h" | 
| 30 | #include "catalog/catalog.h" | 
| 31 | #include "catalog/namespace.h" | 
| 32 | #include "catalog/pg_subscription.h" | 
| 33 | #include "catalog/pg_subscription_rel.h" | 
| 34 | #include "commands/tablecmds.h" | 
| 35 | #include "commands/trigger.h" | 
| 36 | #include "executor/executor.h" | 
| 37 | #include "executor/nodeModifyTable.h" | 
| 38 | #include "funcapi.h" | 
| 39 | #include "libpq/pqformat.h" | 
| 40 | #include "libpq/pqsignal.h" | 
| 41 | #include "mb/pg_wchar.h" | 
| 42 | #include "miscadmin.h" | 
| 43 | #include "nodes/makefuncs.h" | 
| 44 | #include "optimizer/optimizer.h" | 
| 45 | #include "parser/parse_relation.h" | 
| 46 | #include "pgstat.h" | 
| 47 | #include "postmaster/bgworker.h" | 
| 48 | #include "postmaster/postmaster.h" | 
| 49 | #include "postmaster/walwriter.h" | 
| 50 | #include "replication/decode.h" | 
| 51 | #include "replication/logical.h" | 
| 52 | #include "replication/logicalproto.h" | 
| 53 | #include "replication/logicalrelation.h" | 
| 54 | #include "replication/logicalworker.h" | 
| 55 | #include "replication/origin.h" | 
| 56 | #include "replication/reorderbuffer.h" | 
| 57 | #include "replication/snapbuild.h" | 
| 58 | #include "replication/walreceiver.h" | 
| 59 | #include "replication/worker_internal.h" | 
| 60 | #include "rewrite/rewriteHandler.h" | 
| 61 | #include "storage/bufmgr.h" | 
| 62 | #include "storage/ipc.h" | 
| 63 | #include "storage/lmgr.h" | 
| 64 | #include "storage/proc.h" | 
| 65 | #include "storage/procarray.h" | 
| 66 | #include "tcop/tcopprot.h" | 
| 67 | #include "utils/builtins.h" | 
| 68 | #include "utils/catcache.h" | 
| 69 | #include "utils/datum.h" | 
| 70 | #include "utils/fmgroids.h" | 
| 71 | #include "utils/guc.h" | 
| 72 | #include "utils/inval.h" | 
| 73 | #include "utils/lsyscache.h" | 
| 74 | #include "utils/memutils.h" | 
| 75 | #include "utils/rel.h" | 
| 76 | #include "utils/syscache.h" | 
| 77 | #include "utils/timeout.h" | 
| 78 |  | 
| 79 | #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */ | 
| 80 |  | 
| 81 | typedef struct FlushPosition | 
| 82 | { | 
| 83 | 	dlist_node	node; | 
| 84 | 	XLogRecPtr	local_end; | 
| 85 | 	XLogRecPtr	remote_end; | 
| 86 | } FlushPosition; | 
| 87 |  | 
| 88 | static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); | 
| 89 |  | 
| 90 | typedef struct SlotErrCallbackArg | 
| 91 | { | 
| 92 | 	LogicalRepRelMapEntry *rel; | 
| 93 | 	int			local_attnum; | 
| 94 | 	int			remote_attnum; | 
| 95 | } SlotErrCallbackArg; | 
| 96 |  | 
| 97 | static MemoryContext ApplyMessageContext = NULL; | 
| 98 | MemoryContext ApplyContext = NULL; | 
| 99 |  | 
| 100 | WalReceiverConn *wrconn = NULL; | 
| 101 |  | 
| 102 | Subscription *MySubscription = NULL; | 
| 103 | bool		MySubscriptionValid = false; | 
| 104 |  | 
| 105 | bool		in_remote_transaction = false; | 
| 106 | static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; | 
| 107 |  | 
| 108 | static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); | 
| 109 |  | 
| 110 | static void store_flush_position(XLogRecPtr remote_lsn); | 
| 111 |  | 
| 112 | static void maybe_reread_subscription(void); | 
| 113 |  | 
| 114 | /* Flags set by signal handlers */ | 
| 115 | static volatile sig_atomic_t got_SIGHUP = false; | 
| 116 |  | 
| 117 | /* | 
| 118 |  * Should this worker apply changes for given relation. | 
| 119 |  * | 
| 120 |  * This is mainly needed for initial relation data sync as that runs in | 
| 121 |  * separate worker process running in parallel and we need some way to skip | 
| 122 |  * changes coming to the main apply worker during the sync of a table. | 
| 123 |  * | 
| 124 |  * Note we need to do smaller or equals comparison for SYNCDONE state because | 
| 125 |  * it might hold position of end of initial slot consistent point WAL | 
| 126 |  * record + 1 (ie start of next record) and next record can be COMMIT of | 
| 127 |  * transaction we are now processing (which is what we set remote_final_lsn | 
| 128 |  * to in apply_handle_begin). | 
| 129 |  */ | 
| 130 | static bool | 
| 131 | should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) | 
| 132 | { | 
| 133 | 	if (am_tablesync_worker()) | 
| 134 | 		return MyLogicalRepWorker->relid == rel->localreloid; | 
| 135 | 	else | 
| 136 | 		return (rel->state == SUBREL_STATE_READY || | 
| 137 | 				(rel->state == SUBREL_STATE_SYNCDONE && | 
| 138 | 				 rel->statelsn <= remote_final_lsn)); | 
| 139 | } | 
| 140 |  | 
| 141 | /* | 
| 142 |  * Make sure that we started local transaction. | 
| 143 |  * | 
| 144 |  * Also switches to ApplyMessageContext as necessary. | 
| 145 |  */ | 
| 146 | static bool | 
| 147 | ensure_transaction(void) | 
| 148 | { | 
| 149 | 	if (IsTransactionState()) | 
| 150 | 	{ | 
| 151 | 		SetCurrentStatementStartTimestamp(); | 
| 152 |  | 
| 153 | 		if (CurrentMemoryContext != ApplyMessageContext) | 
| 154 | 			MemoryContextSwitchTo(ApplyMessageContext); | 
| 155 |  | 
| 156 | 		return false; | 
| 157 | 	} | 
| 158 |  | 
| 159 | 	SetCurrentStatementStartTimestamp(); | 
| 160 | 	StartTransactionCommand(); | 
| 161 |  | 
| 162 | 	maybe_reread_subscription(); | 
| 163 |  | 
| 164 | 	MemoryContextSwitchTo(ApplyMessageContext); | 
| 165 | 	return true; | 
| 166 | } | 
| 167 |  | 
| 168 |  | 
| 169 | /* | 
| 170 |  * Executor state preparation for evaluation of constraint expressions, | 
| 171 |  * indexes and triggers. | 
| 172 |  * | 
| 173 |  * This is based on similar code in copy.c | 
| 174 |  */ | 
| 175 | static EState * | 
| 176 | create_estate_for_relation(LogicalRepRelMapEntry *rel) | 
| 177 | { | 
| 178 | 	EState	   *estate; | 
| 179 | 	ResultRelInfo *resultRelInfo; | 
| 180 | 	RangeTblEntry *rte; | 
| 181 |  | 
| 182 | 	estate = CreateExecutorState(); | 
| 183 |  | 
| 184 | 	rte = makeNode(RangeTblEntry); | 
| 185 | 	rte->rtekind = RTE_RELATION; | 
| 186 | 	rte->relid = RelationGetRelid(rel->localrel); | 
| 187 | 	rte->relkind = rel->localrel->rd_rel->relkind; | 
| 188 | 	rte->rellockmode = AccessShareLock; | 
| 189 | 	ExecInitRangeTable(estate, list_make1(rte)); | 
| 190 |  | 
| 191 | 	resultRelInfo = makeNode(ResultRelInfo); | 
| 192 | 	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); | 
| 193 |  | 
| 194 | 	estate->es_result_relations = resultRelInfo; | 
| 195 | 	estate->es_num_result_relations = 1; | 
| 196 | 	estate->es_result_relation_info = resultRelInfo; | 
| 197 |  | 
| 198 | 	estate->es_output_cid = GetCurrentCommandId(true); | 
| 199 |  | 
| 200 | 	/* Prepare to catch AFTER triggers. */ | 
| 201 | 	AfterTriggerBeginQuery(); | 
| 202 |  | 
| 203 | 	return estate; | 
| 204 | } | 
| 205 |  | 
| 206 | /* | 
| 207 |  * Executes default values for columns for which we can't map to remote | 
| 208 |  * relation columns. | 
| 209 |  * | 
| 210 |  * This allows us to support tables which have more columns on the downstream | 
| 211 |  * than on the upstream. | 
| 212 |  */ | 
| 213 | static void | 
| 214 | slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, | 
| 215 | 				   TupleTableSlot *slot) | 
| 216 | { | 
| 217 | 	TupleDesc	desc = RelationGetDescr(rel->localrel); | 
| 218 | 	int			num_phys_attrs = desc->natts; | 
| 219 | 	int			i; | 
| 220 | 	int			attnum, | 
| 221 | 				num_defaults = 0; | 
| 222 | 	int		   *defmap; | 
| 223 | 	ExprState **defexprs; | 
| 224 | 	ExprContext *econtext; | 
| 225 |  | 
| 226 | 	econtext = GetPerTupleExprContext(estate); | 
| 227 |  | 
| 228 | 	/* We got all the data via replication, no need to evaluate anything. */ | 
| 229 | 	if (num_phys_attrs == rel->remoterel.natts) | 
| 230 | 		return; | 
| 231 |  | 
| 232 | 	defmap = (int *) palloc(num_phys_attrs * sizeof(int)); | 
| 233 | 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); | 
| 234 |  | 
| 235 | 	for (attnum = 0; attnum < num_phys_attrs; attnum++) | 
| 236 | 	{ | 
| 237 | 		Expr	   *defexpr; | 
| 238 |  | 
| 239 | 		if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated) | 
| 240 | 			continue; | 
| 241 |  | 
| 242 | 		if (rel->attrmap[attnum] >= 0) | 
| 243 | 			continue; | 
| 244 |  | 
| 245 | 		defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1); | 
| 246 |  | 
| 247 | 		if (defexpr != NULL) | 
| 248 | 		{ | 
| 249 | 			/* Run the expression through planner */ | 
| 250 | 			defexpr = expression_planner(defexpr); | 
| 251 |  | 
| 252 | 			/* Initialize executable expression in copycontext */ | 
| 253 | 			defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); | 
| 254 | 			defmap[num_defaults] = attnum; | 
| 255 | 			num_defaults++; | 
| 256 | 		} | 
| 257 |  | 
| 258 | 	} | 
| 259 |  | 
| 260 | 	for (i = 0; i < num_defaults; i++) | 
| 261 | 		slot->tts_values[defmap[i]] = | 
| 262 | 			ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]); | 
| 263 | } | 
| 264 |  | 
| 265 | /* | 
| 266 |  * Error callback to give more context info about type conversion failure. | 
| 267 |  */ | 
| 268 | static void | 
| 269 | slot_store_error_callback(void *arg) | 
| 270 | { | 
| 271 | 	SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; | 
| 272 | 	LogicalRepRelMapEntry *rel; | 
| 273 | 	char	   *remotetypname; | 
| 274 | 	Oid			remotetypoid, | 
| 275 | 				localtypoid; | 
| 276 |  | 
| 277 | 	/* Nothing to do if remote attribute number is not set */ | 
| 278 | 	if (errarg->remote_attnum < 0) | 
| 279 | 		return; | 
| 280 |  | 
| 281 | 	rel = errarg->rel; | 
| 282 | 	remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum]; | 
| 283 |  | 
| 284 | 	/* Fetch remote type name from the LogicalRepTypMap cache */ | 
| 285 | 	remotetypname = logicalrep_typmap_gettypname(remotetypoid); | 
| 286 |  | 
| 287 | 	/* Fetch local type OID from the local sys cache */ | 
| 288 | 	localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1); | 
| 289 |  | 
| 290 | 	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "  | 
| 291 | 			   "remote type %s, local type %s" , | 
| 292 | 			   rel->remoterel.nspname, rel->remoterel.relname, | 
| 293 | 			   rel->remoterel.attnames[errarg->remote_attnum], | 
| 294 | 			   remotetypname, | 
| 295 | 			   format_type_be(localtypoid)); | 
| 296 | } | 
| 297 |  | 
| 298 | /* | 
| 299 |  * Store data in C string form into slot. | 
| 300 |  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our | 
| 301 |  * use better. | 
| 302 |  */ | 
| 303 | static void | 
| 304 | slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, | 
| 305 | 					char **values) | 
| 306 | { | 
| 307 | 	int			natts = slot->tts_tupleDescriptor->natts; | 
| 308 | 	int			i; | 
| 309 | 	SlotErrCallbackArg errarg; | 
| 310 | 	ErrorContextCallback errcallback; | 
| 311 |  | 
| 312 | 	ExecClearTuple(slot); | 
| 313 |  | 
| 314 | 	/* Push callback + info on the error context stack */ | 
| 315 | 	errarg.rel = rel; | 
| 316 | 	errarg.local_attnum = -1; | 
| 317 | 	errarg.remote_attnum = -1; | 
| 318 | 	errcallback.callback = slot_store_error_callback; | 
| 319 | 	errcallback.arg = (void *) &errarg; | 
| 320 | 	errcallback.previous = error_context_stack; | 
| 321 | 	error_context_stack = &errcallback; | 
| 322 |  | 
| 323 | 	/* Call the "in" function for each non-dropped attribute */ | 
| 324 | 	for (i = 0; i < natts; i++) | 
| 325 | 	{ | 
| 326 | 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); | 
| 327 | 		int			remoteattnum = rel->attrmap[i]; | 
| 328 |  | 
| 329 | 		if (!att->attisdropped && remoteattnum >= 0 && | 
| 330 | 			values[remoteattnum] != NULL) | 
| 331 | 		{ | 
| 332 | 			Oid			typinput; | 
| 333 | 			Oid			typioparam; | 
| 334 |  | 
| 335 | 			errarg.local_attnum = i; | 
| 336 | 			errarg.remote_attnum = remoteattnum; | 
| 337 |  | 
| 338 | 			getTypeInputInfo(att->atttypid, &typinput, &typioparam); | 
| 339 | 			slot->tts_values[i] = | 
| 340 | 				OidInputFunctionCall(typinput, values[remoteattnum], | 
| 341 | 									 typioparam, att->atttypmod); | 
| 342 | 			slot->tts_isnull[i] = false; | 
| 343 |  | 
| 344 | 			errarg.local_attnum = -1; | 
| 345 | 			errarg.remote_attnum = -1; | 
| 346 | 		} | 
| 347 | 		else | 
| 348 | 		{ | 
| 349 | 			/* | 
| 350 | 			 * We assign NULL to dropped attributes, NULL values, and missing | 
| 351 | 			 * values (missing values should be later filled using | 
| 352 | 			 * slot_fill_defaults). | 
| 353 | 			 */ | 
| 354 | 			slot->tts_values[i] = (Datum) 0; | 
| 355 | 			slot->tts_isnull[i] = true; | 
| 356 | 		} | 
| 357 | 	} | 
| 358 |  | 
| 359 | 	/* Pop the error context stack */ | 
| 360 | 	error_context_stack = errcallback.previous; | 
| 361 |  | 
| 362 | 	ExecStoreVirtualTuple(slot); | 
| 363 | } | 
| 364 |  | 
| 365 | /* | 
| 366 |  * Modify slot with user data provided as C strings. | 
| 367 |  * This is somewhat similar to heap_modify_tuple but also calls the type | 
| 368 |  * input function on the user data as the input is the text representation | 
| 369 |  * of the types. | 
| 370 |  */ | 
| 371 | static void | 
| 372 | slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, | 
| 373 | 					 char **values, bool *replaces) | 
| 374 | { | 
| 375 | 	int			natts = slot->tts_tupleDescriptor->natts; | 
| 376 | 	int			i; | 
| 377 | 	SlotErrCallbackArg errarg; | 
| 378 | 	ErrorContextCallback errcallback; | 
| 379 |  | 
| 380 | 	slot_getallattrs(slot); | 
| 381 | 	ExecClearTuple(slot); | 
| 382 |  | 
| 383 | 	/* Push callback + info on the error context stack */ | 
| 384 | 	errarg.rel = rel; | 
| 385 | 	errarg.local_attnum = -1; | 
| 386 | 	errarg.remote_attnum = -1; | 
| 387 | 	errcallback.callback = slot_store_error_callback; | 
| 388 | 	errcallback.arg = (void *) &errarg; | 
| 389 | 	errcallback.previous = error_context_stack; | 
| 390 | 	error_context_stack = &errcallback; | 
| 391 |  | 
| 392 | 	/* Call the "in" function for each replaced attribute */ | 
| 393 | 	for (i = 0; i < natts; i++) | 
| 394 | 	{ | 
| 395 | 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); | 
| 396 | 		int			remoteattnum = rel->attrmap[i]; | 
| 397 |  | 
| 398 | 		if (remoteattnum < 0) | 
| 399 | 			continue; | 
| 400 |  | 
| 401 | 		if (!replaces[remoteattnum]) | 
| 402 | 			continue; | 
| 403 |  | 
| 404 | 		if (values[remoteattnum] != NULL) | 
| 405 | 		{ | 
| 406 | 			Oid			typinput; | 
| 407 | 			Oid			typioparam; | 
| 408 |  | 
| 409 | 			errarg.local_attnum = i; | 
| 410 | 			errarg.remote_attnum = remoteattnum; | 
| 411 |  | 
| 412 | 			getTypeInputInfo(att->atttypid, &typinput, &typioparam); | 
| 413 | 			slot->tts_values[i] = | 
| 414 | 				OidInputFunctionCall(typinput, values[remoteattnum], | 
| 415 | 									 typioparam, att->atttypmod); | 
| 416 | 			slot->tts_isnull[i] = false; | 
| 417 |  | 
| 418 | 			errarg.local_attnum = -1; | 
| 419 | 			errarg.remote_attnum = -1; | 
| 420 | 		} | 
| 421 | 		else | 
| 422 | 		{ | 
| 423 | 			slot->tts_values[i] = (Datum) 0; | 
| 424 | 			slot->tts_isnull[i] = true; | 
| 425 | 		} | 
| 426 | 	} | 
| 427 |  | 
| 428 | 	/* Pop the error context stack */ | 
| 429 | 	error_context_stack = errcallback.previous; | 
| 430 |  | 
| 431 | 	ExecStoreVirtualTuple(slot); | 
| 432 | } | 
| 433 |  | 
| 434 | /* | 
| 435 |  * Handle BEGIN message. | 
| 436 |  */ | 
| 437 | static void | 
| 438 | apply_handle_begin(StringInfo s) | 
| 439 | { | 
| 440 | 	LogicalRepBeginData begin_data; | 
| 441 |  | 
| 442 | 	logicalrep_read_begin(s, &begin_data); | 
| 443 |  | 
| 444 | 	remote_final_lsn = begin_data.final_lsn; | 
| 445 |  | 
| 446 | 	in_remote_transaction = true; | 
| 447 |  | 
| 448 | 	pgstat_report_activity(STATE_RUNNING, NULL); | 
| 449 | } | 
| 450 |  | 
| 451 | /* | 
| 452 |  * Handle COMMIT message. | 
| 453 |  * | 
| 454 |  * TODO, support tracking of multiple origins | 
| 455 |  */ | 
| 456 | static void | 
| 457 | apply_handle_commit(StringInfo s) | 
| 458 | { | 
| 459 | 	LogicalRepCommitData commit_data; | 
| 460 |  | 
| 461 | 	logicalrep_read_commit(s, &commit_data); | 
| 462 |  | 
| 463 | 	Assert(commit_data.commit_lsn == remote_final_lsn); | 
| 464 |  | 
| 465 | 	/* The synchronization worker runs in single transaction. */ | 
| 466 | 	if (IsTransactionState() && !am_tablesync_worker()) | 
| 467 | 	{ | 
| 468 | 		/* | 
| 469 | 		 * Update origin state so we can restart streaming from correct | 
| 470 | 		 * position in case of crash. | 
| 471 | 		 */ | 
| 472 | 		replorigin_session_origin_lsn = commit_data.end_lsn; | 
| 473 | 		replorigin_session_origin_timestamp = commit_data.committime; | 
| 474 |  | 
| 475 | 		CommitTransactionCommand(); | 
| 476 | 		pgstat_report_stat(false); | 
| 477 |  | 
| 478 | 		store_flush_position(commit_data.end_lsn); | 
| 479 | 	} | 
| 480 | 	else | 
| 481 | 	{ | 
| 482 | 		/* Process any invalidation messages that might have accumulated. */ | 
| 483 | 		AcceptInvalidationMessages(); | 
| 484 | 		maybe_reread_subscription(); | 
| 485 | 	} | 
| 486 |  | 
| 487 | 	in_remote_transaction = false; | 
| 488 |  | 
| 489 | 	/* Process any tables that are being synchronized in parallel. */ | 
| 490 | 	process_syncing_tables(commit_data.end_lsn); | 
| 491 |  | 
| 492 | 	pgstat_report_activity(STATE_IDLE, NULL); | 
| 493 | } | 
| 494 |  | 
| 495 | /* | 
| 496 |  * Handle ORIGIN message. | 
| 497 |  * | 
| 498 |  * TODO, support tracking of multiple origins | 
| 499 |  */ | 
| 500 | static void | 
| 501 | apply_handle_origin(StringInfo s) | 
| 502 | { | 
| 503 | 	/* | 
| 504 | 	 * ORIGIN message can only come inside remote transaction and before any | 
| 505 | 	 * actual writes. | 
| 506 | 	 */ | 
| 507 | 	if (!in_remote_transaction || | 
| 508 | 		(IsTransactionState() && !am_tablesync_worker())) | 
| 509 | 		ereport(ERROR, | 
| 510 | 				(errcode(ERRCODE_PROTOCOL_VIOLATION), | 
| 511 | 				 errmsg("ORIGIN message sent out of order" ))); | 
| 512 | } | 
| 513 |  | 
| 514 | /* | 
| 515 |  * Handle RELATION message. | 
| 516 |  * | 
| 517 |  * Note we don't do validation against local schema here. The validation | 
| 518 |  * against local schema is postponed until first change for given relation | 
| 519 |  * comes as we only care about it when applying changes for it anyway and we | 
| 520 |  * do less locking this way. | 
| 521 |  */ | 
| 522 | static void | 
| 523 | apply_handle_relation(StringInfo s) | 
| 524 | { | 
| 525 | 	LogicalRepRelation *rel; | 
| 526 |  | 
| 527 | 	rel = logicalrep_read_rel(s); | 
| 528 | 	logicalrep_relmap_update(rel); | 
| 529 | } | 
| 530 |  | 
| 531 | /* | 
| 532 |  * Handle TYPE message. | 
| 533 |  * | 
| 534 |  * Note we don't do local mapping here, that's done when the type is | 
| 535 |  * actually used. | 
| 536 |  */ | 
| 537 | static void | 
| 538 | apply_handle_type(StringInfo s) | 
| 539 | { | 
| 540 | 	LogicalRepTyp typ; | 
| 541 |  | 
| 542 | 	logicalrep_read_typ(s, &typ); | 
| 543 | 	logicalrep_typmap_update(&typ); | 
| 544 | } | 
| 545 |  | 
| 546 | /* | 
| 547 |  * Get replica identity index or if it is not defined a primary key. | 
| 548 |  * | 
| 549 |  * If neither is defined, returns InvalidOid | 
| 550 |  */ | 
| 551 | static Oid | 
| 552 | GetRelationIdentityOrPK(Relation rel) | 
| 553 | { | 
| 554 | 	Oid			idxoid; | 
| 555 |  | 
| 556 | 	idxoid = RelationGetReplicaIndex(rel); | 
| 557 |  | 
| 558 | 	if (!OidIsValid(idxoid)) | 
| 559 | 		idxoid = RelationGetPrimaryKeyIndex(rel); | 
| 560 |  | 
| 561 | 	return idxoid; | 
| 562 | } | 
| 563 |  | 
| 564 | /* | 
| 565 |  * Handle INSERT message. | 
| 566 |  */ | 
| 567 | static void | 
| 568 | apply_handle_insert(StringInfo s) | 
| 569 | { | 
| 570 | 	LogicalRepRelMapEntry *rel; | 
| 571 | 	LogicalRepTupleData newtup; | 
| 572 | 	LogicalRepRelId relid; | 
| 573 | 	EState	   *estate; | 
| 574 | 	TupleTableSlot *remoteslot; | 
| 575 | 	MemoryContext oldctx; | 
| 576 |  | 
| 577 | 	ensure_transaction(); | 
| 578 |  | 
| 579 | 	relid = logicalrep_read_insert(s, &newtup); | 
| 580 | 	rel = logicalrep_rel_open(relid, RowExclusiveLock); | 
| 581 | 	if (!should_apply_changes_for_rel(rel)) | 
| 582 | 	{ | 
| 583 | 		/* | 
| 584 | 		 * The relation can't become interesting in the middle of the | 
| 585 | 		 * transaction so it's safe to unlock it. | 
| 586 | 		 */ | 
| 587 | 		logicalrep_rel_close(rel, RowExclusiveLock); | 
| 588 | 		return; | 
| 589 | 	} | 
| 590 |  | 
| 591 | 	/* Initialize the executor state. */ | 
| 592 | 	estate = create_estate_for_relation(rel); | 
| 593 | 	remoteslot = ExecInitExtraTupleSlot(estate, | 
| 594 | 										RelationGetDescr(rel->localrel), | 
| 595 | 										&TTSOpsVirtual); | 
| 596 |  | 
| 597 | 	/* Input functions may need an active snapshot, so get one */ | 
| 598 | 	PushActiveSnapshot(GetTransactionSnapshot()); | 
| 599 |  | 
| 600 | 	/* Process and store remote tuple in the slot */ | 
| 601 | 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); | 
| 602 | 	slot_store_cstrings(remoteslot, rel, newtup.values); | 
| 603 | 	slot_fill_defaults(rel, estate, remoteslot); | 
| 604 | 	MemoryContextSwitchTo(oldctx); | 
| 605 |  | 
| 606 | 	ExecOpenIndices(estate->es_result_relation_info, false); | 
| 607 |  | 
| 608 | 	/* Do the insert. */ | 
| 609 | 	ExecSimpleRelationInsert(estate, remoteslot); | 
| 610 |  | 
| 611 | 	/* Cleanup. */ | 
| 612 | 	ExecCloseIndices(estate->es_result_relation_info); | 
| 613 | 	PopActiveSnapshot(); | 
| 614 |  | 
| 615 | 	/* Handle queued AFTER triggers. */ | 
| 616 | 	AfterTriggerEndQuery(estate); | 
| 617 |  | 
| 618 | 	ExecResetTupleTable(estate->es_tupleTable, false); | 
| 619 | 	FreeExecutorState(estate); | 
| 620 |  | 
| 621 | 	logicalrep_rel_close(rel, NoLock); | 
| 622 |  | 
| 623 | 	CommandCounterIncrement(); | 
| 624 | } | 
| 625 |  | 
| 626 | /* | 
| 627 |  * Check if the logical replication relation is updatable and throw | 
| 628 |  * appropriate error if it isn't. | 
| 629 |  */ | 
| 630 | static void | 
| 631 | check_relation_updatable(LogicalRepRelMapEntry *rel) | 
| 632 | { | 
| 633 | 	/* Updatable, no error. */ | 
| 634 | 	if (rel->updatable) | 
| 635 | 		return; | 
| 636 |  | 
| 637 | 	/* | 
| 638 | 	 * We are in error mode so it's fine this is somewhat slow. It's better to | 
| 639 | 	 * give user correct error. | 
| 640 | 	 */ | 
| 641 | 	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) | 
| 642 | 	{ | 
| 643 | 		ereport(ERROR, | 
| 644 | 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), | 
| 645 | 				 errmsg("publisher did not send replica identity column "  | 
| 646 | 						"expected by the logical replication target relation \"%s.%s\"" , | 
| 647 | 						rel->remoterel.nspname, rel->remoterel.relname))); | 
| 648 | 	} | 
| 649 |  | 
| 650 | 	ereport(ERROR, | 
| 651 | 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), | 
| 652 | 			 errmsg("logical replication target relation \"%s.%s\" has "  | 
| 653 | 					"neither REPLICA IDENTITY index nor PRIMARY "  | 
| 654 | 					"KEY and published relation does not have "  | 
| 655 | 					"REPLICA IDENTITY FULL" , | 
| 656 | 					rel->remoterel.nspname, rel->remoterel.relname))); | 
| 657 | } | 
| 658 |  | 
| 659 | /* | 
| 660 |  * Handle UPDATE message. | 
| 661 |  * | 
| 662 |  * TODO: FDW support | 
| 663 |  */ | 
| 664 | static void | 
| 665 | apply_handle_update(StringInfo s) | 
| 666 | { | 
| 667 | 	LogicalRepRelMapEntry *rel; | 
| 668 | 	LogicalRepRelId relid; | 
| 669 | 	Oid			idxoid; | 
| 670 | 	EState	   *estate; | 
| 671 | 	EPQState	epqstate; | 
| 672 | 	LogicalRepTupleData oldtup; | 
| 673 | 	LogicalRepTupleData newtup; | 
| 674 | 	bool		has_oldtup; | 
| 675 | 	TupleTableSlot *localslot; | 
| 676 | 	TupleTableSlot *remoteslot; | 
| 677 | 	bool		found; | 
| 678 | 	MemoryContext oldctx; | 
| 679 |  | 
| 680 | 	ensure_transaction(); | 
| 681 |  | 
| 682 | 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup, | 
| 683 | 								   &newtup); | 
| 684 | 	rel = logicalrep_rel_open(relid, RowExclusiveLock); | 
| 685 | 	if (!should_apply_changes_for_rel(rel)) | 
| 686 | 	{ | 
| 687 | 		/* | 
| 688 | 		 * The relation can't become interesting in the middle of the | 
| 689 | 		 * transaction so it's safe to unlock it. | 
| 690 | 		 */ | 
| 691 | 		logicalrep_rel_close(rel, RowExclusiveLock); | 
| 692 | 		return; | 
| 693 | 	} | 
| 694 |  | 
| 695 | 	/* Check if we can do the update. */ | 
| 696 | 	check_relation_updatable(rel); | 
| 697 |  | 
| 698 | 	/* Initialize the executor state. */ | 
| 699 | 	estate = create_estate_for_relation(rel); | 
| 700 | 	remoteslot = ExecInitExtraTupleSlot(estate, | 
| 701 | 										RelationGetDescr(rel->localrel), | 
| 702 | 										&TTSOpsVirtual); | 
| 703 | 	localslot = table_slot_create(rel->localrel, | 
| 704 | 								  &estate->es_tupleTable); | 
| 705 | 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); | 
| 706 |  | 
| 707 | 	PushActiveSnapshot(GetTransactionSnapshot()); | 
| 708 | 	ExecOpenIndices(estate->es_result_relation_info, false); | 
| 709 |  | 
| 710 | 	/* Build the search tuple. */ | 
| 711 | 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); | 
| 712 | 	slot_store_cstrings(remoteslot, rel, | 
| 713 | 						has_oldtup ? oldtup.values : newtup.values); | 
| 714 | 	MemoryContextSwitchTo(oldctx); | 
| 715 |  | 
| 716 | 	/* | 
| 717 | 	 * Try to find tuple using either replica identity index, primary key or | 
| 718 | 	 * if needed, sequential scan. | 
| 719 | 	 */ | 
| 720 | 	idxoid = GetRelationIdentityOrPK(rel->localrel); | 
| 721 | 	Assert(OidIsValid(idxoid) || | 
| 722 | 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup)); | 
| 723 |  | 
| 724 | 	if (OidIsValid(idxoid)) | 
| 725 | 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid, | 
| 726 | 											 LockTupleExclusive, | 
| 727 | 											 remoteslot, localslot); | 
| 728 | 	else | 
| 729 | 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, | 
| 730 | 										 remoteslot, localslot); | 
| 731 |  | 
| 732 | 	ExecClearTuple(remoteslot); | 
| 733 |  | 
| 734 | 	/* | 
| 735 | 	 * Tuple found. | 
| 736 | 	 * | 
| 737 | 	 * Note this will fail if there are other conflicting unique indexes. | 
| 738 | 	 */ | 
| 739 | 	if (found) | 
| 740 | 	{ | 
| 741 | 		/* Process and store remote tuple in the slot */ | 
| 742 | 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); | 
| 743 | 		ExecCopySlot(remoteslot, localslot); | 
| 744 | 		slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed); | 
| 745 | 		MemoryContextSwitchTo(oldctx); | 
| 746 |  | 
| 747 | 		EvalPlanQualSetSlot(&epqstate, remoteslot); | 
| 748 |  | 
| 749 | 		/* Do the actual update. */ | 
| 750 | 		ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot); | 
| 751 | 	} | 
| 752 | 	else | 
| 753 | 	{ | 
| 754 | 		/* | 
| 755 | 		 * The tuple to be updated could not be found. | 
| 756 | 		 * | 
| 757 | 		 * TODO what to do here, change the log level to LOG perhaps? | 
| 758 | 		 */ | 
| 759 | 		elog(DEBUG1, | 
| 760 | 			 "logical replication did not find row for update "  | 
| 761 | 			 "in replication target relation \"%s\"" , | 
| 762 | 			 RelationGetRelationName(rel->localrel)); | 
| 763 | 	} | 
| 764 |  | 
| 765 | 	/* Cleanup. */ | 
| 766 | 	ExecCloseIndices(estate->es_result_relation_info); | 
| 767 | 	PopActiveSnapshot(); | 
| 768 |  | 
| 769 | 	/* Handle queued AFTER triggers. */ | 
| 770 | 	AfterTriggerEndQuery(estate); | 
| 771 |  | 
| 772 | 	EvalPlanQualEnd(&epqstate); | 
| 773 | 	ExecResetTupleTable(estate->es_tupleTable, false); | 
| 774 | 	FreeExecutorState(estate); | 
| 775 |  | 
| 776 | 	logicalrep_rel_close(rel, NoLock); | 
| 777 |  | 
| 778 | 	CommandCounterIncrement(); | 
| 779 | } | 
| 780 |  | 
| 781 | /* | 
| 782 |  * Handle DELETE message. | 
| 783 |  * | 
| 784 |  * TODO: FDW support | 
| 785 |  */ | 
| 786 | static void | 
| 787 | apply_handle_delete(StringInfo s) | 
| 788 | { | 
| 789 | 	LogicalRepRelMapEntry *rel; | 
| 790 | 	LogicalRepTupleData oldtup; | 
| 791 | 	LogicalRepRelId relid; | 
| 792 | 	Oid			idxoid; | 
| 793 | 	EState	   *estate; | 
| 794 | 	EPQState	epqstate; | 
| 795 | 	TupleTableSlot *remoteslot; | 
| 796 | 	TupleTableSlot *localslot; | 
| 797 | 	bool		found; | 
| 798 | 	MemoryContext oldctx; | 
| 799 |  | 
| 800 | 	ensure_transaction(); | 
| 801 |  | 
| 802 | 	relid = logicalrep_read_delete(s, &oldtup); | 
| 803 | 	rel = logicalrep_rel_open(relid, RowExclusiveLock); | 
| 804 | 	if (!should_apply_changes_for_rel(rel)) | 
| 805 | 	{ | 
| 806 | 		/* | 
| 807 | 		 * The relation can't become interesting in the middle of the | 
| 808 | 		 * transaction so it's safe to unlock it. | 
| 809 | 		 */ | 
| 810 | 		logicalrep_rel_close(rel, RowExclusiveLock); | 
| 811 | 		return; | 
| 812 | 	} | 
| 813 |  | 
| 814 | 	/* Check if we can do the delete. */ | 
| 815 | 	check_relation_updatable(rel); | 
| 816 |  | 
| 817 | 	/* Initialize the executor state. */ | 
| 818 | 	estate = create_estate_for_relation(rel); | 
| 819 | 	remoteslot = ExecInitExtraTupleSlot(estate, | 
| 820 | 										RelationGetDescr(rel->localrel), | 
| 821 | 										&TTSOpsVirtual); | 
| 822 | 	localslot = table_slot_create(rel->localrel, | 
| 823 | 								  &estate->es_tupleTable); | 
| 824 | 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); | 
| 825 |  | 
| 826 | 	PushActiveSnapshot(GetTransactionSnapshot()); | 
| 827 | 	ExecOpenIndices(estate->es_result_relation_info, false); | 
| 828 |  | 
| 829 | 	/* Find the tuple using the replica identity index. */ | 
| 830 | 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); | 
| 831 | 	slot_store_cstrings(remoteslot, rel, oldtup.values); | 
| 832 | 	MemoryContextSwitchTo(oldctx); | 
| 833 |  | 
| 834 | 	/* | 
| 835 | 	 * Try to find tuple using either replica identity index, primary key or | 
| 836 | 	 * if needed, sequential scan. | 
| 837 | 	 */ | 
| 838 | 	idxoid = GetRelationIdentityOrPK(rel->localrel); | 
| 839 | 	Assert(OidIsValid(idxoid) || | 
| 840 | 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL)); | 
| 841 |  | 
| 842 | 	if (OidIsValid(idxoid)) | 
| 843 | 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid, | 
| 844 | 											 LockTupleExclusive, | 
| 845 | 											 remoteslot, localslot); | 
| 846 | 	else | 
| 847 | 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, | 
| 848 | 										 remoteslot, localslot); | 
| 849 | 	/* If found delete it. */ | 
| 850 | 	if (found) | 
| 851 | 	{ | 
| 852 | 		EvalPlanQualSetSlot(&epqstate, localslot); | 
| 853 |  | 
| 854 | 		/* Do the actual delete. */ | 
| 855 | 		ExecSimpleRelationDelete(estate, &epqstate, localslot); | 
| 856 | 	} | 
| 857 | 	else | 
| 858 | 	{ | 
| 859 | 		/* The tuple to be deleted could not be found. */ | 
| 860 | 		elog(DEBUG1, | 
| 861 | 			 "logical replication could not find row for delete "  | 
| 862 | 			 "in replication target relation \"%s\"" , | 
| 863 | 			 RelationGetRelationName(rel->localrel)); | 
| 864 | 	} | 
| 865 |  | 
| 866 | 	/* Cleanup. */ | 
| 867 | 	ExecCloseIndices(estate->es_result_relation_info); | 
| 868 | 	PopActiveSnapshot(); | 
| 869 |  | 
| 870 | 	/* Handle queued AFTER triggers. */ | 
| 871 | 	AfterTriggerEndQuery(estate); | 
| 872 |  | 
| 873 | 	EvalPlanQualEnd(&epqstate); | 
| 874 | 	ExecResetTupleTable(estate->es_tupleTable, false); | 
| 875 | 	FreeExecutorState(estate); | 
| 876 |  | 
| 877 | 	logicalrep_rel_close(rel, NoLock); | 
| 878 |  | 
| 879 | 	CommandCounterIncrement(); | 
| 880 | } | 
| 881 |  | 
| 882 | /* | 
| 883 |  * Handle TRUNCATE message. | 
| 884 |  * | 
| 885 |  * TODO: FDW support | 
| 886 |  */ | 
| 887 | static void | 
| 888 | apply_handle_truncate(StringInfo s) | 
| 889 | { | 
| 890 | 	bool		cascade = false; | 
| 891 | 	bool		restart_seqs = false; | 
| 892 | 	List	   *remote_relids = NIL; | 
| 893 | 	List	   *remote_rels = NIL; | 
| 894 | 	List	   *rels = NIL; | 
| 895 | 	List	   *relids = NIL; | 
| 896 | 	List	   *relids_logged = NIL; | 
| 897 | 	ListCell   *lc; | 
| 898 |  | 
| 899 | 	ensure_transaction(); | 
| 900 |  | 
| 901 | 	remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); | 
| 902 |  | 
| 903 | 	foreach(lc, remote_relids) | 
| 904 | 	{ | 
| 905 | 		LogicalRepRelId relid = lfirst_oid(lc); | 
| 906 | 		LogicalRepRelMapEntry *rel; | 
| 907 |  | 
| 908 | 		rel = logicalrep_rel_open(relid, RowExclusiveLock); | 
| 909 | 		if (!should_apply_changes_for_rel(rel)) | 
| 910 | 		{ | 
| 911 | 			/* | 
| 912 | 			 * The relation can't become interesting in the middle of the | 
| 913 | 			 * transaction so it's safe to unlock it. | 
| 914 | 			 */ | 
| 915 | 			logicalrep_rel_close(rel, RowExclusiveLock); | 
| 916 | 			continue; | 
| 917 | 		} | 
| 918 |  | 
| 919 | 		remote_rels = lappend(remote_rels, rel); | 
| 920 | 		rels = lappend(rels, rel->localrel); | 
| 921 | 		relids = lappend_oid(relids, rel->localreloid); | 
| 922 | 		if (RelationIsLogicallyLogged(rel->localrel)) | 
| 923 | 			relids_logged = lappend_oid(relids_logged, rel->localreloid); | 
| 924 | 	} | 
| 925 |  | 
| 926 | 	/* | 
| 927 | 	 * Even if we used CASCADE on the upstream master we explicitly default to | 
| 928 | 	 * replaying changes without further cascading. This might be later | 
| 929 | 	 * changeable with a user specified option. | 
| 930 | 	 */ | 
| 931 | 	ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); | 
| 932 |  | 
| 933 | 	foreach(lc, remote_rels) | 
| 934 | 	{ | 
| 935 | 		LogicalRepRelMapEntry *rel = lfirst(lc); | 
| 936 |  | 
| 937 | 		logicalrep_rel_close(rel, NoLock); | 
| 938 | 	} | 
| 939 |  | 
| 940 | 	CommandCounterIncrement(); | 
| 941 | } | 
| 942 |  | 
| 943 |  | 
| 944 | /* | 
| 945 |  * Logical replication protocol message dispatcher. | 
| 946 |  */ | 
| 947 | static void | 
| 948 | apply_dispatch(StringInfo s) | 
| 949 | { | 
| 950 | 	char		action = pq_getmsgbyte(s); | 
| 951 |  | 
| 952 | 	switch (action) | 
| 953 | 	{ | 
| 954 | 			/* BEGIN */ | 
| 955 | 		case 'B': | 
| 956 | 			apply_handle_begin(s); | 
| 957 | 			break; | 
| 958 | 			/* COMMIT */ | 
| 959 | 		case 'C': | 
| 960 | 			apply_handle_commit(s); | 
| 961 | 			break; | 
| 962 | 			/* INSERT */ | 
| 963 | 		case 'I': | 
| 964 | 			apply_handle_insert(s); | 
| 965 | 			break; | 
| 966 | 			/* UPDATE */ | 
| 967 | 		case 'U': | 
| 968 | 			apply_handle_update(s); | 
| 969 | 			break; | 
| 970 | 			/* DELETE */ | 
| 971 | 		case 'D': | 
| 972 | 			apply_handle_delete(s); | 
| 973 | 			break; | 
| 974 | 			/* TRUNCATE */ | 
| 975 | 		case 'T': | 
| 976 | 			apply_handle_truncate(s); | 
| 977 | 			break; | 
| 978 | 			/* RELATION */ | 
| 979 | 		case 'R': | 
| 980 | 			apply_handle_relation(s); | 
| 981 | 			break; | 
| 982 | 			/* TYPE */ | 
| 983 | 		case 'Y': | 
| 984 | 			apply_handle_type(s); | 
| 985 | 			break; | 
| 986 | 			/* ORIGIN */ | 
| 987 | 		case 'O': | 
| 988 | 			apply_handle_origin(s); | 
| 989 | 			break; | 
| 990 | 		default: | 
| 991 | 			ereport(ERROR, | 
| 992 | 					(errcode(ERRCODE_PROTOCOL_VIOLATION), | 
| 993 | 					 errmsg("invalid logical replication message type \"%c\"" , action))); | 
| 994 | 	} | 
| 995 | } | 
| 996 |  | 
| 997 | /* | 
| 998 |  * Figure out which write/flush positions to report to the walsender process. | 
| 999 |  * | 
| 1000 |  * We can't simply report back the last LSN the walsender sent us because the | 
| 1001 |  * local transaction might not yet be flushed to disk locally. Instead we | 
| 1002 |  * build a list that associates local with remote LSNs for every commit. When | 
| 1003 |  * reporting back the flush position to the sender we iterate that list and | 
| 1004 |  * check which entries on it are already locally flushed. Those we can report | 
| 1005 |  * as having been flushed. | 
| 1006 |  * | 
| 1007 |  * The have_pending_txes is true if there are outstanding transactions that | 
| 1008 |  * need to be flushed. | 
| 1009 |  */ | 
| 1010 | static void | 
| 1011 | get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, | 
| 1012 | 				   bool *have_pending_txes) | 
| 1013 | { | 
| 1014 | 	dlist_mutable_iter iter; | 
| 1015 | 	XLogRecPtr	local_flush = GetFlushRecPtr(); | 
| 1016 |  | 
| 1017 | 	*write = InvalidXLogRecPtr; | 
| 1018 | 	*flush = InvalidXLogRecPtr; | 
| 1019 |  | 
| 1020 | 	dlist_foreach_modify(iter, &lsn_mapping) | 
| 1021 | 	{ | 
| 1022 | 		FlushPosition *pos = | 
| 1023 | 		dlist_container(FlushPosition, node, iter.cur); | 
| 1024 |  | 
| 1025 | 		*write = pos->remote_end; | 
| 1026 |  | 
| 1027 | 		if (pos->local_end <= local_flush) | 
| 1028 | 		{ | 
| 1029 | 			*flush = pos->remote_end; | 
| 1030 | 			dlist_delete(iter.cur); | 
| 1031 | 			pfree(pos); | 
| 1032 | 		} | 
| 1033 | 		else | 
| 1034 | 		{ | 
| 1035 | 			/* | 
| 1036 | 			 * Don't want to uselessly iterate over the rest of the list which | 
| 1037 | 			 * could potentially be long. Instead get the last element and | 
| 1038 | 			 * grab the write position from there. | 
| 1039 | 			 */ | 
| 1040 | 			pos = dlist_tail_element(FlushPosition, node, | 
| 1041 | 									 &lsn_mapping); | 
| 1042 | 			*write = pos->remote_end; | 
| 1043 | 			*have_pending_txes = true; | 
| 1044 | 			return; | 
| 1045 | 		} | 
| 1046 | 	} | 
| 1047 |  | 
| 1048 | 	*have_pending_txes = !dlist_is_empty(&lsn_mapping); | 
| 1049 | } | 
| 1050 |  | 
| 1051 | /* | 
| 1052 |  * Store current remote/local lsn pair in the tracking list. | 
| 1053 |  */ | 
| 1054 | static void | 
| 1055 | store_flush_position(XLogRecPtr remote_lsn) | 
| 1056 | { | 
| 1057 | 	FlushPosition *flushpos; | 
| 1058 |  | 
| 1059 | 	/* Need to do this in permanent context */ | 
| 1060 | 	MemoryContextSwitchTo(ApplyContext); | 
| 1061 |  | 
| 1062 | 	/* Track commit lsn  */ | 
| 1063 | 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); | 
| 1064 | 	flushpos->local_end = XactLastCommitEnd; | 
| 1065 | 	flushpos->remote_end = remote_lsn; | 
| 1066 |  | 
| 1067 | 	dlist_push_tail(&lsn_mapping, &flushpos->node); | 
| 1068 | 	MemoryContextSwitchTo(ApplyMessageContext); | 
| 1069 | } | 
| 1070 |  | 
| 1071 |  | 
| 1072 | /* Update statistics of the worker. */ | 
| 1073 | static void | 
| 1074 | UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) | 
| 1075 | { | 
| 1076 | 	MyLogicalRepWorker->last_lsn = last_lsn; | 
| 1077 | 	MyLogicalRepWorker->last_send_time = send_time; | 
| 1078 | 	MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp(); | 
| 1079 | 	if (reply) | 
| 1080 | 	{ | 
| 1081 | 		MyLogicalRepWorker->reply_lsn = last_lsn; | 
| 1082 | 		MyLogicalRepWorker->reply_time = send_time; | 
| 1083 | 	} | 
| 1084 | } | 
| 1085 |  | 
| 1086 | /* | 
| 1087 |  * Apply main loop. | 
| 1088 |  */ | 
| 1089 | static void | 
| 1090 | LogicalRepApplyLoop(XLogRecPtr last_received) | 
| 1091 | { | 
| 1092 | 	/* | 
| 1093 | 	 * Init the ApplyMessageContext which we clean up after each replication | 
| 1094 | 	 * protocol message. | 
| 1095 | 	 */ | 
| 1096 | 	ApplyMessageContext = AllocSetContextCreate(ApplyContext, | 
| 1097 | 												"ApplyMessageContext" , | 
| 1098 | 												ALLOCSET_DEFAULT_SIZES); | 
| 1099 |  | 
| 1100 | 	/* mark as idle, before starting to loop */ | 
| 1101 | 	pgstat_report_activity(STATE_IDLE, NULL); | 
| 1102 |  | 
| 1103 | 	for (;;) | 
| 1104 | 	{ | 
| 1105 | 		pgsocket	fd = PGINVALID_SOCKET; | 
| 1106 | 		int			rc; | 
| 1107 | 		int			len; | 
| 1108 | 		char	   *buf = NULL; | 
| 1109 | 		bool		endofstream = false; | 
| 1110 | 		TimestampTz last_recv_timestamp = GetCurrentTimestamp(); | 
| 1111 | 		bool		ping_sent = false; | 
| 1112 | 		long		wait_time; | 
| 1113 |  | 
| 1114 | 		CHECK_FOR_INTERRUPTS(); | 
| 1115 |  | 
| 1116 | 		MemoryContextSwitchTo(ApplyMessageContext); | 
| 1117 |  | 
| 1118 | 		len = walrcv_receive(wrconn, &buf, &fd); | 
| 1119 |  | 
| 1120 | 		if (len != 0) | 
| 1121 | 		{ | 
| 1122 | 			/* Process the data */ | 
| 1123 | 			for (;;) | 
| 1124 | 			{ | 
| 1125 | 				CHECK_FOR_INTERRUPTS(); | 
| 1126 |  | 
| 1127 | 				if (len == 0) | 
| 1128 | 				{ | 
| 1129 | 					break; | 
| 1130 | 				} | 
| 1131 | 				else if (len < 0) | 
| 1132 | 				{ | 
| 1133 | 					ereport(LOG, | 
| 1134 | 							(errmsg("data stream from publisher has ended" ))); | 
| 1135 | 					endofstream = true; | 
| 1136 | 					break; | 
| 1137 | 				} | 
| 1138 | 				else | 
| 1139 | 				{ | 
| 1140 | 					int			c; | 
| 1141 | 					StringInfoData s; | 
| 1142 |  | 
| 1143 | 					/* Reset timeout. */ | 
| 1144 | 					last_recv_timestamp = GetCurrentTimestamp(); | 
| 1145 | 					ping_sent = false; | 
| 1146 |  | 
| 1147 | 					/* Ensure we are reading the data into our memory context. */ | 
| 1148 | 					MemoryContextSwitchTo(ApplyMessageContext); | 
| 1149 |  | 
| 1150 | 					s.data = buf; | 
| 1151 | 					s.len = len; | 
| 1152 | 					s.cursor = 0; | 
| 1153 | 					s.maxlen = -1; | 
| 1154 |  | 
| 1155 | 					c = pq_getmsgbyte(&s); | 
| 1156 |  | 
| 1157 | 					if (c == 'w') | 
| 1158 | 					{ | 
| 1159 | 						XLogRecPtr	start_lsn; | 
| 1160 | 						XLogRecPtr	end_lsn; | 
| 1161 | 						TimestampTz send_time; | 
| 1162 |  | 
| 1163 | 						start_lsn = pq_getmsgint64(&s); | 
| 1164 | 						end_lsn = pq_getmsgint64(&s); | 
| 1165 | 						send_time = pq_getmsgint64(&s); | 
| 1166 |  | 
| 1167 | 						if (last_received < start_lsn) | 
| 1168 | 							last_received = start_lsn; | 
| 1169 |  | 
| 1170 | 						if (last_received < end_lsn) | 
| 1171 | 							last_received = end_lsn; | 
| 1172 |  | 
| 1173 | 						UpdateWorkerStats(last_received, send_time, false); | 
| 1174 |  | 
| 1175 | 						apply_dispatch(&s); | 
| 1176 | 					} | 
| 1177 | 					else if (c == 'k') | 
| 1178 | 					{ | 
| 1179 | 						XLogRecPtr	end_lsn; | 
| 1180 | 						TimestampTz timestamp; | 
| 1181 | 						bool		reply_requested; | 
| 1182 |  | 
| 1183 | 						end_lsn = pq_getmsgint64(&s); | 
| 1184 | 						timestamp = pq_getmsgint64(&s); | 
| 1185 | 						reply_requested = pq_getmsgbyte(&s); | 
| 1186 |  | 
| 1187 | 						if (last_received < end_lsn) | 
| 1188 | 							last_received = end_lsn; | 
| 1189 |  | 
| 1190 | 						send_feedback(last_received, reply_requested, false); | 
| 1191 | 						UpdateWorkerStats(last_received, timestamp, true); | 
| 1192 | 					} | 
| 1193 | 					/* other message types are purposefully ignored */ | 
| 1194 |  | 
| 1195 | 					MemoryContextReset(ApplyMessageContext); | 
| 1196 | 				} | 
| 1197 |  | 
| 1198 | 				len = walrcv_receive(wrconn, &buf, &fd); | 
| 1199 | 			} | 
| 1200 | 		} | 
| 1201 |  | 
| 1202 | 		/* confirm all writes so far */ | 
| 1203 | 		send_feedback(last_received, false, false); | 
| 1204 |  | 
| 1205 | 		if (!in_remote_transaction) | 
| 1206 | 		{ | 
| 1207 | 			/* | 
| 1208 | 			 * If we didn't get any transactions for a while there might be | 
| 1209 | 			 * unconsumed invalidation messages in the queue, consume them | 
| 1210 | 			 * now. | 
| 1211 | 			 */ | 
| 1212 | 			AcceptInvalidationMessages(); | 
| 1213 | 			maybe_reread_subscription(); | 
| 1214 |  | 
| 1215 | 			/* Process any table synchronization changes. */ | 
| 1216 | 			process_syncing_tables(last_received); | 
| 1217 | 		} | 
| 1218 |  | 
| 1219 | 		/* Cleanup the memory. */ | 
| 1220 | 		MemoryContextResetAndDeleteChildren(ApplyMessageContext); | 
| 1221 | 		MemoryContextSwitchTo(TopMemoryContext); | 
| 1222 |  | 
| 1223 | 		/* Check if we need to exit the streaming loop. */ | 
| 1224 | 		if (endofstream) | 
| 1225 | 		{ | 
| 1226 | 			TimeLineID	tli; | 
| 1227 |  | 
| 1228 | 			walrcv_endstreaming(wrconn, &tli); | 
| 1229 | 			break; | 
| 1230 | 		} | 
| 1231 |  | 
| 1232 | 		/* | 
| 1233 | 		 * Wait for more data or latch.  If we have unflushed transactions, | 
| 1234 | 		 * wake up after WalWriterDelay to see if they've been flushed yet (in | 
| 1235 | 		 * which case we should send a feedback message).  Otherwise, there's | 
| 1236 | 		 * no particular urgency about waking up unless we get data or a | 
| 1237 | 		 * signal. | 
| 1238 | 		 */ | 
| 1239 | 		if (!dlist_is_empty(&lsn_mapping)) | 
| 1240 | 			wait_time = WalWriterDelay; | 
| 1241 | 		else | 
| 1242 | 			wait_time = NAPTIME_PER_CYCLE; | 
| 1243 |  | 
| 1244 | 		rc = WaitLatchOrSocket(MyLatch, | 
| 1245 | 							   WL_SOCKET_READABLE | WL_LATCH_SET | | 
| 1246 | 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, | 
| 1247 | 							   fd, wait_time, | 
| 1248 | 							   WAIT_EVENT_LOGICAL_APPLY_MAIN); | 
| 1249 |  | 
| 1250 | 		if (rc & WL_LATCH_SET) | 
| 1251 | 		{ | 
| 1252 | 			ResetLatch(MyLatch); | 
| 1253 | 			CHECK_FOR_INTERRUPTS(); | 
| 1254 | 		} | 
| 1255 |  | 
| 1256 | 		if (got_SIGHUP) | 
| 1257 | 		{ | 
| 1258 | 			got_SIGHUP = false; | 
| 1259 | 			ProcessConfigFile(PGC_SIGHUP); | 
| 1260 | 		} | 
| 1261 |  | 
| 1262 | 		if (rc & WL_TIMEOUT) | 
| 1263 | 		{ | 
| 1264 | 			/* | 
| 1265 | 			 * We didn't receive anything new. If we haven't heard anything | 
| 1266 | 			 * from the server for more than wal_receiver_timeout / 2, ping | 
| 1267 | 			 * the server. Also, if it's been longer than | 
| 1268 | 			 * wal_receiver_status_interval since the last update we sent, | 
| 1269 | 			 * send a status update to the master anyway, to report any | 
| 1270 | 			 * progress in applying WAL. | 
| 1271 | 			 */ | 
| 1272 | 			bool		requestReply = false; | 
| 1273 |  | 
| 1274 | 			/* | 
| 1275 | 			 * Check if time since last receive from standby has reached the | 
| 1276 | 			 * configured limit. | 
| 1277 | 			 */ | 
| 1278 | 			if (wal_receiver_timeout > 0) | 
| 1279 | 			{ | 
| 1280 | 				TimestampTz now = GetCurrentTimestamp(); | 
| 1281 | 				TimestampTz timeout; | 
| 1282 |  | 
| 1283 | 				timeout = | 
| 1284 | 					TimestampTzPlusMilliseconds(last_recv_timestamp, | 
| 1285 | 												wal_receiver_timeout); | 
| 1286 |  | 
| 1287 | 				if (now >= timeout) | 
| 1288 | 					ereport(ERROR, | 
| 1289 | 							(errmsg("terminating logical replication worker due to timeout" ))); | 
| 1290 |  | 
| 1291 | 				/* | 
| 1292 | 				 * We didn't receive anything new, for half of receiver | 
| 1293 | 				 * replication timeout. Ping the server. | 
| 1294 | 				 */ | 
| 1295 | 				if (!ping_sent) | 
| 1296 | 				{ | 
| 1297 | 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, | 
| 1298 | 														  (wal_receiver_timeout / 2)); | 
| 1299 | 					if (now >= timeout) | 
| 1300 | 					{ | 
| 1301 | 						requestReply = true; | 
| 1302 | 						ping_sent = true; | 
| 1303 | 					} | 
| 1304 | 				} | 
| 1305 | 			} | 
| 1306 |  | 
| 1307 | 			send_feedback(last_received, requestReply, requestReply); | 
| 1308 | 		} | 
| 1309 | 	} | 
| 1310 | } | 
| 1311 |  | 
| 1312 | /* | 
| 1313 |  * Send a Standby Status Update message to server. | 
| 1314 |  * | 
| 1315 |  * 'recvpos' is the latest LSN we've received data to, force is set if we need | 
| 1316 |  * to send a response to avoid timeouts. | 
| 1317 |  */ | 
| 1318 | static void | 
| 1319 | send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) | 
| 1320 | { | 
| 1321 | 	static StringInfo reply_message = NULL; | 
| 1322 | 	static TimestampTz send_time = 0; | 
| 1323 |  | 
| 1324 | 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr; | 
| 1325 | 	static XLogRecPtr last_writepos = InvalidXLogRecPtr; | 
| 1326 | 	static XLogRecPtr last_flushpos = InvalidXLogRecPtr; | 
| 1327 |  | 
| 1328 | 	XLogRecPtr	writepos; | 
| 1329 | 	XLogRecPtr	flushpos; | 
| 1330 | 	TimestampTz now; | 
| 1331 | 	bool		have_pending_txes; | 
| 1332 |  | 
| 1333 | 	/* | 
| 1334 | 	 * If the user doesn't want status to be reported to the publisher, be | 
| 1335 | 	 * sure to exit before doing anything at all. | 
| 1336 | 	 */ | 
| 1337 | 	if (!force && wal_receiver_status_interval <= 0) | 
| 1338 | 		return; | 
| 1339 |  | 
| 1340 | 	/* It's legal to not pass a recvpos */ | 
| 1341 | 	if (recvpos < last_recvpos) | 
| 1342 | 		recvpos = last_recvpos; | 
| 1343 |  | 
| 1344 | 	get_flush_position(&writepos, &flushpos, &have_pending_txes); | 
| 1345 |  | 
| 1346 | 	/* | 
| 1347 | 	 * No outstanding transactions to flush, we can report the latest received | 
| 1348 | 	 * position. This is important for synchronous replication. | 
| 1349 | 	 */ | 
| 1350 | 	if (!have_pending_txes) | 
| 1351 | 		flushpos = writepos = recvpos; | 
| 1352 |  | 
| 1353 | 	if (writepos < last_writepos) | 
| 1354 | 		writepos = last_writepos; | 
| 1355 |  | 
| 1356 | 	if (flushpos < last_flushpos) | 
| 1357 | 		flushpos = last_flushpos; | 
| 1358 |  | 
| 1359 | 	now = GetCurrentTimestamp(); | 
| 1360 |  | 
| 1361 | 	/* if we've already reported everything we're good */ | 
| 1362 | 	if (!force && | 
| 1363 | 		writepos == last_writepos && | 
| 1364 | 		flushpos == last_flushpos && | 
| 1365 | 		!TimestampDifferenceExceeds(send_time, now, | 
| 1366 | 									wal_receiver_status_interval * 1000)) | 
| 1367 | 		return; | 
| 1368 | 	send_time = now; | 
| 1369 |  | 
| 1370 | 	if (!reply_message) | 
| 1371 | 	{ | 
| 1372 | 		MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); | 
| 1373 |  | 
| 1374 | 		reply_message = makeStringInfo(); | 
| 1375 | 		MemoryContextSwitchTo(oldctx); | 
| 1376 | 	} | 
| 1377 | 	else | 
| 1378 | 		resetStringInfo(reply_message); | 
| 1379 |  | 
| 1380 | 	pq_sendbyte(reply_message, 'r'); | 
| 1381 | 	pq_sendint64(reply_message, recvpos);	/* write */ | 
| 1382 | 	pq_sendint64(reply_message, flushpos);	/* flush */ | 
| 1383 | 	pq_sendint64(reply_message, writepos);	/* apply */ | 
| 1384 | 	pq_sendint64(reply_message, now);	/* sendTime */ | 
| 1385 | 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */ | 
| 1386 |  | 
| 1387 | 	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X" , | 
| 1388 | 		 force, | 
| 1389 | 		 (uint32) (recvpos >> 32), (uint32) recvpos, | 
| 1390 | 		 (uint32) (writepos >> 32), (uint32) writepos, | 
| 1391 | 		 (uint32) (flushpos >> 32), (uint32) flushpos | 
| 1392 | 		); | 
| 1393 |  | 
| 1394 | 	walrcv_send(wrconn, reply_message->data, reply_message->len); | 
| 1395 |  | 
| 1396 | 	if (recvpos > last_recvpos) | 
| 1397 | 		last_recvpos = recvpos; | 
| 1398 | 	if (writepos > last_writepos) | 
| 1399 | 		last_writepos = writepos; | 
| 1400 | 	if (flushpos > last_flushpos) | 
| 1401 | 		last_flushpos = flushpos; | 
| 1402 | } | 
| 1403 |  | 
| 1404 | /* | 
| 1405 |  * Reread subscription info if needed. Most changes will be exit. | 
| 1406 |  */ | 
| 1407 | static void | 
| 1408 | maybe_reread_subscription(void) | 
| 1409 | { | 
| 1410 | 	MemoryContext oldctx; | 
| 1411 | 	Subscription *newsub; | 
| 1412 | 	bool		started_tx = false; | 
| 1413 |  | 
| 1414 | 	/* When cache state is valid there is nothing to do here. */ | 
| 1415 | 	if (MySubscriptionValid) | 
| 1416 | 		return; | 
| 1417 |  | 
| 1418 | 	/* This function might be called inside or outside of transaction. */ | 
| 1419 | 	if (!IsTransactionState()) | 
| 1420 | 	{ | 
| 1421 | 		StartTransactionCommand(); | 
| 1422 | 		started_tx = true; | 
| 1423 | 	} | 
| 1424 |  | 
| 1425 | 	/* Ensure allocations in permanent context. */ | 
| 1426 | 	oldctx = MemoryContextSwitchTo(ApplyContext); | 
| 1427 |  | 
| 1428 | 	newsub = GetSubscription(MyLogicalRepWorker->subid, true); | 
| 1429 |  | 
| 1430 | 	/* | 
| 1431 | 	 * Exit if the subscription was removed. This normally should not happen | 
| 1432 | 	 * as the worker gets killed during DROP SUBSCRIPTION. | 
| 1433 | 	 */ | 
| 1434 | 	if (!newsub) | 
| 1435 | 	{ | 
| 1436 | 		ereport(LOG, | 
| 1437 | 				(errmsg("logical replication apply worker for subscription \"%s\" will "  | 
| 1438 | 						"stop because the subscription was removed" , | 
| 1439 | 						MySubscription->name))); | 
| 1440 |  | 
| 1441 | 		proc_exit(0); | 
| 1442 | 	} | 
| 1443 |  | 
| 1444 | 	/* | 
| 1445 | 	 * Exit if the subscription was disabled. This normally should not happen | 
| 1446 | 	 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE. | 
| 1447 | 	 */ | 
| 1448 | 	if (!newsub->enabled) | 
| 1449 | 	{ | 
| 1450 | 		ereport(LOG, | 
| 1451 | 				(errmsg("logical replication apply worker for subscription \"%s\" will "  | 
| 1452 | 						"stop because the subscription was disabled" , | 
| 1453 | 						MySubscription->name))); | 
| 1454 |  | 
| 1455 | 		proc_exit(0); | 
| 1456 | 	} | 
| 1457 |  | 
| 1458 | 	/* | 
| 1459 | 	 * Exit if connection string was changed. The launcher will start new | 
| 1460 | 	 * worker. | 
| 1461 | 	 */ | 
| 1462 | 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) | 
| 1463 | 	{ | 
| 1464 | 		ereport(LOG, | 
| 1465 | 				(errmsg("logical replication apply worker for subscription \"%s\" will "  | 
| 1466 | 						"restart because the connection information was changed" , | 
| 1467 | 						MySubscription->name))); | 
| 1468 |  | 
| 1469 | 		proc_exit(0); | 
| 1470 | 	} | 
| 1471 |  | 
| 1472 | 	/* | 
| 1473 | 	 * Exit if subscription name was changed (it's used for | 
| 1474 | 	 * fallback_application_name). The launcher will start new worker. | 
| 1475 | 	 */ | 
| 1476 | 	if (strcmp(newsub->name, MySubscription->name) != 0) | 
| 1477 | 	{ | 
| 1478 | 		ereport(LOG, | 
| 1479 | 				(errmsg("logical replication apply worker for subscription \"%s\" will "  | 
| 1480 | 						"restart because subscription was renamed" , | 
| 1481 | 						MySubscription->name))); | 
| 1482 |  | 
| 1483 | 		proc_exit(0); | 
| 1484 | 	} | 
| 1485 |  | 
| 1486 | 	/* !slotname should never happen when enabled is true. */ | 
| 1487 | 	Assert(newsub->slotname); | 
| 1488 |  | 
| 1489 | 	/* | 
| 1490 | 	 * We need to make new connection to new slot if slot name has changed so | 
| 1491 | 	 * exit here as well if that's the case. | 
| 1492 | 	 */ | 
| 1493 | 	if (strcmp(newsub->slotname, MySubscription->slotname) != 0) | 
| 1494 | 	{ | 
| 1495 | 		ereport(LOG, | 
| 1496 | 				(errmsg("logical replication apply worker for subscription \"%s\" will "  | 
| 1497 | 						"restart because the replication slot name was changed" , | 
| 1498 | 						MySubscription->name))); | 
| 1499 |  | 
| 1500 | 		proc_exit(0); | 
| 1501 | 	} | 
| 1502 |  | 
| 1503 | 	/* | 
| 1504 | 	 * Exit if publication list was changed. The launcher will start new | 
| 1505 | 	 * worker. | 
| 1506 | 	 */ | 
| 1507 | 	if (!equal(newsub->publications, MySubscription->publications)) | 
| 1508 | 	{ | 
| 1509 | 		ereport(LOG, | 
| 1510 | 				(errmsg("logical replication apply worker for subscription \"%s\" will "  | 
| 1511 | 						"restart because subscription's publications were changed" , | 
| 1512 | 						MySubscription->name))); | 
| 1513 |  | 
| 1514 | 		proc_exit(0); | 
| 1515 | 	} | 
| 1516 |  | 
| 1517 | 	/* Check for other changes that should never happen too. */ | 
| 1518 | 	if (newsub->dbid != MySubscription->dbid) | 
| 1519 | 	{ | 
| 1520 | 		elog(ERROR, "subscription %u changed unexpectedly" , | 
| 1521 | 			 MyLogicalRepWorker->subid); | 
| 1522 | 	} | 
| 1523 |  | 
| 1524 | 	/* Clean old subscription info and switch to new one. */ | 
| 1525 | 	FreeSubscription(MySubscription); | 
| 1526 | 	MySubscription = newsub; | 
| 1527 |  | 
| 1528 | 	MemoryContextSwitchTo(oldctx); | 
| 1529 |  | 
| 1530 | 	/* Change synchronous commit according to the user's wishes */ | 
| 1531 | 	SetConfigOption("synchronous_commit" , MySubscription->synccommit, | 
| 1532 | 					PGC_BACKEND, PGC_S_OVERRIDE); | 
| 1533 |  | 
| 1534 | 	if (started_tx) | 
| 1535 | 		CommitTransactionCommand(); | 
| 1536 |  | 
| 1537 | 	MySubscriptionValid = true; | 
| 1538 | } | 
| 1539 |  | 
| 1540 | /* | 
| 1541 |  * Callback from subscription syscache invalidation. | 
| 1542 |  */ | 
| 1543 | static void | 
| 1544 | subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) | 
| 1545 | { | 
| 1546 | 	MySubscriptionValid = false; | 
| 1547 | } | 
| 1548 |  | 
| 1549 | /* SIGHUP: set flag to reload configuration at next convenient time */ | 
| 1550 | static void | 
| 1551 | logicalrep_worker_sighup(SIGNAL_ARGS) | 
| 1552 | { | 
| 1553 | 	int			save_errno = errno; | 
| 1554 |  | 
| 1555 | 	got_SIGHUP = true; | 
| 1556 |  | 
| 1557 | 	/* Waken anything waiting on the process latch */ | 
| 1558 | 	SetLatch(MyLatch); | 
| 1559 |  | 
| 1560 | 	errno = save_errno; | 
| 1561 | } | 
| 1562 |  | 
| 1563 | /* Logical Replication Apply worker entry point */ | 
| 1564 | void | 
| 1565 | ApplyWorkerMain(Datum main_arg) | 
| 1566 | { | 
| 1567 | 	int			worker_slot = DatumGetInt32(main_arg); | 
| 1568 | 	MemoryContext oldctx; | 
| 1569 | 	char		originname[NAMEDATALEN]; | 
| 1570 | 	XLogRecPtr	origin_startpos; | 
| 1571 | 	char	   *myslotname; | 
| 1572 | 	WalRcvStreamOptions options; | 
| 1573 |  | 
| 1574 | 	/* Attach to slot */ | 
| 1575 | 	logicalrep_worker_attach(worker_slot); | 
| 1576 |  | 
| 1577 | 	/* Setup signal handling */ | 
| 1578 | 	pqsignal(SIGHUP, logicalrep_worker_sighup); | 
| 1579 | 	pqsignal(SIGTERM, die); | 
| 1580 | 	BackgroundWorkerUnblockSignals(); | 
| 1581 |  | 
| 1582 | 	/* | 
| 1583 | 	 * We don't currently need any ResourceOwner in a walreceiver process, but | 
| 1584 | 	 * if we did, we could call CreateAuxProcessResourceOwner here. | 
| 1585 | 	 */ | 
| 1586 |  | 
| 1587 | 	/* Initialise stats to a sanish value */ | 
| 1588 | 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = | 
| 1589 | 		MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); | 
| 1590 |  | 
| 1591 | 	/* Load the libpq-specific functions */ | 
| 1592 | 	load_file("libpqwalreceiver" , false); | 
| 1593 |  | 
| 1594 | 	/* Run as replica session replication role. */ | 
| 1595 | 	SetConfigOption("session_replication_role" , "replica" , | 
| 1596 | 					PGC_SUSET, PGC_S_OVERRIDE); | 
| 1597 |  | 
| 1598 | 	/* Connect to our database. */ | 
| 1599 | 	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, | 
| 1600 | 											  MyLogicalRepWorker->userid, | 
| 1601 | 											  0); | 
| 1602 |  | 
| 1603 | 	/* Load the subscription into persistent memory context. */ | 
| 1604 | 	ApplyContext = AllocSetContextCreate(TopMemoryContext, | 
| 1605 | 										 "ApplyContext" , | 
| 1606 | 										 ALLOCSET_DEFAULT_SIZES); | 
| 1607 | 	StartTransactionCommand(); | 
| 1608 | 	oldctx = MemoryContextSwitchTo(ApplyContext); | 
| 1609 |  | 
| 1610 | 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); | 
| 1611 | 	if (!MySubscription) | 
| 1612 | 	{ | 
| 1613 | 		ereport(LOG, | 
| 1614 | 				(errmsg("logical replication apply worker for subscription %u will not "  | 
| 1615 | 						"start because the subscription was removed during startup" , | 
| 1616 | 						MyLogicalRepWorker->subid))); | 
| 1617 | 		proc_exit(0); | 
| 1618 | 	} | 
| 1619 |  | 
| 1620 | 	MySubscriptionValid = true; | 
| 1621 | 	MemoryContextSwitchTo(oldctx); | 
| 1622 |  | 
| 1623 | 	if (!MySubscription->enabled) | 
| 1624 | 	{ | 
| 1625 | 		ereport(LOG, | 
| 1626 | 				(errmsg("logical replication apply worker for subscription \"%s\" will not "  | 
| 1627 | 						"start because the subscription was disabled during startup" , | 
| 1628 | 						MySubscription->name))); | 
| 1629 |  | 
| 1630 | 		proc_exit(0); | 
| 1631 | 	} | 
| 1632 |  | 
| 1633 | 	/* Setup synchronous commit according to the user's wishes */ | 
| 1634 | 	SetConfigOption("synchronous_commit" , MySubscription->synccommit, | 
| 1635 | 					PGC_BACKEND, PGC_S_OVERRIDE); | 
| 1636 |  | 
| 1637 | 	/* Keep us informed about subscription changes. */ | 
| 1638 | 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, | 
| 1639 | 								  subscription_change_cb, | 
| 1640 | 								  (Datum) 0); | 
| 1641 |  | 
| 1642 | 	if (am_tablesync_worker()) | 
| 1643 | 		ereport(LOG, | 
| 1644 | 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started" , | 
| 1645 | 						MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); | 
| 1646 | 	else | 
| 1647 | 		ereport(LOG, | 
| 1648 | 				(errmsg("logical replication apply worker for subscription \"%s\" has started" , | 
| 1649 | 						MySubscription->name))); | 
| 1650 |  | 
| 1651 | 	CommitTransactionCommand(); | 
| 1652 |  | 
| 1653 | 	/* Connect to the origin and start the replication. */ | 
| 1654 | 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"" , | 
| 1655 | 		 MySubscription->conninfo); | 
| 1656 |  | 
| 1657 | 	if (am_tablesync_worker()) | 
| 1658 | 	{ | 
| 1659 | 		char	   *syncslotname; | 
| 1660 |  | 
| 1661 | 		/* This is table synchroniation worker, call initial sync. */ | 
| 1662 | 		syncslotname = LogicalRepSyncTableStart(&origin_startpos); | 
| 1663 |  | 
| 1664 | 		/* The slot name needs to be allocated in permanent memory context. */ | 
| 1665 | 		oldctx = MemoryContextSwitchTo(ApplyContext); | 
| 1666 | 		myslotname = pstrdup(syncslotname); | 
| 1667 | 		MemoryContextSwitchTo(oldctx); | 
| 1668 |  | 
| 1669 | 		pfree(syncslotname); | 
| 1670 | 	} | 
| 1671 | 	else | 
| 1672 | 	{ | 
| 1673 | 		/* This is main apply worker */ | 
| 1674 | 		RepOriginId originid; | 
| 1675 | 		TimeLineID	startpointTLI; | 
| 1676 | 		char	   *err; | 
| 1677 |  | 
| 1678 | 		myslotname = MySubscription->slotname; | 
| 1679 |  | 
| 1680 | 		/* | 
| 1681 | 		 * This shouldn't happen if the subscription is enabled, but guard | 
| 1682 | 		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will | 
| 1683 | 		 * crash if slot is NULL.) | 
| 1684 | 		 */ | 
| 1685 | 		if (!myslotname) | 
| 1686 | 			ereport(ERROR, | 
| 1687 | 					(errmsg("subscription has no replication slot set" ))); | 
| 1688 |  | 
| 1689 | 		/* Setup replication origin tracking. */ | 
| 1690 | 		StartTransactionCommand(); | 
| 1691 | 		snprintf(originname, sizeof(originname), "pg_%u" , MySubscription->oid); | 
| 1692 | 		originid = replorigin_by_name(originname, true); | 
| 1693 | 		if (!OidIsValid(originid)) | 
| 1694 | 			originid = replorigin_create(originname); | 
| 1695 | 		replorigin_session_setup(originid); | 
| 1696 | 		replorigin_session_origin = originid; | 
| 1697 | 		origin_startpos = replorigin_session_get_progress(false); | 
| 1698 | 		CommitTransactionCommand(); | 
| 1699 |  | 
| 1700 | 		wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, | 
| 1701 | 								&err); | 
| 1702 | 		if (wrconn == NULL) | 
| 1703 | 			ereport(ERROR, | 
| 1704 | 					(errmsg("could not connect to the publisher: %s" , err))); | 
| 1705 |  | 
| 1706 | 		/* | 
| 1707 | 		 * We don't really use the output identify_system for anything but it | 
| 1708 | 		 * does some initializations on the upstream so let's still call it. | 
| 1709 | 		 */ | 
| 1710 | 		(void) walrcv_identify_system(wrconn, &startpointTLI); | 
| 1711 |  | 
| 1712 | 	} | 
| 1713 |  | 
| 1714 | 	/* | 
| 1715 | 	 * Setup callback for syscache so that we know when something changes in | 
| 1716 | 	 * the subscription relation state. | 
| 1717 | 	 */ | 
| 1718 | 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, | 
| 1719 | 								  invalidate_syncing_table_states, | 
| 1720 | 								  (Datum) 0); | 
| 1721 |  | 
| 1722 | 	/* Build logical replication streaming options. */ | 
| 1723 | 	options.logical = true; | 
| 1724 | 	options.startpoint = origin_startpos; | 
| 1725 | 	options.slotname = myslotname; | 
| 1726 | 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; | 
| 1727 | 	options.proto.logical.publication_names = MySubscription->publications; | 
| 1728 |  | 
| 1729 | 	/* Start normal logical streaming replication. */ | 
| 1730 | 	walrcv_startstreaming(wrconn, &options); | 
| 1731 |  | 
| 1732 | 	/* Run the main loop. */ | 
| 1733 | 	LogicalRepApplyLoop(origin_startpos); | 
| 1734 |  | 
| 1735 | 	proc_exit(0); | 
| 1736 | } | 
| 1737 |  | 
| 1738 | /* | 
| 1739 |  * Is current process a logical replication worker? | 
| 1740 |  */ | 
| 1741 | bool | 
| 1742 | IsLogicalWorker(void) | 
| 1743 | { | 
| 1744 | 	return MyLogicalRepWorker != NULL; | 
| 1745 | } | 
| 1746 |  |