1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | #include "monetdb_config.h" |
10 | |
11 | #include <sys/types.h> |
12 | #include <sys/socket.h> |
13 | #include <sys/un.h> |
14 | #include <sys/wait.h> |
15 | #include <netdb.h> |
16 | #include <netinet/in.h> |
17 | #ifdef HAVE_POLL_H |
18 | #include <poll.h> |
19 | #endif |
20 | #include <time.h> |
21 | #include <string.h> /* strerror */ |
22 | #include <unistd.h> /* select */ |
23 | #include <signal.h> |
24 | #include <fcntl.h> |
25 | |
26 | #include "monet_options.h" |
27 | #include "msabaoth.h" |
28 | #include "mcrypt.h" |
29 | #include "utils/utils.h" |
30 | #include "utils/properties.h" |
31 | #include "utils/database.h" |
32 | #include "utils/control.h" |
33 | |
34 | #include "gdk.h" /* these three for creation of dbs with password */ |
35 | #include "mal_authorize.h" |
36 | |
37 | #include "merovingian.h" |
38 | #include "discoveryrunner.h" /* broadcast, remotedb */ |
39 | #include "forkmserver.h" |
40 | #include "controlrunner.h" |
41 | #include "multiplex-funnel.h" |
42 | |
43 | #if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC) |
44 | #define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen) |
45 | #endif |
46 | |
47 | static void |
48 | leavedb(char *name) |
49 | { |
50 | char buf[128]; |
51 | snprintf(buf, sizeof(buf), |
52 | "LEAV %s mapi:monetdb://%s:%u/" , |
53 | name, _mero_hostname, |
54 | (unsigned int)getConfNum(_mero_props, "port" )); |
55 | broadcast(buf); |
56 | } |
57 | |
58 | static void |
59 | leavedbS(sabdb *stats) |
60 | { |
61 | confkeyval *props = getDefaultProps(); |
62 | char *shared; |
63 | readProps(props, stats->path); |
64 | shared = getConfVal(props, "shared" ); |
65 | if (!stats->locked && (shared == NULL || strcmp(shared, "no" ) != 0)) |
66 | leavedb(stats->dbname); |
67 | freeConfFile(props); |
68 | free(props); |
69 | } |
70 | |
71 | static char _internal_uri_buf[256]; |
72 | static void |
73 | setURI(sabdb *stats) |
74 | { |
75 | confkeyval *props = getDefaultProps(); |
76 | char *shared; |
77 | readProps(props, stats->path); |
78 | shared = getConfVal(props, "shared" ); |
79 | if (!stats->locked && (shared == NULL || strcmp(shared, "no" ) != 0)) { |
80 | snprintf(_internal_uri_buf, sizeof(_internal_uri_buf), |
81 | "mapi:monetdb://%s:%u/%s%s%s" , |
82 | _mero_hostname, |
83 | (unsigned int)getConfNum(_mero_props, "port" ), |
84 | stats->dbname, |
85 | shared == NULL ? "" : "/" , |
86 | shared == NULL ? "" : shared); |
87 | stats->uri = _internal_uri_buf; |
88 | } |
89 | freeConfFile(props); |
90 | free(props); |
91 | } |
92 | |
93 | static void |
94 | anncdbS(sabdb *stats) |
95 | { |
96 | char buf[128]; |
97 | confkeyval *props = getDefaultProps(); |
98 | char *shared; |
99 | readProps(props, stats->path); |
100 | shared = getConfVal(props, "shared" ); |
101 | if (!stats->locked && (shared == NULL || strcmp(shared, "no" ) != 0)) { |
102 | snprintf(buf, sizeof(buf), |
103 | "ANNC %s%s%s mapi:monetdb://%s:%u/ %d" , |
104 | stats->dbname, |
105 | shared == NULL ? "" : "/" , |
106 | shared == NULL ? "" : shared, |
107 | _mero_hostname, |
108 | (unsigned int)getConfNum(_mero_props, "port" ), |
109 | getConfNum(_mero_props, "discoveryttl" ) + 60); |
110 | broadcast(buf); |
111 | } |
112 | freeConfFile(props); |
113 | free(props); |
114 | } |
115 | |
116 | inline static int |
117 | recvWithTimeout(int msgsock, stream *fdin, char *buf, size_t buflen) |
118 | { |
119 | int retval; |
120 | #ifdef HAVE_POLL |
121 | struct pollfd pfd = (struct pollfd) {.fd = msgsock, .events = POLLIN}; |
122 | |
123 | retval = poll(&pfd, 1, 1000); |
124 | #else |
125 | fd_set fds; |
126 | struct timeval tv; |
127 | |
128 | FD_ZERO(&fds); |
129 | FD_SET(msgsock, &fds); |
130 | |
131 | /* Wait up to 1 second. If a client doesn't make this, it's too slow */ |
132 | tv.tv_sec = 1; |
133 | tv.tv_usec = 0; |
134 | retval = select(msgsock + 1, &fds, NULL, NULL, &tv); |
135 | #endif |
136 | if (retval <= 0) { |
137 | /* nothing interesting has happened */ |
138 | return(-2); |
139 | } |
140 | |
141 | if (fdin != NULL) { |
142 | ssize_t ret; |
143 | /* stream.h is sooo broken :( */ |
144 | memset(buf, '\0', buflen); |
145 | ret = mnstr_read_block(fdin, buf, buflen - 1, 1); |
146 | return(ret >= 0 ? (int)strlen(buf) : mnstr_errnr(fdin) < 0 ? -1 : 0); |
147 | } else { |
148 | return(recv(msgsock, buf, buflen, 0)); |
149 | } |
150 | } |
151 | |
152 | char |
153 | control_authorise( |
154 | const char *host, |
155 | const char *chal, |
156 | const char *algo, |
157 | const char *passwd, |
158 | stream *fout) |
159 | { |
160 | char *pwd; |
161 | |
162 | if (getConfNum(_mero_props, "control" ) == 0 || |
163 | getConfVal(_mero_props, "passphrase" ) == NULL) |
164 | { |
165 | Mfprintf(_mero_ctlout, "%s: remote control disabled\n" , host); |
166 | mnstr_printf(fout, "!access denied\n" ); |
167 | mnstr_flush(fout); |
168 | return 0; |
169 | } |
170 | |
171 | pwd = mcrypt_hashPassword(algo, |
172 | getConfVal(_mero_props, "passphrase" ), chal); |
173 | if (!pwd) { |
174 | Mfprintf(_mero_ctlout, "%s: Allocation failure during authentication\n" , host); |
175 | mnstr_printf(fout, "!allocation failure\n" ); |
176 | mnstr_flush(fout); |
177 | return 0; |
178 | } |
179 | if (strcmp(pwd, passwd) != 0) { |
180 | free(pwd); |
181 | Mfprintf(_mero_ctlout, "%s: permission denied " |
182 | "(bad passphrase)\n" , host); |
183 | mnstr_printf(fout, "!access denied\n" ); |
184 | mnstr_flush(fout); |
185 | return 0; |
186 | } |
187 | free(pwd); |
188 | |
189 | mnstr_printf(fout, "=OK\n" ); |
190 | mnstr_flush(fout); |
191 | |
192 | return 1; |
193 | } |
194 | |
195 | #define send_client(P) \ |
196 | do { \ |
197 | if (fout != NULL) { \ |
198 | mnstr_printf(fout, P "%s", buf2); \ |
199 | mnstr_flush(fout); \ |
200 | } else { \ |
201 | if (send(msgsock, buf2, len, 0) < 0) \ |
202 | senderror = errno; \ |
203 | } \ |
204 | } while (0) |
205 | |
206 | #define send_list() \ |
207 | do { \ |
208 | len = snprintf(buf2, sizeof(buf2), "OK\n"); \ |
209 | if (fout == NULL) { \ |
210 | if (send(msgsock, buf2, strlen(buf2), 0) < 0 || \ |
211 | send(msgsock, pbuf, strlen(pbuf), 0) < 0) \ |
212 | senderror = errno; \ |
213 | } else { \ |
214 | char *p, *q = pbuf; \ |
215 | mnstr_printf(fout, "=OK\n"); \ |
216 | while ((p = strchr(q, '\n')) != NULL) { \ |
217 | *p++ = '\0'; \ |
218 | mnstr_printf(fout, "=%s\n", q); \ |
219 | q = p; \ |
220 | } \ |
221 | if (*q != '\0') \ |
222 | mnstr_printf(fout, "=%s\n", q); \ |
223 | mnstr_flush(fout); \ |
224 | } \ |
225 | } while (0) |
226 | |
227 | static void ctl_handle_client( |
228 | const char *origin, |
229 | int msgsock, |
230 | stream *fdin, |
231 | stream *fout) |
232 | { |
233 | /* TODO: this function may actually stall the entire client |
234 | * handler, so we should probably at some point implement a take |
235 | * over of the socket such that a separate (controlrunner) thread is |
236 | * going to handle the traffic and negotiations, instead of the |
237 | * client thread that just goes inside this program here. */ |
238 | char buf[8096]; |
239 | char buf2[8096]; |
240 | char *p, *q; |
241 | sabdb *stats; |
242 | int pos = 0; |
243 | size_t len; |
244 | err e; |
245 | int senderror = 0; |
246 | |
247 | while (_mero_keep_listening && !senderror) { |
248 | if (pos == 0) { |
249 | if ((pos = recvWithTimeout(msgsock, fdin, buf, sizeof(buf))) == 0) { |
250 | /* EOF */ |
251 | break; |
252 | } else if (pos == -1) { |
253 | /* we got interrupted ... so what? */ |
254 | if (errno == EINTR) { |
255 | pos = 0; |
256 | continue; |
257 | } |
258 | /* hmmm error ... give up */ |
259 | Mfprintf(_mero_ctlerr, "%s: error reading from control " |
260 | "channel: %s\n" , origin, strerror(errno)); |
261 | break; |
262 | } else if (pos == -2) { |
263 | Mfprintf(_mero_ctlerr, "%s: time-out reading from " |
264 | "control channel, disconnecting client\n" , origin); |
265 | break; |
266 | } else { |
267 | buf[pos] = '\0'; |
268 | pos = 0; |
269 | } |
270 | } |
271 | q = buf + pos; |
272 | p = strchr(q, '\n'); |
273 | if (p == NULL) { |
274 | /* skip, must be garbage */ |
275 | Mfprintf(_mero_ctlerr, "%s: skipping garbage on control " |
276 | "channel: %s\n" , origin, buf); |
277 | pos = 0; |
278 | continue; |
279 | } |
280 | *p++ = '\0'; |
281 | if (*p == '\0') { |
282 | pos = 0; |
283 | } else { |
284 | pos = p - buf; |
285 | } |
286 | |
287 | /* format is simple: database<space>command */ |
288 | if ((p = strchr(q, ' ')) == NULL) { |
289 | Mfprintf(_mero_ctlerr, "%s: malformed control signal: %s\n" , |
290 | origin, q); |
291 | } else { |
292 | *p++ = '\0'; |
293 | if (strcmp(p, "ping" ) == 0) { |
294 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
295 | send_client("=" ); |
296 | } else if (strcmp(p, "start" ) == 0) { |
297 | err e; |
298 | if ((e = msab_getStatus(&stats, q)) != NULL) { |
299 | len = snprintf(buf2, sizeof(buf2), |
300 | "internal error, please review the logs\n" ); |
301 | send_client("!" ); |
302 | Mfprintf(_mero_ctlerr, "%s: start: msab_getStatus: " |
303 | "%s\n" , origin, e); |
304 | freeErr(e); |
305 | continue; |
306 | } else { |
307 | if (stats == NULL) { |
308 | Mfprintf(_mero_ctlerr, "%s: received start signal " |
309 | "for database not under merovingian " |
310 | "control: %s\n" , origin, q); |
311 | len = snprintf(buf2, sizeof(buf2), |
312 | "no such database: %s\n" , q); |
313 | send_client("!" ); |
314 | continue; |
315 | } |
316 | |
317 | if (stats->state == SABdbRunning) { |
318 | Mfprintf(_mero_ctlerr, "%s: received start signal " |
319 | "for already running database: %s\n" , |
320 | origin, q); |
321 | len = snprintf(buf2, sizeof(buf2), |
322 | "database is already running: %s\n" , q); |
323 | send_client("!" ); |
324 | msab_freeStatus(&stats); |
325 | continue; |
326 | } |
327 | |
328 | msab_freeStatus(&stats); |
329 | } |
330 | if ((e = forkMserver(q, &stats, true)) != NO_ERR) { |
331 | Mfprintf(_mero_ctlerr, "%s: failed to fork mserver: " |
332 | "%s\n" , origin, getErrMsg(e)); |
333 | len = snprintf(buf2, sizeof(buf2), |
334 | "starting '%s' failed: %s\n" , |
335 | q, getErrMsg(e)); |
336 | send_client("!" ); |
337 | freeErr(e); |
338 | stats = NULL; |
339 | } else { |
340 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
341 | send_client("=" ); |
342 | Mfprintf(_mero_ctlout, "%s: started '%s'\n" , |
343 | origin, q); |
344 | } |
345 | |
346 | if (stats != NULL) |
347 | msab_freeStatus(&stats); |
348 | } else if (strcmp(p, "stop" ) == 0 || |
349 | strcmp(p, "kill" ) == 0) |
350 | { |
351 | dpair dp; |
352 | /* we need to find the right dpair, that is we |
353 | * sort of assume the control signal is right */ |
354 | pthread_mutex_lock(&_mero_topdp_lock); |
355 | dp = _mero_topdp->next; /* don't need the console/log */ |
356 | while (dp != NULL) { |
357 | if (dp->type == MERODB && strcmp(dp->dbname, q) == 0) { |
358 | if (strcmp(p, "stop" ) == 0) { |
359 | pid_t pid = dp->pid; |
360 | char *dbname = strdup(dp->dbname); |
361 | mtype type = dp->type; |
362 | pthread_mutex_unlock(&_mero_topdp_lock); |
363 | /* Try to shutdown the profiler before the DB. |
364 | * If we are unable to shutdown the profiler, we |
365 | * should still try to shutdown the server. In |
366 | * other words: ignore any errors that shutdown_profiler |
367 | * may have encountered. |
368 | */ |
369 | shutdown_profiler(dbname, &stats); |
370 | terminateProcess(pid, dbname, type, 1); |
371 | Mfprintf(_mero_ctlout, "%s: stopped " |
372 | "database '%s'\n" , origin, q); |
373 | } else { |
374 | kill(dp->pid, SIGKILL); |
375 | pthread_mutex_unlock(&_mero_topdp_lock); |
376 | Mfprintf(_mero_ctlout, "%s: killed " |
377 | "database '%s'\n" , origin, q); |
378 | } |
379 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
380 | send_client("=" ); |
381 | break; |
382 | } else if (dp->type == MEROFUN && strcmp(dp->dbname, q) == 0) { |
383 | /* multiplexDestroy needs topdp lock to remove itself */ |
384 | char *dbname = strdup(dp->dbname); |
385 | pthread_mutex_unlock(&_mero_topdp_lock); |
386 | multiplexDestroy(dbname); |
387 | free(dbname); |
388 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
389 | send_client("=" ); |
390 | break; |
391 | } |
392 | |
393 | dp = dp->next; |
394 | } |
395 | if (dp == NULL) { |
396 | pthread_mutex_unlock(&_mero_topdp_lock); |
397 | Mfprintf(_mero_ctlerr, "%s: received stop signal for " |
398 | "non running database: %s\n" , origin, q); |
399 | len = snprintf(buf2, sizeof(buf2), |
400 | "database is not running: %s\n" , q); |
401 | send_client("!" ); |
402 | } |
403 | } else if (strcmp(p, "create" ) == 0 || |
404 | strncmp(p, "create password=" , strlen("create password=" )) == 0) { |
405 | err e; |
406 | |
407 | p += strlen("create" ); |
408 | if (*p == ' ') |
409 | p += strlen(" password=" ); |
410 | |
411 | e = db_create(q); |
412 | if (e != NO_ERR) { |
413 | Mfprintf(_mero_ctlerr, "%s: failed to create " |
414 | "database '%s': %s\n" , origin, q, getErrMsg(e)); |
415 | len = snprintf(buf2, sizeof(buf2), |
416 | "%s\n" , getErrMsg(e)); |
417 | send_client("!" ); |
418 | free(e); |
419 | } else { |
420 | if (*p != '\0') { |
421 | pid_t child; |
422 | if ((child = fork()) == 0) { |
423 | FILE *secretf; |
424 | size_t len; |
425 | char *err; |
426 | char *vaultkey; |
427 | opt *set = malloc(sizeof(opt) * 2); |
428 | int setlen = 0; |
429 | char *sadbfarm; |
430 | |
431 | if ((err = msab_getDBfarm(&sadbfarm)) != NULL) { |
432 | Mfprintf(_mero_ctlerr, "%s: internal error: %s\n" , |
433 | origin, err); |
434 | exit(0); |
435 | } |
436 | snprintf(buf2, sizeof(buf2), "%s/%s" , sadbfarm, q); |
437 | free(sadbfarm); |
438 | setlen = mo_add_option(&set, setlen, opt_cmdline, "gdk_dbpath" , buf2); |
439 | setlen = mo_system_config(&set, setlen); |
440 | if (BBPaddfarm(buf2, (1 << PERSISTENT) | (1 << TRANSIENT)) != GDK_SUCCEED) { |
441 | Mfprintf(_mero_ctlerr, "%s: could not add farm to " |
442 | "'%s': %d: %s\n" , origin, q, errno, strerror(errno)); |
443 | exit(0); |
444 | } |
445 | /* the child, pollute scope by loading BBP */ |
446 | if (chdir(q) < 0) { |
447 | /* Fabian says "Ignore the output. |
448 | * The idea is that the stuff below |
449 | * will also fail, and therefore emit |
450 | * some error, but if that happens, |
451 | * the world already is in such a bad |
452 | * shape that that most likely isn't |
453 | * your biggest problem. |
454 | * Hence a (void) probably does. |
455 | * If not, a fake if. |
456 | * (exit(0) should be fine)." |
457 | * (https://www.monetdb.org/pipermail/developers-list/2014-February/004238.html) |
458 | */ |
459 | Mfprintf(_mero_ctlerr, "%s: could not chdir to " |
460 | "'%s': %d: %s\n" , origin, q, errno, strerror(errno)); |
461 | exit(0); |
462 | } |
463 | |
464 | buf2[0] = '\0'; |
465 | if ((secretf = fopen(".vaultkey" , "r" )) != NULL) { |
466 | len = fread(buf2, 1, sizeof(buf2), secretf); |
467 | buf2[len] = '\0'; |
468 | len = strlen(buf2); /* secret can contain null-bytes */ |
469 | fclose(secretf); |
470 | } |
471 | if (GDKinit(set, setlen) != GDK_SUCCEED) { |
472 | Mfprintf(_mero_ctlerr, "%s: could not " |
473 | "initialize database '%s'\n" , |
474 | origin, q); |
475 | exit(0); |
476 | } |
477 | vaultkey = buf2; |
478 | if ((err = AUTHunlockVault(vaultkey)) != NULL || |
479 | (err = AUTHinitTables(p)) != NULL) { |
480 | Mfprintf(_mero_ctlerr, "%s: could not setup " |
481 | "database '%s': %s\n" , origin, q, err); |
482 | freeException(err); |
483 | } else { |
484 | /* don't start locked */ |
485 | remove(".maintenance" ); |
486 | } |
487 | |
488 | exit(0); /* return to the parent */ |
489 | } else if (child > 0) { |
490 | /* wait for the child to finish */ |
491 | waitpid(child, NULL, 0); |
492 | } else { |
493 | Mfprintf(_mero_ctlout, "%s: forking failed\n" , |
494 | origin); |
495 | } |
496 | } |
497 | |
498 | Mfprintf(_mero_ctlout, "%s: created database '%s'\n" , |
499 | origin, q); |
500 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
501 | send_client("=" ); |
502 | } |
503 | } else if (strncmp(p, "create mfunnel=" , strlen("create mfunnel=" )) == 0) { |
504 | err e = NO_ERR; |
505 | char *r; |
506 | |
507 | /* check mfunnel definition for correctness first */ |
508 | p += strlen("create mfunnel=" ); |
509 | /* user+pass@pattern,user+pass@pattern,... */ |
510 | r = p; |
511 | do { |
512 | for (; *r != '\0' && *r != '+'; r++) |
513 | ; |
514 | if (*r == '\0' || *(r - 1) == ',' || (*(r - 1) == '=')) { |
515 | e = "missing user" ; |
516 | break; |
517 | } |
518 | for (r++; *r != '\0' && *r != '@'; r++) |
519 | ; |
520 | if (*r == '\0' || *(r - 1) == '+') { |
521 | e = "missing password" ; |
522 | break; |
523 | } |
524 | for (r++; *r != '\0' && *r != ','; r++) |
525 | ; |
526 | if (*(r - 1) == '@') { |
527 | e = "missing pattern" ; |
528 | break; |
529 | } |
530 | if (*r == '\0') |
531 | break; |
532 | r++; |
533 | } while(1); |
534 | if (e != NO_ERR) { |
535 | Mfprintf(_mero_ctlerr, "%s: invalid multiplex-funnel " |
536 | "specification '%s': %s at char %zu\n" , |
537 | origin, p, getErrMsg(e), (size_t)(r - p)); |
538 | len = snprintf(buf2, sizeof(buf2), |
539 | "invalid pattern: %s\n" , getErrMsg(e)); |
540 | send_client("!" ); |
541 | } else if ((e = db_create(q)) != NO_ERR) { |
542 | Mfprintf(_mero_ctlerr, "%s: failed to create " |
543 | "multiplex-funnel '%s': %s\n" , |
544 | origin, q, getErrMsg(e)); |
545 | len = snprintf(buf2, sizeof(buf2), |
546 | "%s\n" , getErrMsg(e)); |
547 | send_client("!" ); |
548 | free(e); |
549 | } else { |
550 | confkeyval *props = getDefaultProps(); |
551 | confkeyval *kv; |
552 | char *dbfarm; |
553 | /* write the funnel config */ |
554 | kv = findConfKey(props, "type" ); |
555 | setConfVal(kv, "mfunnel" ); |
556 | kv = findConfKey(props, "mfunnel" ); |
557 | setConfVal(kv, p); |
558 | if ((e = msab_getDBfarm(&dbfarm)) != NULL) { |
559 | Mfprintf(_mero_ctlerr, "%s: failed to retrieve " |
560 | "dbfarm: %s\n" , origin, e); |
561 | free(e); |
562 | /* try, hopefully this succeeds */ |
563 | if ((e = db_destroy(q)) != NO_ERR) { |
564 | Mfprintf(_mero_ctlerr, "%s: could not destroy: " |
565 | "%s\n" , origin, getErrMsg(e)); |
566 | free(e); |
567 | } |
568 | len = snprintf(buf2, sizeof(buf2), |
569 | "failed to prepare multiplex-funnel\n" ); |
570 | send_client("!" ); |
571 | } else { |
572 | snprintf(buf2, sizeof(buf2), "%s/%s" , dbfarm, q); |
573 | free(dbfarm); |
574 | writeProps(props, buf2); |
575 | Mfprintf(_mero_ctlout, |
576 | "%s: created multiplex-funnel '%s'\n" , |
577 | origin, q); |
578 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
579 | send_client("!" ); |
580 | } |
581 | freeConfFile(props); |
582 | free(props); |
583 | } |
584 | } else if (strcmp(p, "destroy" ) == 0) { |
585 | err e = db_destroy(q); |
586 | if (e != NO_ERR) { |
587 | Mfprintf(_mero_ctlerr, "%s: failed to destroy " |
588 | "database '%s': %s\n" , origin, q, getErrMsg(e)); |
589 | len = snprintf(buf2, sizeof(buf2), |
590 | "%s\n" , getErrMsg(e)); |
591 | send_client("!" ); |
592 | free(e); |
593 | } else { |
594 | /* we can leave without tag, will remove all, |
595 | * generates an "leave request for unknown |
596 | * database" if not shared (e.g. when under |
597 | * maintenance) */ |
598 | leavedb(q); |
599 | Mfprintf(_mero_ctlout, "%s: destroyed database '%s'\n" , |
600 | origin, q); |
601 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
602 | send_client("=" ); |
603 | } |
604 | } else if (strcmp(p, "lock" ) == 0) { |
605 | char *e = db_lock(q); |
606 | if (e != NULL) { |
607 | Mfprintf(_mero_ctlerr, "%s: failed to lock " |
608 | "database '%s': %s\n" , origin, q, getErrMsg(e)); |
609 | len = snprintf(buf2, sizeof(buf2), |
610 | "%s\n" , getErrMsg(e)); |
611 | send_client("!" ); |
612 | free(e); |
613 | } else { |
614 | /* we go under maintenance, unshare it, take |
615 | * spam if database happened to be unshared "for |
616 | * love" */ |
617 | leavedb(q); |
618 | Mfprintf(_mero_ctlout, "%s: locked database '%s'\n" , |
619 | origin, q); |
620 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
621 | send_client("=" ); |
622 | } |
623 | } else if (strcmp(p, "release" ) == 0) { |
624 | char *e = db_release(q); |
625 | if (e != NULL) { |
626 | Mfprintf(_mero_ctlerr, "%s: failed to release " |
627 | "database '%s': %s\n" , origin, q, getErrMsg(e)); |
628 | len = snprintf(buf2, sizeof(buf2), |
629 | "%s\n" , getErrMsg(e)); |
630 | send_client("!" ); |
631 | free(e); |
632 | } else { |
633 | /* announce database, but need to do it the |
634 | * right way so we don't accidentially announce |
635 | * an unshared database */ |
636 | if ((e = msab_getStatus(&stats, q)) != NULL) { |
637 | len = snprintf(buf2, sizeof(buf2), |
638 | "internal error, please review the logs\n" ); |
639 | send_client("!" ); |
640 | Mfprintf(_mero_ctlerr, "%s: release: " |
641 | "msab_getStatus: %s\n" , origin, e); |
642 | freeErr(e); |
643 | /* we need to OK regardless, as releasing |
644 | * succeed */ |
645 | } else { |
646 | anncdbS(stats); |
647 | msab_freeStatus(&stats); |
648 | } |
649 | Mfprintf(_mero_ctlout, "%s: released database '%s'\n" , |
650 | origin, q); |
651 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
652 | send_client("=" ); |
653 | } |
654 | } else if (strncmp(p, "profilerstart" , strlen("profilerstart" )) == 0) { |
655 | char *log_path = NULL; |
656 | char *e = fork_profiler(q, &stats, &log_path); |
657 | if (e != NULL) { |
658 | Mfprintf(_mero_ctlerr, "%s: failed to start the profiler " |
659 | "database '%s': %s\n" , origin, q, getErrMsg(e)); |
660 | len = snprintf(buf2, sizeof(buf2), |
661 | "%s\n" , getErrMsg(e)); |
662 | send_client("!" ); |
663 | freeErr(e); |
664 | } else { |
665 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
666 | send_client("=" ); |
667 | Mfprintf(_mero_ctlout, "%s: started profiler for '%s'\n" , |
668 | origin, q); |
669 | Mfprintf(_mero_ctlout, "%s: logs at: %s\n" , |
670 | origin, log_path); |
671 | } |
672 | msab_freeStatus(&stats); |
673 | } else if (strncmp(p, "profilerstop" , strlen("profilerstop" )) == 0) { |
674 | char *e = shutdown_profiler(q, &stats); |
675 | if (e != NULL) { |
676 | Mfprintf(_mero_ctlerr, "%s: failed to shutdown the profiler " |
677 | "database '%s': %s\n" , origin, q, getErrMsg(e)); |
678 | len = snprintf(buf2, sizeof(buf2), |
679 | "%s\n" , getErrMsg(e)); |
680 | send_client("!" ); |
681 | freeErr(e); |
682 | } else { |
683 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
684 | send_client("=" ); |
685 | Mfprintf(_mero_ctlout, "%s: profiler shut down for '%s'\n" , |
686 | origin, q); |
687 | } |
688 | msab_freeStatus(&stats); |
689 | } else if (strncmp(p, "name=" , strlen("name=" )) == 0) { |
690 | char *e; |
691 | |
692 | p += strlen("name=" ); |
693 | e = db_rename(q, p); |
694 | if (e != NULL) { |
695 | Mfprintf(_mero_ctlerr, "%s: %s\n" , origin, e); |
696 | len = snprintf(buf2, sizeof(buf2), "%s\n" , e); |
697 | send_client("!" ); |
698 | free(e); |
699 | } else { |
700 | if ((e = msab_getStatus(&stats, p)) != NULL) { |
701 | Mfprintf(_mero_ctlerr, "%s: name: msab_getStatus:" |
702 | " %s\n" , origin, e); |
703 | freeErr(e); |
704 | /* should not fail, since the rename was |
705 | * already successful */ |
706 | } else { |
707 | leavedb(q); /* could be spam, but shouldn't harm */ |
708 | anncdbS(stats); |
709 | msab_freeStatus(&stats); |
710 | } |
711 | Mfprintf(_mero_ctlout, "%s: renamed database '%s' " |
712 | "to '%s'\n" , origin, q, p); |
713 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
714 | send_client("=" ); |
715 | } |
716 | } else if (strchr(p, '=') != NULL) { /* set */ |
717 | char *val; |
718 | char doshare = 0; |
719 | |
720 | if ((e = msab_getStatus(&stats, q)) != NULL) { |
721 | len = snprintf(buf2, sizeof(buf2), |
722 | "internal error, please review the logs\n" ); |
723 | send_client("!" ); |
724 | Mfprintf(_mero_ctlerr, "%s: set: msab_getStatus: " |
725 | "%s\n" , origin, e); |
726 | freeErr(e); |
727 | continue; |
728 | } |
729 | if (stats == NULL) { |
730 | Mfprintf(_mero_ctlerr, "%s: received property signal " |
731 | "for unknown database: %s\n" , origin, q); |
732 | len = snprintf(buf2, sizeof(buf2), |
733 | "unknown database: %s\n" , q); |
734 | send_client("!" ); |
735 | continue; |
736 | } |
737 | |
738 | val = strchr(p, '='); |
739 | assert(val != NULL); /* see above */ |
740 | *val++ = '\0'; |
741 | if (*val == '\0') |
742 | val = NULL; |
743 | |
744 | if ((doshare = !strcmp(p, "shared" ))) { |
745 | /* bail out if we don't do discovery at all */ |
746 | if (getConfNum(_mero_props, "discovery" ) == 0) { |
747 | len = snprintf(buf2, sizeof(buf2), |
748 | "discovery service is globally disabled, " |
749 | "enable it first\n" ); |
750 | send_client("!" ); |
751 | Mfprintf(_mero_ctlerr, "%s: set: cannot perform " |
752 | "client share request: discovery service " |
753 | "is globally disabled\n" , origin); |
754 | msab_freeStatus(&stats); |
755 | continue; |
756 | } |
757 | |
758 | /* we're going to change the way it is shared, |
759 | * so remove it now in its old form */ |
760 | leavedbS(stats); |
761 | } else if (stats->state == SABdbRunning) { |
762 | Mfprintf(_mero_ctlerr, "%s: cannot set property '%s' " |
763 | "on running database\n" , origin, p); |
764 | len = snprintf(buf2, sizeof(buf2), |
765 | "cannot set property '%s' on running " |
766 | "database\n" , p); |
767 | send_client("!" ); |
768 | msab_freeStatus(&stats); |
769 | continue; |
770 | } |
771 | |
772 | if ((e = setProp(stats->path, p, val)) != NULL) { |
773 | if (doshare) |
774 | /* reannounce again, there was an error */ |
775 | anncdbS(stats); |
776 | Mfprintf(_mero_ctlerr, "%s: setting property failed: " |
777 | "%s\n" , origin, e); |
778 | len = snprintf(buf2, sizeof(buf2), |
779 | "%s\n" , e); |
780 | send_client("!" ); |
781 | free(e); |
782 | msab_freeStatus(&stats); |
783 | continue; |
784 | } else if (doshare) { |
785 | /* announce in new personality */ |
786 | anncdbS(stats); |
787 | } |
788 | |
789 | msab_freeStatus(&stats); |
790 | |
791 | if (val != NULL) { |
792 | Mfprintf(_mero_ctlout, "%s: set property '%s' for " |
793 | "database '%s' to '%s'\n" , origin, p, q, val); |
794 | } else { |
795 | Mfprintf(_mero_ctlout, "%s: inherited property '%s' " |
796 | "for database '%s'\n" , origin, p, q); |
797 | } |
798 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
799 | send_client("=" ); |
800 | |
801 | /* comands below this point are multi line and hence you can't |
802 | * combine them, so they disconnect the client afterwards */ |
803 | |
804 | } else if (strcmp(p, "version" ) == 0) { |
805 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
806 | send_client("=" ); |
807 | len = snprintf(buf2, sizeof(buf2), "%s (%s)\n" , |
808 | VERSION, |
809 | #ifdef MONETDB_RELEASE |
810 | MONETDB_RELEASE |
811 | #else |
812 | "unreleased" |
813 | #endif |
814 | ); |
815 | send_client("=" ); |
816 | break; |
817 | } else if (strcmp(p, "mserver" ) == 0) { |
818 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
819 | send_client("=" ); |
820 | len = snprintf(buf2, sizeof(buf2), "%s\n" , _mero_mserver); |
821 | send_client("=" ); |
822 | break; |
823 | } else if (strcmp(p, "get" ) == 0) { |
824 | confkeyval *props = getDefaultProps(); |
825 | char *pbuf; |
826 | |
827 | if (strcmp(q, "#defaults" ) == 0) { |
828 | /* send defaults to client */ |
829 | writePropsBuf(_mero_db_props, &pbuf); |
830 | send_list(); |
831 | |
832 | Mfprintf(_mero_ctlout, "%s: served default property " |
833 | "list\n" , origin); |
834 | freeConfFile(props); |
835 | free(props); |
836 | free(pbuf); |
837 | break; |
838 | } |
839 | |
840 | if ((e = msab_getStatus(&stats, q)) != NULL) { |
841 | len = snprintf(buf2, sizeof(buf2), |
842 | "internal error, please review the logs\n" ); |
843 | send_client("!" ); |
844 | Mfprintf(_mero_ctlerr, "%s: get: msab_getStatus: " |
845 | "%s\n" , origin, e); |
846 | freeErr(e); |
847 | freeConfFile(props); |
848 | free(props); |
849 | break; |
850 | } |
851 | if (stats == NULL) { |
852 | Mfprintf(_mero_ctlerr, "%s: received get signal for " |
853 | "unknown database: %s\n" , origin, q); |
854 | len = snprintf(buf2, sizeof(buf2), |
855 | "unknown database: %s\n" , q); |
856 | send_client("!" ); |
857 | freeConfFile(props); |
858 | free(props); |
859 | break; |
860 | } |
861 | |
862 | /* from here we'll always succeed, even if we don't |
863 | * send anything */ |
864 | readProps(props, stats->path); |
865 | writePropsBuf(props, &pbuf); |
866 | send_list(); |
867 | freeConfFile(props); |
868 | free(props); |
869 | free(pbuf); |
870 | msab_freeStatus(&stats); |
871 | |
872 | Mfprintf(_mero_ctlout, "%s: served property list for " |
873 | "database '%s'\n" , origin, q); |
874 | break; |
875 | } else if (strcmp(p, "status" ) == 0) { |
876 | sabdb *stats; |
877 | sabdb *topdb; |
878 | char *sdb = NULL; |
879 | |
880 | if (strcmp(q, "#all" ) == 0) |
881 | /* list all */ |
882 | q = NULL; |
883 | |
884 | /* return a list of sabdb structs for our local |
885 | * databases */ |
886 | if ((e = msab_getStatus(&stats, q)) != NULL) { |
887 | len = snprintf(buf2, sizeof(buf2), |
888 | "internal error, please review the logs\n" ); |
889 | send_client("!" ); |
890 | Mfprintf(_mero_ctlerr, "%s: status: msab_getStatus: " |
891 | "%s\n" , origin, e); |
892 | freeErr(e); |
893 | break; |
894 | } |
895 | |
896 | if (stats == NULL && q != NULL) { |
897 | Mfprintf(_mero_ctlerr, "%s: received status signal for " |
898 | "unknown database: %s\n" , origin, q); |
899 | len = snprintf(buf2, sizeof(buf2), |
900 | "unknown database: %s\n" , q); |
901 | len = snprintf(buf2, sizeof(buf2), "no such database '%s'\n" , q); |
902 | send_client("!" ); |
903 | break; |
904 | } |
905 | |
906 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
907 | if (fout == NULL) { |
908 | if (send(msgsock, buf2, len, 0) < 0) |
909 | senderror = errno; |
910 | } else { |
911 | mnstr_printf(fout, "=%s" , buf2); |
912 | } |
913 | |
914 | for (topdb = stats; stats != NULL; stats = stats->next) { |
915 | /* set uri */ |
916 | setURI(stats); |
917 | /* currently never fails (just crashes) */ |
918 | if ((e = msab_serialise(&sdb, stats)) != NULL) |
919 | break; |
920 | stats->uri = NULL; |
921 | len = snprintf(buf2, sizeof(buf2), "%s\n" , sdb); |
922 | if (fout == NULL) { |
923 | if (send(msgsock, buf2, len, 0) < 0) |
924 | senderror = errno; |
925 | } else { |
926 | mnstr_printf(fout, "=%s" , buf2); |
927 | } |
928 | free(sdb); |
929 | } |
930 | if (e != NULL) { |
931 | len = snprintf(buf2, sizeof(buf2), |
932 | "internal error, please review the logs\n" ); |
933 | send_client("!" ); |
934 | Mfprintf(_mero_ctlerr, "%s: status: msab_getStatus: " |
935 | "%s\n" , origin, e); |
936 | msab_freeStatus(&topdb); |
937 | freeErr(e); |
938 | break; |
939 | } |
940 | |
941 | if (fout != NULL) |
942 | mnstr_flush(fout); |
943 | |
944 | if (q == NULL) { |
945 | Mfprintf(_mero_ctlout, "%s: served status list\n" , |
946 | origin); |
947 | } else { |
948 | Mfprintf(_mero_ctlout, "%s: returned status for " |
949 | "'%s'\n" , origin, q); |
950 | } |
951 | |
952 | msab_freeStatus(&topdb); |
953 | break; |
954 | } else if (strcmp(q, "anelosimus" ) == 0 && |
955 | strcmp(p, "eximius" ) == 0) |
956 | { |
957 | /* return a list of remote databases from our Aranita */ |
958 | remotedb rdb; |
959 | |
960 | pthread_mutex_lock(&_mero_remotedb_lock); |
961 | |
962 | /* this never fails */ |
963 | len = snprintf(buf2, sizeof(buf2), "OK\n" ); |
964 | if (fout == NULL) { |
965 | if (send(msgsock, buf2, len, 0) < 0) |
966 | senderror = errno; |
967 | } else { |
968 | mnstr_printf(fout, "=%s" , buf2); |
969 | } |
970 | |
971 | rdb = _mero_remotedbs; |
972 | while (rdb != NULL && !senderror) { |
973 | len = snprintf(buf2, sizeof(buf2), "%s\t%s\n" , |
974 | rdb->fullname, |
975 | rdb->conn); |
976 | if (fout == NULL) { |
977 | if (send(msgsock, buf2, len, 0) < 0) |
978 | senderror = errno; |
979 | } else { |
980 | mnstr_printf(fout, "=%s" , buf2); |
981 | } |
982 | rdb = rdb->next; |
983 | } |
984 | |
985 | if (fout != NULL) |
986 | mnstr_flush(fout); |
987 | |
988 | pthread_mutex_unlock(&_mero_remotedb_lock); |
989 | |
990 | Mfprintf(_mero_ctlout, "%s: served neighbour list\n" , |
991 | origin); |
992 | break; |
993 | } else { |
994 | Mfprintf(_mero_ctlerr, "%s: unknown control command: %s\n" , |
995 | origin, p); |
996 | len = snprintf(buf2, sizeof(buf2), |
997 | "unknown command: %s\n" , p); |
998 | send_client("!" ); |
999 | break; |
1000 | } |
1001 | } |
1002 | } |
1003 | if (senderror) |
1004 | Mfprintf(_mero_ctlerr, "%s: error sending to control " |
1005 | "channel: %s\n" , origin, strerror(senderror)); |
1006 | } |
1007 | |
1008 | void |
1009 | control_handleclient(const char *host, int sock, stream *fdin, stream *fout) |
1010 | { |
1011 | ctl_handle_client(host, sock, fdin, fout); |
1012 | } |
1013 | |
1014 | static void * |
1015 | handle_client(void *p) |
1016 | { |
1017 | int msgsock = * (int *) p; |
1018 | |
1019 | free(p); |
1020 | ctl_handle_client("(local)" , msgsock, NULL, NULL); |
1021 | shutdown(msgsock, SHUT_RDWR); |
1022 | closesocket(msgsock); |
1023 | return NULL; |
1024 | } |
1025 | |
1026 | void * |
1027 | controlRunner(void *d) |
1028 | { |
1029 | int usock = *(int *)d; |
1030 | int retval; |
1031 | #ifdef HAVE_POLL |
1032 | struct pollfd pfd; |
1033 | #else |
1034 | fd_set fds; |
1035 | struct timeval tv; |
1036 | #endif |
1037 | int msgsock; |
1038 | pthread_t tid; |
1039 | int *p; |
1040 | |
1041 | do { |
1042 | if ((p = malloc(sizeof(int))) == NULL) { |
1043 | Mfprintf(_mero_ctlerr, "malloc failed" ); |
1044 | break; |
1045 | } |
1046 | #ifdef HAVE_POLL |
1047 | pfd = (struct pollfd) {.fd = usock, .events = POLLIN}; |
1048 | retval = poll(&pfd, 1, 1000); |
1049 | #else |
1050 | FD_ZERO(&fds); |
1051 | FD_SET(usock, &fds); |
1052 | |
1053 | /* limit waiting time in order to check whether we need to exit */ |
1054 | tv.tv_sec = 1; |
1055 | tv.tv_usec = 0; |
1056 | retval = select(usock + 1, &fds, NULL, NULL, &tv); |
1057 | #endif |
1058 | if (retval == 0) { |
1059 | /* nothing interesting has happened */ |
1060 | continue; |
1061 | } |
1062 | if (retval == -1) { |
1063 | continue; |
1064 | } |
1065 | |
1066 | #ifdef HAVE_POLL |
1067 | if ((pfd.revents & POLLIN) == 0) |
1068 | continue; |
1069 | #else |
1070 | if (!FD_ISSET(usock, &fds)) { |
1071 | continue; |
1072 | } |
1073 | #endif |
1074 | |
1075 | if ((msgsock = accept4(usock, (SOCKPTR) 0, (socklen_t *) 0, SOCK_CLOEXEC)) == -1) { |
1076 | if (_mero_keep_listening == 0) |
1077 | break; |
1078 | if (errno != EINTR) { |
1079 | Mfprintf(_mero_ctlerr, "error during accept: %s" , |
1080 | strerror(errno)); |
1081 | } |
1082 | continue; |
1083 | } |
1084 | #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4)) |
1085 | (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC); |
1086 | #endif |
1087 | |
1088 | *p = msgsock; |
1089 | if (pthread_create(&tid, NULL, handle_client, p) != 0) |
1090 | closesocket(msgsock); |
1091 | else |
1092 | pthread_detach(tid); |
1093 | } while (_mero_keep_listening); |
1094 | shutdown(usock, SHUT_RDWR); |
1095 | closesocket(usock); |
1096 | Mfprintf(_mero_ctlout, "control channel closed\n" ); |
1097 | return NULL; |
1098 | } |
1099 | |
1100 | /* vim:set ts=4 sw=4 noexpandtab: */ |
1101 | |