1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * libpqwalreceiver.c |
4 | * |
5 | * This file contains the libpq-specific parts of walreceiver. It's |
6 | * loaded as a dynamic module to avoid linking the main server binary with |
7 | * libpq. |
8 | * |
9 | * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group |
10 | * |
11 | * |
12 | * IDENTIFICATION |
13 | * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c |
14 | * |
15 | *------------------------------------------------------------------------- |
16 | */ |
17 | #include "postgres.h" |
18 | |
19 | #include <unistd.h> |
20 | #include <sys/time.h> |
21 | |
22 | #include "libpq-fe.h" |
23 | #include "pqexpbuffer.h" |
24 | #include "access/xlog.h" |
25 | #include "catalog/pg_type.h" |
26 | #include "funcapi.h" |
27 | #include "mb/pg_wchar.h" |
28 | #include "miscadmin.h" |
29 | #include "pgstat.h" |
30 | #include "replication/walreceiver.h" |
31 | #include "utils/builtins.h" |
32 | #include "utils/memutils.h" |
33 | #include "utils/pg_lsn.h" |
34 | #include "utils/tuplestore.h" |
35 | |
36 | PG_MODULE_MAGIC; |
37 | |
38 | void _PG_init(void); |
39 | |
40 | struct WalReceiverConn |
41 | { |
42 | /* Current connection to the primary, if any */ |
43 | PGconn *streamConn; |
44 | /* Used to remember if the connection is logical or physical */ |
45 | bool logical; |
46 | /* Buffer for currently read records */ |
47 | char *recvBuf; |
48 | }; |
49 | |
50 | /* Prototypes for interface functions */ |
51 | static WalReceiverConn *libpqrcv_connect(const char *conninfo, |
52 | bool logical, const char *appname, |
53 | char **err); |
54 | static void libpqrcv_check_conninfo(const char *conninfo); |
55 | static char *libpqrcv_get_conninfo(WalReceiverConn *conn); |
56 | static void libpqrcv_get_senderinfo(WalReceiverConn *conn, |
57 | char **sender_host, int *sender_port); |
58 | static char *libpqrcv_identify_system(WalReceiverConn *conn, |
59 | TimeLineID *primary_tli); |
60 | static int libpqrcv_server_version(WalReceiverConn *conn); |
61 | static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, |
62 | TimeLineID tli, char **filename, |
63 | char **content, int *len); |
64 | static bool libpqrcv_startstreaming(WalReceiverConn *conn, |
65 | const WalRcvStreamOptions *options); |
66 | static void libpqrcv_endstreaming(WalReceiverConn *conn, |
67 | TimeLineID *next_tli); |
68 | static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, |
69 | pgsocket *wait_fd); |
70 | static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, |
71 | int nbytes); |
72 | static char *libpqrcv_create_slot(WalReceiverConn *conn, |
73 | const char *slotname, |
74 | bool temporary, |
75 | CRSSnapshotAction snapshot_action, |
76 | XLogRecPtr *lsn); |
77 | static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, |
78 | const char *query, |
79 | const int nRetTypes, |
80 | const Oid *retTypes); |
81 | static void libpqrcv_disconnect(WalReceiverConn *conn); |
82 | |
83 | static WalReceiverFunctionsType PQWalReceiverFunctions = { |
84 | libpqrcv_connect, |
85 | libpqrcv_check_conninfo, |
86 | libpqrcv_get_conninfo, |
87 | libpqrcv_get_senderinfo, |
88 | libpqrcv_identify_system, |
89 | libpqrcv_server_version, |
90 | libpqrcv_readtimelinehistoryfile, |
91 | libpqrcv_startstreaming, |
92 | libpqrcv_endstreaming, |
93 | libpqrcv_receive, |
94 | libpqrcv_send, |
95 | libpqrcv_create_slot, |
96 | libpqrcv_exec, |
97 | libpqrcv_disconnect |
98 | }; |
99 | |
100 | /* Prototypes for private functions */ |
101 | static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); |
102 | static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); |
103 | static char *stringlist_to_identifierstr(PGconn *conn, List *strings); |
104 | |
105 | /* |
106 | * Module initialization function |
107 | */ |
108 | void |
109 | _PG_init(void) |
110 | { |
111 | if (WalReceiverFunctions != NULL) |
112 | elog(ERROR, "libpqwalreceiver already loaded" ); |
113 | WalReceiverFunctions = &PQWalReceiverFunctions; |
114 | } |
115 | |
116 | /* |
117 | * Establish the connection to the primary server for XLOG streaming |
118 | * |
119 | * Returns NULL on error and fills the err with palloc'ed error message. |
120 | */ |
121 | static WalReceiverConn * |
122 | libpqrcv_connect(const char *conninfo, bool logical, const char *appname, |
123 | char **err) |
124 | { |
125 | WalReceiverConn *conn; |
126 | PostgresPollingStatusType status; |
127 | const char *keys[5]; |
128 | const char *vals[5]; |
129 | int i = 0; |
130 | |
131 | /* |
132 | * We use the expand_dbname parameter to process the connection string (or |
133 | * URI), and pass some extra options. |
134 | */ |
135 | keys[i] = "dbname" ; |
136 | vals[i] = conninfo; |
137 | keys[++i] = "replication" ; |
138 | vals[i] = logical ? "database" : "true" ; |
139 | if (!logical) |
140 | { |
141 | /* |
142 | * The database name is ignored by the server in replication mode, but |
143 | * specify "replication" for .pgpass lookup. |
144 | */ |
145 | keys[++i] = "dbname" ; |
146 | vals[i] = "replication" ; |
147 | } |
148 | keys[++i] = "fallback_application_name" ; |
149 | vals[i] = appname; |
150 | if (logical) |
151 | { |
152 | keys[++i] = "client_encoding" ; |
153 | vals[i] = GetDatabaseEncodingName(); |
154 | } |
155 | keys[++i] = NULL; |
156 | vals[i] = NULL; |
157 | |
158 | Assert(i < sizeof(keys)); |
159 | |
160 | conn = palloc0(sizeof(WalReceiverConn)); |
161 | conn->streamConn = PQconnectStartParams(keys, vals, |
162 | /* expand_dbname = */ true); |
163 | if (PQstatus(conn->streamConn) == CONNECTION_BAD) |
164 | { |
165 | *err = pchomp(PQerrorMessage(conn->streamConn)); |
166 | return NULL; |
167 | } |
168 | |
169 | /* |
170 | * Poll connection until we have OK or FAILED status. |
171 | * |
172 | * Per spec for PQconnectPoll, first wait till socket is write-ready. |
173 | */ |
174 | status = PGRES_POLLING_WRITING; |
175 | do |
176 | { |
177 | int io_flag; |
178 | int rc; |
179 | |
180 | if (status == PGRES_POLLING_READING) |
181 | io_flag = WL_SOCKET_READABLE; |
182 | #ifdef WIN32 |
183 | /* Windows needs a different test while waiting for connection-made */ |
184 | else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) |
185 | io_flag = WL_SOCKET_CONNECTED; |
186 | #endif |
187 | else |
188 | io_flag = WL_SOCKET_WRITEABLE; |
189 | |
190 | rc = WaitLatchOrSocket(MyLatch, |
191 | WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, |
192 | PQsocket(conn->streamConn), |
193 | 0, |
194 | WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); |
195 | |
196 | /* Interrupted? */ |
197 | if (rc & WL_LATCH_SET) |
198 | { |
199 | ResetLatch(MyLatch); |
200 | ProcessWalRcvInterrupts(); |
201 | } |
202 | |
203 | /* If socket is ready, advance the libpq state machine */ |
204 | if (rc & io_flag) |
205 | status = PQconnectPoll(conn->streamConn); |
206 | } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); |
207 | |
208 | if (PQstatus(conn->streamConn) != CONNECTION_OK) |
209 | { |
210 | *err = pchomp(PQerrorMessage(conn->streamConn)); |
211 | return NULL; |
212 | } |
213 | |
214 | conn->logical = logical; |
215 | |
216 | return conn; |
217 | } |
218 | |
219 | /* |
220 | * Validate connection info string (just try to parse it) |
221 | */ |
222 | static void |
223 | libpqrcv_check_conninfo(const char *conninfo) |
224 | { |
225 | PQconninfoOption *opts = NULL; |
226 | char *err = NULL; |
227 | |
228 | opts = PQconninfoParse(conninfo, &err); |
229 | if (opts == NULL) |
230 | ereport(ERROR, |
231 | (errcode(ERRCODE_SYNTAX_ERROR), |
232 | errmsg("invalid connection string syntax: %s" , err))); |
233 | |
234 | PQconninfoFree(opts); |
235 | } |
236 | |
237 | /* |
238 | * Return a user-displayable conninfo string. Any security-sensitive fields |
239 | * are obfuscated. |
240 | */ |
241 | static char * |
242 | libpqrcv_get_conninfo(WalReceiverConn *conn) |
243 | { |
244 | PQconninfoOption *conn_opts; |
245 | PQconninfoOption *conn_opt; |
246 | PQExpBufferData buf; |
247 | char *retval; |
248 | |
249 | Assert(conn->streamConn != NULL); |
250 | |
251 | initPQExpBuffer(&buf); |
252 | conn_opts = PQconninfo(conn->streamConn); |
253 | |
254 | if (conn_opts == NULL) |
255 | ereport(ERROR, |
256 | (errmsg("could not parse connection string: %s" , |
257 | _("out of memory" )))); |
258 | |
259 | /* build a clean connection string from pieces */ |
260 | for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++) |
261 | { |
262 | bool obfuscate; |
263 | |
264 | /* Skip debug and empty options */ |
265 | if (strchr(conn_opt->dispchar, 'D') || |
266 | conn_opt->val == NULL || |
267 | conn_opt->val[0] == '\0') |
268 | continue; |
269 | |
270 | /* Obfuscate security-sensitive options */ |
271 | obfuscate = strchr(conn_opt->dispchar, '*') != NULL; |
272 | |
273 | appendPQExpBuffer(&buf, "%s%s=%s" , |
274 | buf.len == 0 ? "" : " " , |
275 | conn_opt->keyword, |
276 | obfuscate ? "********" : conn_opt->val); |
277 | } |
278 | |
279 | PQconninfoFree(conn_opts); |
280 | |
281 | retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data); |
282 | termPQExpBuffer(&buf); |
283 | return retval; |
284 | } |
285 | |
286 | /* |
287 | * Provides information of sender this WAL receiver is connected to. |
288 | */ |
289 | static void |
290 | libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, |
291 | int *sender_port) |
292 | { |
293 | char *ret = NULL; |
294 | |
295 | *sender_host = NULL; |
296 | *sender_port = 0; |
297 | |
298 | Assert(conn->streamConn != NULL); |
299 | |
300 | ret = PQhost(conn->streamConn); |
301 | if (ret && strlen(ret) != 0) |
302 | *sender_host = pstrdup(ret); |
303 | |
304 | ret = PQport(conn->streamConn); |
305 | if (ret && strlen(ret) != 0) |
306 | *sender_port = atoi(ret); |
307 | } |
308 | |
309 | /* |
310 | * Check that primary's system identifier matches ours, and fetch the current |
311 | * timeline ID of the primary. |
312 | */ |
313 | static char * |
314 | libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) |
315 | { |
316 | PGresult *res; |
317 | char *primary_sysid; |
318 | |
319 | /* |
320 | * Get the system identifier and timeline ID as a DataRow message from the |
321 | * primary server. |
322 | */ |
323 | res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM" ); |
324 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
325 | { |
326 | PQclear(res); |
327 | ereport(ERROR, |
328 | (errmsg("could not receive database system identifier and timeline ID from " |
329 | "the primary server: %s" , |
330 | pchomp(PQerrorMessage(conn->streamConn))))); |
331 | } |
332 | if (PQnfields(res) < 3 || PQntuples(res) != 1) |
333 | { |
334 | int ntuples = PQntuples(res); |
335 | int nfields = PQnfields(res); |
336 | |
337 | PQclear(res); |
338 | ereport(ERROR, |
339 | (errmsg("invalid response from primary server" ), |
340 | errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields." , |
341 | ntuples, nfields, 3, 1))); |
342 | } |
343 | primary_sysid = pstrdup(PQgetvalue(res, 0, 0)); |
344 | *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1)); |
345 | PQclear(res); |
346 | |
347 | return primary_sysid; |
348 | } |
349 | |
350 | /* |
351 | * Thin wrapper around libpq to obtain server version. |
352 | */ |
353 | static int |
354 | libpqrcv_server_version(WalReceiverConn *conn) |
355 | { |
356 | return PQserverVersion(conn->streamConn); |
357 | } |
358 | |
359 | /* |
360 | * Start streaming WAL data from given streaming options. |
361 | * |
362 | * Returns true if we switched successfully to copy-both mode. False |
363 | * means the server received the command and executed it successfully, but |
364 | * didn't switch to copy-mode. That means that there was no WAL on the |
365 | * requested timeline and starting point, because the server switched to |
366 | * another timeline at or before the requested starting point. On failure, |
367 | * throws an ERROR. |
368 | */ |
369 | static bool |
370 | libpqrcv_startstreaming(WalReceiverConn *conn, |
371 | const WalRcvStreamOptions *options) |
372 | { |
373 | StringInfoData cmd; |
374 | PGresult *res; |
375 | |
376 | Assert(options->logical == conn->logical); |
377 | Assert(options->slotname || !options->logical); |
378 | |
379 | initStringInfo(&cmd); |
380 | |
381 | /* Build the command. */ |
382 | appendStringInfoString(&cmd, "START_REPLICATION" ); |
383 | if (options->slotname != NULL) |
384 | appendStringInfo(&cmd, " SLOT \"%s\"" , |
385 | options->slotname); |
386 | |
387 | if (options->logical) |
388 | appendStringInfoString(&cmd, " LOGICAL" ); |
389 | |
390 | appendStringInfo(&cmd, " %X/%X" , |
391 | (uint32) (options->startpoint >> 32), |
392 | (uint32) options->startpoint); |
393 | |
394 | /* |
395 | * Additional options are different depending on if we are doing logical |
396 | * or physical replication. |
397 | */ |
398 | if (options->logical) |
399 | { |
400 | char *pubnames_str; |
401 | List *pubnames; |
402 | char *pubnames_literal; |
403 | |
404 | appendStringInfoString(&cmd, " (" ); |
405 | |
406 | appendStringInfo(&cmd, "proto_version '%u'" , |
407 | options->proto.logical.proto_version); |
408 | |
409 | pubnames = options->proto.logical.publication_names; |
410 | pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); |
411 | if (!pubnames_str) |
412 | ereport(ERROR, |
413 | (errmsg("could not start WAL streaming: %s" , |
414 | pchomp(PQerrorMessage(conn->streamConn))))); |
415 | pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str, |
416 | strlen(pubnames_str)); |
417 | if (!pubnames_literal) |
418 | ereport(ERROR, |
419 | (errmsg("could not start WAL streaming: %s" , |
420 | pchomp(PQerrorMessage(conn->streamConn))))); |
421 | appendStringInfo(&cmd, ", publication_names %s" , pubnames_literal); |
422 | PQfreemem(pubnames_literal); |
423 | pfree(pubnames_str); |
424 | |
425 | appendStringInfoChar(&cmd, ')'); |
426 | } |
427 | else |
428 | appendStringInfo(&cmd, " TIMELINE %u" , |
429 | options->proto.physical.startpointTLI); |
430 | |
431 | /* Start streaming. */ |
432 | res = libpqrcv_PQexec(conn->streamConn, cmd.data); |
433 | pfree(cmd.data); |
434 | |
435 | if (PQresultStatus(res) == PGRES_COMMAND_OK) |
436 | { |
437 | PQclear(res); |
438 | return false; |
439 | } |
440 | else if (PQresultStatus(res) != PGRES_COPY_BOTH) |
441 | { |
442 | PQclear(res); |
443 | ereport(ERROR, |
444 | (errmsg("could not start WAL streaming: %s" , |
445 | pchomp(PQerrorMessage(conn->streamConn))))); |
446 | } |
447 | PQclear(res); |
448 | return true; |
449 | } |
450 | |
451 | /* |
452 | * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as |
453 | * reported by the server, or 0 if it did not report it. |
454 | */ |
455 | static void |
456 | libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) |
457 | { |
458 | PGresult *res; |
459 | |
460 | /* |
461 | * Send copy-end message. As in libpqrcv_PQexec, this could theoretically |
462 | * block, but the risk seems small. |
463 | */ |
464 | if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || |
465 | PQflush(conn->streamConn)) |
466 | ereport(ERROR, |
467 | (errmsg("could not send end-of-streaming message to primary: %s" , |
468 | pchomp(PQerrorMessage(conn->streamConn))))); |
469 | |
470 | *next_tli = 0; |
471 | |
472 | /* |
473 | * After COPY is finished, we should receive a result set indicating the |
474 | * next timeline's ID, or just CommandComplete if the server was shut |
475 | * down. |
476 | * |
477 | * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is |
478 | * also possible in case we aborted the copy in mid-stream. |
479 | */ |
480 | res = libpqrcv_PQgetResult(conn->streamConn); |
481 | if (PQresultStatus(res) == PGRES_TUPLES_OK) |
482 | { |
483 | /* |
484 | * Read the next timeline's ID. The server also sends the timeline's |
485 | * starting point, but it is ignored. |
486 | */ |
487 | if (PQnfields(res) < 2 || PQntuples(res) != 1) |
488 | ereport(ERROR, |
489 | (errmsg("unexpected result set after end-of-streaming" ))); |
490 | *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0)); |
491 | PQclear(res); |
492 | |
493 | /* the result set should be followed by CommandComplete */ |
494 | res = libpqrcv_PQgetResult(conn->streamConn); |
495 | } |
496 | else if (PQresultStatus(res) == PGRES_COPY_OUT) |
497 | { |
498 | PQclear(res); |
499 | |
500 | /* End the copy */ |
501 | if (PQendcopy(conn->streamConn)) |
502 | ereport(ERROR, |
503 | (errmsg("error while shutting down streaming COPY: %s" , |
504 | pchomp(PQerrorMessage(conn->streamConn))))); |
505 | |
506 | /* CommandComplete should follow */ |
507 | res = libpqrcv_PQgetResult(conn->streamConn); |
508 | } |
509 | |
510 | if (PQresultStatus(res) != PGRES_COMMAND_OK) |
511 | ereport(ERROR, |
512 | (errmsg("error reading result of streaming command: %s" , |
513 | pchomp(PQerrorMessage(conn->streamConn))))); |
514 | PQclear(res); |
515 | |
516 | /* Verify that there are no more results */ |
517 | res = libpqrcv_PQgetResult(conn->streamConn); |
518 | if (res != NULL) |
519 | ereport(ERROR, |
520 | (errmsg("unexpected result after CommandComplete: %s" , |
521 | pchomp(PQerrorMessage(conn->streamConn))))); |
522 | } |
523 | |
524 | /* |
525 | * Fetch the timeline history file for 'tli' from primary. |
526 | */ |
527 | static void |
528 | libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, |
529 | TimeLineID tli, char **filename, |
530 | char **content, int *len) |
531 | { |
532 | PGresult *res; |
533 | char cmd[64]; |
534 | |
535 | Assert(!conn->logical); |
536 | |
537 | /* |
538 | * Request the primary to send over the history file for given timeline. |
539 | */ |
540 | snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u" , tli); |
541 | res = libpqrcv_PQexec(conn->streamConn, cmd); |
542 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
543 | { |
544 | PQclear(res); |
545 | ereport(ERROR, |
546 | (errmsg("could not receive timeline history file from " |
547 | "the primary server: %s" , |
548 | pchomp(PQerrorMessage(conn->streamConn))))); |
549 | } |
550 | if (PQnfields(res) != 2 || PQntuples(res) != 1) |
551 | { |
552 | int ntuples = PQntuples(res); |
553 | int nfields = PQnfields(res); |
554 | |
555 | PQclear(res); |
556 | ereport(ERROR, |
557 | (errmsg("invalid response from primary server" ), |
558 | errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields." , |
559 | ntuples, nfields))); |
560 | } |
561 | *filename = pstrdup(PQgetvalue(res, 0, 0)); |
562 | |
563 | *len = PQgetlength(res, 0, 1); |
564 | *content = palloc(*len); |
565 | memcpy(*content, PQgetvalue(res, 0, 1), *len); |
566 | PQclear(res); |
567 | } |
568 | |
569 | /* |
570 | * Send a query and wait for the results by using the asynchronous libpq |
571 | * functions and socket readiness events. |
572 | * |
573 | * We must not use the regular blocking libpq functions like PQexec() |
574 | * since they are uninterruptible by signals on some platforms, such as |
575 | * Windows. |
576 | * |
577 | * The function is modeled on PQexec() in libpq, but only implements |
578 | * those parts that are in use in the walreceiver api. |
579 | * |
580 | * May return NULL, rather than an error result, on failure. |
581 | */ |
582 | static PGresult * |
583 | libpqrcv_PQexec(PGconn *streamConn, const char *query) |
584 | { |
585 | PGresult *lastResult = NULL; |
586 | |
587 | /* |
588 | * PQexec() silently discards any prior query results on the connection. |
589 | * This is not required for this function as it's expected that the caller |
590 | * (which is this library in all cases) will behave correctly and we don't |
591 | * have to be backwards compatible with old libpq. |
592 | */ |
593 | |
594 | /* |
595 | * Submit the query. Since we don't use non-blocking mode, this could |
596 | * theoretically block. In practice, since we don't send very long query |
597 | * strings, the risk seems negligible. |
598 | */ |
599 | if (!PQsendQuery(streamConn, query)) |
600 | return NULL; |
601 | |
602 | for (;;) |
603 | { |
604 | /* Wait for, and collect, the next PGresult. */ |
605 | PGresult *result; |
606 | |
607 | result = libpqrcv_PQgetResult(streamConn); |
608 | if (result == NULL) |
609 | break; /* query is complete, or failure */ |
610 | |
611 | /* |
612 | * Emulate PQexec()'s behavior of returning the last result when there |
613 | * are many. We are fine with returning just last error message. |
614 | */ |
615 | PQclear(lastResult); |
616 | lastResult = result; |
617 | |
618 | if (PQresultStatus(lastResult) == PGRES_COPY_IN || |
619 | PQresultStatus(lastResult) == PGRES_COPY_OUT || |
620 | PQresultStatus(lastResult) == PGRES_COPY_BOTH || |
621 | PQstatus(streamConn) == CONNECTION_BAD) |
622 | break; |
623 | } |
624 | |
625 | return lastResult; |
626 | } |
627 | |
628 | /* |
629 | * Perform the equivalent of PQgetResult(), but watch for interrupts. |
630 | */ |
631 | static PGresult * |
632 | libpqrcv_PQgetResult(PGconn *streamConn) |
633 | { |
634 | /* |
635 | * Collect data until PQgetResult is ready to get the result without |
636 | * blocking. |
637 | */ |
638 | while (PQisBusy(streamConn)) |
639 | { |
640 | int rc; |
641 | |
642 | /* |
643 | * We don't need to break down the sleep into smaller increments, |
644 | * since we'll get interrupted by signals and can handle any |
645 | * interrupts here. |
646 | */ |
647 | rc = WaitLatchOrSocket(MyLatch, |
648 | WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | |
649 | WL_LATCH_SET, |
650 | PQsocket(streamConn), |
651 | 0, |
652 | WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); |
653 | |
654 | /* Interrupted? */ |
655 | if (rc & WL_LATCH_SET) |
656 | { |
657 | ResetLatch(MyLatch); |
658 | ProcessWalRcvInterrupts(); |
659 | } |
660 | |
661 | /* Consume whatever data is available from the socket */ |
662 | if (PQconsumeInput(streamConn) == 0) |
663 | { |
664 | /* trouble; return NULL */ |
665 | return NULL; |
666 | } |
667 | } |
668 | |
669 | /* Now we can collect and return the next PGresult */ |
670 | return PQgetResult(streamConn); |
671 | } |
672 | |
673 | /* |
674 | * Disconnect connection to primary, if any. |
675 | */ |
676 | static void |
677 | libpqrcv_disconnect(WalReceiverConn *conn) |
678 | { |
679 | PQfinish(conn->streamConn); |
680 | if (conn->recvBuf != NULL) |
681 | PQfreemem(conn->recvBuf); |
682 | pfree(conn); |
683 | } |
684 | |
685 | /* |
686 | * Receive a message available from XLOG stream. |
687 | * |
688 | * Returns: |
689 | * |
690 | * If data was received, returns the length of the data. *buffer is set to |
691 | * point to a buffer holding the received message. The buffer is only valid |
692 | * until the next libpqrcv_* call. |
693 | * |
694 | * If no data was available immediately, returns 0, and *wait_fd is set to a |
695 | * socket descriptor which can be waited on before trying again. |
696 | * |
697 | * -1 if the server ended the COPY. |
698 | * |
699 | * ereports on error. |
700 | */ |
701 | static int |
702 | libpqrcv_receive(WalReceiverConn *conn, char **buffer, |
703 | pgsocket *wait_fd) |
704 | { |
705 | int rawlen; |
706 | |
707 | if (conn->recvBuf != NULL) |
708 | PQfreemem(conn->recvBuf); |
709 | conn->recvBuf = NULL; |
710 | |
711 | /* Try to receive a CopyData message */ |
712 | rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1); |
713 | if (rawlen == 0) |
714 | { |
715 | /* Try consuming some data. */ |
716 | if (PQconsumeInput(conn->streamConn) == 0) |
717 | ereport(ERROR, |
718 | (errmsg("could not receive data from WAL stream: %s" , |
719 | pchomp(PQerrorMessage(conn->streamConn))))); |
720 | |
721 | /* Now that we've consumed some input, try again */ |
722 | rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1); |
723 | if (rawlen == 0) |
724 | { |
725 | /* Tell caller to try again when our socket is ready. */ |
726 | *wait_fd = PQsocket(conn->streamConn); |
727 | return 0; |
728 | } |
729 | } |
730 | if (rawlen == -1) /* end-of-streaming or error */ |
731 | { |
732 | PGresult *res; |
733 | |
734 | res = libpqrcv_PQgetResult(conn->streamConn); |
735 | if (PQresultStatus(res) == PGRES_COMMAND_OK) |
736 | { |
737 | PQclear(res); |
738 | |
739 | /* Verify that there are no more results. */ |
740 | res = libpqrcv_PQgetResult(conn->streamConn); |
741 | if (res != NULL) |
742 | { |
743 | PQclear(res); |
744 | |
745 | /* |
746 | * If the other side closed the connection orderly (otherwise |
747 | * we'd seen an error, or PGRES_COPY_IN) don't report an error |
748 | * here, but let callers deal with it. |
749 | */ |
750 | if (PQstatus(conn->streamConn) == CONNECTION_BAD) |
751 | return -1; |
752 | |
753 | ereport(ERROR, |
754 | (errmsg("unexpected result after CommandComplete: %s" , |
755 | PQerrorMessage(conn->streamConn)))); |
756 | } |
757 | |
758 | return -1; |
759 | } |
760 | else if (PQresultStatus(res) == PGRES_COPY_IN) |
761 | { |
762 | PQclear(res); |
763 | return -1; |
764 | } |
765 | else |
766 | { |
767 | PQclear(res); |
768 | ereport(ERROR, |
769 | (errmsg("could not receive data from WAL stream: %s" , |
770 | pchomp(PQerrorMessage(conn->streamConn))))); |
771 | } |
772 | } |
773 | if (rawlen < -1) |
774 | ereport(ERROR, |
775 | (errmsg("could not receive data from WAL stream: %s" , |
776 | pchomp(PQerrorMessage(conn->streamConn))))); |
777 | |
778 | /* Return received messages to caller */ |
779 | *buffer = conn->recvBuf; |
780 | return rawlen; |
781 | } |
782 | |
783 | /* |
784 | * Send a message to XLOG stream. |
785 | * |
786 | * ereports on error. |
787 | */ |
788 | static void |
789 | libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) |
790 | { |
791 | if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 || |
792 | PQflush(conn->streamConn)) |
793 | ereport(ERROR, |
794 | (errmsg("could not send data to WAL stream: %s" , |
795 | pchomp(PQerrorMessage(conn->streamConn))))); |
796 | } |
797 | |
798 | /* |
799 | * Create new replication slot. |
800 | * Returns the name of the exported snapshot for logical slot or NULL for |
801 | * physical slot. |
802 | */ |
803 | static char * |
804 | libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, |
805 | bool temporary, CRSSnapshotAction snapshot_action, |
806 | XLogRecPtr *lsn) |
807 | { |
808 | PGresult *res; |
809 | StringInfoData cmd; |
810 | char *snapshot; |
811 | |
812 | initStringInfo(&cmd); |
813 | |
814 | appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"" , slotname); |
815 | |
816 | if (temporary) |
817 | appendStringInfoString(&cmd, " TEMPORARY" ); |
818 | |
819 | if (conn->logical) |
820 | { |
821 | appendStringInfoString(&cmd, " LOGICAL pgoutput" ); |
822 | switch (snapshot_action) |
823 | { |
824 | case CRS_EXPORT_SNAPSHOT: |
825 | appendStringInfoString(&cmd, " EXPORT_SNAPSHOT" ); |
826 | break; |
827 | case CRS_NOEXPORT_SNAPSHOT: |
828 | appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT" ); |
829 | break; |
830 | case CRS_USE_SNAPSHOT: |
831 | appendStringInfoString(&cmd, " USE_SNAPSHOT" ); |
832 | break; |
833 | } |
834 | } |
835 | |
836 | res = libpqrcv_PQexec(conn->streamConn, cmd.data); |
837 | pfree(cmd.data); |
838 | |
839 | if (PQresultStatus(res) != PGRES_TUPLES_OK) |
840 | { |
841 | PQclear(res); |
842 | ereport(ERROR, |
843 | (errmsg("could not create replication slot \"%s\": %s" , |
844 | slotname, pchomp(PQerrorMessage(conn->streamConn))))); |
845 | } |
846 | |
847 | *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, |
848 | CStringGetDatum(PQgetvalue(res, 0, 1)))); |
849 | if (!PQgetisnull(res, 0, 2)) |
850 | snapshot = pstrdup(PQgetvalue(res, 0, 2)); |
851 | else |
852 | snapshot = NULL; |
853 | |
854 | PQclear(res); |
855 | |
856 | return snapshot; |
857 | } |
858 | |
859 | /* |
860 | * Convert tuple query result to tuplestore. |
861 | */ |
862 | static void |
863 | libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, |
864 | const int nRetTypes, const Oid *retTypes) |
865 | { |
866 | int tupn; |
867 | int coln; |
868 | int nfields = PQnfields(pgres); |
869 | HeapTuple tuple; |
870 | AttInMetadata *attinmeta; |
871 | MemoryContext rowcontext; |
872 | MemoryContext oldcontext; |
873 | |
874 | /* Make sure we got expected number of fields. */ |
875 | if (nfields != nRetTypes) |
876 | ereport(ERROR, |
877 | (errmsg("invalid query response" ), |
878 | errdetail("Expected %d fields, got %d fields." , |
879 | nRetTypes, nfields))); |
880 | |
881 | walres->tuplestore = tuplestore_begin_heap(true, false, work_mem); |
882 | |
883 | /* Create tuple descriptor corresponding to expected result. */ |
884 | walres->tupledesc = CreateTemplateTupleDesc(nRetTypes); |
885 | for (coln = 0; coln < nRetTypes; coln++) |
886 | TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1, |
887 | PQfname(pgres, coln), retTypes[coln], -1, 0); |
888 | attinmeta = TupleDescGetAttInMetadata(walres->tupledesc); |
889 | |
890 | /* No point in doing more here if there were no tuples returned. */ |
891 | if (PQntuples(pgres) == 0) |
892 | return; |
893 | |
894 | /* Create temporary context for local allocations. */ |
895 | rowcontext = AllocSetContextCreate(CurrentMemoryContext, |
896 | "libpqrcv query result context" , |
897 | ALLOCSET_DEFAULT_SIZES); |
898 | |
899 | /* Process returned rows. */ |
900 | for (tupn = 0; tupn < PQntuples(pgres); tupn++) |
901 | { |
902 | char *cstrs[MaxTupleAttributeNumber]; |
903 | |
904 | ProcessWalRcvInterrupts(); |
905 | |
906 | /* Do the allocations in temporary context. */ |
907 | oldcontext = MemoryContextSwitchTo(rowcontext); |
908 | |
909 | /* |
910 | * Fill cstrs with null-terminated strings of column values. |
911 | */ |
912 | for (coln = 0; coln < nfields; coln++) |
913 | { |
914 | if (PQgetisnull(pgres, tupn, coln)) |
915 | cstrs[coln] = NULL; |
916 | else |
917 | cstrs[coln] = PQgetvalue(pgres, tupn, coln); |
918 | } |
919 | |
920 | /* Convert row to a tuple, and add it to the tuplestore */ |
921 | tuple = BuildTupleFromCStrings(attinmeta, cstrs); |
922 | tuplestore_puttuple(walres->tuplestore, tuple); |
923 | |
924 | /* Clean up */ |
925 | MemoryContextSwitchTo(oldcontext); |
926 | MemoryContextReset(rowcontext); |
927 | } |
928 | |
929 | MemoryContextDelete(rowcontext); |
930 | } |
931 | |
932 | /* |
933 | * Public interface for sending generic queries (and commands). |
934 | * |
935 | * This can only be called from process connected to database. |
936 | */ |
937 | static WalRcvExecResult * |
938 | libpqrcv_exec(WalReceiverConn *conn, const char *query, |
939 | const int nRetTypes, const Oid *retTypes) |
940 | { |
941 | PGresult *pgres = NULL; |
942 | WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult)); |
943 | |
944 | if (MyDatabaseId == InvalidOid) |
945 | ereport(ERROR, |
946 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
947 | errmsg("the query interface requires a database connection" ))); |
948 | |
949 | pgres = libpqrcv_PQexec(conn->streamConn, query); |
950 | |
951 | switch (PQresultStatus(pgres)) |
952 | { |
953 | case PGRES_SINGLE_TUPLE: |
954 | case PGRES_TUPLES_OK: |
955 | walres->status = WALRCV_OK_TUPLES; |
956 | libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes); |
957 | break; |
958 | |
959 | case PGRES_COPY_IN: |
960 | walres->status = WALRCV_OK_COPY_IN; |
961 | break; |
962 | |
963 | case PGRES_COPY_OUT: |
964 | walres->status = WALRCV_OK_COPY_OUT; |
965 | break; |
966 | |
967 | case PGRES_COPY_BOTH: |
968 | walres->status = WALRCV_OK_COPY_BOTH; |
969 | break; |
970 | |
971 | case PGRES_COMMAND_OK: |
972 | walres->status = WALRCV_OK_COMMAND; |
973 | break; |
974 | |
975 | /* Empty query is considered error. */ |
976 | case PGRES_EMPTY_QUERY: |
977 | walres->status = WALRCV_ERROR; |
978 | walres->err = _("empty query" ); |
979 | break; |
980 | |
981 | case PGRES_NONFATAL_ERROR: |
982 | case PGRES_FATAL_ERROR: |
983 | case PGRES_BAD_RESPONSE: |
984 | walres->status = WALRCV_ERROR; |
985 | walres->err = pchomp(PQerrorMessage(conn->streamConn)); |
986 | break; |
987 | } |
988 | |
989 | PQclear(pgres); |
990 | |
991 | return walres; |
992 | } |
993 | |
994 | /* |
995 | * Given a List of strings, return it as single comma separated |
996 | * string, quoting identifiers as needed. |
997 | * |
998 | * This is essentially the reverse of SplitIdentifierString. |
999 | * |
1000 | * The caller should free the result. |
1001 | */ |
1002 | static char * |
1003 | stringlist_to_identifierstr(PGconn *conn, List *strings) |
1004 | { |
1005 | ListCell *lc; |
1006 | StringInfoData res; |
1007 | bool first = true; |
1008 | |
1009 | initStringInfo(&res); |
1010 | |
1011 | foreach(lc, strings) |
1012 | { |
1013 | char *val = strVal(lfirst(lc)); |
1014 | char *val_escaped; |
1015 | |
1016 | if (first) |
1017 | first = false; |
1018 | else |
1019 | appendStringInfoChar(&res, ','); |
1020 | |
1021 | val_escaped = PQescapeIdentifier(conn, val, strlen(val)); |
1022 | if (!val_escaped) |
1023 | { |
1024 | free(res.data); |
1025 | return NULL; |
1026 | } |
1027 | appendStringInfoString(&res, val_escaped); |
1028 | PQfreemem(val_escaped); |
1029 | } |
1030 | |
1031 | return res.data; |
1032 | } |
1033 | |