1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10
11#include <sys/types.h>
12#include <sys/socket.h>
13#include <sys/un.h>
14#include <sys/wait.h>
15#include <netdb.h>
16#include <netinet/in.h>
17#ifdef HAVE_POLL_H
18#include <poll.h>
19#endif
20#include <time.h>
21#include <string.h> /* strerror */
22#include <unistd.h> /* select */
23#include <signal.h>
24#include <fcntl.h>
25
26#include "monet_options.h"
27#include "msabaoth.h"
28#include "mcrypt.h"
29#include "utils/utils.h"
30#include "utils/properties.h"
31#include "utils/database.h"
32#include "utils/control.h"
33
34#include "gdk.h" /* these three for creation of dbs with password */
35#include "mal_authorize.h"
36
37#include "merovingian.h"
38#include "discoveryrunner.h" /* broadcast, remotedb */
39#include "forkmserver.h"
40#include "controlrunner.h"
41#include "multiplex-funnel.h"
42
43#if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC)
44#define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen)
45#endif
46
47static void
48leavedb(char *name)
49{
50 char buf[128];
51 snprintf(buf, sizeof(buf),
52 "LEAV %s mapi:monetdb://%s:%u/",
53 name, _mero_hostname,
54 (unsigned int)getConfNum(_mero_props, "port"));
55 broadcast(buf);
56}
57
58static void
59leavedbS(sabdb *stats)
60{
61 confkeyval *props = getDefaultProps();
62 char *shared;
63 readProps(props, stats->path);
64 shared = getConfVal(props, "shared");
65 if (!stats->locked && (shared == NULL || strcmp(shared, "no") != 0))
66 leavedb(stats->dbname);
67 freeConfFile(props);
68 free(props);
69}
70
71static char _internal_uri_buf[256];
72static void
73setURI(sabdb *stats)
74{
75 confkeyval *props = getDefaultProps();
76 char *shared;
77 readProps(props, stats->path);
78 shared = getConfVal(props, "shared");
79 if (!stats->locked && (shared == NULL || strcmp(shared, "no") != 0)) {
80 snprintf(_internal_uri_buf, sizeof(_internal_uri_buf),
81 "mapi:monetdb://%s:%u/%s%s%s",
82 _mero_hostname,
83 (unsigned int)getConfNum(_mero_props, "port"),
84 stats->dbname,
85 shared == NULL ? "" : "/",
86 shared == NULL ? "" : shared);
87 stats->uri = _internal_uri_buf;
88 }
89 freeConfFile(props);
90 free(props);
91}
92
93static void
94anncdbS(sabdb *stats)
95{
96 char buf[128];
97 confkeyval *props = getDefaultProps();
98 char *shared;
99 readProps(props, stats->path);
100 shared = getConfVal(props, "shared");
101 if (!stats->locked && (shared == NULL || strcmp(shared, "no") != 0)) {
102 snprintf(buf, sizeof(buf),
103 "ANNC %s%s%s mapi:monetdb://%s:%u/ %d",
104 stats->dbname,
105 shared == NULL ? "" : "/",
106 shared == NULL ? "" : shared,
107 _mero_hostname,
108 (unsigned int)getConfNum(_mero_props, "port"),
109 getConfNum(_mero_props, "discoveryttl") + 60);
110 broadcast(buf);
111 }
112 freeConfFile(props);
113 free(props);
114}
115
116inline static int
117recvWithTimeout(int msgsock, stream *fdin, char *buf, size_t buflen)
118{
119 int retval;
120#ifdef HAVE_POLL
121 struct pollfd pfd = (struct pollfd) {.fd = msgsock, .events = POLLIN};
122
123 retval = poll(&pfd, 1, 1000);
124#else
125 fd_set fds;
126 struct timeval tv;
127
128 FD_ZERO(&fds);
129 FD_SET(msgsock, &fds);
130
131 /* Wait up to 1 second. If a client doesn't make this, it's too slow */
132 tv.tv_sec = 1;
133 tv.tv_usec = 0;
134 retval = select(msgsock + 1, &fds, NULL, NULL, &tv);
135#endif
136 if (retval <= 0) {
137 /* nothing interesting has happened */
138 return(-2);
139 }
140
141 if (fdin != NULL) {
142 ssize_t ret;
143 /* stream.h is sooo broken :( */
144 memset(buf, '\0', buflen);
145 ret = mnstr_read_block(fdin, buf, buflen - 1, 1);
146 return(ret >= 0 ? (int)strlen(buf) : mnstr_errnr(fdin) < 0 ? -1 : 0);
147 } else {
148 return(recv(msgsock, buf, buflen, 0));
149 }
150}
151
152char
153control_authorise(
154 const char *host,
155 const char *chal,
156 const char *algo,
157 const char *passwd,
158 stream *fout)
159{
160 char *pwd;
161
162 if (getConfNum(_mero_props, "control") == 0 ||
163 getConfVal(_mero_props, "passphrase") == NULL)
164 {
165 Mfprintf(_mero_ctlout, "%s: remote control disabled\n", host);
166 mnstr_printf(fout, "!access denied\n");
167 mnstr_flush(fout);
168 return 0;
169 }
170
171 pwd = mcrypt_hashPassword(algo,
172 getConfVal(_mero_props, "passphrase"), chal);
173 if (!pwd) {
174 Mfprintf(_mero_ctlout, "%s: Allocation failure during authentication\n", host);
175 mnstr_printf(fout, "!allocation failure\n");
176 mnstr_flush(fout);
177 return 0;
178 }
179 if (strcmp(pwd, passwd) != 0) {
180 free(pwd);
181 Mfprintf(_mero_ctlout, "%s: permission denied "
182 "(bad passphrase)\n", host);
183 mnstr_printf(fout, "!access denied\n");
184 mnstr_flush(fout);
185 return 0;
186 }
187 free(pwd);
188
189 mnstr_printf(fout, "=OK\n");
190 mnstr_flush(fout);
191
192 return 1;
193}
194
195#define send_client(P) \
196 do { \
197 if (fout != NULL) { \
198 mnstr_printf(fout, P "%s", buf2); \
199 mnstr_flush(fout); \
200 } else { \
201 if (send(msgsock, buf2, len, 0) < 0) \
202 senderror = errno; \
203 } \
204 } while (0)
205
206#define send_list() \
207 do { \
208 len = snprintf(buf2, sizeof(buf2), "OK\n"); \
209 if (fout == NULL) { \
210 if (send(msgsock, buf2, strlen(buf2), 0) < 0 || \
211 send(msgsock, pbuf, strlen(pbuf), 0) < 0) \
212 senderror = errno; \
213 } else { \
214 char *p, *q = pbuf; \
215 mnstr_printf(fout, "=OK\n"); \
216 while ((p = strchr(q, '\n')) != NULL) { \
217 *p++ = '\0'; \
218 mnstr_printf(fout, "=%s\n", q); \
219 q = p; \
220 } \
221 if (*q != '\0') \
222 mnstr_printf(fout, "=%s\n", q); \
223 mnstr_flush(fout); \
224 } \
225 } while (0)
226
227static void ctl_handle_client(
228 const char *origin,
229 int msgsock,
230 stream *fdin,
231 stream *fout)
232{
233 /* TODO: this function may actually stall the entire client
234 * handler, so we should probably at some point implement a take
235 * over of the socket such that a separate (controlrunner) thread is
236 * going to handle the traffic and negotiations, instead of the
237 * client thread that just goes inside this program here. */
238 char buf[8096];
239 char buf2[8096];
240 char *p, *q;
241 sabdb *stats;
242 int pos = 0;
243 size_t len;
244 err e;
245 int senderror = 0;
246
247 while (_mero_keep_listening && !senderror) {
248 if (pos == 0) {
249 if ((pos = recvWithTimeout(msgsock, fdin, buf, sizeof(buf))) == 0) {
250 /* EOF */
251 break;
252 } else if (pos == -1) {
253 /* we got interrupted ... so what? */
254 if (errno == EINTR) {
255 pos = 0;
256 continue;
257 }
258 /* hmmm error ... give up */
259 Mfprintf(_mero_ctlerr, "%s: error reading from control "
260 "channel: %s\n", origin, strerror(errno));
261 break;
262 } else if (pos == -2) {
263 Mfprintf(_mero_ctlerr, "%s: time-out reading from "
264 "control channel, disconnecting client\n", origin);
265 break;
266 } else {
267 buf[pos] = '\0';
268 pos = 0;
269 }
270 }
271 q = buf + pos;
272 p = strchr(q, '\n');
273 if (p == NULL) {
274 /* skip, must be garbage */
275 Mfprintf(_mero_ctlerr, "%s: skipping garbage on control "
276 "channel: %s\n", origin, buf);
277 pos = 0;
278 continue;
279 }
280 *p++ = '\0';
281 if (*p == '\0') {
282 pos = 0;
283 } else {
284 pos = p - buf;
285 }
286
287 /* format is simple: database<space>command */
288 if ((p = strchr(q, ' ')) == NULL) {
289 Mfprintf(_mero_ctlerr, "%s: malformed control signal: %s\n",
290 origin, q);
291 } else {
292 *p++ = '\0';
293 if (strcmp(p, "ping") == 0) {
294 len = snprintf(buf2, sizeof(buf2), "OK\n");
295 send_client("=");
296 } else if (strcmp(p, "start") == 0) {
297 err e;
298 if ((e = msab_getStatus(&stats, q)) != NULL) {
299 len = snprintf(buf2, sizeof(buf2),
300 "internal error, please review the logs\n");
301 send_client("!");
302 Mfprintf(_mero_ctlerr, "%s: start: msab_getStatus: "
303 "%s\n", origin, e);
304 freeErr(e);
305 continue;
306 } else {
307 if (stats == NULL) {
308 Mfprintf(_mero_ctlerr, "%s: received start signal "
309 "for database not under merovingian "
310 "control: %s\n", origin, q);
311 len = snprintf(buf2, sizeof(buf2),
312 "no such database: %s\n", q);
313 send_client("!");
314 continue;
315 }
316
317 if (stats->state == SABdbRunning) {
318 Mfprintf(_mero_ctlerr, "%s: received start signal "
319 "for already running database: %s\n",
320 origin, q);
321 len = snprintf(buf2, sizeof(buf2),
322 "database is already running: %s\n", q);
323 send_client("!");
324 msab_freeStatus(&stats);
325 continue;
326 }
327
328 msab_freeStatus(&stats);
329 }
330 if ((e = forkMserver(q, &stats, true)) != NO_ERR) {
331 Mfprintf(_mero_ctlerr, "%s: failed to fork mserver: "
332 "%s\n", origin, getErrMsg(e));
333 len = snprintf(buf2, sizeof(buf2),
334 "starting '%s' failed: %s\n",
335 q, getErrMsg(e));
336 send_client("!");
337 freeErr(e);
338 stats = NULL;
339 } else {
340 len = snprintf(buf2, sizeof(buf2), "OK\n");
341 send_client("=");
342 Mfprintf(_mero_ctlout, "%s: started '%s'\n",
343 origin, q);
344 }
345
346 if (stats != NULL)
347 msab_freeStatus(&stats);
348 } else if (strcmp(p, "stop") == 0 ||
349 strcmp(p, "kill") == 0)
350 {
351 dpair dp;
352 /* we need to find the right dpair, that is we
353 * sort of assume the control signal is right */
354 pthread_mutex_lock(&_mero_topdp_lock);
355 dp = _mero_topdp->next; /* don't need the console/log */
356 while (dp != NULL) {
357 if (dp->type == MERODB && strcmp(dp->dbname, q) == 0) {
358 if (strcmp(p, "stop") == 0) {
359 pid_t pid = dp->pid;
360 char *dbname = strdup(dp->dbname);
361 mtype type = dp->type;
362 pthread_mutex_unlock(&_mero_topdp_lock);
363 /* Try to shutdown the profiler before the DB.
364 * If we are unable to shutdown the profiler, we
365 * should still try to shutdown the server. In
366 * other words: ignore any errors that shutdown_profiler
367 * may have encountered.
368 */
369 shutdown_profiler(dbname, &stats);
370 terminateProcess(pid, dbname, type, 1);
371 Mfprintf(_mero_ctlout, "%s: stopped "
372 "database '%s'\n", origin, q);
373 } else {
374 kill(dp->pid, SIGKILL);
375 pthread_mutex_unlock(&_mero_topdp_lock);
376 Mfprintf(_mero_ctlout, "%s: killed "
377 "database '%s'\n", origin, q);
378 }
379 len = snprintf(buf2, sizeof(buf2), "OK\n");
380 send_client("=");
381 break;
382 } else if (dp->type == MEROFUN && strcmp(dp->dbname, q) == 0) {
383 /* multiplexDestroy needs topdp lock to remove itself */
384 char *dbname = strdup(dp->dbname);
385 pthread_mutex_unlock(&_mero_topdp_lock);
386 multiplexDestroy(dbname);
387 free(dbname);
388 len = snprintf(buf2, sizeof(buf2), "OK\n");
389 send_client("=");
390 break;
391 }
392
393 dp = dp->next;
394 }
395 if (dp == NULL) {
396 pthread_mutex_unlock(&_mero_topdp_lock);
397 Mfprintf(_mero_ctlerr, "%s: received stop signal for "
398 "non running database: %s\n", origin, q);
399 len = snprintf(buf2, sizeof(buf2),
400 "database is not running: %s\n", q);
401 send_client("!");
402 }
403 } else if (strcmp(p, "create") == 0 ||
404 strncmp(p, "create password=", strlen("create password=")) == 0) {
405 err e;
406
407 p += strlen("create");
408 if (*p == ' ')
409 p += strlen(" password=");
410
411 e = db_create(q);
412 if (e != NO_ERR) {
413 Mfprintf(_mero_ctlerr, "%s: failed to create "
414 "database '%s': %s\n", origin, q, getErrMsg(e));
415 len = snprintf(buf2, sizeof(buf2),
416 "%s\n", getErrMsg(e));
417 send_client("!");
418 free(e);
419 } else {
420 if (*p != '\0') {
421 pid_t child;
422 if ((child = fork()) == 0) {
423 FILE *secretf;
424 size_t len;
425 char *err;
426 char *vaultkey;
427 opt *set = malloc(sizeof(opt) * 2);
428 int setlen = 0;
429 char *sadbfarm;
430
431 if ((err = msab_getDBfarm(&sadbfarm)) != NULL) {
432 Mfprintf(_mero_ctlerr, "%s: internal error: %s\n",
433 origin, err);
434 exit(0);
435 }
436 snprintf(buf2, sizeof(buf2), "%s/%s", sadbfarm, q);
437 free(sadbfarm);
438 setlen = mo_add_option(&set, setlen, opt_cmdline, "gdk_dbpath", buf2);
439 setlen = mo_system_config(&set, setlen);
440 if (BBPaddfarm(buf2, (1 << PERSISTENT) | (1 << TRANSIENT)) != GDK_SUCCEED) {
441 Mfprintf(_mero_ctlerr, "%s: could not add farm to "
442 "'%s': %d: %s\n", origin, q, errno, strerror(errno));
443 exit(0);
444 }
445 /* the child, pollute scope by loading BBP */
446 if (chdir(q) < 0) {
447 /* Fabian says "Ignore the output.
448 * The idea is that the stuff below
449 * will also fail, and therefore emit
450 * some error, but if that happens,
451 * the world already is in such a bad
452 * shape that that most likely isn't
453 * your biggest problem.
454 * Hence a (void) probably does.
455 * If not, a fake if.
456 * (exit(0) should be fine)."
457 * (https://www.monetdb.org/pipermail/developers-list/2014-February/004238.html)
458 */
459 Mfprintf(_mero_ctlerr, "%s: could not chdir to "
460 "'%s': %d: %s\n", origin, q, errno, strerror(errno));
461 exit(0);
462 }
463
464 buf2[0] = '\0';
465 if ((secretf = fopen(".vaultkey", "r")) != NULL) {
466 len = fread(buf2, 1, sizeof(buf2), secretf);
467 buf2[len] = '\0';
468 len = strlen(buf2); /* secret can contain null-bytes */
469 fclose(secretf);
470 }
471 if (GDKinit(set, setlen) != GDK_SUCCEED) {
472 Mfprintf(_mero_ctlerr, "%s: could not "
473 "initialize database '%s'\n",
474 origin, q);
475 exit(0);
476 }
477 vaultkey = buf2;
478 if ((err = AUTHunlockVault(vaultkey)) != NULL ||
479 (err = AUTHinitTables(p)) != NULL) {
480 Mfprintf(_mero_ctlerr, "%s: could not setup "
481 "database '%s': %s\n", origin, q, err);
482 freeException(err);
483 } else {
484 /* don't start locked */
485 remove(".maintenance");
486 }
487
488 exit(0); /* return to the parent */
489 } else if (child > 0) {
490 /* wait for the child to finish */
491 waitpid(child, NULL, 0);
492 } else {
493 Mfprintf(_mero_ctlout, "%s: forking failed\n",
494 origin);
495 }
496 }
497
498 Mfprintf(_mero_ctlout, "%s: created database '%s'\n",
499 origin, q);
500 len = snprintf(buf2, sizeof(buf2), "OK\n");
501 send_client("=");
502 }
503 } else if (strncmp(p, "create mfunnel=", strlen("create mfunnel=")) == 0) {
504 err e = NO_ERR;
505 char *r;
506
507 /* check mfunnel definition for correctness first */
508 p += strlen("create mfunnel=");
509 /* user+pass@pattern,user+pass@pattern,... */
510 r = p;
511 do {
512 for (; *r != '\0' && *r != '+'; r++)
513 ;
514 if (*r == '\0' || *(r - 1) == ',' || (*(r - 1) == '=')) {
515 e = "missing user";
516 break;
517 }
518 for (r++; *r != '\0' && *r != '@'; r++)
519 ;
520 if (*r == '\0' || *(r - 1) == '+') {
521 e = "missing password";
522 break;
523 }
524 for (r++; *r != '\0' && *r != ','; r++)
525 ;
526 if (*(r - 1) == '@') {
527 e = "missing pattern";
528 break;
529 }
530 if (*r == '\0')
531 break;
532 r++;
533 } while(1);
534 if (e != NO_ERR) {
535 Mfprintf(_mero_ctlerr, "%s: invalid multiplex-funnel "
536 "specification '%s': %s at char %zu\n",
537 origin, p, getErrMsg(e), (size_t)(r - p));
538 len = snprintf(buf2, sizeof(buf2),
539 "invalid pattern: %s\n", getErrMsg(e));
540 send_client("!");
541 } else if ((e = db_create(q)) != NO_ERR) {
542 Mfprintf(_mero_ctlerr, "%s: failed to create "
543 "multiplex-funnel '%s': %s\n",
544 origin, q, getErrMsg(e));
545 len = snprintf(buf2, sizeof(buf2),
546 "%s\n", getErrMsg(e));
547 send_client("!");
548 free(e);
549 } else {
550 confkeyval *props = getDefaultProps();
551 confkeyval *kv;
552 char *dbfarm;
553 /* write the funnel config */
554 kv = findConfKey(props, "type");
555 setConfVal(kv, "mfunnel");
556 kv = findConfKey(props, "mfunnel");
557 setConfVal(kv, p);
558 if ((e = msab_getDBfarm(&dbfarm)) != NULL) {
559 Mfprintf(_mero_ctlerr, "%s: failed to retrieve "
560 "dbfarm: %s\n", origin, e);
561 free(e);
562 /* try, hopefully this succeeds */
563 if ((e = db_destroy(q)) != NO_ERR) {
564 Mfprintf(_mero_ctlerr, "%s: could not destroy: "
565 "%s\n", origin, getErrMsg(e));
566 free(e);
567 }
568 len = snprintf(buf2, sizeof(buf2),
569 "failed to prepare multiplex-funnel\n");
570 send_client("!");
571 } else {
572 snprintf(buf2, sizeof(buf2), "%s/%s", dbfarm, q);
573 free(dbfarm);
574 writeProps(props, buf2);
575 Mfprintf(_mero_ctlout,
576 "%s: created multiplex-funnel '%s'\n",
577 origin, q);
578 len = snprintf(buf2, sizeof(buf2), "OK\n");
579 send_client("!");
580 }
581 freeConfFile(props);
582 free(props);
583 }
584 } else if (strcmp(p, "destroy") == 0) {
585 err e = db_destroy(q);
586 if (e != NO_ERR) {
587 Mfprintf(_mero_ctlerr, "%s: failed to destroy "
588 "database '%s': %s\n", origin, q, getErrMsg(e));
589 len = snprintf(buf2, sizeof(buf2),
590 "%s\n", getErrMsg(e));
591 send_client("!");
592 free(e);
593 } else {
594 /* we can leave without tag, will remove all,
595 * generates an "leave request for unknown
596 * database" if not shared (e.g. when under
597 * maintenance) */
598 leavedb(q);
599 Mfprintf(_mero_ctlout, "%s: destroyed database '%s'\n",
600 origin, q);
601 len = snprintf(buf2, sizeof(buf2), "OK\n");
602 send_client("=");
603 }
604 } else if (strcmp(p, "lock") == 0) {
605 char *e = db_lock(q);
606 if (e != NULL) {
607 Mfprintf(_mero_ctlerr, "%s: failed to lock "
608 "database '%s': %s\n", origin, q, getErrMsg(e));
609 len = snprintf(buf2, sizeof(buf2),
610 "%s\n", getErrMsg(e));
611 send_client("!");
612 free(e);
613 } else {
614 /* we go under maintenance, unshare it, take
615 * spam if database happened to be unshared "for
616 * love" */
617 leavedb(q);
618 Mfprintf(_mero_ctlout, "%s: locked database '%s'\n",
619 origin, q);
620 len = snprintf(buf2, sizeof(buf2), "OK\n");
621 send_client("=");
622 }
623 } else if (strcmp(p, "release") == 0) {
624 char *e = db_release(q);
625 if (e != NULL) {
626 Mfprintf(_mero_ctlerr, "%s: failed to release "
627 "database '%s': %s\n", origin, q, getErrMsg(e));
628 len = snprintf(buf2, sizeof(buf2),
629 "%s\n", getErrMsg(e));
630 send_client("!");
631 free(e);
632 } else {
633 /* announce database, but need to do it the
634 * right way so we don't accidentially announce
635 * an unshared database */
636 if ((e = msab_getStatus(&stats, q)) != NULL) {
637 len = snprintf(buf2, sizeof(buf2),
638 "internal error, please review the logs\n");
639 send_client("!");
640 Mfprintf(_mero_ctlerr, "%s: release: "
641 "msab_getStatus: %s\n", origin, e);
642 freeErr(e);
643 /* we need to OK regardless, as releasing
644 * succeed */
645 } else {
646 anncdbS(stats);
647 msab_freeStatus(&stats);
648 }
649 Mfprintf(_mero_ctlout, "%s: released database '%s'\n",
650 origin, q);
651 len = snprintf(buf2, sizeof(buf2), "OK\n");
652 send_client("=");
653 }
654 } else if (strncmp(p, "profilerstart", strlen("profilerstart")) == 0) {
655 char *log_path = NULL;
656 char *e = fork_profiler(q, &stats, &log_path);
657 if (e != NULL) {
658 Mfprintf(_mero_ctlerr, "%s: failed to start the profiler "
659 "database '%s': %s\n", origin, q, getErrMsg(e));
660 len = snprintf(buf2, sizeof(buf2),
661 "%s\n", getErrMsg(e));
662 send_client("!");
663 freeErr(e);
664 } else {
665 len = snprintf(buf2, sizeof(buf2), "OK\n");
666 send_client("=");
667 Mfprintf(_mero_ctlout, "%s: started profiler for '%s'\n",
668 origin, q);
669 Mfprintf(_mero_ctlout, "%s: logs at: %s\n",
670 origin, log_path);
671 }
672 msab_freeStatus(&stats);
673 } else if (strncmp(p, "profilerstop", strlen("profilerstop")) == 0) {
674 char *e = shutdown_profiler(q, &stats);
675 if (e != NULL) {
676 Mfprintf(_mero_ctlerr, "%s: failed to shutdown the profiler "
677 "database '%s': %s\n", origin, q, getErrMsg(e));
678 len = snprintf(buf2, sizeof(buf2),
679 "%s\n", getErrMsg(e));
680 send_client("!");
681 freeErr(e);
682 } else {
683 len = snprintf(buf2, sizeof(buf2), "OK\n");
684 send_client("=");
685 Mfprintf(_mero_ctlout, "%s: profiler shut down for '%s'\n",
686 origin, q);
687 }
688 msab_freeStatus(&stats);
689 } else if (strncmp(p, "name=", strlen("name=")) == 0) {
690 char *e;
691
692 p += strlen("name=");
693 e = db_rename(q, p);
694 if (e != NULL) {
695 Mfprintf(_mero_ctlerr, "%s: %s\n", origin, e);
696 len = snprintf(buf2, sizeof(buf2), "%s\n", e);
697 send_client("!");
698 free(e);
699 } else {
700 if ((e = msab_getStatus(&stats, p)) != NULL) {
701 Mfprintf(_mero_ctlerr, "%s: name: msab_getStatus:"
702 " %s\n", origin, e);
703 freeErr(e);
704 /* should not fail, since the rename was
705 * already successful */
706 } else {
707 leavedb(q); /* could be spam, but shouldn't harm */
708 anncdbS(stats);
709 msab_freeStatus(&stats);
710 }
711 Mfprintf(_mero_ctlout, "%s: renamed database '%s' "
712 "to '%s'\n", origin, q, p);
713 len = snprintf(buf2, sizeof(buf2), "OK\n");
714 send_client("=");
715 }
716 } else if (strchr(p, '=') != NULL) { /* set */
717 char *val;
718 char doshare = 0;
719
720 if ((e = msab_getStatus(&stats, q)) != NULL) {
721 len = snprintf(buf2, sizeof(buf2),
722 "internal error, please review the logs\n");
723 send_client("!");
724 Mfprintf(_mero_ctlerr, "%s: set: msab_getStatus: "
725 "%s\n", origin, e);
726 freeErr(e);
727 continue;
728 }
729 if (stats == NULL) {
730 Mfprintf(_mero_ctlerr, "%s: received property signal "
731 "for unknown database: %s\n", origin, q);
732 len = snprintf(buf2, sizeof(buf2),
733 "unknown database: %s\n", q);
734 send_client("!");
735 continue;
736 }
737
738 val = strchr(p, '=');
739 assert(val != NULL); /* see above */
740 *val++ = '\0';
741 if (*val == '\0')
742 val = NULL;
743
744 if ((doshare = !strcmp(p, "shared"))) {
745 /* bail out if we don't do discovery at all */
746 if (getConfNum(_mero_props, "discovery") == 0) {
747 len = snprintf(buf2, sizeof(buf2),
748 "discovery service is globally disabled, "
749 "enable it first\n");
750 send_client("!");
751 Mfprintf(_mero_ctlerr, "%s: set: cannot perform "
752 "client share request: discovery service "
753 "is globally disabled\n", origin);
754 msab_freeStatus(&stats);
755 continue;
756 }
757
758 /* we're going to change the way it is shared,
759 * so remove it now in its old form */
760 leavedbS(stats);
761 } else if (stats->state == SABdbRunning) {
762 Mfprintf(_mero_ctlerr, "%s: cannot set property '%s' "
763 "on running database\n", origin, p);
764 len = snprintf(buf2, sizeof(buf2),
765 "cannot set property '%s' on running "
766 "database\n", p);
767 send_client("!");
768 msab_freeStatus(&stats);
769 continue;
770 }
771
772 if ((e = setProp(stats->path, p, val)) != NULL) {
773 if (doshare)
774 /* reannounce again, there was an error */
775 anncdbS(stats);
776 Mfprintf(_mero_ctlerr, "%s: setting property failed: "
777 "%s\n", origin, e);
778 len = snprintf(buf2, sizeof(buf2),
779 "%s\n", e);
780 send_client("!");
781 free(e);
782 msab_freeStatus(&stats);
783 continue;
784 } else if (doshare) {
785 /* announce in new personality */
786 anncdbS(stats);
787 }
788
789 msab_freeStatus(&stats);
790
791 if (val != NULL) {
792 Mfprintf(_mero_ctlout, "%s: set property '%s' for "
793 "database '%s' to '%s'\n", origin, p, q, val);
794 } else {
795 Mfprintf(_mero_ctlout, "%s: inherited property '%s' "
796 "for database '%s'\n", origin, p, q);
797 }
798 len = snprintf(buf2, sizeof(buf2), "OK\n");
799 send_client("=");
800
801 /* comands below this point are multi line and hence you can't
802 * combine them, so they disconnect the client afterwards */
803
804 } else if (strcmp(p, "version") == 0) {
805 len = snprintf(buf2, sizeof(buf2), "OK\n");
806 send_client("=");
807 len = snprintf(buf2, sizeof(buf2), "%s (%s)\n",
808 VERSION,
809#ifdef MONETDB_RELEASE
810 MONETDB_RELEASE
811#else
812 "unreleased"
813#endif
814 );
815 send_client("=");
816 break;
817 } else if (strcmp(p, "mserver") == 0) {
818 len = snprintf(buf2, sizeof(buf2), "OK\n");
819 send_client("=");
820 len = snprintf(buf2, sizeof(buf2), "%s\n", _mero_mserver);
821 send_client("=");
822 break;
823 } else if (strcmp(p, "get") == 0) {
824 confkeyval *props = getDefaultProps();
825 char *pbuf;
826
827 if (strcmp(q, "#defaults") == 0) {
828 /* send defaults to client */
829 writePropsBuf(_mero_db_props, &pbuf);
830 send_list();
831
832 Mfprintf(_mero_ctlout, "%s: served default property "
833 "list\n", origin);
834 freeConfFile(props);
835 free(props);
836 free(pbuf);
837 break;
838 }
839
840 if ((e = msab_getStatus(&stats, q)) != NULL) {
841 len = snprintf(buf2, sizeof(buf2),
842 "internal error, please review the logs\n");
843 send_client("!");
844 Mfprintf(_mero_ctlerr, "%s: get: msab_getStatus: "
845 "%s\n", origin, e);
846 freeErr(e);
847 freeConfFile(props);
848 free(props);
849 break;
850 }
851 if (stats == NULL) {
852 Mfprintf(_mero_ctlerr, "%s: received get signal for "
853 "unknown database: %s\n", origin, q);
854 len = snprintf(buf2, sizeof(buf2),
855 "unknown database: %s\n", q);
856 send_client("!");
857 freeConfFile(props);
858 free(props);
859 break;
860 }
861
862 /* from here we'll always succeed, even if we don't
863 * send anything */
864 readProps(props, stats->path);
865 writePropsBuf(props, &pbuf);
866 send_list();
867 freeConfFile(props);
868 free(props);
869 free(pbuf);
870 msab_freeStatus(&stats);
871
872 Mfprintf(_mero_ctlout, "%s: served property list for "
873 "database '%s'\n", origin, q);
874 break;
875 } else if (strcmp(p, "status") == 0) {
876 sabdb *stats;
877 sabdb *topdb;
878 char *sdb = NULL;
879
880 if (strcmp(q, "#all") == 0)
881 /* list all */
882 q = NULL;
883
884 /* return a list of sabdb structs for our local
885 * databases */
886 if ((e = msab_getStatus(&stats, q)) != NULL) {
887 len = snprintf(buf2, sizeof(buf2),
888 "internal error, please review the logs\n");
889 send_client("!");
890 Mfprintf(_mero_ctlerr, "%s: status: msab_getStatus: "
891 "%s\n", origin, e);
892 freeErr(e);
893 break;
894 }
895
896 if (stats == NULL && q != NULL) {
897 Mfprintf(_mero_ctlerr, "%s: received status signal for "
898 "unknown database: %s\n", origin, q);
899 len = snprintf(buf2, sizeof(buf2),
900 "unknown database: %s\n", q);
901 len = snprintf(buf2, sizeof(buf2), "no such database '%s'\n", q);
902 send_client("!");
903 break;
904 }
905
906 len = snprintf(buf2, sizeof(buf2), "OK\n");
907 if (fout == NULL) {
908 if (send(msgsock, buf2, len, 0) < 0)
909 senderror = errno;
910 } else {
911 mnstr_printf(fout, "=%s", buf2);
912 }
913
914 for (topdb = stats; stats != NULL; stats = stats->next) {
915 /* set uri */
916 setURI(stats);
917 /* currently never fails (just crashes) */
918 if ((e = msab_serialise(&sdb, stats)) != NULL)
919 break;
920 stats->uri = NULL;
921 len = snprintf(buf2, sizeof(buf2), "%s\n", sdb);
922 if (fout == NULL) {
923 if (send(msgsock, buf2, len, 0) < 0)
924 senderror = errno;
925 } else {
926 mnstr_printf(fout, "=%s", buf2);
927 }
928 free(sdb);
929 }
930 if (e != NULL) {
931 len = snprintf(buf2, sizeof(buf2),
932 "internal error, please review the logs\n");
933 send_client("!");
934 Mfprintf(_mero_ctlerr, "%s: status: msab_getStatus: "
935 "%s\n", origin, e);
936 msab_freeStatus(&topdb);
937 freeErr(e);
938 break;
939 }
940
941 if (fout != NULL)
942 mnstr_flush(fout);
943
944 if (q == NULL) {
945 Mfprintf(_mero_ctlout, "%s: served status list\n",
946 origin);
947 } else {
948 Mfprintf(_mero_ctlout, "%s: returned status for "
949 "'%s'\n", origin, q);
950 }
951
952 msab_freeStatus(&topdb);
953 break;
954 } else if (strcmp(q, "anelosimus") == 0 &&
955 strcmp(p, "eximius") == 0)
956 {
957 /* return a list of remote databases from our Aranita */
958 remotedb rdb;
959
960 pthread_mutex_lock(&_mero_remotedb_lock);
961
962 /* this never fails */
963 len = snprintf(buf2, sizeof(buf2), "OK\n");
964 if (fout == NULL) {
965 if (send(msgsock, buf2, len, 0) < 0)
966 senderror = errno;
967 } else {
968 mnstr_printf(fout, "=%s", buf2);
969 }
970
971 rdb = _mero_remotedbs;
972 while (rdb != NULL && !senderror) {
973 len = snprintf(buf2, sizeof(buf2), "%s\t%s\n",
974 rdb->fullname,
975 rdb->conn);
976 if (fout == NULL) {
977 if (send(msgsock, buf2, len, 0) < 0)
978 senderror = errno;
979 } else {
980 mnstr_printf(fout, "=%s", buf2);
981 }
982 rdb = rdb->next;
983 }
984
985 if (fout != NULL)
986 mnstr_flush(fout);
987
988 pthread_mutex_unlock(&_mero_remotedb_lock);
989
990 Mfprintf(_mero_ctlout, "%s: served neighbour list\n",
991 origin);
992 break;
993 } else {
994 Mfprintf(_mero_ctlerr, "%s: unknown control command: %s\n",
995 origin, p);
996 len = snprintf(buf2, sizeof(buf2),
997 "unknown command: %s\n", p);
998 send_client("!");
999 break;
1000 }
1001 }
1002 }
1003 if (senderror)
1004 Mfprintf(_mero_ctlerr, "%s: error sending to control "
1005 "channel: %s\n", origin, strerror(senderror));
1006}
1007
1008void
1009control_handleclient(const char *host, int sock, stream *fdin, stream *fout)
1010{
1011 ctl_handle_client(host, sock, fdin, fout);
1012}
1013
1014static void *
1015handle_client(void *p)
1016{
1017 int msgsock = * (int *) p;
1018
1019 free(p);
1020 ctl_handle_client("(local)", msgsock, NULL, NULL);
1021 shutdown(msgsock, SHUT_RDWR);
1022 closesocket(msgsock);
1023 return NULL;
1024}
1025
1026void *
1027controlRunner(void *d)
1028{
1029 int usock = *(int *)d;
1030 int retval;
1031#ifdef HAVE_POLL
1032 struct pollfd pfd;
1033#else
1034 fd_set fds;
1035 struct timeval tv;
1036#endif
1037 int msgsock;
1038 pthread_t tid;
1039 int *p;
1040
1041 do {
1042 if ((p = malloc(sizeof(int))) == NULL) {
1043 Mfprintf(_mero_ctlerr, "malloc failed");
1044 break;
1045 }
1046#ifdef HAVE_POLL
1047 pfd = (struct pollfd) {.fd = usock, .events = POLLIN};
1048 retval = poll(&pfd, 1, 1000);
1049#else
1050 FD_ZERO(&fds);
1051 FD_SET(usock, &fds);
1052
1053 /* limit waiting time in order to check whether we need to exit */
1054 tv.tv_sec = 1;
1055 tv.tv_usec = 0;
1056 retval = select(usock + 1, &fds, NULL, NULL, &tv);
1057#endif
1058 if (retval == 0) {
1059 /* nothing interesting has happened */
1060 continue;
1061 }
1062 if (retval == -1) {
1063 continue;
1064 }
1065
1066#ifdef HAVE_POLL
1067 if ((pfd.revents & POLLIN) == 0)
1068 continue;
1069#else
1070 if (!FD_ISSET(usock, &fds)) {
1071 continue;
1072 }
1073#endif
1074
1075 if ((msgsock = accept4(usock, (SOCKPTR) 0, (socklen_t *) 0, SOCK_CLOEXEC)) == -1) {
1076 if (_mero_keep_listening == 0)
1077 break;
1078 if (errno != EINTR) {
1079 Mfprintf(_mero_ctlerr, "error during accept: %s",
1080 strerror(errno));
1081 }
1082 continue;
1083 }
1084#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
1085 (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
1086#endif
1087
1088 *p = msgsock;
1089 if (pthread_create(&tid, NULL, handle_client, p) != 0)
1090 closesocket(msgsock);
1091 else
1092 pthread_detach(tid);
1093 } while (_mero_keep_listening);
1094 shutdown(usock, SHUT_RDWR);
1095 closesocket(usock);
1096 Mfprintf(_mero_ctlout, "control channel closed\n");
1097 return NULL;
1098}
1099
1100/* vim:set ts=4 sw=4 noexpandtab: */
1101