1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10
11#include <string.h> /* strerror, strchr, strcmp */
12#include <sys/types.h>
13#include <sys/socket.h>
14#include <sys/un.h>
15#include <netdb.h>
16#include <netinet/in.h>
17#ifdef HAVE_POLL_H
18#include <poll.h>
19#endif
20#ifdef HAVE_SYS_UIO_H
21# include <sys/uio.h>
22#endif
23#include <fcntl.h>
24
25#include "msabaoth.h"
26#include "mcrypt.h"
27#include "stream.h"
28#include "stream_socket.h"
29#include "utils/utils.h" /* freeConfFile */
30#include "utils/properties.h" /* readProps */
31
32#include "merovingian.h"
33#include "forkmserver.h"
34#include "proxy.h"
35#include "multiplex-funnel.h"
36#include "controlrunner.h"
37#include "client.h"
38#include "handlers.h"
39
40#if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC)
41#define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen)
42#endif
43
44struct threads {
45 struct threads *next;
46 pthread_t tid;
47 volatile char dead;
48};
49struct clientdata {
50 int sock;
51 bool isusock;
52 struct threads *self;
53 char challenge[32];
54};
55
56static void *
57handleClient(void *data)
58
59{
60 stream *fdin, *fout;
61 char buf[8096];
62 char chal[32];
63 char *user = NULL, *algo = NULL, *passwd = NULL, *lang = NULL;
64 char *database = NULL, *s;
65 char dbmod[64];
66 char host[512];
67 char port[16];
68 sabdb *top = NULL;
69 sabdb *stat = NULL;
70 struct sockaddr saddr;
71 socklen_t saddrlen = 0;
72 err e;
73 confkeyval *ckv, *kv;
74 char mydoproxy;
75 sabdb redirs[24]; /* do we need more? */
76 int r = 0;
77 int sock;
78 bool isusock;
79 struct threads *self;
80
81 sock = ((struct clientdata *) data)->sock;
82 isusock = ((struct clientdata *) data)->isusock;
83 self = ((struct clientdata *) data)->self;
84 memcpy(chal, ((struct clientdata *) data)->challenge, sizeof(chal));
85 free(data);
86 fdin = socket_rstream(sock, "merovingian<-client (read)");
87 if (fdin == 0) {
88 self->dead = 1;
89 return(newErr("merovingian-client inputstream problems"));
90 }
91 fdin = block_stream(fdin);
92
93 fout = socket_wstream(sock, "merovingian->client (write)");
94 if (fout == 0) {
95 close_stream(fdin);
96 self->dead = 1;
97 return(newErr("merovingian-client outputstream problems"));
98 }
99 fout = block_stream(fout);
100
101 if (isusock) {
102 snprintf(host, sizeof(host), "(local)");
103 } else if (getpeername(sock, &saddr, &saddrlen) == -1) {
104 Mfprintf(stderr, "couldn't get peername of client: %s\n", strerror(errno));
105 snprintf(host, sizeof(host), "(unknown)");
106 } else {
107 char ghost[512];
108 if (getnameinfo(&saddr, saddrlen, ghost, sizeof(ghost), port, sizeof(port),
109 NI_NUMERICSERV | NI_NUMERICHOST) == 0) {
110 snprintf(host, sizeof(host), "%s:%s", ghost, port);
111 } else {
112 snprintf(host, sizeof(host), "(unknown):%s", port);
113 }
114 }
115
116 /* note: since Jan2012 we speak proto 9 for control connections */
117 mnstr_printf(fout, "%s:merovingian:9:%s:%s:%s:",
118 chal,
119 mcrypt_getHashAlgorithms(),
120#ifdef WORDS_BIGENDIAN
121 "BIG",
122#else
123 "LIT",
124#endif
125 MONETDB5_PASSWDHASH
126 );
127 mnstr_flush(fout);
128
129 /* get response */
130 buf[0] = '\0';
131 if (mnstr_read_block(fdin, buf, sizeof(buf) - 1, 1) < 0) {
132 /* we didn't get a terminated block :/ */
133 e = newErr("client %s sent challenge in incomplete block: %s",
134 host, buf);
135 mnstr_printf(fout, "!monetdbd: client sent something this "
136 "server could not understand, sorry\n");
137 mnstr_flush(fout);
138 close_stream(fout);
139 close_stream(fdin);
140 self->dead = 1;
141 return(e);
142 }
143 buf[sizeof(buf) - 1] = '\0';
144
145 /* decode BIG/LIT:user:{cypher}passwordchal:lang:database: line */
146
147 user = buf;
148 /* byte order */
149 s = strchr(user, ':');
150 if (s) {
151 *s = 0;
152 /* we don't use this in merovingian */
153 /* mnstr_set_byteorder(fin->s, strcmp(user, "BIG") == 0); */
154 user = s + 1;
155 } else {
156 e = newErr("client %s challenge error: %s", host, buf);
157 mnstr_printf(fout, "!monetdbd: incomplete challenge '%s'\n", buf);
158 mnstr_flush(fout);
159 close_stream(fout);
160 close_stream(fdin);
161 self->dead = 1;
162 return(e);
163 }
164
165 /* passwd */
166 s = strchr(user, ':');
167 if (s) {
168 *s = 0;
169 passwd = s + 1;
170 /* decode algorithm, i.e. {plain}mypasswordchallenge */
171 if (*passwd != '{') {
172 e = newErr("client %s challenge error: %s", host, buf);
173 mnstr_printf(fout, "!monetdbd: invalid password entry\n");
174 mnstr_flush(fout);
175 close_stream(fout);
176 close_stream(fdin);
177 self->dead = 1;
178 return(e);
179 }
180 algo = passwd + 1;
181 s = strchr(algo, '}');
182 if (!s) {
183 e = newErr("client %s challenge error: %s", host, buf);
184 mnstr_printf(fout, "!monetdbd: invalid password entry\n");
185 mnstr_flush(fout);
186 close_stream(fout);
187 close_stream(fdin);
188 self->dead = 1;
189 return(e);
190 }
191 *s = 0;
192 passwd = s + 1;
193 } else {
194 e = newErr("client %s challenge error: %s", host, buf);
195 mnstr_printf(fout, "!monetdbd: incomplete challenge, missing password after '%s'\n", user);
196 mnstr_flush(fout);
197 close_stream(fout);
198 close_stream(fdin);
199 self->dead = 1;
200 return(e);
201 }
202
203 /* lang */
204 s = strchr(passwd, ':');
205 if (s) {
206 *s = 0;
207 lang = s + 1;
208 } else {
209 e = newErr("client %s challenge error: %s", host, buf);
210 mnstr_printf(fout, "!monetdbd: incomplete challenge, missing language after '%s'\n", passwd);
211 mnstr_flush(fout);
212 close_stream(fout);
213 close_stream(fdin);
214 self->dead = 1;
215 return(e);
216 }
217
218 /* database */
219 s = strchr(lang, ':');
220 if (s) {
221 *s = 0;
222 database = s + 1;
223 /* since we don't know where the string ends, we need to look
224 * for another : */
225 s = strchr(database, ':');
226 if (s == NULL) {
227 e = newErr("client %s challenge error: %s", host, buf);
228 mnstr_printf(fout, "!monetdbd: incomplete challenge, missing trailing colon\n");
229 mnstr_flush(fout);
230 close_stream(fout);
231 close_stream(fdin);
232 self->dead = 1;
233 return(e);
234 } else {
235 *s = '\0';
236 }
237 }
238
239 if (database == NULL || *database == '\0') {
240 /* we need to have a database, if we haven't gotten one,
241 * complain */
242 mnstr_printf(fout, "!monetdbd: please specify a database\n");
243 mnstr_flush(fout);
244 close_stream(fout);
245 close_stream(fdin);
246 self->dead = 1;
247 return(newErr("client %s specified no database", host));
248 }
249
250 if (strcmp(lang, "control") == 0) {
251 /* handle control client */
252 if (control_authorise(host, chal, algo, passwd, fout))
253 control_handleclient(host, sock, fdin, fout);
254 close_stream(fout);
255 close_stream(fdin);
256 self->dead = 1;
257 return(NO_ERR);
258 }
259
260 if (strcmp(lang, "resolve") == 0) {
261 /* ensure the pattern ends with '/\*' such that we force a
262 * remote entry, including those for local databases, this
263 * way we will get a redirect back to merovingian for such
264 * database if it is proxied and hence not remotely
265 * available */
266 size_t len = strlen(database);
267 if (len > 2 &&
268 database[len - 2] != '/' &&
269 database[len - 1] != '*')
270 {
271 snprintf(dbmod, sizeof(dbmod), "%s/*", database);
272 database = dbmod;
273 }
274 }
275
276 if ((e = forkMserver(database, &top, false)) != NO_ERR) {
277 if (top == NULL) {
278 mnstr_printf(fout, "!monetdbd: no such database '%s', please create it first\n", database);
279 } else {
280 mnstr_printf(fout, "!monetdbd: internal error while starting mserver '%s'%s\n", e, strstr(e, "logfile")?"":", please refer to the logs");
281 Mfprintf(_mero_ctlerr, "!monetdbd: an internal error has occurred '%s'\n",e);
282 }
283 mnstr_flush(fout);
284 close_stream(fout);
285 close_stream(fdin);
286 self->dead = 1;
287 return(e);
288 }
289 stat = top;
290
291 /* a multiplex-funnel is a database which has no connections, but a
292 * scenario "mfunnel" */
293 if ((top->conns == NULL || top->conns->val == NULL) &&
294 top->scens != NULL && strcmp(top->scens->val, "mfunnel") == 0)
295 {
296 multiplexAddClient(top->dbname, sock, fout, fdin, host);
297 msab_freeStatus(&top);
298 self->dead = 1;
299 return(NO_ERR);
300 }
301
302 /* collect possible redirects */
303 for (stat = top; stat != NULL; stat = stat->next) {
304 if (stat->conns == NULL || stat->conns->val == NULL) {
305 Mfprintf(stdout, "dropping database without available "
306 "connections: '%s'\n", stat->dbname);
307 } else if (r == 24) {
308 Mfprintf(stdout, "dropping database connection because of "
309 "too many already: %s\n", stat->conns->val);
310 } else {
311 redirs[r++] = *stat;
312 }
313 }
314
315 /* if we can't redirect, our mission ends here */
316 if (r == 0) {
317 if (top->locked) {
318 e = newErr("database '%s' is under maintenance", top->dbname);
319 } else {
320 e = newErr("there are no available connections for '%s'", database);
321 }
322 mnstr_printf(fout, "!monetdbd: %s\n", e);
323 mnstr_flush(fout);
324 close_stream(fout);
325 close_stream(fdin);
326 msab_freeStatus(&top);
327 self->dead = 1;
328 return(e);
329 }
330
331 /* need to send a response, either we are going to proxy, or we send
332 * a redirect, if we have multiple options, a redirect is our only
333 * option, but if the redir is a single remote we need to stick to
334 * our default, there is a special case when the client indicates it
335 * is only resolving a pattern, in which we always need to send
336 * redirects, even if it's one */
337 mydoproxy = 0;
338 if (r == 1 && strcmp(lang, "resolve") != 0) {
339 if (redirs[0].dbname != redirs[0].path) {
340 /* this is a real local database (not a remote) */
341 ckv = getDefaultProps();
342 readProps(ckv, redirs[0].path);
343 kv = findConfKey(ckv, "forward");
344 } else {
345 ckv = NULL;
346 kv = NULL;
347 }
348 if (kv == NULL || kv->val == NULL)
349 kv = findConfKey(_mero_props, "forward");
350 mydoproxy = strcmp(kv->val, "proxy") == 0;
351 if (ckv != NULL) {
352 freeConfFile(ckv);
353 free(ckv);
354 }
355 }
356
357 if (mydoproxy == 0) {
358 fprintf(stdout, "redirecting client %s for database '%s' to",
359 host, database);
360 /* client is in control, send all redirects */
361 while (--r >= 0) {
362 fprintf(stdout, " %s%s",
363 redirs[r].conns->val, redirs[r].dbname);
364 mnstr_printf(fout, "^%s%s\n",
365 redirs[r].conns->val, redirs[r].dbname);
366 }
367 /* flush redirect */
368 fprintf(stdout, "\n");
369 fflush(stdout);
370 mnstr_flush(fout);
371 } else {
372 Mfprintf(stdout, "proxying client %s for database '%s' to "
373 "%s?database=%s\n",
374 host, database, redirs[0].conns->val, redirs[0].dbname);
375 /* merovingian is in control, only consider the first redirect */
376 mnstr_printf(fout, "^mapi:merovingian://proxy?database=%s\n",
377 redirs[0].dbname);
378 /* flush redirect */
379 mnstr_flush(fout);
380
381 /* wait for input, or disconnect in a proxy runner */
382 if ((e = startProxy(sock, fdin, fout,
383 redirs[0].conns->val, host)) != NO_ERR)
384 {
385 /* we need to let the client login in order not to violate
386 * the protocol */
387 mnstr_printf(fout, "void:merovingian:9:%s:BIG:%s:",
388 mcrypt_getHashAlgorithms(), MONETDB5_PASSWDHASH);
389 mnstr_flush(fout);
390 mnstr_read_block(fdin, buf, 8095, 1); /* eat away client response */
391 mnstr_printf(fout, "!monetdbd: an internal error has occurred '%s', refer to the logs for details, please try again later\n",e);
392 mnstr_flush(fout);
393 Mfprintf(_mero_ctlerr, "!monetdbd: an internal error has occurred '%s'\n",e);
394 close_stream(fout);
395 close_stream(fdin);
396 Mfprintf(stdout, "starting a proxy failed: %s\n", e);
397 msab_freeStatus(&top);
398 self->dead = 1;
399 return(e);
400 }
401 }
402
403 msab_freeStatus(&top);
404 self->dead = 1;
405 return(NO_ERR);
406}
407
408char *
409acceptConnections(int sock, int usock)
410{
411 char *msg;
412 int retval;
413#ifdef HAVE_POLL
414 struct pollfd pfd[2];
415#else
416 fd_set fds;
417 struct timeval tv;
418#endif
419 int msgsock;
420 void *e;
421 struct clientdata *data;
422 struct threads *threads = NULL, **threadp, *p;
423 int errnr; /* saved errno */
424
425 do {
426 /* handle socket connections */
427 bool isusock = false;
428
429#ifdef HAVE_POLL
430 pfd[0] = (struct pollfd) {.fd = sock, .events = POLLIN};
431 pfd[1] = (struct pollfd) {.fd = usock, .events = POLLIN};
432
433 /* Wait up to 5 seconds */
434 retval = poll(pfd, 2, 5000);
435#else
436 FD_ZERO(&fds);
437 FD_SET(sock, &fds);
438 FD_SET(usock, &fds);
439
440 /* Wait up to 5 seconds */
441 tv.tv_sec = 5;
442 tv.tv_usec = 0;
443 retval = select((sock > usock ? sock : usock) + 1,
444 &fds, NULL, NULL, &tv);
445#endif
446 errnr = errno;
447 /* join any handleClient threads that we started and that may
448 * have finished by now */
449 for (threadp = &threads; *threadp; threadp = &(*threadp)->next) {
450 if ((*threadp)->dead &&
451 pthread_join((*threadp)->tid, &e) == 0) {
452 p = *threadp;
453 *threadp = p->next;
454 free(p);
455 if (e != NO_ERR) {
456 Mfprintf(stderr, "client error: %s\n",
457 getErrMsg((char *) e));
458 freeErr(e);
459 }
460 if (*threadp == NULL)
461 break;
462 }
463 }
464 childhandler();
465 reinitialize();
466 if (retval == 0) {
467 /* nothing interesting has happened */
468 continue;
469 }
470 if (retval == -1) {
471 if (_mero_keep_listening == 0)
472 break;
473 switch (errnr) {
474 case EINTR:
475 /* interrupted */
476 break;
477 case EMFILE:
478 case ENFILE:
479 case ENOBUFS:
480 case ENOMEM:
481 /* transient failures */
482 break;
483 default:
484 msg = strerror(errnr);
485 goto error;
486 }
487 continue;
488 }
489 if (
490#ifdef HAVE_POLL
491 pfd[0].revents & POLLIN
492#else
493 FD_ISSET(sock, &fds)
494#endif
495 ) {
496 isusock = false;
497 if ((msgsock = accept4(sock, (SOCKPTR)0, (socklen_t *) 0, SOCK_CLOEXEC)) == -1) {
498 if (_mero_keep_listening == 0)
499 break;
500 switch (errno) {
501 case EINTR:
502 /* interrupted */
503 break;
504 case EMFILE:
505 case ENFILE:
506 case ENOBUFS:
507 case ENOMEM:
508 /* transient failures */
509 break;
510 case ECONNABORTED:
511 /* connection aborted before we began */
512 break;
513 default:
514 msg = strerror(errno);
515 goto error;
516 }
517 continue;
518 }
519#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
520 (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
521#endif
522 } else if (
523#ifdef HAVE_POLL
524 pfd[1].revents & POLLIN
525#else
526 FD_ISSET(usock, &fds)
527#endif
528 ) {
529 struct msghdr msgh;
530 struct iovec iov;
531 char buf[1];
532 int rv;
533 char ccmsg[CMSG_SPACE(sizeof(int))];
534
535 isusock = true;
536 if ((msgsock = accept4(usock, (SOCKPTR)0, (socklen_t *)0, SOCK_CLOEXEC)) == -1) {
537 if (_mero_keep_listening == 0)
538 break;
539 switch (errno) {
540 case EINTR:
541 /* interrupted */
542 break;
543 case EMFILE:
544 case ENFILE:
545 case ENOBUFS:
546 case ENOMEM:
547 /* transient failures */
548 break;
549 case ECONNABORTED:
550 /* connection aborted before we began */
551 break;
552 default:
553 msg = strerror(errno);
554 goto error;
555 }
556 continue;
557 }
558#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
559 (void) fcntl(usock, F_SETFD, FD_CLOEXEC);
560#endif
561
562 /* BEWARE: unix domain sockets have a slightly different
563 * behaviour initialy than normal sockets, because we can
564 * send filedescriptors or credentials with them. To do so,
565 * we need to use sendmsg/recvmsg, which operates on a bare
566 * socket. Unfortunately we *have* to send something, so it
567 * is one byte that can optionally carry the ancillary data.
568 * This byte is at this moment defined to contain a character:
569 * '0' - there is no ancillary data
570 * '1' - ancillary data for passing a file descriptor
571 * The future may introduce a state for passing credentials.
572 * Any unknown character must be interpreted as some unknown
573 * action, and hence not supported by the server.
574 * Since there is no reason why one would like to pass
575 * descriptors to Merovingian, this is not implemented here. */
576
577 iov.iov_base = buf;
578 iov.iov_len = 1;
579
580 msgh.msg_name = 0;
581 msgh.msg_namelen = 0;
582 msgh.msg_iov = &iov;
583 msgh.msg_iovlen = 1;
584 msgh.msg_control = ccmsg;
585 msgh.msg_controllen = sizeof(ccmsg);
586
587 rv = recvmsg(msgsock, &msgh, 0);
588 if (rv == -1) {
589 closesocket(msgsock);
590 continue;
591 }
592
593 switch (buf[0]) {
594 case '0':
595 /* nothing special, nothing to do */
596 break;
597 case '1':
598 /* filedescriptor, no way */
599 closesocket(msgsock);
600 Mfprintf(stderr, "client error: fd passing not supported\n");
601 continue;
602 default:
603 /* some unknown state */
604 closesocket(msgsock);
605 Mfprintf(stderr, "client error: unknown initial byte\n");
606 continue;
607 }
608 } else
609 continue;
610 /* start handleClient as a thread so that we're not blocked by
611 * a slow client */
612 data = malloc(sizeof(*data)); /* freed by handleClient */
613 p = malloc(sizeof(*p));
614 if (data == NULL || p == NULL) {
615 if (data)
616 free(data);
617 if (p)
618 free(p);
619 closesocket(msgsock);
620 Mfprintf(stderr, "cannot allocate memory\n");
621 continue;
622 }
623 data->sock = msgsock;
624 data->isusock = isusock;
625 p->dead = 0;
626 data->self = p;
627 data->challenge[31] = '\0';
628 generateSalt(data->challenge, 31);
629 if (pthread_create(&p->tid, NULL, handleClient, data) == 0) {
630 p->next = threads;
631 threads = p;
632 } else {
633 closesocket(msgsock);
634 free(data);
635 free(p);
636 }
637 } while (_mero_keep_listening);
638 shutdown(sock, SHUT_RDWR);
639 closesocket(sock);
640 return(NO_ERR);
641
642error:
643 _mero_keep_listening = 0;
644 closesocket(sock);
645 return(newErr("accept connection: %s", msg));
646}
647
648/* vim:set ts=4 sw=4 noexpandtab: */
649