| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * walreceiver.h |
| 4 | * Exports from replication/walreceiverfuncs.c. |
| 5 | * |
| 6 | * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group |
| 7 | * |
| 8 | * src/include/replication/walreceiver.h |
| 9 | * |
| 10 | *------------------------------------------------------------------------- |
| 11 | */ |
| 12 | #ifndef _WALRECEIVER_H |
| 13 | #define _WALRECEIVER_H |
| 14 | |
| 15 | #include "access/xlog.h" |
| 16 | #include "access/xlogdefs.h" |
| 17 | #include "fmgr.h" |
| 18 | #include "getaddrinfo.h" /* for NI_MAXHOST */ |
| 19 | #include "replication/logicalproto.h" |
| 20 | #include "replication/walsender.h" |
| 21 | #include "storage/latch.h" |
| 22 | #include "storage/spin.h" |
| 23 | #include "pgtime.h" |
| 24 | #include "utils/tuplestore.h" |
| 25 | |
| 26 | /* user-settable parameters */ |
| 27 | extern int wal_receiver_status_interval; |
| 28 | extern int wal_receiver_timeout; |
| 29 | extern bool hot_standby_feedback; |
| 30 | |
| 31 | /* |
| 32 | * MAXCONNINFO: maximum size of a connection string. |
| 33 | * |
| 34 | * XXX: Should this move to pg_config_manual.h? |
| 35 | */ |
| 36 | #define MAXCONNINFO 1024 |
| 37 | |
| 38 | /* Can we allow the standby to accept replication connection from another standby? */ |
| 39 | #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0) |
| 40 | |
| 41 | /* |
| 42 | * Values for WalRcv->walRcvState. |
| 43 | */ |
| 44 | typedef enum |
| 45 | { |
| 46 | WALRCV_STOPPED, /* stopped and mustn't start up again */ |
| 47 | WALRCV_STARTING, /* launched, but the process hasn't |
| 48 | * initialized yet */ |
| 49 | WALRCV_STREAMING, /* walreceiver is streaming */ |
| 50 | WALRCV_WAITING, /* stopped streaming, waiting for orders */ |
| 51 | WALRCV_RESTARTING, /* asked to restart streaming */ |
| 52 | WALRCV_STOPPING /* requested to stop, but still running */ |
| 53 | } WalRcvState; |
| 54 | |
| 55 | /* Shared memory area for management of walreceiver process */ |
| 56 | typedef struct |
| 57 | { |
| 58 | /* |
| 59 | * PID of currently active walreceiver process, its current state and |
| 60 | * start time (actually, the time at which it was requested to be |
| 61 | * started). |
| 62 | */ |
| 63 | pid_t pid; |
| 64 | WalRcvState walRcvState; |
| 65 | pg_time_t startTime; |
| 66 | |
| 67 | /* |
| 68 | * receiveStart and receiveStartTLI indicate the first byte position and |
| 69 | * timeline that will be received. When startup process starts the |
| 70 | * walreceiver, it sets these to the point where it wants the streaming to |
| 71 | * begin. |
| 72 | */ |
| 73 | XLogRecPtr receiveStart; |
| 74 | TimeLineID receiveStartTLI; |
| 75 | |
| 76 | /* |
| 77 | * receivedUpto-1 is the last byte position that has already been |
| 78 | * received, and receivedTLI is the timeline it came from. At the first |
| 79 | * startup of walreceiver, these are set to receiveStart and |
| 80 | * receiveStartTLI. After that, walreceiver updates these whenever it |
| 81 | * flushes the received WAL to disk. |
| 82 | */ |
| 83 | XLogRecPtr receivedUpto; |
| 84 | TimeLineID receivedTLI; |
| 85 | |
| 86 | /* |
| 87 | * latestChunkStart is the starting byte position of the current "batch" |
| 88 | * of received WAL. It's actually the same as the previous value of |
| 89 | * receivedUpto before the last flush to disk. Startup process can use |
| 90 | * this to detect whether it's keeping up or not. |
| 91 | */ |
| 92 | XLogRecPtr latestChunkStart; |
| 93 | |
| 94 | /* |
| 95 | * Time of send and receive of any message received. |
| 96 | */ |
| 97 | TimestampTz lastMsgSendTime; |
| 98 | TimestampTz lastMsgReceiptTime; |
| 99 | |
| 100 | /* |
| 101 | * Latest reported end of WAL on the sender |
| 102 | */ |
| 103 | XLogRecPtr latestWalEnd; |
| 104 | TimestampTz latestWalEndTime; |
| 105 | |
| 106 | /* |
| 107 | * connection string; initially set to connect to the primary, and later |
| 108 | * clobbered to hide security-sensitive fields. |
| 109 | */ |
| 110 | char conninfo[MAXCONNINFO]; |
| 111 | |
| 112 | /* |
| 113 | * Host name (this can be a host name, an IP address, or a directory path) |
| 114 | * and port number of the active replication connection. |
| 115 | */ |
| 116 | char sender_host[NI_MAXHOST]; |
| 117 | int sender_port; |
| 118 | |
| 119 | /* |
| 120 | * replication slot name; is also used for walreceiver to connect with the |
| 121 | * primary |
| 122 | */ |
| 123 | char slotname[NAMEDATALEN]; |
| 124 | |
| 125 | /* set true once conninfo is ready to display (obfuscated pwds etc) */ |
| 126 | bool ready_to_display; |
| 127 | |
| 128 | /* |
| 129 | * Latch used by startup process to wake up walreceiver after telling it |
| 130 | * where to start streaming (after setting receiveStart and |
| 131 | * receiveStartTLI), and also to tell it to send apply feedback to the |
| 132 | * primary whenever specially marked commit records are applied. This is |
| 133 | * normally mapped to procLatch when walreceiver is running. |
| 134 | */ |
| 135 | Latch *latch; |
| 136 | |
| 137 | slock_t mutex; /* locks shared variables shown above */ |
| 138 | |
| 139 | /* |
| 140 | * force walreceiver reply? This doesn't need to be locked; memory |
| 141 | * barriers for ordering are sufficient. But we do need atomic fetch and |
| 142 | * store semantics, so use sig_atomic_t. |
| 143 | */ |
| 144 | sig_atomic_t force_reply; /* used as a bool */ |
| 145 | } WalRcvData; |
| 146 | |
| 147 | extern WalRcvData *WalRcv; |
| 148 | |
| 149 | typedef struct |
| 150 | { |
| 151 | bool logical; /* True if this is logical replication stream, |
| 152 | * false if physical stream. */ |
| 153 | char *slotname; /* Name of the replication slot or NULL. */ |
| 154 | XLogRecPtr startpoint; /* LSN of starting point. */ |
| 155 | |
| 156 | union |
| 157 | { |
| 158 | struct |
| 159 | { |
| 160 | TimeLineID startpointTLI; /* Starting timeline */ |
| 161 | } physical; |
| 162 | struct |
| 163 | { |
| 164 | uint32 proto_version; /* Logical protocol version */ |
| 165 | List *publication_names; /* String list of publications */ |
| 166 | } logical; |
| 167 | } proto; |
| 168 | } WalRcvStreamOptions; |
| 169 | |
| 170 | struct WalReceiverConn; |
| 171 | typedef struct WalReceiverConn WalReceiverConn; |
| 172 | |
| 173 | /* |
| 174 | * Status of walreceiver query execution. |
| 175 | * |
| 176 | * We only define statuses that are currently used. |
| 177 | */ |
| 178 | typedef enum |
| 179 | { |
| 180 | WALRCV_ERROR, /* There was error when executing the query. */ |
| 181 | WALRCV_OK_COMMAND, /* Query executed utility or replication |
| 182 | * command. */ |
| 183 | WALRCV_OK_TUPLES, /* Query returned tuples. */ |
| 184 | WALRCV_OK_COPY_IN, /* Query started COPY FROM. */ |
| 185 | WALRCV_OK_COPY_OUT, /* Query started COPY TO. */ |
| 186 | WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication |
| 187 | * protocol. */ |
| 188 | } WalRcvExecStatus; |
| 189 | |
| 190 | /* |
| 191 | * Return value for walrcv_query, returns the status of the execution and |
| 192 | * tuples if any. |
| 193 | */ |
| 194 | typedef struct WalRcvExecResult |
| 195 | { |
| 196 | WalRcvExecStatus status; |
| 197 | char *err; |
| 198 | Tuplestorestate *tuplestore; |
| 199 | TupleDesc tupledesc; |
| 200 | } WalRcvExecResult; |
| 201 | |
| 202 | /* libpqwalreceiver hooks */ |
| 203 | typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical, |
| 204 | const char *appname, |
| 205 | char **err); |
| 206 | typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); |
| 207 | typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); |
| 208 | typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, |
| 209 | char **sender_host, |
| 210 | int *sender_port); |
| 211 | typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, |
| 212 | TimeLineID *primary_tli); |
| 213 | typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn); |
| 214 | typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn, |
| 215 | TimeLineID tli, |
| 216 | char **filename, |
| 217 | char **content, int *size); |
| 218 | typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn, |
| 219 | const WalRcvStreamOptions *options); |
| 220 | typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn, |
| 221 | TimeLineID *next_tli); |
| 222 | typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer, |
| 223 | pgsocket *wait_fd); |
| 224 | typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, |
| 225 | int nbytes); |
| 226 | typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, |
| 227 | const char *slotname, bool temporary, |
| 228 | CRSSnapshotAction snapshot_action, |
| 229 | XLogRecPtr *lsn); |
| 230 | typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn, |
| 231 | const char *query, |
| 232 | const int nRetTypes, |
| 233 | const Oid *retTypes); |
| 234 | typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); |
| 235 | |
| 236 | typedef struct WalReceiverFunctionsType |
| 237 | { |
| 238 | walrcv_connect_fn walrcv_connect; |
| 239 | walrcv_check_conninfo_fn walrcv_check_conninfo; |
| 240 | walrcv_get_conninfo_fn walrcv_get_conninfo; |
| 241 | walrcv_get_senderinfo_fn walrcv_get_senderinfo; |
| 242 | walrcv_identify_system_fn walrcv_identify_system; |
| 243 | walrcv_server_version_fn walrcv_server_version; |
| 244 | walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; |
| 245 | walrcv_startstreaming_fn walrcv_startstreaming; |
| 246 | walrcv_endstreaming_fn walrcv_endstreaming; |
| 247 | walrcv_receive_fn walrcv_receive; |
| 248 | walrcv_send_fn walrcv_send; |
| 249 | walrcv_create_slot_fn walrcv_create_slot; |
| 250 | walrcv_exec_fn walrcv_exec; |
| 251 | walrcv_disconnect_fn walrcv_disconnect; |
| 252 | } WalReceiverFunctionsType; |
| 253 | |
| 254 | extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; |
| 255 | |
| 256 | #define walrcv_connect(conninfo, logical, appname, err) \ |
| 257 | WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err) |
| 258 | #define walrcv_check_conninfo(conninfo) \ |
| 259 | WalReceiverFunctions->walrcv_check_conninfo(conninfo) |
| 260 | #define walrcv_get_conninfo(conn) \ |
| 261 | WalReceiverFunctions->walrcv_get_conninfo(conn) |
| 262 | #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ |
| 263 | WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) |
| 264 | #define walrcv_identify_system(conn, primary_tli) \ |
| 265 | WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) |
| 266 | #define walrcv_server_version(conn) \ |
| 267 | WalReceiverFunctions->walrcv_server_version(conn) |
| 268 | #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ |
| 269 | WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) |
| 270 | #define walrcv_startstreaming(conn, options) \ |
| 271 | WalReceiverFunctions->walrcv_startstreaming(conn, options) |
| 272 | #define walrcv_endstreaming(conn, next_tli) \ |
| 273 | WalReceiverFunctions->walrcv_endstreaming(conn, next_tli) |
| 274 | #define walrcv_receive(conn, buffer, wait_fd) \ |
| 275 | WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) |
| 276 | #define walrcv_send(conn, buffer, nbytes) \ |
| 277 | WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) |
| 278 | #define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \ |
| 279 | WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) |
| 280 | #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ |
| 281 | WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes) |
| 282 | #define walrcv_disconnect(conn) \ |
| 283 | WalReceiverFunctions->walrcv_disconnect(conn) |
| 284 | |
| 285 | static inline void |
| 286 | walrcv_clear_result(WalRcvExecResult *walres) |
| 287 | { |
| 288 | if (!walres) |
| 289 | return; |
| 290 | |
| 291 | if (walres->err) |
| 292 | pfree(walres->err); |
| 293 | |
| 294 | if (walres->tuplestore) |
| 295 | tuplestore_end(walres->tuplestore); |
| 296 | |
| 297 | if (walres->tupledesc) |
| 298 | FreeTupleDesc(walres->tupledesc); |
| 299 | |
| 300 | pfree(walres); |
| 301 | } |
| 302 | |
| 303 | /* prototypes for functions in walreceiver.c */ |
| 304 | extern void WalReceiverMain(void) pg_attribute_noreturn(); |
| 305 | extern void ProcessWalRcvInterrupts(void); |
| 306 | |
| 307 | /* prototypes for functions in walreceiverfuncs.c */ |
| 308 | extern Size WalRcvShmemSize(void); |
| 309 | extern void WalRcvShmemInit(void); |
| 310 | extern void ShutdownWalRcv(void); |
| 311 | extern bool WalRcvStreaming(void); |
| 312 | extern bool WalRcvRunning(void); |
| 313 | extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, |
| 314 | const char *conninfo, const char *slotname); |
| 315 | extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); |
| 316 | extern int GetReplicationApplyDelay(void); |
| 317 | extern int GetReplicationTransferLatency(void); |
| 318 | extern void WalRcvForceReply(void); |
| 319 | |
| 320 | #endif /* _WALRECEIVER_H */ |
| 321 | |