1/*-------------------------------------------------------------------------
2 *
3 * pqcomm.c
4 * Communication functions between the Frontend and the Backend
5 *
6 * These routines handle the low-level details of communication between
7 * frontend and backend. They just shove data across the communication
8 * channel, and are ignorant of the semantics of the data --- or would be,
9 * except for major brain damage in the design of the old COPY OUT protocol.
10 * Unfortunately, COPY OUT was designed to commandeer the communication
11 * channel (it just transfers data without wrapping it into messages).
12 * No other messages can be sent while COPY OUT is in progress; and if the
13 * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14 * the frontend gets back into sync. Therefore, these routines have to be
15 * aware of COPY OUT state. (New COPY-OUT is message-based and does *not*
16 * set the DoingCopyOut flag.)
17 *
18 * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19 * pq_putbytes(), especially if the message would require multiple calls
20 * to send. Instead, use the routines in pqformat.c to construct the message
21 * in a buffer and then emit it in one call to pq_putmessage. This ensures
22 * that the channel will not be clogged by an incomplete message if execution
23 * is aborted by ereport(ERROR) partway through the message. The only
24 * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25 *
26 * At one time, libpq was shared between frontend and backend, but now
27 * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28 * All that remains is similarities of names to trap the unwary...
29 *
30 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
31 * Portions Copyright (c) 1994, Regents of the University of California
32 *
33 * src/backend/libpq/pqcomm.c
34 *
35 *-------------------------------------------------------------------------
36 */
37
38/*------------------------
39 * INTERFACE ROUTINES
40 *
41 * setup/teardown:
42 * StreamServerPort - Open postmaster's server port
43 * StreamConnection - Create new connection with client
44 * StreamClose - Close a client/backend connection
45 * TouchSocketFiles - Protect socket files against /tmp cleaners
46 * pq_init - initialize libpq at backend startup
47 * pq_comm_reset - reset libpq during error recovery
48 * pq_close - shutdown libpq at backend exit
49 *
50 * low-level I/O:
51 * pq_getbytes - get a known number of bytes from connection
52 * pq_getstring - get a null terminated string from connection
53 * pq_getmessage - get a message with length word from connection
54 * pq_getbyte - get next byte from connection
55 * pq_peekbyte - peek at next byte from connection
56 * pq_putbytes - send bytes to connection (not flushed until pq_flush)
57 * pq_flush - flush pending output
58 * pq_flush_if_writable - flush pending output if writable without blocking
59 * pq_getbyte_if_available - get a byte if available without blocking
60 *
61 * message-level I/O (and old-style-COPY-OUT cruft):
62 * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
63 * pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64 * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65 * pq_endcopyout - end a COPY OUT transfer
66 *
67 *------------------------
68 */
69#include "postgres.h"
70
71#include <signal.h>
72#include <fcntl.h>
73#include <grp.h>
74#include <unistd.h>
75#include <sys/file.h>
76#include <sys/socket.h>
77#include <sys/stat.h>
78#include <sys/time.h>
79#include <netdb.h>
80#include <netinet/in.h>
81#ifdef HAVE_NETINET_TCP_H
82#include <netinet/tcp.h>
83#endif
84#ifdef HAVE_UTIME_H
85#include <utime.h>
86#endif
87#ifdef _MSC_VER /* mstcpip.h is missing on mingw */
88#include <mstcpip.h>
89#endif
90
91#include "common/ip.h"
92#include "libpq/libpq.h"
93#include "miscadmin.h"
94#include "port/pg_bswap.h"
95#include "storage/ipc.h"
96#include "utils/guc.h"
97#include "utils/memutils.h"
98
99/*
100 * Cope with the various platform-specific ways to spell TCP keepalive socket
101 * options. This doesn't cover Windows, which as usual does its own thing.
102 */
103#if defined(TCP_KEEPIDLE)
104/* TCP_KEEPIDLE is the name of this option on Linux and *BSD */
105#define PG_TCP_KEEPALIVE_IDLE TCP_KEEPIDLE
106#define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPIDLE"
107#elif defined(TCP_KEEPALIVE_THRESHOLD)
108/* TCP_KEEPALIVE_THRESHOLD is the name of this option on Solaris >= 11 */
109#define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE_THRESHOLD
110#define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE_THRESHOLD"
111#elif defined(TCP_KEEPALIVE) && defined(__darwin__)
112/* TCP_KEEPALIVE is the name of this option on macOS */
113/* Caution: Solaris has this symbol but it means something different */
114#define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE
115#define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE"
116#endif
117
118/*
119 * Configuration options
120 */
121int Unix_socket_permissions;
122char *Unix_socket_group;
123
124/* Where the Unix socket files are (list of palloc'd strings) */
125static List *sock_paths = NIL;
126
127/*
128 * Buffers for low-level I/O.
129 *
130 * The receive buffer is fixed size. Send buffer is usually 8k, but can be
131 * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
132 */
133
134#define PQ_SEND_BUFFER_SIZE 8192
135#define PQ_RECV_BUFFER_SIZE 8192
136
137static char *PqSendBuffer;
138static int PqSendBufferSize; /* Size send buffer */
139static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
140static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
141
142static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
143static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
144static int PqRecvLength; /* End of data available in PqRecvBuffer */
145
146/*
147 * Message status
148 */
149static bool PqCommBusy; /* busy sending data to the client */
150static bool PqCommReadingMsg; /* in the middle of reading a message */
151static bool DoingCopyOut; /* in old-protocol COPY OUT processing */
152
153
154/* Internal functions */
155static void socket_comm_reset(void);
156static void socket_close(int code, Datum arg);
157static void socket_set_nonblocking(bool nonblocking);
158static int socket_flush(void);
159static int socket_flush_if_writable(void);
160static bool socket_is_send_pending(void);
161static int socket_putmessage(char msgtype, const char *s, size_t len);
162static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
163static void socket_startcopyout(void);
164static void socket_endcopyout(bool errorAbort);
165static int internal_putbytes(const char *s, size_t len);
166static int internal_flush(void);
167
168#ifdef HAVE_UNIX_SOCKETS
169static int Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
170static int Setup_AF_UNIX(char *sock_path);
171#endif /* HAVE_UNIX_SOCKETS */
172
173static const PQcommMethods PqCommSocketMethods = {
174 socket_comm_reset,
175 socket_flush,
176 socket_flush_if_writable,
177 socket_is_send_pending,
178 socket_putmessage,
179 socket_putmessage_noblock,
180 socket_startcopyout,
181 socket_endcopyout
182};
183
184const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
185
186WaitEventSet *FeBeWaitSet;
187
188
189/* --------------------------------
190 * pq_init - initialize libpq at backend startup
191 * --------------------------------
192 */
193void
194pq_init(void)
195{
196 /* initialize state variables */
197 PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
198 PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
199 PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
200 PqCommBusy = false;
201 PqCommReadingMsg = false;
202 DoingCopyOut = false;
203
204 /* set up process-exit hook to close the socket */
205 on_proc_exit(socket_close, 0);
206
207 /*
208 * In backends (as soon as forked) we operate the underlying socket in
209 * nonblocking mode and use latches to implement blocking semantics if
210 * needed. That allows us to provide safely interruptible reads and
211 * writes.
212 *
213 * Use COMMERROR on failure, because ERROR would try to send the error to
214 * the client, which might require changing the mode again, leading to
215 * infinite recursion.
216 */
217#ifndef WIN32
218 if (!pg_set_noblock(MyProcPort->sock))
219 ereport(COMMERROR,
220 (errmsg("could not set socket to nonblocking mode: %m")));
221#endif
222
223 FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
224 AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
225 NULL, NULL);
226 AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
227 AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
228}
229
230/* --------------------------------
231 * socket_comm_reset - reset libpq during error recovery
232 *
233 * This is called from error recovery at the outer idle loop. It's
234 * just to get us out of trouble if we somehow manage to elog() from
235 * inside a pqcomm.c routine (which ideally will never happen, but...)
236 * --------------------------------
237 */
238static void
239socket_comm_reset(void)
240{
241 /* Do not throw away pending data, but do reset the busy flag */
242 PqCommBusy = false;
243 /* We can abort any old-style COPY OUT, too */
244 pq_endcopyout(true);
245}
246
247/* --------------------------------
248 * socket_close - shutdown libpq at backend exit
249 *
250 * This is the one pg_on_exit_callback in place during BackendInitialize().
251 * That function's unusual signal handling constrains that this callback be
252 * safe to run at any instant.
253 * --------------------------------
254 */
255static void
256socket_close(int code, Datum arg)
257{
258 /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
259 if (MyProcPort != NULL)
260 {
261#if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
262#ifdef ENABLE_GSS
263 OM_uint32 min_s;
264
265 /*
266 * Shutdown GSSAPI layer. This section does nothing when interrupting
267 * BackendInitialize(), because pg_GSS_recvauth() makes first use of
268 * "ctx" and "cred".
269 */
270 if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
271 gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
272
273 if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
274 gss_release_cred(&min_s, &MyProcPort->gss->cred);
275#endif /* ENABLE_GSS */
276
277 /*
278 * GSS and SSPI share the port->gss struct. Since nowhere else does a
279 * postmaster child free this, doing so is safe when interrupting
280 * BackendInitialize().
281 */
282 free(MyProcPort->gss);
283#endif /* ENABLE_GSS || ENABLE_SSPI */
284
285 /*
286 * Cleanly shut down SSL layer. Nowhere else does a postmaster child
287 * call this, so this is safe when interrupting BackendInitialize().
288 */
289 secure_close(MyProcPort);
290
291 /*
292 * Formerly we did an explicit close() here, but it seems better to
293 * leave the socket open until the process dies. This allows clients
294 * to perform a "synchronous close" if they care --- wait till the
295 * transport layer reports connection closure, and you can be sure the
296 * backend has exited.
297 *
298 * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
299 * though.
300 */
301 MyProcPort->sock = PGINVALID_SOCKET;
302 }
303}
304
305
306
307/*
308 * Streams -- wrapper around Unix socket system calls
309 *
310 *
311 * Stream functions are used for vanilla TCP connection protocol.
312 */
313
314
315/*
316 * StreamServerPort -- open a "listening" port to accept connections.
317 *
318 * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
319 * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
320 * specified. For TCP ports, hostName is either NULL for all interfaces or
321 * the interface to listen on, and unixSocketDir is ignored (can be NULL).
322 *
323 * Successfully opened sockets are added to the ListenSocket[] array (of
324 * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
325 *
326 * RETURNS: STATUS_OK or STATUS_ERROR
327 */
328
329int
330StreamServerPort(int family, char *hostName, unsigned short portNumber,
331 char *unixSocketDir,
332 pgsocket ListenSocket[], int MaxListen)
333{
334 pgsocket fd;
335 int err;
336 int maxconn;
337 int ret;
338 char portNumberStr[32];
339 const char *familyDesc;
340 char familyDescBuf[64];
341 const char *addrDesc;
342 char addrBuf[NI_MAXHOST];
343 char *service;
344 struct addrinfo *addrs = NULL,
345 *addr;
346 struct addrinfo hint;
347 int listen_index = 0;
348 int added = 0;
349
350#ifdef HAVE_UNIX_SOCKETS
351 char unixSocketPath[MAXPGPATH];
352#endif
353#if !defined(WIN32) || defined(IPV6_V6ONLY)
354 int one = 1;
355#endif
356
357 /* Initialize hint structure */
358 MemSet(&hint, 0, sizeof(hint));
359 hint.ai_family = family;
360 hint.ai_flags = AI_PASSIVE;
361 hint.ai_socktype = SOCK_STREAM;
362
363#ifdef HAVE_UNIX_SOCKETS
364 if (family == AF_UNIX)
365 {
366 /*
367 * Create unixSocketPath from portNumber and unixSocketDir and lock
368 * that file path
369 */
370 UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
371 if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
372 {
373 ereport(LOG,
374 (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
375 unixSocketPath,
376 (int) (UNIXSOCK_PATH_BUFLEN - 1))));
377 return STATUS_ERROR;
378 }
379 if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
380 return STATUS_ERROR;
381 service = unixSocketPath;
382 }
383 else
384#endif /* HAVE_UNIX_SOCKETS */
385 {
386 snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
387 service = portNumberStr;
388 }
389
390 ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
391 if (ret || !addrs)
392 {
393 if (hostName)
394 ereport(LOG,
395 (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
396 hostName, service, gai_strerror(ret))));
397 else
398 ereport(LOG,
399 (errmsg("could not translate service \"%s\" to address: %s",
400 service, gai_strerror(ret))));
401 if (addrs)
402 pg_freeaddrinfo_all(hint.ai_family, addrs);
403 return STATUS_ERROR;
404 }
405
406 for (addr = addrs; addr; addr = addr->ai_next)
407 {
408 if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
409 {
410 /*
411 * Only set up a unix domain socket when they really asked for it.
412 * The service/port is different in that case.
413 */
414 continue;
415 }
416
417 /* See if there is still room to add 1 more socket. */
418 for (; listen_index < MaxListen; listen_index++)
419 {
420 if (ListenSocket[listen_index] == PGINVALID_SOCKET)
421 break;
422 }
423 if (listen_index >= MaxListen)
424 {
425 ereport(LOG,
426 (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
427 MaxListen)));
428 break;
429 }
430
431 /* set up address family name for log messages */
432 switch (addr->ai_family)
433 {
434 case AF_INET:
435 familyDesc = _("IPv4");
436 break;
437#ifdef HAVE_IPV6
438 case AF_INET6:
439 familyDesc = _("IPv6");
440 break;
441#endif
442#ifdef HAVE_UNIX_SOCKETS
443 case AF_UNIX:
444 familyDesc = _("Unix");
445 break;
446#endif
447 default:
448 snprintf(familyDescBuf, sizeof(familyDescBuf),
449 _("unrecognized address family %d"),
450 addr->ai_family);
451 familyDesc = familyDescBuf;
452 break;
453 }
454
455 /* set up text form of address for log messages */
456#ifdef HAVE_UNIX_SOCKETS
457 if (addr->ai_family == AF_UNIX)
458 addrDesc = unixSocketPath;
459 else
460#endif
461 {
462 pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
463 addr->ai_addrlen,
464 addrBuf, sizeof(addrBuf),
465 NULL, 0,
466 NI_NUMERICHOST);
467 addrDesc = addrBuf;
468 }
469
470 if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
471 {
472 ereport(LOG,
473 (errcode_for_socket_access(),
474 /* translator: first %s is IPv4, IPv6, or Unix */
475 errmsg("could not create %s socket for address \"%s\": %m",
476 familyDesc, addrDesc)));
477 continue;
478 }
479
480#ifndef WIN32
481
482 /*
483 * Without the SO_REUSEADDR flag, a new postmaster can't be started
484 * right away after a stop or crash, giving "address already in use"
485 * error on TCP ports.
486 *
487 * On win32, however, this behavior only happens if the
488 * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
489 * servers to listen on the same address, resulting in unpredictable
490 * behavior. With no flags at all, win32 behaves as Unix with
491 * SO_REUSEADDR.
492 */
493 if (!IS_AF_UNIX(addr->ai_family))
494 {
495 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
496 (char *) &one, sizeof(one))) == -1)
497 {
498 ereport(LOG,
499 (errcode_for_socket_access(),
500 /* translator: first %s is IPv4, IPv6, or Unix */
501 errmsg("setsockopt(SO_REUSEADDR) failed for %s address \"%s\": %m",
502 familyDesc, addrDesc)));
503 closesocket(fd);
504 continue;
505 }
506 }
507#endif
508
509#ifdef IPV6_V6ONLY
510 if (addr->ai_family == AF_INET6)
511 {
512 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
513 (char *) &one, sizeof(one)) == -1)
514 {
515 ereport(LOG,
516 (errcode_for_socket_access(),
517 /* translator: first %s is IPv4, IPv6, or Unix */
518 errmsg("setsockopt(IPV6_V6ONLY) failed for %s address \"%s\": %m",
519 familyDesc, addrDesc)));
520 closesocket(fd);
521 continue;
522 }
523 }
524#endif
525
526 /*
527 * Note: This might fail on some OS's, like Linux older than
528 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
529 * ipv4 addresses to ipv6. It will show ::ffff:ipv4 for all ipv4
530 * connections.
531 */
532 err = bind(fd, addr->ai_addr, addr->ai_addrlen);
533 if (err < 0)
534 {
535 ereport(LOG,
536 (errcode_for_socket_access(),
537 /* translator: first %s is IPv4, IPv6, or Unix */
538 errmsg("could not bind %s address \"%s\": %m",
539 familyDesc, addrDesc),
540 (IS_AF_UNIX(addr->ai_family)) ?
541 errhint("Is another postmaster already running on port %d?"
542 " If not, remove socket file \"%s\" and retry.",
543 (int) portNumber, service) :
544 errhint("Is another postmaster already running on port %d?"
545 " If not, wait a few seconds and retry.",
546 (int) portNumber)));
547 closesocket(fd);
548 continue;
549 }
550
551#ifdef HAVE_UNIX_SOCKETS
552 if (addr->ai_family == AF_UNIX)
553 {
554 if (Setup_AF_UNIX(service) != STATUS_OK)
555 {
556 closesocket(fd);
557 break;
558 }
559 }
560#endif
561
562 /*
563 * Select appropriate accept-queue length limit. PG_SOMAXCONN is only
564 * intended to provide a clamp on the request on platforms where an
565 * overly large request provokes a kernel error (are there any?).
566 */
567 maxconn = MaxBackends * 2;
568 if (maxconn > PG_SOMAXCONN)
569 maxconn = PG_SOMAXCONN;
570
571 err = listen(fd, maxconn);
572 if (err < 0)
573 {
574 ereport(LOG,
575 (errcode_for_socket_access(),
576 /* translator: first %s is IPv4, IPv6, or Unix */
577 errmsg("could not listen on %s address \"%s\": %m",
578 familyDesc, addrDesc)));
579 closesocket(fd);
580 continue;
581 }
582
583#ifdef HAVE_UNIX_SOCKETS
584 if (addr->ai_family == AF_UNIX)
585 ereport(LOG,
586 (errmsg("listening on Unix socket \"%s\"",
587 addrDesc)));
588 else
589#endif
590 ereport(LOG,
591 /* translator: first %s is IPv4 or IPv6 */
592 (errmsg("listening on %s address \"%s\", port %d",
593 familyDesc, addrDesc, (int) portNumber)));
594
595 ListenSocket[listen_index] = fd;
596 added++;
597 }
598
599 pg_freeaddrinfo_all(hint.ai_family, addrs);
600
601 if (!added)
602 return STATUS_ERROR;
603
604 return STATUS_OK;
605}
606
607
608#ifdef HAVE_UNIX_SOCKETS
609
610/*
611 * Lock_AF_UNIX -- configure unix socket file path
612 */
613static int
614Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
615{
616 /*
617 * Grab an interlock file associated with the socket file.
618 *
619 * Note: there are two reasons for using a socket lock file, rather than
620 * trying to interlock directly on the socket itself. First, it's a lot
621 * more portable, and second, it lets us remove any pre-existing socket
622 * file without race conditions.
623 */
624 CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
625
626 /*
627 * Once we have the interlock, we can safely delete any pre-existing
628 * socket file to avoid failure at bind() time.
629 */
630 (void) unlink(unixSocketPath);
631
632 /*
633 * Remember socket file pathnames for later maintenance.
634 */
635 sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
636
637 return STATUS_OK;
638}
639
640
641/*
642 * Setup_AF_UNIX -- configure unix socket permissions
643 */
644static int
645Setup_AF_UNIX(char *sock_path)
646{
647 /*
648 * Fix socket ownership/permission if requested. Note we must do this
649 * before we listen() to avoid a window where unwanted connections could
650 * get accepted.
651 */
652 Assert(Unix_socket_group);
653 if (Unix_socket_group[0] != '\0')
654 {
655#ifdef WIN32
656 elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
657#else
658 char *endptr;
659 unsigned long val;
660 gid_t gid;
661
662 val = strtoul(Unix_socket_group, &endptr, 10);
663 if (*endptr == '\0')
664 { /* numeric group id */
665 gid = val;
666 }
667 else
668 { /* convert group name to id */
669 struct group *gr;
670
671 gr = getgrnam(Unix_socket_group);
672 if (!gr)
673 {
674 ereport(LOG,
675 (errmsg("group \"%s\" does not exist",
676 Unix_socket_group)));
677 return STATUS_ERROR;
678 }
679 gid = gr->gr_gid;
680 }
681 if (chown(sock_path, -1, gid) == -1)
682 {
683 ereport(LOG,
684 (errcode_for_file_access(),
685 errmsg("could not set group of file \"%s\": %m",
686 sock_path)));
687 return STATUS_ERROR;
688 }
689#endif
690 }
691
692 if (chmod(sock_path, Unix_socket_permissions) == -1)
693 {
694 ereport(LOG,
695 (errcode_for_file_access(),
696 errmsg("could not set permissions of file \"%s\": %m",
697 sock_path)));
698 return STATUS_ERROR;
699 }
700 return STATUS_OK;
701}
702#endif /* HAVE_UNIX_SOCKETS */
703
704
705/*
706 * StreamConnection -- create a new connection with client using
707 * server port. Set port->sock to the FD of the new connection.
708 *
709 * ASSUME: that this doesn't need to be non-blocking because
710 * the Postmaster uses select() to tell when the server master
711 * socket is ready for accept().
712 *
713 * RETURNS: STATUS_OK or STATUS_ERROR
714 */
715int
716StreamConnection(pgsocket server_fd, Port *port)
717{
718 /* accept connection and fill in the client (remote) address */
719 port->raddr.salen = sizeof(port->raddr.addr);
720 if ((port->sock = accept(server_fd,
721 (struct sockaddr *) &port->raddr.addr,
722 &port->raddr.salen)) == PGINVALID_SOCKET)
723 {
724 ereport(LOG,
725 (errcode_for_socket_access(),
726 errmsg("could not accept new connection: %m")));
727
728 /*
729 * If accept() fails then postmaster.c will still see the server
730 * socket as read-ready, and will immediately try again. To avoid
731 * uselessly sucking lots of CPU, delay a bit before trying again.
732 * (The most likely reason for failure is being out of kernel file
733 * table slots; we can do little except hope some will get freed up.)
734 */
735 pg_usleep(100000L); /* wait 0.1 sec */
736 return STATUS_ERROR;
737 }
738
739 /* fill in the server (local) address */
740 port->laddr.salen = sizeof(port->laddr.addr);
741 if (getsockname(port->sock,
742 (struct sockaddr *) &port->laddr.addr,
743 &port->laddr.salen) < 0)
744 {
745 elog(LOG, "getsockname() failed: %m");
746 return STATUS_ERROR;
747 }
748
749 /* select NODELAY and KEEPALIVE options if it's a TCP connection */
750 if (!IS_AF_UNIX(port->laddr.addr.ss_family))
751 {
752 int on;
753#ifdef WIN32
754 int oldopt;
755 int optlen;
756 int newopt;
757#endif
758
759#ifdef TCP_NODELAY
760 on = 1;
761 if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
762 (char *) &on, sizeof(on)) < 0)
763 {
764 elog(LOG, "setsockopt(%s) failed: %m", "TCP_NODELAY");
765 return STATUS_ERROR;
766 }
767#endif
768 on = 1;
769 if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
770 (char *) &on, sizeof(on)) < 0)
771 {
772 elog(LOG, "setsockopt(%s) failed: %m", "SO_KEEPALIVE");
773 return STATUS_ERROR;
774 }
775
776#ifdef WIN32
777
778 /*
779 * This is a Win32 socket optimization. The OS send buffer should be
780 * large enough to send the whole Postgres send buffer in one go, or
781 * performance suffers. The Postgres send buffer can be enlarged if a
782 * very large message needs to be sent, but we won't attempt to
783 * enlarge the OS buffer if that happens, so somewhat arbitrarily
784 * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
785 * (That's 32kB with the current default).
786 *
787 * The default OS buffer size used to be 8kB in earlier Windows
788 * versions, but was raised to 64kB in Windows 2012. So it shouldn't
789 * be necessary to change it in later versions anymore. Changing it
790 * unnecessarily can even reduce performance, because setting
791 * SO_SNDBUF in the application disables the "dynamic send buffering"
792 * feature that was introduced in Windows 7. So before fiddling with
793 * SO_SNDBUF, check if the current buffer size is already large enough
794 * and only increase it if necessary.
795 *
796 * See https://support.microsoft.com/kb/823764/EN-US/ and
797 * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
798 */
799 optlen = sizeof(oldopt);
800 if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
801 &optlen) < 0)
802 {
803 elog(LOG, "getsockopt(%s) failed: %m", "SO_SNDBUF");
804 return STATUS_ERROR;
805 }
806 newopt = PQ_SEND_BUFFER_SIZE * 4;
807 if (oldopt < newopt)
808 {
809 if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
810 sizeof(newopt)) < 0)
811 {
812 elog(LOG, "setsockopt(%s) failed: %m", "SO_SNDBUF");
813 return STATUS_ERROR;
814 }
815 }
816#endif
817
818 /*
819 * Also apply the current keepalive parameters. If we fail to set a
820 * parameter, don't error out, because these aren't universally
821 * supported. (Note: you might think we need to reset the GUC
822 * variables to 0 in such a case, but it's not necessary because the
823 * show hooks for these variables report the truth anyway.)
824 */
825 (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
826 (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
827 (void) pq_setkeepalivescount(tcp_keepalives_count, port);
828 (void) pq_settcpusertimeout(tcp_user_timeout, port);
829 }
830
831 return STATUS_OK;
832}
833
834/*
835 * StreamClose -- close a client/backend connection
836 *
837 * NOTE: this is NOT used to terminate a session; it is just used to release
838 * the file descriptor in a process that should no longer have the socket
839 * open. (For example, the postmaster calls this after passing ownership
840 * of the connection to a child process.) It is expected that someone else
841 * still has the socket open. So, we only want to close the descriptor,
842 * we do NOT want to send anything to the far end.
843 */
844void
845StreamClose(pgsocket sock)
846{
847 closesocket(sock);
848}
849
850/*
851 * TouchSocketFiles -- mark socket files as recently accessed
852 *
853 * This routine should be called every so often to ensure that the socket
854 * files have a recent mod date (ordinary operations on sockets usually won't
855 * change the mod date). That saves them from being removed by
856 * overenthusiastic /tmp-directory-cleaner daemons. (Another reason we should
857 * never have put the socket file in /tmp...)
858 */
859void
860TouchSocketFiles(void)
861{
862 ListCell *l;
863
864 /* Loop through all created sockets... */
865 foreach(l, sock_paths)
866 {
867 char *sock_path = (char *) lfirst(l);
868
869 /*
870 * utime() is POSIX standard, utimes() is a common alternative. If we
871 * have neither, there's no way to affect the mod or access time of
872 * the socket :-(
873 *
874 * In either path, we ignore errors; there's no point in complaining.
875 */
876#ifdef HAVE_UTIME
877 utime(sock_path, NULL);
878#else /* !HAVE_UTIME */
879#ifdef HAVE_UTIMES
880 utimes(sock_path, NULL);
881#endif /* HAVE_UTIMES */
882#endif /* HAVE_UTIME */
883 }
884}
885
886/*
887 * RemoveSocketFiles -- unlink socket files at postmaster shutdown
888 */
889void
890RemoveSocketFiles(void)
891{
892 ListCell *l;
893
894 /* Loop through all created sockets... */
895 foreach(l, sock_paths)
896 {
897 char *sock_path = (char *) lfirst(l);
898
899 /* Ignore any error. */
900 (void) unlink(sock_path);
901 }
902 /* Since we're about to exit, no need to reclaim storage */
903 sock_paths = NIL;
904}
905
906
907/* --------------------------------
908 * Low-level I/O routines begin here.
909 *
910 * These routines communicate with a frontend client across a connection
911 * already established by the preceding routines.
912 * --------------------------------
913 */
914
915/* --------------------------------
916 * socket_set_nonblocking - set socket blocking/non-blocking
917 *
918 * Sets the socket non-blocking if nonblocking is true, or sets it
919 * blocking otherwise.
920 * --------------------------------
921 */
922static void
923socket_set_nonblocking(bool nonblocking)
924{
925 if (MyProcPort == NULL)
926 ereport(ERROR,
927 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
928 errmsg("there is no client connection")));
929
930 MyProcPort->noblock = nonblocking;
931}
932
933/* --------------------------------
934 * pq_recvbuf - load some bytes into the input buffer
935 *
936 * returns 0 if OK, EOF if trouble
937 * --------------------------------
938 */
939static int
940pq_recvbuf(void)
941{
942 if (PqRecvPointer > 0)
943 {
944 if (PqRecvLength > PqRecvPointer)
945 {
946 /* still some unread data, left-justify it in the buffer */
947 memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
948 PqRecvLength - PqRecvPointer);
949 PqRecvLength -= PqRecvPointer;
950 PqRecvPointer = 0;
951 }
952 else
953 PqRecvLength = PqRecvPointer = 0;
954 }
955
956 /* Ensure that we're in blocking mode */
957 socket_set_nonblocking(false);
958
959 /* Can fill buffer from PqRecvLength and upwards */
960 for (;;)
961 {
962 int r;
963
964 r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
965 PQ_RECV_BUFFER_SIZE - PqRecvLength);
966
967 if (r < 0)
968 {
969 if (errno == EINTR)
970 continue; /* Ok if interrupted */
971
972 /*
973 * Careful: an ereport() that tries to write to the client would
974 * cause recursion to here, leading to stack overflow and core
975 * dump! This message must go *only* to the postmaster log.
976 */
977 ereport(COMMERROR,
978 (errcode_for_socket_access(),
979 errmsg("could not receive data from client: %m")));
980 return EOF;
981 }
982 if (r == 0)
983 {
984 /*
985 * EOF detected. We used to write a log message here, but it's
986 * better to expect the ultimate caller to do that.
987 */
988 return EOF;
989 }
990 /* r contains number of bytes read, so just incr length */
991 PqRecvLength += r;
992 return 0;
993 }
994}
995
996/* --------------------------------
997 * pq_getbyte - get a single byte from connection, or return EOF
998 * --------------------------------
999 */
1000int
1001pq_getbyte(void)
1002{
1003 Assert(PqCommReadingMsg);
1004
1005 while (PqRecvPointer >= PqRecvLength)
1006 {
1007 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1008 return EOF; /* Failed to recv data */
1009 }
1010 return (unsigned char) PqRecvBuffer[PqRecvPointer++];
1011}
1012
1013/* --------------------------------
1014 * pq_peekbyte - peek at next byte from connection
1015 *
1016 * Same as pq_getbyte() except we don't advance the pointer.
1017 * --------------------------------
1018 */
1019int
1020pq_peekbyte(void)
1021{
1022 Assert(PqCommReadingMsg);
1023
1024 while (PqRecvPointer >= PqRecvLength)
1025 {
1026 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1027 return EOF; /* Failed to recv data */
1028 }
1029 return (unsigned char) PqRecvBuffer[PqRecvPointer];
1030}
1031
1032/* --------------------------------
1033 * pq_getbyte_if_available - get a single byte from connection,
1034 * if available
1035 *
1036 * The received byte is stored in *c. Returns 1 if a byte was read,
1037 * 0 if no data was available, or EOF if trouble.
1038 * --------------------------------
1039 */
1040int
1041pq_getbyte_if_available(unsigned char *c)
1042{
1043 int r;
1044
1045 Assert(PqCommReadingMsg);
1046
1047 if (PqRecvPointer < PqRecvLength)
1048 {
1049 *c = PqRecvBuffer[PqRecvPointer++];
1050 return 1;
1051 }
1052
1053 /* Put the socket into non-blocking mode */
1054 socket_set_nonblocking(true);
1055
1056 r = secure_read(MyProcPort, c, 1);
1057 if (r < 0)
1058 {
1059 /*
1060 * Ok if no data available without blocking or interrupted (though
1061 * EINTR really shouldn't happen with a non-blocking socket). Report
1062 * other errors.
1063 */
1064 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1065 r = 0;
1066 else
1067 {
1068 /*
1069 * Careful: an ereport() that tries to write to the client would
1070 * cause recursion to here, leading to stack overflow and core
1071 * dump! This message must go *only* to the postmaster log.
1072 */
1073 ereport(COMMERROR,
1074 (errcode_for_socket_access(),
1075 errmsg("could not receive data from client: %m")));
1076 r = EOF;
1077 }
1078 }
1079 else if (r == 0)
1080 {
1081 /* EOF detected */
1082 r = EOF;
1083 }
1084
1085 return r;
1086}
1087
1088/* --------------------------------
1089 * pq_getbytes - get a known number of bytes from connection
1090 *
1091 * returns 0 if OK, EOF if trouble
1092 * --------------------------------
1093 */
1094int
1095pq_getbytes(char *s, size_t len)
1096{
1097 size_t amount;
1098
1099 Assert(PqCommReadingMsg);
1100
1101 while (len > 0)
1102 {
1103 while (PqRecvPointer >= PqRecvLength)
1104 {
1105 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1106 return EOF; /* Failed to recv data */
1107 }
1108 amount = PqRecvLength - PqRecvPointer;
1109 if (amount > len)
1110 amount = len;
1111 memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1112 PqRecvPointer += amount;
1113 s += amount;
1114 len -= amount;
1115 }
1116 return 0;
1117}
1118
1119/* --------------------------------
1120 * pq_discardbytes - throw away a known number of bytes
1121 *
1122 * same as pq_getbytes except we do not copy the data to anyplace.
1123 * this is used for resynchronizing after read errors.
1124 *
1125 * returns 0 if OK, EOF if trouble
1126 * --------------------------------
1127 */
1128static int
1129pq_discardbytes(size_t len)
1130{
1131 size_t amount;
1132
1133 Assert(PqCommReadingMsg);
1134
1135 while (len > 0)
1136 {
1137 while (PqRecvPointer >= PqRecvLength)
1138 {
1139 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1140 return EOF; /* Failed to recv data */
1141 }
1142 amount = PqRecvLength - PqRecvPointer;
1143 if (amount > len)
1144 amount = len;
1145 PqRecvPointer += amount;
1146 len -= amount;
1147 }
1148 return 0;
1149}
1150
1151/* --------------------------------
1152 * pq_getstring - get a null terminated string from connection
1153 *
1154 * The return value is placed in an expansible StringInfo, which has
1155 * already been initialized by the caller.
1156 *
1157 * This is used only for dealing with old-protocol clients. The idea
1158 * is to produce a StringInfo that looks the same as we would get from
1159 * pq_getmessage() with a newer client; we will then process it with
1160 * pq_getmsgstring. Therefore, no character set conversion is done here,
1161 * even though this is presumably useful only for text.
1162 *
1163 * returns 0 if OK, EOF if trouble
1164 * --------------------------------
1165 */
1166int
1167pq_getstring(StringInfo s)
1168{
1169 int i;
1170
1171 Assert(PqCommReadingMsg);
1172
1173 resetStringInfo(s);
1174
1175 /* Read until we get the terminating '\0' */
1176 for (;;)
1177 {
1178 while (PqRecvPointer >= PqRecvLength)
1179 {
1180 if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1181 return EOF; /* Failed to recv data */
1182 }
1183
1184 for (i = PqRecvPointer; i < PqRecvLength; i++)
1185 {
1186 if (PqRecvBuffer[i] == '\0')
1187 {
1188 /* include the '\0' in the copy */
1189 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1190 i - PqRecvPointer + 1);
1191 PqRecvPointer = i + 1; /* advance past \0 */
1192 return 0;
1193 }
1194 }
1195
1196 /* If we're here we haven't got the \0 in the buffer yet. */
1197 appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1198 PqRecvLength - PqRecvPointer);
1199 PqRecvPointer = PqRecvLength;
1200 }
1201}
1202
1203
1204/* --------------------------------
1205 * pq_startmsgread - begin reading a message from the client.
1206 *
1207 * This must be called before any of the pq_get* functions.
1208 * --------------------------------
1209 */
1210void
1211pq_startmsgread(void)
1212{
1213 /*
1214 * There shouldn't be a read active already, but let's check just to be
1215 * sure.
1216 */
1217 if (PqCommReadingMsg)
1218 ereport(FATAL,
1219 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1220 errmsg("terminating connection because protocol synchronization was lost")));
1221
1222 PqCommReadingMsg = true;
1223}
1224
1225
1226/* --------------------------------
1227 * pq_endmsgread - finish reading message.
1228 *
1229 * This must be called after reading a V2 protocol message with
1230 * pq_getstring() and friends, to indicate that we have read the whole
1231 * message. In V3 protocol, pq_getmessage() does this implicitly.
1232 * --------------------------------
1233 */
1234void
1235pq_endmsgread(void)
1236{
1237 Assert(PqCommReadingMsg);
1238
1239 PqCommReadingMsg = false;
1240}
1241
1242/* --------------------------------
1243 * pq_is_reading_msg - are we currently reading a message?
1244 *
1245 * This is used in error recovery at the outer idle loop to detect if we have
1246 * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1247 * will check for that too, but it's nicer to detect it earlier.
1248 * --------------------------------
1249 */
1250bool
1251pq_is_reading_msg(void)
1252{
1253 return PqCommReadingMsg;
1254}
1255
1256/* --------------------------------
1257 * pq_getmessage - get a message with length word from connection
1258 *
1259 * The return value is placed in an expansible StringInfo, which has
1260 * already been initialized by the caller.
1261 * Only the message body is placed in the StringInfo; the length word
1262 * is removed. Also, s->cursor is initialized to zero for convenience
1263 * in scanning the message contents.
1264 *
1265 * If maxlen is not zero, it is an upper limit on the length of the
1266 * message we are willing to accept. We abort the connection (by
1267 * returning EOF) if client tries to send more than that.
1268 *
1269 * returns 0 if OK, EOF if trouble
1270 * --------------------------------
1271 */
1272int
1273pq_getmessage(StringInfo s, int maxlen)
1274{
1275 int32 len;
1276
1277 Assert(PqCommReadingMsg);
1278
1279 resetStringInfo(s);
1280
1281 /* Read message length word */
1282 if (pq_getbytes((char *) &len, 4) == EOF)
1283 {
1284 ereport(COMMERROR,
1285 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1286 errmsg("unexpected EOF within message length word")));
1287 return EOF;
1288 }
1289
1290 len = pg_ntoh32(len);
1291
1292 if (len < 4 ||
1293 (maxlen > 0 && len > maxlen))
1294 {
1295 ereport(COMMERROR,
1296 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1297 errmsg("invalid message length")));
1298 return EOF;
1299 }
1300
1301 len -= 4; /* discount length itself */
1302
1303 if (len > 0)
1304 {
1305 /*
1306 * Allocate space for message. If we run out of room (ridiculously
1307 * large message), we will elog(ERROR), but we want to discard the
1308 * message body so as not to lose communication sync.
1309 */
1310 PG_TRY();
1311 {
1312 enlargeStringInfo(s, len);
1313 }
1314 PG_CATCH();
1315 {
1316 if (pq_discardbytes(len) == EOF)
1317 ereport(COMMERROR,
1318 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1319 errmsg("incomplete message from client")));
1320
1321 /* we discarded the rest of the message so we're back in sync. */
1322 PqCommReadingMsg = false;
1323 PG_RE_THROW();
1324 }
1325 PG_END_TRY();
1326
1327 /* And grab the message */
1328 if (pq_getbytes(s->data, len) == EOF)
1329 {
1330 ereport(COMMERROR,
1331 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1332 errmsg("incomplete message from client")));
1333 return EOF;
1334 }
1335 s->len = len;
1336 /* Place a trailing null per StringInfo convention */
1337 s->data[len] = '\0';
1338 }
1339
1340 /* finished reading the message. */
1341 PqCommReadingMsg = false;
1342
1343 return 0;
1344}
1345
1346
1347/* --------------------------------
1348 * pq_putbytes - send bytes to connection (not flushed until pq_flush)
1349 *
1350 * returns 0 if OK, EOF if trouble
1351 * --------------------------------
1352 */
1353int
1354pq_putbytes(const char *s, size_t len)
1355{
1356 int res;
1357
1358 /* Should only be called by old-style COPY OUT */
1359 Assert(DoingCopyOut);
1360 /* No-op if reentrant call */
1361 if (PqCommBusy)
1362 return 0;
1363 PqCommBusy = true;
1364 res = internal_putbytes(s, len);
1365 PqCommBusy = false;
1366 return res;
1367}
1368
1369static int
1370internal_putbytes(const char *s, size_t len)
1371{
1372 size_t amount;
1373
1374 while (len > 0)
1375 {
1376 /* If buffer is full, then flush it out */
1377 if (PqSendPointer >= PqSendBufferSize)
1378 {
1379 socket_set_nonblocking(false);
1380 if (internal_flush())
1381 return EOF;
1382 }
1383 amount = PqSendBufferSize - PqSendPointer;
1384 if (amount > len)
1385 amount = len;
1386 memcpy(PqSendBuffer + PqSendPointer, s, amount);
1387 PqSendPointer += amount;
1388 s += amount;
1389 len -= amount;
1390 }
1391 return 0;
1392}
1393
1394/* --------------------------------
1395 * socket_flush - flush pending output
1396 *
1397 * returns 0 if OK, EOF if trouble
1398 * --------------------------------
1399 */
1400static int
1401socket_flush(void)
1402{
1403 int res;
1404
1405 /* No-op if reentrant call */
1406 if (PqCommBusy)
1407 return 0;
1408 PqCommBusy = true;
1409 socket_set_nonblocking(false);
1410 res = internal_flush();
1411 PqCommBusy = false;
1412 return res;
1413}
1414
1415/* --------------------------------
1416 * internal_flush - flush pending output
1417 *
1418 * Returns 0 if OK (meaning everything was sent, or operation would block
1419 * and the socket is in non-blocking mode), or EOF if trouble.
1420 * --------------------------------
1421 */
1422static int
1423internal_flush(void)
1424{
1425 static int last_reported_send_errno = 0;
1426
1427 char *bufptr = PqSendBuffer + PqSendStart;
1428 char *bufend = PqSendBuffer + PqSendPointer;
1429
1430 while (bufptr < bufend)
1431 {
1432 int r;
1433
1434 r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1435
1436 if (r <= 0)
1437 {
1438 if (errno == EINTR)
1439 continue; /* Ok if we were interrupted */
1440
1441 /*
1442 * Ok if no data writable without blocking, and the socket is in
1443 * non-blocking mode.
1444 */
1445 if (errno == EAGAIN ||
1446 errno == EWOULDBLOCK)
1447 {
1448 return 0;
1449 }
1450
1451 /*
1452 * Careful: an ereport() that tries to write to the client would
1453 * cause recursion to here, leading to stack overflow and core
1454 * dump! This message must go *only* to the postmaster log.
1455 *
1456 * If a client disconnects while we're in the midst of output, we
1457 * might write quite a bit of data before we get to a safe query
1458 * abort point. So, suppress duplicate log messages.
1459 */
1460 if (errno != last_reported_send_errno)
1461 {
1462 last_reported_send_errno = errno;
1463 ereport(COMMERROR,
1464 (errcode_for_socket_access(),
1465 errmsg("could not send data to client: %m")));
1466 }
1467
1468 /*
1469 * We drop the buffered data anyway so that processing can
1470 * continue, even though we'll probably quit soon. We also set a
1471 * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1472 * the connection.
1473 */
1474 PqSendStart = PqSendPointer = 0;
1475 ClientConnectionLost = 1;
1476 InterruptPending = 1;
1477 return EOF;
1478 }
1479
1480 last_reported_send_errno = 0; /* reset after any successful send */
1481 bufptr += r;
1482 PqSendStart += r;
1483 }
1484
1485 PqSendStart = PqSendPointer = 0;
1486 return 0;
1487}
1488
1489/* --------------------------------
1490 * pq_flush_if_writable - flush pending output if writable without blocking
1491 *
1492 * Returns 0 if OK, or EOF if trouble.
1493 * --------------------------------
1494 */
1495static int
1496socket_flush_if_writable(void)
1497{
1498 int res;
1499
1500 /* Quick exit if nothing to do */
1501 if (PqSendPointer == PqSendStart)
1502 return 0;
1503
1504 /* No-op if reentrant call */
1505 if (PqCommBusy)
1506 return 0;
1507
1508 /* Temporarily put the socket into non-blocking mode */
1509 socket_set_nonblocking(true);
1510
1511 PqCommBusy = true;
1512 res = internal_flush();
1513 PqCommBusy = false;
1514 return res;
1515}
1516
1517/* --------------------------------
1518 * socket_is_send_pending - is there any pending data in the output buffer?
1519 * --------------------------------
1520 */
1521static bool
1522socket_is_send_pending(void)
1523{
1524 return (PqSendStart < PqSendPointer);
1525}
1526
1527/* --------------------------------
1528 * Message-level I/O routines begin here.
1529 *
1530 * These routines understand about the old-style COPY OUT protocol.
1531 * --------------------------------
1532 */
1533
1534
1535/* --------------------------------
1536 * socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1537 *
1538 * If msgtype is not '\0', it is a message type code to place before
1539 * the message body. If msgtype is '\0', then the message has no type
1540 * code (this is only valid in pre-3.0 protocols).
1541 *
1542 * len is the length of the message body data at *s. In protocol 3.0
1543 * and later, a message length word (equal to len+4 because it counts
1544 * itself too) is inserted by this routine.
1545 *
1546 * All normal messages are suppressed while old-style COPY OUT is in
1547 * progress. (In practice only a few notice messages might get emitted
1548 * then; dropping them is annoying, but at least they will still appear
1549 * in the postmaster log.)
1550 *
1551 * We also suppress messages generated while pqcomm.c is busy. This
1552 * avoids any possibility of messages being inserted within other
1553 * messages. The only known trouble case arises if SIGQUIT occurs
1554 * during a pqcomm.c routine --- quickdie() will try to send a warning
1555 * message, and the most reasonable approach seems to be to drop it.
1556 *
1557 * returns 0 if OK, EOF if trouble
1558 * --------------------------------
1559 */
1560static int
1561socket_putmessage(char msgtype, const char *s, size_t len)
1562{
1563 if (DoingCopyOut || PqCommBusy)
1564 return 0;
1565 PqCommBusy = true;
1566 if (msgtype)
1567 if (internal_putbytes(&msgtype, 1))
1568 goto fail;
1569 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1570 {
1571 uint32 n32;
1572
1573 n32 = pg_hton32((uint32) (len + 4));
1574 if (internal_putbytes((char *) &n32, 4))
1575 goto fail;
1576 }
1577 if (internal_putbytes(s, len))
1578 goto fail;
1579 PqCommBusy = false;
1580 return 0;
1581
1582fail:
1583 PqCommBusy = false;
1584 return EOF;
1585}
1586
1587/* --------------------------------
1588 * pq_putmessage_noblock - like pq_putmessage, but never blocks
1589 *
1590 * If the output buffer is too small to hold the message, the buffer
1591 * is enlarged.
1592 */
1593static void
1594socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1595{
1596 int res PG_USED_FOR_ASSERTS_ONLY;
1597 int required;
1598
1599 /*
1600 * Ensure we have enough space in the output buffer for the message header
1601 * as well as the message itself.
1602 */
1603 required = PqSendPointer + 1 + 4 + len;
1604 if (required > PqSendBufferSize)
1605 {
1606 PqSendBuffer = repalloc(PqSendBuffer, required);
1607 PqSendBufferSize = required;
1608 }
1609 res = pq_putmessage(msgtype, s, len);
1610 Assert(res == 0); /* should not fail when the message fits in
1611 * buffer */
1612}
1613
1614
1615/* --------------------------------
1616 * socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1617 * is beginning
1618 * --------------------------------
1619 */
1620static void
1621socket_startcopyout(void)
1622{
1623 DoingCopyOut = true;
1624}
1625
1626/* --------------------------------
1627 * socket_endcopyout - end an old-style COPY OUT transfer
1628 *
1629 * If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1630 * and must send a terminator line. Since a partial data line might have
1631 * been emitted, send a couple of newlines first (the first one could
1632 * get absorbed by a backslash...) Note that old-style COPY OUT does
1633 * not allow binary transfers, so a textual terminator is always correct.
1634 * --------------------------------
1635 */
1636static void
1637socket_endcopyout(bool errorAbort)
1638{
1639 if (!DoingCopyOut)
1640 return;
1641 if (errorAbort)
1642 pq_putbytes("\n\n\\.\n", 5);
1643 /* in non-error case, copy.c will have emitted the terminator line */
1644 DoingCopyOut = false;
1645}
1646
1647/*
1648 * Support for TCP Keepalive parameters
1649 */
1650
1651/*
1652 * On Windows, we need to set both idle and interval at the same time.
1653 * We also cannot reset them to the default (setting to zero will
1654 * actually set them to zero, not default), therefore we fallback to
1655 * the out-of-the-box default instead.
1656 */
1657#if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1658static int
1659pq_setkeepaliveswin32(Port *port, int idle, int interval)
1660{
1661 struct tcp_keepalive ka;
1662 DWORD retsize;
1663
1664 if (idle <= 0)
1665 idle = 2 * 60 * 60; /* default = 2 hours */
1666 if (interval <= 0)
1667 interval = 1; /* default = 1 second */
1668
1669 ka.onoff = 1;
1670 ka.keepalivetime = idle * 1000;
1671 ka.keepaliveinterval = interval * 1000;
1672
1673 if (WSAIoctl(port->sock,
1674 SIO_KEEPALIVE_VALS,
1675 (LPVOID) &ka,
1676 sizeof(ka),
1677 NULL,
1678 0,
1679 &retsize,
1680 NULL,
1681 NULL)
1682 != 0)
1683 {
1684 elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1685 WSAGetLastError());
1686 return STATUS_ERROR;
1687 }
1688 if (port->keepalives_idle != idle)
1689 port->keepalives_idle = idle;
1690 if (port->keepalives_interval != interval)
1691 port->keepalives_interval = interval;
1692 return STATUS_OK;
1693}
1694#endif
1695
1696int
1697pq_getkeepalivesidle(Port *port)
1698{
1699#if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1700 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1701 return 0;
1702
1703 if (port->keepalives_idle != 0)
1704 return port->keepalives_idle;
1705
1706 if (port->default_keepalives_idle == 0)
1707 {
1708#ifndef WIN32
1709 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1710
1711 if (getsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1712 (char *) &port->default_keepalives_idle,
1713 &size) < 0)
1714 {
1715 elog(LOG, "getsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1716 port->default_keepalives_idle = -1; /* don't know */
1717 }
1718#else /* WIN32 */
1719 /* We can't get the defaults on Windows, so return "don't know" */
1720 port->default_keepalives_idle = -1;
1721#endif /* WIN32 */
1722 }
1723
1724 return port->default_keepalives_idle;
1725#else
1726 return 0;
1727#endif
1728}
1729
1730int
1731pq_setkeepalivesidle(int idle, Port *port)
1732{
1733 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1734 return STATUS_OK;
1735
1736/* check SIO_KEEPALIVE_VALS here, not just WIN32, as some toolchains lack it */
1737#if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1738 if (idle == port->keepalives_idle)
1739 return STATUS_OK;
1740
1741#ifndef WIN32
1742 if (port->default_keepalives_idle <= 0)
1743 {
1744 if (pq_getkeepalivesidle(port) < 0)
1745 {
1746 if (idle == 0)
1747 return STATUS_OK; /* default is set but unknown */
1748 else
1749 return STATUS_ERROR;
1750 }
1751 }
1752
1753 if (idle == 0)
1754 idle = port->default_keepalives_idle;
1755
1756 if (setsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1757 (char *) &idle, sizeof(idle)) < 0)
1758 {
1759 elog(LOG, "setsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1760 return STATUS_ERROR;
1761 }
1762
1763 port->keepalives_idle = idle;
1764#else /* WIN32 */
1765 return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1766#endif
1767#else
1768 if (idle != 0)
1769 {
1770 elog(LOG, "setting the keepalive idle time is not supported");
1771 return STATUS_ERROR;
1772 }
1773#endif
1774
1775 return STATUS_OK;
1776}
1777
1778int
1779pq_getkeepalivesinterval(Port *port)
1780{
1781#if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1782 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1783 return 0;
1784
1785 if (port->keepalives_interval != 0)
1786 return port->keepalives_interval;
1787
1788 if (port->default_keepalives_interval == 0)
1789 {
1790#ifndef WIN32
1791 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1792
1793 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1794 (char *) &port->default_keepalives_interval,
1795 &size) < 0)
1796 {
1797 elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1798 port->default_keepalives_interval = -1; /* don't know */
1799 }
1800#else
1801 /* We can't get the defaults on Windows, so return "don't know" */
1802 port->default_keepalives_interval = -1;
1803#endif /* WIN32 */
1804 }
1805
1806 return port->default_keepalives_interval;
1807#else
1808 return 0;
1809#endif
1810}
1811
1812int
1813pq_setkeepalivesinterval(int interval, Port *port)
1814{
1815 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1816 return STATUS_OK;
1817
1818#if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1819 if (interval == port->keepalives_interval)
1820 return STATUS_OK;
1821
1822#ifndef WIN32
1823 if (port->default_keepalives_interval <= 0)
1824 {
1825 if (pq_getkeepalivesinterval(port) < 0)
1826 {
1827 if (interval == 0)
1828 return STATUS_OK; /* default is set but unknown */
1829 else
1830 return STATUS_ERROR;
1831 }
1832 }
1833
1834 if (interval == 0)
1835 interval = port->default_keepalives_interval;
1836
1837 if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1838 (char *) &interval, sizeof(interval)) < 0)
1839 {
1840 elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1841 return STATUS_ERROR;
1842 }
1843
1844 port->keepalives_interval = interval;
1845#else /* WIN32 */
1846 return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1847#endif
1848#else
1849 if (interval != 0)
1850 {
1851 elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPINTVL");
1852 return STATUS_ERROR;
1853 }
1854#endif
1855
1856 return STATUS_OK;
1857}
1858
1859int
1860pq_getkeepalivescount(Port *port)
1861{
1862#ifdef TCP_KEEPCNT
1863 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1864 return 0;
1865
1866 if (port->keepalives_count != 0)
1867 return port->keepalives_count;
1868
1869 if (port->default_keepalives_count == 0)
1870 {
1871 ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1872
1873 if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1874 (char *) &port->default_keepalives_count,
1875 &size) < 0)
1876 {
1877 elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPCNT");
1878 port->default_keepalives_count = -1; /* don't know */
1879 }
1880 }
1881
1882 return port->default_keepalives_count;
1883#else
1884 return 0;
1885#endif
1886}
1887
1888int
1889pq_setkeepalivescount(int count, Port *port)
1890{
1891 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1892 return STATUS_OK;
1893
1894#ifdef TCP_KEEPCNT
1895 if (count == port->keepalives_count)
1896 return STATUS_OK;
1897
1898 if (port->default_keepalives_count <= 0)
1899 {
1900 if (pq_getkeepalivescount(port) < 0)
1901 {
1902 if (count == 0)
1903 return STATUS_OK; /* default is set but unknown */
1904 else
1905 return STATUS_ERROR;
1906 }
1907 }
1908
1909 if (count == 0)
1910 count = port->default_keepalives_count;
1911
1912 if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1913 (char *) &count, sizeof(count)) < 0)
1914 {
1915 elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPCNT");
1916 return STATUS_ERROR;
1917 }
1918
1919 port->keepalives_count = count;
1920#else
1921 if (count != 0)
1922 {
1923 elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPCNT");
1924 return STATUS_ERROR;
1925 }
1926#endif
1927
1928 return STATUS_OK;
1929}
1930
1931int
1932pq_gettcpusertimeout(Port *port)
1933{
1934#ifdef TCP_USER_TIMEOUT
1935 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1936 return 0;
1937
1938 if (port->tcp_user_timeout != 0)
1939 return port->tcp_user_timeout;
1940
1941 if (port->default_tcp_user_timeout == 0)
1942 {
1943 ACCEPT_TYPE_ARG3 size = sizeof(port->default_tcp_user_timeout);
1944
1945 if (getsockopt(port->sock, IPPROTO_TCP, TCP_USER_TIMEOUT,
1946 (char *) &port->default_tcp_user_timeout,
1947 &size) < 0)
1948 {
1949 elog(LOG, "getsockopt(%s) failed: %m", "TCP_USER_TIMEOUT");
1950 port->default_tcp_user_timeout = -1; /* don't know */
1951 }
1952 }
1953
1954 return port->default_tcp_user_timeout;
1955#else
1956 return 0;
1957#endif
1958}
1959
1960int
1961pq_settcpusertimeout(int timeout, Port *port)
1962{
1963 if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1964 return STATUS_OK;
1965
1966#ifdef TCP_USER_TIMEOUT
1967 if (timeout == port->tcp_user_timeout)
1968 return STATUS_OK;
1969
1970 if (port->default_tcp_user_timeout <= 0)
1971 {
1972 if (pq_gettcpusertimeout(port) < 0)
1973 {
1974 if (timeout == 0)
1975 return STATUS_OK; /* default is set but unknown */
1976 else
1977 return STATUS_ERROR;
1978 }
1979 }
1980
1981 if (timeout == 0)
1982 timeout = port->default_tcp_user_timeout;
1983
1984 if (setsockopt(port->sock, IPPROTO_TCP, TCP_USER_TIMEOUT,
1985 (char *) &timeout, sizeof(timeout)) < 0)
1986 {
1987 elog(LOG, "setsockopt(%s) failed: %m", "TCP_USER_TIMEOUT");
1988 return STATUS_ERROR;
1989 }
1990
1991 port->tcp_user_timeout = timeout;
1992#else
1993 if (timeout != 0)
1994 {
1995 elog(LOG, "setsockopt(%s) not supported", "TCP_USER_TIMEOUT");
1996 return STATUS_ERROR;
1997 }
1998#endif
1999
2000 return STATUS_OK;
2001}
2002