1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include <sys/types.h>
11#include <sys/stat.h> /* stat */
12#include <sys/wait.h> /* wait */
13#include <sys/socket.h>
14#include <sys/un.h>
15#include <netdb.h>
16#include <netinet/in.h>
17#include <fcntl.h>
18#include <string.h> /* strerror */
19#ifdef HAVE_SYS_UIO_H
20# include <sys/uio.h>
21#endif
22
23#include "mstring.h"
24#include "stream.h"
25#include "stream_socket.h"
26
27#include "merovingian.h"
28#include "proxy.h"
29
30typedef struct _merovingian_proxy {
31 stream *in; /* the input to read from and to dispatch to out */
32 stream *out; /* where to write the read input to */
33 stream *co_in; /* the input stream of the co-thread,
34 don't read from this stream! close only */
35 stream *co_out; /* the output stream of the co-thread,
36 don't write to this stream! close only */
37 char *name; /* a description to log when this thread ends */
38 pthread_t co_thr;/* the other proxyThread */
39} merovingian_proxy;
40
41static void *
42proxyThread(void *d)
43{
44 merovingian_proxy *p = (merovingian_proxy *)d;
45 int len;
46 char data[8 * 1024];
47
48 /* pass everything from in to out, until either reading from in,
49 * or writing to out fails, then close the other proxyThread's in
50 * and out streams so that it stops as well (it will, or already
51 * has, closed the streams we read from/write to) */
52 while ((len = mnstr_read(p->in, data, 1, sizeof(data))) >= 0) {
53 if (len > 0 && mnstr_write(p->out, data, len, 1) != 1)
54 break;
55 if (len == 0 && mnstr_flush(p->out) == -1)
56 break;
57 }
58
59 mnstr_close(p->co_out); /* out towards target B */
60 mnstr_close(p->co_in); /* related in from target A */
61
62 if (p->name != NULL) {
63 /* name is only set on the client-to-server thread */
64 if (len <= 0) {
65 Mfprintf(stdout, "client %s has disconnected from proxy\n",
66 p->name);
67 } else {
68 Mfprintf(stdout, "server has terminated proxy connection, "
69 "disconnecting client %s\n", p->name);
70 }
71 free(p->name);
72
73 /* wait for the other thread to finish, after which we can
74 * finally destroy the streams (all four, since we're the only
75 * one doing it) */
76 pthread_join(p->co_thr, NULL);
77 mnstr_destroy(p->co_out);
78 mnstr_destroy(p->in);
79 mnstr_destroy(p->out);
80 mnstr_destroy(p->co_in);
81 }
82
83 free(p);
84 return NULL;
85}
86
87err
88startProxy(int psock, stream *cfdin, stream *cfout, char *url, char *client)
89{
90 int ssock = -1;
91 char *port, *t, *conn, *endipv6;
92 struct stat statbuf;
93 stream *sfdin, *sfout;
94 merovingian_proxy *pctos, *pstoc;
95 pthread_t ptid;
96 pthread_attr_t detachattr;
97 int thret;
98
99 /* quick 'n' dirty parsing */
100 if (strncmp(url, "mapi:monetdb://", sizeof("mapi:monetdb://") - 1) == 0) {
101 conn = strdup(url + sizeof("mapi:monetdb://") - 1);
102
103 if (*conn == '[') { /* check for an IPv6 address */
104 if ((endipv6 = strchr(conn, ']')) != NULL) {
105 if ((port = strchr(endipv6, ':')) != NULL) {
106 *port = '\0';
107 port++;
108 if ((t = strchr(port, '/')) != NULL)
109 *t = '\0';
110 } else {
111 return(newErr("can't find a port in redirect: %s", url));
112 }
113 } else {
114 return(newErr("invalid IPv6 address in redirect: %s", url));
115 }
116 } else if ((port = strchr(conn, ':')) != NULL) { /* drop anything off after the hostname */
117 *port = '\0';
118 port++;
119 if ((t = strchr(port, '/')) != NULL)
120 *t = '\0';
121 } else if (stat(conn, &statbuf) != -1) {
122 ssock = 0;
123 } else {
124 free(conn);
125 return(newErr("can't find a port in redirect, "
126 "or is not a UNIX socket file: %s", url));
127 }
128 } else {
129 return(newErr("unsupported protocol/scheme in redirect: %s", url));
130 }
131
132 if (ssock != -1) {
133 /* UNIX socket connect, don't proxy, but pass socket fd */
134 struct sockaddr_un server;
135 struct msghdr msg;
136 char ccmsg[CMSG_SPACE(sizeof(ssock))];
137 struct cmsghdr *cmsg;
138 struct iovec vec;
139 char buf[1];
140 int *c_d;
141
142 server = (struct sockaddr_un) {
143 .sun_family = AF_UNIX,
144 };
145 strcpy_len(server.sun_path, conn, sizeof(server.sun_path));
146 free(conn);
147 if ((ssock = socket(PF_UNIX, SOCK_STREAM
148#ifdef SOCK_CLOEXEC
149 | SOCK_CLOEXEC
150#endif
151 , 0)) == -1) {
152 return(newErr("cannot open socket: %s", strerror(errno)));
153 }
154#if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
155 (void) fcntl(ssock, F_SETFD, FD_CLOEXEC);
156#endif
157 if (connect(ssock, (SOCKPTR) &server, sizeof(struct sockaddr_un)) == -1) {
158 closesocket(ssock);
159 return(newErr("cannot connect: %s", strerror(errno)));
160 }
161
162 /* send first byte, nothing special to happen */
163 msg.msg_name = NULL;
164 msg.msg_namelen = 0;
165 *buf = '1'; /* pass fd */
166 vec.iov_base = buf;
167 vec.iov_len = 1;
168 msg.msg_iov = &vec;
169 msg.msg_iovlen = 1;
170 msg.msg_control = ccmsg;
171 msg.msg_controllen = sizeof(ccmsg);
172 cmsg = CMSG_FIRSTHDR(&msg);
173 cmsg->cmsg_level = SOL_SOCKET;
174 cmsg->cmsg_type = SCM_RIGHTS;
175 cmsg->cmsg_len = CMSG_LEN(sizeof(psock));
176 /* HACK to avoid
177 * "dereferencing type-punned pointer will break strict-aliasing rules"
178 * (with gcc 4.5.1 on Fedora 14)
179 */
180 c_d = (int *)CMSG_DATA(cmsg);
181 *c_d = psock;
182 msg.msg_controllen = cmsg->cmsg_len;
183 msg.msg_flags = 0;
184
185 Mfprintf(stdout, "target connection is on local UNIX domain socket, "
186 "passing on filedescriptor instead of proxying\n");
187 if (sendmsg(ssock, &msg, 0) < 0) {
188 closesocket(ssock);
189 return(newErr("could not send initial byte: %s", strerror(errno)));
190 }
191 /* block until the server acknowledges that it has psock
192 * connected with itself */
193 if (recv(ssock, buf, 1, 0) == -1) {
194 closesocket(ssock);
195 return(newErr("could not receive initial byte: %s", strerror(errno)));
196 }
197 shutdown(ssock, SHUT_RDWR);
198 closesocket(ssock);
199 /* psock is the underlying socket of cfdin/cfout which we
200 * passed on to the client; we need to close the socket, but
201 * not call shutdown() on it, which would happen if we called
202 * close_stream(), so we call closesocket to close the socket
203 * and mnstr_destroy to free memory */
204 closesocket(psock);
205 mnstr_destroy(cfdin);
206 mnstr_destroy(cfout);
207 return(NO_ERR);
208 } else {
209 int check;
210 struct addrinfo *results, *rp, hints = (struct addrinfo) {
211 .ai_family = AF_UNSPEC,
212 .ai_socktype = SOCK_STREAM,
213 .ai_protocol = IPPROTO_TCP,
214 };
215
216 if ((check = getaddrinfo(conn, port, &hints, &results)) != 0) {
217 err x = newErr("cannot get address for hostname '%s': %s", conn, gai_strerror(check));
218 free(conn);
219 return(x);
220 }
221 free(conn);
222
223 for (rp = results; rp; rp = rp->ai_next) {
224 ssock = socket(rp->ai_family, rp->ai_socktype
225#ifdef SOCK_CLOEXEC
226 | SOCK_CLOEXEC
227#endif
228 , rp->ai_protocol);
229 if (ssock == -1)
230 continue;
231 if (connect(ssock, rp->ai_addr, rp->ai_addrlen) == -1) {
232 closesocket(ssock);
233 continue;
234 } else {
235#if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
236 (void) fcntl(ssock, F_SETFD, FD_CLOEXEC);
237#endif
238 break;
239 }
240 }
241 if (results)
242 freeaddrinfo(results);
243 if (rp == NULL)
244 return(newErr("cannot open socket: %s", strerror(errno)));
245 }
246
247 sfdin = block_stream(socket_rstream(ssock, "merovingian<-server (proxy read)"));
248 sfout = block_stream(socket_wstream(ssock, "merovingian->server (proxy write)"));
249
250 if (sfdin == 0 || sfout == 0) {
251 close_stream(sfout);
252 close_stream(sfdin);
253 return(newErr("merovingian-server inputstream or outputstream problems"));
254 }
255
256 /* our proxy schematically looks like this:
257 *
258 * A___>___B
259 * out in | | out in
260 * client --------- | M | --------- server
261 * in out |_____| in out
262 * C < D
263 *
264 * the thread that does A -> B is called ctos, C -> D stoc
265 * the merovingian_proxy structs are filled like:
266 * ctos: in = A, out = B, co_in = D, co_out = C
267 * stoc: in = D, out = C, co_in = A, co_out = B
268 */
269
270 pstoc = malloc(sizeof(merovingian_proxy));
271 pstoc->in = sfdin;
272 pstoc->out = cfout;
273 pstoc->co_in = cfdin;
274 pstoc->co_out = sfout;
275 pstoc->name = NULL; /* we want only one log-message on disconnect */
276 pstoc->co_thr = 0;
277
278 if ((thret = pthread_create(&ptid, NULL,
279 proxyThread, (void *)pstoc)) != 0)
280 {
281 close_stream(sfout);
282 close_stream(sfdin);
283 return(newErr("failed to create proxy thread: %s", strerror(thret)));
284 }
285
286 pctos = malloc(sizeof(merovingian_proxy));
287 pctos->in = cfdin;
288 pctos->out = sfout;
289 pctos->co_in = sfdin;
290 pctos->co_out = cfout;
291 pctos->name = strdup(client);
292 pctos->co_thr = ptid;
293
294 pthread_attr_init(&detachattr);
295 pthread_attr_setdetachstate(&detachattr, PTHREAD_CREATE_DETACHED);
296 if ((thret = pthread_create(&ptid, &detachattr,
297 proxyThread, (void *)pctos)) != 0)
298 {
299 close_stream(sfout);
300 close_stream(sfdin);
301 return(newErr("failed to create proxy thread: %s", strerror(thret)));
302 }
303
304 return(NO_ERR);
305}
306
307#ifdef MYSQL_EMULATION_BLEEDING_EDGE_STUFF
308static err
309handleMySQLClient(int sock)
310{
311 stream *fdin, *fout;
312 str buf[8096];
313 str p;
314 int len;
315
316 fdin = socket_rstream(sock, "merovingian<-mysqlclient (read)");
317 if (fdin == 0)
318 return(newErr("merovingian-mysqlclient inputstream problems"));
319
320 fout = socket_wstream(sock, "merovingian->mysqlclient (write)");
321 if (fout == 0) {
322 close_stream(fdin);
323 return(newErr("merovingian-mysqlclient outputstream problems"));
324 }
325
326#ifdef WORDS_BIGENDIAN
327#define le_int(P, X) \
328 *(P)++ = (unsigned int)X & 255; \
329 *(P)++ = ((unsigned int)X >> 8) & 255; \
330 *(P)++ = ((unsigned int)X >> 16) & 255; \
331 *(P)++ = ((unsigned int)X >> 24) & 255;
332#define le_sht(P, X) \
333 *(P)++ = (unsigned short)X & 255; \
334 *(P)++ = ((unsigned short)X >> 8) & 255;
335#else
336#define le_int(P, X) \
337 *(P)++ = ((unsigned int)X >> 24) & 255; \
338 *(P)++ = ((unsigned int)X >> 16) & 255; \
339 *(P)++ = ((unsigned int)X >> 8) & 255; \
340 *(P)++ = (unsigned int)X & 255;
341#define le_sht(P, X) \
342 *(P)++ = ((unsigned short)X >> 8) & 255; \
343 *(P)++ = (unsigned short)X & 255;
344#endif
345
346 /* Handshake Initialization Packet */
347 p = buf + 4; /* skip bytes for package header */
348 *p++ = 0x10; /* protocol_version */
349 p += sprintf(p, VERSION "-merovingian") + 1; /* server_version\0 */
350 le_int(p, 0); /* thread_number */
351 p += sprintf(p, "voidvoid"); /* scramble_buff */
352 *p++ = 0x00; /* filler */
353 /* server_capabilities:
354 * CLIENT_CONNECT_WITH_DB CLIENT_NO_SCHEMA CLIENT_PROTOCOL_41
355 * CLIENT_INTERACTIVE CLIENT_MULTI_STATEMENTS CLIENT_MULTI_RESULTS
356 */
357 le_sht(p, (8 | 16 | 512 | 1024 | 8192 | 65536 | 131072));
358 *p++ = 0x33; /* server_language = utf8_general_ci */
359 le_sht(p, 2); /* server_status = SERVER_STATUS_AUTOCOMMIT */
360 p += sprintf(p, " "); /* filler 14 bytes */
361
362 /* packet header */
363 len = p - buf;
364 p = buf;
365 le_int(p, len);
366 *p = *(p + 1); p++;
367 *p = *(p + 1); p++;
368 *p = *(p + 1); p++;
369 *p = 0x00; /* packet number */
370 mnstr_flush(fout);
371
372 return(NO_ERR);
373}
374#endif
375
376/* vim:set ts=4 sw=4 noexpandtab: */
377