1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include <sys/types.h>
11#include <sys/un.h>
12#include <sys/stat.h>
13#include <signal.h>
14#include <unistd.h>
15#include <string.h> /* char ** */
16#include <time.h> /* localtime */
17
18#include "msabaoth.h"
19#include "utils/utils.h"
20#include "utils/glob.h"
21#include "utils/properties.h"
22#include "mutils.h"
23
24#include "merovingian.h"
25#include "discoveryrunner.h" /* remotedb */
26#include "multiplex-funnel.h" /* multiplexInit */
27#include "forkmserver.h"
28
29static pthread_mutex_t fork_lock = PTHREAD_MUTEX_INITIALIZER;
30
31/**
32 * The terminateProcess function tries to let the given mserver process
33 * shut down gracefully within a given time-out. If that fails, it
34 * sends the deadly SIGKILL signal to the mserver process and returns.
35 */
36void
37terminateProcess(pid_t pid, char *dbname, mtype type, int lock)
38{
39 sabdb *stats;
40 char *er;
41 int i;
42 confkeyval *kv;
43
44 if (lock)
45 pthread_mutex_lock(&fork_lock);
46
47 er = msab_getStatus(&stats, dbname);
48 if (er != NULL) {
49 if (lock)
50 pthread_mutex_unlock(&fork_lock);
51 Mfprintf(stderr, "cannot terminate process %lld: %s\n",
52 (long long int)pid, er);
53 free(er);
54 free(dbname);
55 return;
56 }
57
58 if (stats == NULL) {
59 if (lock)
60 pthread_mutex_unlock(&fork_lock);
61 Mfprintf(stderr, "strange, process %lld serves database '%s' "
62 "which does not exist\n", (long long int)pid, dbname);
63 free(dbname);
64 return;
65 }
66
67 switch (stats->state) {
68 case SABdbRunning:
69 /* ok, what we expect */
70 break;
71 case SABdbCrashed:
72 if (lock)
73 pthread_mutex_unlock(&fork_lock);
74 Mfprintf(stderr, "cannot shut down database '%s', mserver "
75 "(pid %lld) has crashed\n",
76 dbname, (long long int)pid);
77 msab_freeStatus(&stats);
78 free(dbname);
79 return;
80 case SABdbInactive:
81 if (lock)
82 pthread_mutex_unlock(&fork_lock);
83 Mfprintf(stdout, "database '%s' appears to have shut down already\n",
84 dbname);
85 fflush(stdout);
86 msab_freeStatus(&stats);
87 free(dbname);
88 return;
89 case SABdbStarting:
90 Mfprintf(stderr, "database '%s' appears to be starting up\n",
91 dbname);
92 /* starting up, so we'll go to the shut down phase */
93 break;
94 default:
95 if (lock)
96 pthread_mutex_unlock(&fork_lock);
97 Mfprintf(stderr, "unknown state: %d\n", (int)stats->state);
98 msab_freeStatus(&stats);
99 free(dbname);
100 return;
101 }
102
103 if (type == MEROFUN) {
104 if (lock)
105 pthread_mutex_unlock(&fork_lock);
106 multiplexDestroy(dbname);
107 msab_freeStatus(&stats);
108 free(dbname);
109 return;
110 } else if (type != MERODB) {
111 /* barf */
112 if (lock)
113 pthread_mutex_unlock(&fork_lock);
114 Mfprintf(stderr, "cannot stop merovingian process role: %s\n", dbname);
115 msab_freeStatus(&stats);
116 free(dbname);
117 return;
118 }
119
120 /* ok, once we get here, we'll be shutting down the server */
121 Mfprintf(stdout, "sending process %lld (database '%s') the "
122 "TERM signal\n", (long long int)pid, dbname);
123 kill(pid, SIGTERM);
124 kv = findConfKey(_mero_props, "exittimeout");
125 for (i = 0; i < atoi(kv->val) * 2; i++) {
126 if (stats != NULL)
127 msab_freeStatus(&stats);
128 sleep_ms(500);
129 er = msab_getStatus(&stats, dbname);
130 if (er != NULL) {
131 Mfprintf(stderr, "unexpected problem: %s\n", er);
132 free(er);
133 /* don't die, just continue, so we KILL in the end */
134 } else if (stats == NULL) {
135 Mfprintf(stderr, "hmmmm, database '%s' suddenly doesn't exist "
136 "any more\n", dbname);
137 } else {
138 switch (stats->state) {
139 case SABdbRunning:
140 case SABdbStarting:
141 /* ok, try again */
142 break;
143 case SABdbCrashed:
144 if (lock)
145 pthread_mutex_unlock(&fork_lock);
146 Mfprintf (stderr, "database '%s' crashed after SIGTERM\n",
147 dbname);
148 msab_freeStatus(&stats);
149 free(dbname);
150 return;
151 case SABdbInactive:
152 if (lock)
153 pthread_mutex_unlock(&fork_lock);
154 Mfprintf(stdout, "database '%s' has shut down\n", dbname);
155 fflush(stdout);
156 msab_freeStatus(&stats);
157 free(dbname);
158 return;
159 default:
160 Mfprintf(stderr, "unknown state: %d\n", (int)stats->state);
161 break;
162 }
163 }
164 }
165 Mfprintf(stderr, "timeout of %s seconds expired, sending process %lld"
166 " (database '%s') the KILL signal\n",
167 kv->val, (long long int)pid, dbname);
168 kill(pid, SIGKILL);
169 msab_freeStatus(&stats);
170 free(dbname);
171 if (lock)
172 pthread_mutex_unlock(&fork_lock);
173}
174
175/**
176 * Fork an mserver and detach. Before forking off, Sabaoth is consulted
177 * to see if forking makes sense, or whether it is necessary at all, or
178 * forbidden by restart policy, e.g. when in maintenance.
179 */
180
181#define MAX_NR_ARGS 511
182
183err
184forkMserver(char *database, sabdb** stats, bool force)
185{
186 pid_t pid;
187 char *er;
188 sabuplog info;
189 struct tm *t;
190 char tstr[20];
191 int pfdo[2];
192 int pfde[2];
193 dpair dp;
194 char vaultkey[512];
195 struct stat statbuf;
196 char upmin[8];
197 char upavg[8];
198 char upmax[8];
199 confkeyval *ckv, *kv, *list;
200 SABdbState state;
201 char *sabdbfarm;
202 char dbpath[1024];
203 char dbextra_path[1024];
204 char port[32];
205 char listenaddr[512];
206 char muri[512]; /* possibly undersized */
207 char usock[512];
208 bool mydoproxy;
209 char nthreads[32];
210 char nclients[32];
211 char pipeline[512];
212 char memmaxsize[64];
213 char vmmaxsize[64];
214 char *readonly = NULL;
215 char *embeddedr = NULL;
216 char *embeddedpy = NULL;
217 char *embeddedc = NULL;
218 char *ipv6 = NULL;
219 char *dbextra = NULL;
220 char *mserver5_extra = NULL;
221 char *mserver5_extra_token = NULL;
222 char *argv[MAX_NR_ARGS+1]; /* for the exec arguments */
223 char property_other[1024];
224 int c = 0;
225 unsigned int mport;
226
227 er = msab_getStatus(stats, database);
228 if (er != NULL) {
229 err e = newErr("%s", er);
230 free(er);
231 return(e);
232 }
233
234 /* NOTE: remotes also include locals through self announcement */
235 if (*stats == NULL) {
236 *stats = getRemoteDB(database);
237
238 if (*stats != NULL)
239 return(NO_ERR);
240
241 return(newErr("no such database: %s", database));
242 }
243
244 /* Since we ask for a specific database, it should be either there
245 * or not there. Since we checked the latter case above, it should
246 * just be there, and be the right one. There also shouldn't be
247 * more than one entry in the list, so we assume we have the right
248 * one here. */
249
250 if ((*stats)->state == SABdbRunning)
251 /* return before doing expensive stuff, when this db just seems
252 * to be running */
253 return(NO_ERR);
254
255 /* Make sure we only start one mserver5 at the same time, this is a
256 * horsedrug for preventing race-conditions where two or more
257 * clients start the same database at the same time, because they
258 * were all identified as being SABdbInactive. If this "global"
259 * lock ever becomes a problem, we can reduce it to a per-database
260 * lock instead. */
261 pthread_mutex_lock(&fork_lock);
262
263 /* refetch the status, as it may have changed */
264 msab_freeStatus(stats);
265 er = msab_getStatus(stats, database);
266 if (er != NULL) {
267 err e = newErr("%s", er);
268 free(er);
269 pthread_mutex_unlock(&fork_lock);
270 return(e);
271 }
272
273 ckv = getDefaultProps();
274 readAllProps(ckv, (*stats)->path);
275 kv = findConfKey(ckv, "type");
276 if (kv->val == NULL)
277 kv = findConfKey(_mero_db_props, "type");
278
279 if ((*stats)->locked) {
280 if (!force) {
281 Mfprintf(stdout, "%s '%s' is under maintenance\n",
282 kv->val, database);
283 freeConfFile(ckv);
284 free(ckv);
285 pthread_mutex_unlock(&fork_lock);
286 return(NO_ERR);
287 } else {
288 Mfprintf(stdout, "startup of %s under maintenance "
289 "'%s' forced\n", kv->val, database);
290 }
291 }
292
293 /* retrieve uplog information to print a short conclusion */
294 er = msab_getUplogInfo(&info, *stats);
295 if (er != NULL) {
296 err e = newErr("could not retrieve uplog information: %s", er);
297 free(er);
298 msab_freeStatus(stats);
299 freeConfFile(ckv);
300 free(ckv);
301 pthread_mutex_unlock(&fork_lock);
302 return(e);
303 }
304
305 switch ((*stats)->state) {
306 case SABdbRunning:
307 freeConfFile(ckv);
308 free(ckv);
309 pthread_mutex_unlock(&fork_lock);
310 return(NO_ERR);
311 case SABdbCrashed:
312 t = localtime(&info.lastcrash);
313 strftime(tstr, sizeof(tstr), "%Y-%m-%d %H:%M:%S", t);
314 secondsToString(upmin, info.minuptime, 1);
315 secondsToString(upavg, info.avguptime, 1);
316 secondsToString(upmax, info.maxuptime, 1);
317 Mfprintf(stdout, "%s '%s' has crashed after start on %s, "
318 "attempting restart, "
319 "up min/avg/max: %s/%s/%s, "
320 "crash average: %d.00 %.2f %.2f (%d-%d=%d)\n",
321 kv->val, database, tstr,
322 upmin, upavg, upmax,
323 info.crashavg1, info.crashavg10, info.crashavg30,
324 info.startcntr, info.stopcntr, info.crashcntr);
325 break;
326 case SABdbInactive:
327 secondsToString(upmin, info.minuptime, 1);
328 secondsToString(upavg, info.avguptime, 1);
329 secondsToString(upmax, info.maxuptime, 1);
330 Mfprintf(stdout, "starting %s '%s', "
331 "up min/avg/max: %s/%s/%s, "
332 "crash average: %d.00 %.2f %.2f (%d-%d=%d)\n",
333 kv->val, database,
334 upmin, upavg, upmax,
335 info.crashavg1, info.crashavg10, info.crashavg30,
336 info.startcntr, info.stopcntr, info.crashcntr);
337 break;
338 default:
339 /* this also includes SABdbStarting, which we shouldn't ever
340 * see due to the global starting lock */
341 state = (*stats)->state;
342 msab_freeStatus(stats);
343 freeConfFile(ckv);
344 free(ckv);
345 pthread_mutex_unlock(&fork_lock);
346 return(newErr("unknown or impossible state: %d",
347 (int)state));
348 }
349
350 /* create the pipes (filedescriptors) now, such that we and the
351 * child have the same descriptor set */
352 if (pipe(pfdo) == -1) {
353 int e = errno;
354 msab_freeStatus(stats);
355 freeConfFile(ckv);
356 free(ckv);
357 pthread_mutex_unlock(&fork_lock);
358 return(newErr("unable to create pipe: %s", strerror(e)));
359 }
360 if (pipe(pfde) == -1) {
361 int e = errno;
362 close(pfdo[0]);
363 close(pfdo[1]);
364 msab_freeStatus(stats);
365 freeConfFile(ckv);
366 free(ckv);
367 pthread_mutex_unlock(&fork_lock);
368 return(newErr("unable to create pipe: %s", strerror(e)));
369 }
370
371 /* a multiplex-funnel means starting a separate thread */
372 if (strcmp(kv->val, "mfunnel") == 0) {
373 FILE *f1, *f2;
374 /* create a dpair entry */
375 pthread_mutex_lock(&_mero_topdp_lock);
376
377 dp = _mero_topdp;
378 while (dp->next != NULL)
379 dp = dp->next;
380 dp = dp->next = malloc(sizeof(struct _dpair));
381 dp->out = pfdo[0];
382 dp->err = pfde[0];
383 dp->next = NULL;
384 dp->type = MEROFUN;
385 dp->pid = getpid();
386 dp->dbname = strdup(database);
387 dp->flag = 0;
388
389 pthread_mutex_unlock(&_mero_topdp_lock);
390
391 kv = findConfKey(ckv, "mfunnel");
392 if(!(f1 = fdopen(pfdo[1], "a"))) {
393 freeConfFile(ckv);
394 free(ckv);
395 pthread_mutex_unlock(&fork_lock);
396 return newErr("Failed to open file descriptor\n");
397 }
398 if(!(f2 = fdopen(pfde[1], "a"))) {
399 fclose(f1);
400 freeConfFile(ckv);
401 free(ckv);
402 pthread_mutex_unlock(&fork_lock);
403 return newErr("Failed to open file descriptor\n");
404 }
405 if ((er = multiplexInit(database, kv->val, f1, f2)) != NO_ERR) {
406 Mfprintf(stderr, "failed to create multiplex-funnel: %s\n",
407 getErrMsg(er));
408 freeConfFile(ckv);
409 free(ckv);
410 pthread_mutex_unlock(&fork_lock);
411 return(er);
412 }
413 freeConfFile(ckv);
414 free(ckv);
415
416 /* refresh stats, now we will have a connection registered */
417 msab_freeStatus(stats);
418 er = msab_getStatus(stats, database);
419 pthread_mutex_unlock(&fork_lock);
420 if (er != NULL) {
421 /* since the client mserver lives its own life anyway,
422 * it's not really a problem we exit here */
423 err e = newErr("%s", er);
424 free(er);
425 return(e);
426 }
427 return(NO_ERR);
428 }
429
430 /* check if the vaultkey is there, otherwise abort early (value
431 * lateron reused when server is started) */
432 snprintf(vaultkey, sizeof(vaultkey), "%s/.vaultkey", (*stats)->path);
433 if (stat(vaultkey, &statbuf) == -1) {
434 msab_freeStatus(stats);
435 freeConfFile(ckv);
436 free(ckv);
437 pthread_mutex_unlock(&fork_lock);
438 return(newErr("cannot start database '%s': no .vaultkey found "
439 "(did you create the database with `monetdb create %s`?)",
440 database, database));
441 }
442
443 er = msab_getDBfarm(&sabdbfarm);
444 if (er != NULL) {
445 freeConfFile(ckv);
446 free(ckv);
447 pthread_mutex_unlock(&fork_lock);
448 return(er);
449 }
450
451 mydoproxy = strcmp(getConfVal(_mero_props, "forward"), "proxy") == 0;
452
453 kv = findConfKey(ckv, "nthreads");
454 if (kv->val == NULL)
455 kv = findConfKey(_mero_db_props, "nthreads");
456 if (kv->val != NULL) {
457 snprintf(nthreads, sizeof(nthreads), "gdk_nr_threads=%s", kv->val);
458 } else {
459 nthreads[0] = '\0';
460 }
461
462 kv = findConfKey(ckv, "nclients");
463 if (kv->val == NULL)
464 kv = findConfKey(_mero_db_props, "nclients");
465 if (kv->val != NULL) {
466 snprintf(nclients, sizeof(nclients), "max_clients=%s", kv->val);
467 } else {
468 nclients[0] = '\0';
469 }
470
471 kv = findConfKey(ckv, "optpipe");
472 if (kv->val == NULL)
473 kv = findConfKey(_mero_db_props, "optpipe");
474 if (kv->val != NULL) {
475 snprintf(pipeline, sizeof(pipeline), "sql_optimizer=%s", kv->val);
476 } else {
477 pipeline[0] = '\0';
478 }
479
480 kv = findConfKey(ckv, "memmaxsize");
481 if (kv->val != NULL) {
482 snprintf(memmaxsize, sizeof(memmaxsize), "gdk_mem_maxsize=%s", kv->val);
483 } else {
484 memmaxsize[0] = '\0';
485 }
486
487 kv = findConfKey(ckv, "vmmaxsize");
488 if (kv->val != NULL) {
489 snprintf(vmmaxsize, sizeof(vmmaxsize), "gdk_vm_maxsize=%s", kv->val);
490 } else {
491 vmmaxsize[0] = '\0';
492 }
493
494 kv = findConfKey(ckv, "readonly");
495 if (kv->val != NULL && strcmp(kv->val, "no") != 0)
496 readonly = "--readonly";
497
498 kv = findConfKey(ckv, "embedr");
499 if (kv->val != NULL && strcmp(kv->val, "no") != 0)
500 embeddedr = "embedded_r=true";
501
502 kv = findConfKey(ckv, "embedpy");
503 if (kv->val != NULL && strcmp(kv->val, "no") != 0)
504 embeddedpy = "embedded_py=2";
505
506 kv = findConfKey(ckv, "embedpy3");
507 if (kv->val != NULL && strcmp(kv->val, "no") != 0) {
508 if (embeddedpy) {
509 // only one python version can be active at a time
510 freeConfFile(ckv);
511 free(ckv);
512 pthread_mutex_unlock(&fork_lock);
513 free(sabdbfarm);
514 return newErr("attempting to start mserver with both embedded python2 and embedded python3; only one python version can be active at a time\n");
515 }
516 embeddedpy = "embedded_py=3";
517 }
518 kv = findConfKey(ckv, "embedc");
519 if (kv->val != NULL && strcmp(kv->val, "no") != 0)
520 embeddedc = "embedded_c=true";
521 kv = findConfKey(ckv, "dbextra");
522 if (kv != NULL && kv->val != NULL) {
523 dbextra = kv->val;
524 }
525
526 kv = findConfKey(ckv, "listenaddr");
527 if (kv->val != NULL) {
528 if (mydoproxy) {
529 // listenaddr is only available on forwarding method
530 freeConfFile(ckv);
531 free(ckv);
532 pthread_mutex_unlock(&fork_lock);
533 free(sabdbfarm);
534 return newErr("attempting to start mserver with listening address while being proxied by monetdbd; this option is only possible on forward method\n");
535 }
536 snprintf(listenaddr, sizeof(listenaddr), "mapi_listenaddr=%s", kv->val);
537 } else {
538 listenaddr[0] = '\0';
539 }
540 mport = (unsigned int)getConfNum(_mero_props, "port");
541 ipv6 = getConfNum(_mero_props, "ipv6") == 1 ? "mapi_ipv6=true" : "mapi_ipv6=false";
542
543 /* ok, now exec that mserver we want */
544 snprintf(dbpath, sizeof(dbpath),
545 "--dbpath=%s/%s", sabdbfarm, database);
546 free(sabdbfarm);
547 snprintf(vaultkey, sizeof(vaultkey),
548 "monet_vault_key=%s/.vaultkey", (*stats)->path);
549 snprintf(muri, sizeof(muri),
550 "merovingian_uri=mapi:monetdb://%s:%u/%s",
551 _mero_hostname, mport, database);
552 argv[c++] = _mero_mserver;
553 argv[c++] = dbpath;
554 argv[c++] = "--set"; argv[c++] = muri;
555 if (dbextra != NULL) {
556 snprintf(dbextra_path, sizeof(dbextra_path),
557 "--dbextra=%s", dbextra);
558 argv[c++] = dbextra_path;
559 }
560 if (mydoproxy) {
561 struct sockaddr_un s; /* only for sizeof(s.sun_path) :( */
562 argv[c++] = "--set"; argv[c++] = "mapi_open=false";
563 /* we "proxy", so we can just solely use UNIX domain sockets
564 * internally. Before we hit our head, check if we can
565 * actually use a UNIX socket (due to pathlength) */
566 if (strlen((*stats)->path) + 11 < sizeof(s.sun_path)) {
567 snprintf(port, sizeof(port), "mapi_port=0");
568 snprintf(usock, sizeof(usock), "mapi_usock=%s/.mapi.sock",
569 (*stats)->path);
570 } else {
571 argv[c++] = "--set"; argv[c++] = "mapi_autosense=true";
572 /* for logic here, see comment below */
573 snprintf(port, sizeof(port), "mapi_port=%u", mport + 1);
574 snprintf(usock, sizeof(usock), "mapi_usock=");
575 }
576 } else {
577 if (listenaddr[0] != '\0') {
578 argv[c++] = "--set"; argv[c++] = listenaddr;
579 } else {
580 argv[c++] = "--set"; argv[c++] = "mapi_open=true";
581 }
582 argv[c++] = "--set"; argv[c++] = "mapi_autosense=true";
583 /* avoid this mserver binding to the same port as merovingian
584 * but on another interface, (INADDR_ANY ... sigh) causing
585 * endless redirects since 0.0.0.0 is not a valid address to
586 * connect to, and hence the hostname is advertised instead */
587 snprintf(port, sizeof(port), "mapi_port=%u", mport + 1);
588 snprintf(usock, sizeof(usock), "mapi_usock=");
589 }
590 argv[c++] = "--set"; argv[c++] = ipv6;
591 argv[c++] = "--set"; argv[c++] = port;
592 argv[c++] = "--set"; argv[c++] = usock;
593 argv[c++] = "--set"; argv[c++] = vaultkey;
594 if (nthreads[0] != '\0') {
595 argv[c++] = "--set"; argv[c++] = nthreads;
596 }
597 if (nclients[0] != '\0') {
598 argv[c++] = "--set"; argv[c++] = nclients;
599 }
600 if (pipeline[0] != '\0') {
601 argv[c++] = "--set"; argv[c++] = pipeline;
602 }
603 if (memmaxsize[0] != '\0') {
604 argv[c++] = "--set"; argv[c++] = memmaxsize;
605 }
606 if (vmmaxsize[0] != '\0') {
607 argv[c++] = "--set"; argv[c++] = vmmaxsize;
608 }
609 if (embeddedr != NULL) {
610 argv[c++] = "--set"; argv[c++] = embeddedr;
611 }
612 if (embeddedpy != NULL) {
613 argv[c++] = "--set"; argv[c++] = embeddedpy;
614 }
615 if (embeddedc != NULL) {
616 argv[c++] = "--set"; argv[c++] = embeddedc;
617 }
618 if (readonly != NULL) {
619 argv[c++] = readonly;
620 }
621 /* get the rest (non-default) mserver props set in the conf file */
622 list = ckv;
623 while (list->key != NULL) {
624 if (list->val != NULL && !defaultProperty(list->key)) {
625 argv[c++] = "--set";
626 snprintf(property_other, sizeof(property_other), "%s=%s", list->key, list->val);
627 argv[c++] = strdup(property_other);
628 }
629 list++;
630 }
631
632 /* Let's get extra mserver5 args from the environment */
633 mserver5_extra = getenv("MSERVER5_EXTRA_ARGS");
634 while (c < MAX_NR_ARGS && (mserver5_extra_token = strsep(&mserver5_extra, " ")))
635 argv[c++] = mserver5_extra_token;
636
637 argv[c++] = NULL;
638
639 freeConfFile(ckv);
640 free(ckv); /* can make ckv static and reuse it all the time */
641
642 /* make sure no entries are shot while adding and that we
643 * deliver a consistent state */
644 pthread_mutex_lock(&_mero_topdp_lock);
645
646 pid = fork();
647 if (pid == 0) {
648 /* redirect stdout and stderr to a new pair of fds for
649 * logging help */
650 ssize_t write_error; /* to avoid compiler warning */
651 int dup_err;
652 close(pfdo[0]);
653 dup_err = dup2(pfdo[1], 1);
654 close(pfdo[1]);
655
656 close(pfde[0]);
657 if(dup_err == -1)
658 perror("dup2");
659 dup_err = dup2(pfde[1], 2);
660 close(pfde[1]);
661 if(dup_err == -1)
662 perror("dup2");
663
664 write_error = write(1, "arguments:", 10);
665 for (c = 0; argv[c] != NULL; c++) {
666 /* very stupid heuristic to make copy/paste easier from
667 * merovingian's log */
668 int q = strchr(argv[c], ' ') != NULL;
669 write_error |= write(1, " \"", 1 + q);
670 write_error |= write(1, argv[c], strlen(argv[c]));
671 if (q)
672 write_error |= write(1, "\"", 1);
673 }
674 write_error |= write(1, "\n", 1);
675 if (write_error < 0)
676 perror("write");
677
678 execv(_mero_mserver, argv);
679 /* if the exec returns, it is because of a failure */
680 perror("executing failed");
681 exit(1);
682 } else if (pid > 0) {
683 /* parent: fine, let's add the pipes for this child */
684 dp = _mero_topdp;
685 while (dp->next != NULL)
686 dp = dp->next;
687 dp = dp->next = malloc(sizeof(struct _dpair));
688 dp->out = pfdo[0];
689 close(pfdo[1]);
690 dp->err = pfde[0];
691 close(pfde[1]);
692 dp->next = NULL;
693 dp->type = MERODB;
694 dp->pid = pid;
695 dp->dbname = strdup(database);
696 dp->flag = 0;
697
698 pthread_mutex_unlock(&_mero_topdp_lock);
699
700 /* wait for the child to finish starting, at some point we
701 * decided that we should wait indefinitely here because if the
702 * mserver needs time to start up, we shouldn't interrupt it,
703 * and if it hangs, we're just doomed, with the drawback that we
704 * completely kill the functionality of monetdbd too */
705 do {
706 /* give the database a break */
707 sleep_ms(500);
708
709 /* in the meanwhile, if the server has stopped, it will
710 * have been removed from the dpair list, so check if
711 * it's still there. */
712 pthread_mutex_lock(&_mero_topdp_lock);
713 dp = _mero_topdp;
714 while (dp != NULL && dp->pid != pid)
715 dp = dp->next;
716 pthread_mutex_unlock(&_mero_topdp_lock);
717
718 /* stats cannot be NULL, as we don't allow starting non
719 * existing databases, note that we need to run this loop at
720 * least once not to leak */
721 msab_freeStatus(stats);
722 er = msab_getStatus(stats, database);
723 if (er != NULL) {
724 /* since the client mserver lives its own life anyway,
725 * it's not really a problem we exit here */
726 err e = newErr("%s", er);
727 free(er);
728 pthread_mutex_unlock(&fork_lock);
729 return(e);
730 }
731
732 /* server doesn't run, no need to wait any longer */
733 if (dp == NULL)
734 break;
735 } while ((*stats)->state != SABdbRunning);
736
737 /* check if the SQL scenario was loaded */
738 if (dp != NULL && (*stats)->state == SABdbRunning &&
739 (*stats)->conns != NULL &&
740 (*stats)->conns->val != NULL &&
741 (*stats)->scens != NULL &&
742 (*stats)->scens->val != NULL) {
743 sablist *scen = (*stats)->scens;
744 do {
745 if (scen->val != NULL && strcmp(scen->val, "sql") == 0)
746 break;
747 } while ((scen = scen->next) != NULL);
748 if (scen == NULL) {
749 /* we don't know what it's doing, but we don't like it
750 * any case, so kill it */
751 terminateProcess(pid, strdup(database), MERODB, 0);
752 msab_freeStatus(stats);
753 pthread_mutex_unlock(&fork_lock);
754 return(newErr("database '%s' did not initialise the sql "
755 "scenario", database));
756 }
757 } else if (dp != NULL) {
758 terminateProcess(pid, strdup(database), MERODB, 0);
759 msab_freeStatus(stats);
760 pthread_mutex_unlock(&fork_lock);
761 return(newErr(
762 "database '%s' started up, but failed to open up "
763 "a communication channel", database));
764 }
765
766 pthread_mutex_unlock(&fork_lock);
767
768 /* try to be clear on why starting the database failed */
769 if (dp == NULL) {
770 state = (*stats)->state;
771
772 /* starting failed */
773 msab_freeStatus(stats);
774
775 switch ((int)state) {
776 case SABdbRunning:
777 /* right, it's not there, but it's running */
778 return(newErr(
779 "database '%s' has inconsistent state "
780 "(sabaoth administration reports running, "
781 "but process seems gone), "
782 "review monetdbd's "
783 "logfile (%s) for any peculiarities", database,
784 getConfVal(_mero_props, "logfile")));
785 case SABdbCrashed:
786 return(newErr(
787 "database '%s' has crashed after starting, "
788 "manual intervention needed, "
789 "check monetdbd's logfile (%s) for details",
790 database, getConfVal(_mero_props, "logfile")));
791 case SABdbInactive:
792 return(newErr(
793 "database '%s' appears to shut "
794 "itself down after starting, "
795 "check monetdbd's logfile (%s) for possible "
796 "hints", database,
797 getConfVal(_mero_props, "logfile")));
798 case SABdbStarting:
799 return(newErr(
800 "database '%s' has inconsistent state "
801 "(sabaoth administration reports starting up, "
802 "but process seems gone), "
803 "review monetdbd's "
804 "logfile (%s) for any peculiarities", database,
805 getConfVal(_mero_props, "logfile")));
806 default:
807 return(newErr("unknown state: %d", (int)state));
808 }
809 }
810
811 if ((*stats)->locked) {
812 Mfprintf(stdout, "database '%s' has been put into maintenance "
813 "mode during startup\n", database);
814 }
815
816 return(NO_ERR);
817 }
818 int e = errno;
819 pthread_mutex_unlock(&_mero_topdp_lock);
820
821 /* forking failed somehow, cleanup the pipes */
822 close(pfdo[0]);
823 close(pfdo[1]);
824 close(pfde[0]);
825 close(pfde[1]);
826 pthread_mutex_unlock(&fork_lock);
827 return(newErr("%s", strerror(e)));
828}
829
830#define BUFLEN 1024
831
832/**
833 * Fork stethoscope and detatch, after performing sanity checks. The assumption
834 * is that each mserver5 process can have at most one stethoscope process
835 * attached to it.
836 */
837err
838fork_profiler(char *dbname, sabdb **stats, char **log_path)
839{
840 pid_t pid;
841 char *error = NO_ERR;
842 char *pidfilename = NULL;
843 confkeyval *ckv = NULL, *kv;
844 size_t pidfnlen;
845 FILE *pidfile;
846 char *profiler_executable;
847 char *beat_frequency = NULL;
848 char *tmp_exe;
849 struct stat path_info;
850 int error_code;
851
852 error = msab_getStatus(stats, dbname);
853 if (error != NULL) {
854 return error;
855 }
856
857 if (*stats == NULL) {
858 /* TODO: What now? */
859 error = newErr("Null stats for db %s", dbname);
860 return error;
861 }
862
863 /* Find the profiler executable. The mserver is running as
864 * /path/to/installation/mserver5
865 * and the profiler executable should be:
866 * /path/to/installation/stethoscope
867 */
868 tmp_exe = strdup(_mero_mserver);
869 if (tmp_exe == NULL) {
870 error = newErr("Cannot find the profiler executable");
871 return error;
872 } else {
873 char *server_filename = "mserver5";
874 char *profiler_filename = "stethoscope";
875 char *s = strstr(tmp_exe, server_filename);
876 size_t executable_len = 0;
877
878 if (s == NULL || strncmp(s, server_filename, strlen(server_filename)) != 0) {
879 error = newErr("Unexpected executable (missing the string \"%s\")", server_filename);
880 free(tmp_exe);
881 return error;
882 }
883
884 executable_len = strlen(tmp_exe) + strlen(profiler_filename) - strlen(server_filename) + 1;
885 *s = '\0';
886 profiler_executable = malloc(executable_len);
887 snprintf(profiler_executable, executable_len, "%s%s%s",
888 tmp_exe, profiler_filename, s + 8);
889 free(tmp_exe);
890 if (stat(profiler_executable, &path_info) == -1) {
891 error = newErr("Cannot find profiler executable");
892 goto cleanup;
893 }
894 /* free(tmp_exe); */
895 }
896
897 pthread_mutex_lock(&fork_lock);
898
899 /* Verify that the requested db is running */
900 if ((*stats)->state != SABdbRunning) {
901 /* server is not running, shoo */
902 error = newErr("Database %s is not running.", dbname);
903 goto cleanup;
904 }
905
906 /* find the path that the profiler will be storing files */
907 ckv = getDefaultProps();
908 readAllProps(ckv, (*stats)->path);
909 kv = findConfKey(ckv, PROFILERBEATFREQ);
910 if (kv) {
911 beat_frequency = kv->val;
912 }
913 kv = findConfKey(ckv, PROFILERLOGPROPERTY);
914
915 if (kv == NULL) {
916 error = newErr("Property '"PROFILERLOGPROPERTY"' not set for db %s\n",
917 dbname);
918 goto cleanup;
919 }
920
921 *log_path = strdup(kv->val);
922
923 /* Check that the log_path exists and create it if it does not */
924 error_code = stat(*log_path, &path_info);
925 if (error_code == -1) { /* stat failed */
926 if (errno == ENOENT) { /* dir does not exist, create it */
927 mode_t mode = 0755;
928 if (mkdir(*log_path, mode) == -1) { /* mkdir failed, bail out */
929 char error_message[BUFSIZ];
930 if (strerror_r(errno, error_message, BUFSIZ) != 0)
931 strcpy(error_message, "unknown error");
932 error = newErr("%s", error_message);
933 free(*log_path);
934 *log_path = NULL;
935 goto cleanup;
936 }
937 } else { /* Something else went wrong, can't handle the heat */
938 char error_message[BUFSIZ];
939 if (strerror_r(errno, error_message, BUFSIZ) != 0)
940 strcpy(error_message, "unknown error");
941 error = newErr("%s", error_message);
942 free(*log_path);
943 *log_path = NULL;
944 goto cleanup;
945 }
946 } else { /* stat succeeded */
947 if(!S_ISDIR(path_info.st_mode)) { /* file exists but is not a directory, bail out */
948 error = newErr("File %s exists but is not a directory.", *log_path);
949 free(*log_path);
950 *log_path = NULL;
951 goto cleanup;
952 }
953 }
954
955 /* construct the filename of the pid file */
956 pidfnlen = strlen(*log_path) + strlen("/profiler.pid") + 1;
957 pidfilename = malloc(pidfnlen);
958 if (pidfilename == NULL) {
959 error = newErr("Cannot allocate buffer while starting profiler");
960 goto cleanup;
961 }
962 snprintf(pidfilename, pidfnlen, "%s/profiler.pid", *log_path);
963
964 /* Make sure another instance of stethoscope is not running. */
965 error_code = stat(pidfilename, &path_info);
966 if (error_code != -1) {
967 char buf[8];
968 long pid;
969 /* The pid file exists. See if a process with this pid exists,
970 * and if yes, if it is stethoscope.
971 */
972
973 /* We cannot open the pidfile, bail out */
974 if ((pidfile = fopen(pidfilename, "r")) == NULL) {
975 error = newErr("pid file %s already exists, but is not accessible. Is the profiler already running?",
976 pidfilename);
977 free(*log_path);
978 *log_path = NULL;
979 goto cleanup;
980 }
981
982 if (fgets(buf, sizeof(buf), pidfile) == NULL) {
983 error = newErr("cannot read from pid file %s: %s\n",
984 pidfilename, strerror(errno));
985 fclose(pidfile);
986 free(*log_path);
987 *log_path = NULL;
988 goto cleanup;
989 }
990 fclose(pidfile);
991
992 /* Verify that what we read is actually a number */
993 errno = 0;
994 pid = strtol(buf, NULL, 10);
995 if (errno != 0) {
996 error = newErr("contents of the pid file are not correct: %s\n",
997 strerror(errno));
998 free(*log_path);
999 *log_path = NULL;
1000 goto cleanup;
1001 }
1002
1003 // Open /proc/<pid>/comm and compare the contents to "stethoscope"
1004 // This of course is specific to Linux
1005 size_t fn_size = strlen("/proc/comm") + 9;
1006 char *filename = malloc(fn_size);
1007 if (filename == NULL) {
1008 error = newErr("cannot allocate %zu bytes: %s\n",
1009 fn_size, strerror(errno));
1010 free(*log_path);
1011 *log_path = NULL;
1012 goto cleanup;
1013 }
1014 snprintf(filename, fn_size, "/proc/%ld/comm", pid);
1015
1016 FILE *comm = fopen(filename, "r");
1017 if (comm == NULL) {
1018 /* We cannot open the file for the process with the specified pid,
1019 * so the process is not running.
1020 */
1021 free(filename);
1022 goto startup;
1023 }
1024 char buf2[BUFLEN];
1025 size_t len = fread(buf2, 1, BUFLEN, comm);
1026
1027 if(ferror(comm)) {
1028 error = newErr("cannot read from file %s\n", filename);
1029 free(filename);
1030 free(*log_path);
1031 fclose(comm);
1032 *log_path = NULL;
1033 goto cleanup;
1034 }
1035 if (len == BUFLEN)
1036 len--;
1037 buf2[len] = 0;
1038
1039 char expected_command[] = "stethoscope";
1040 size_t command_len = strlen(expected_command);
1041 if (strncmp(buf2, expected_command, command_len) == 0) {
1042 error = newErr("profiler already running for %s\n", dbname);
1043 free(filename);
1044 free(*log_path);
1045 fclose(comm);
1046 *log_path = NULL;
1047 goto cleanup;
1048 }
1049
1050 fclose(comm);
1051 free(filename);
1052 }
1053
1054 startup:
1055 /* Open the pid file */
1056 if ((pidfile = fopen(pidfilename, "w")) == NULL) {
1057 error = newErr("unable to open %s for writing", pidfilename);
1058 free(*log_path);
1059 *log_path = NULL;
1060 goto cleanup;
1061 }
1062
1063 pid = fork();
1064 if (pid == 0) {
1065 char timestamp[20];
1066 char *argv[512];
1067 int arg_idx = 0;
1068 size_t log_filename_len;
1069 char *log_filename;
1070 time_t current_time;
1071 struct tm *tm_ctime;
1072
1073 fclose(pidfile);
1074 /* construct the log output file */
1075 log_filename_len = strlen(*log_path) + strlen("/proflog_") + strlen(dbname) + 26;
1076 log_filename = malloc(log_filename_len);
1077 if (log_filename == NULL) {
1078 /* TODO What now? */
1079 Mfprintf(stderr, "failed to allocate buffer\n");
1080 exit(1);
1081 }
1082 current_time = time(NULL);
1083 tm_ctime = localtime(&current_time);
1084 strftime(timestamp, sizeof(timestamp), "%Y-%m-%d_%H:%M:%S", tm_ctime);
1085 snprintf(log_filename, log_filename_len, "%s/proflog_%s_%s.json",
1086 *log_path, dbname, timestamp);
1087
1088 /* build the arguments */
1089 argv[arg_idx++] = profiler_executable;
1090 argv[arg_idx++] = "-j"; /* JSON output */
1091 argv[arg_idx++] = "-d";
1092 argv[arg_idx++] = dbname;
1093 if (beat_frequency) {
1094 argv[arg_idx++] = "-b";
1095 argv[arg_idx++] = beat_frequency;
1096 }
1097 argv[arg_idx++] = "-o";
1098 argv[arg_idx++] = log_filename;
1099 /* execute */
1100 execv(profiler_executable, argv);
1101 exit(1);
1102 } else {
1103 /* write pid of stethoscope */
1104 Mfprintf(pidfile, "%d", (int)pid);
1105 fclose(pidfile);
1106 }
1107
1108 cleanup:
1109 freeConfFile(ckv);
1110 free(ckv);
1111 free(profiler_executable);
1112 free(pidfilename);
1113 pthread_mutex_unlock(&fork_lock);
1114 return error;
1115}
1116
1117err
1118shutdown_profiler(char *dbname, sabdb **stats)
1119{
1120 err error=NO_ERR;
1121 confkeyval *ckv = NULL, *kv;
1122 size_t pidfnlen = 0;
1123 char *pidfilename = NULL;
1124 FILE *pidfile;
1125 char buf[BUFSIZ];
1126 size_t nbytes;
1127 pid_t pid;
1128
1129 error = msab_getStatus(stats, dbname);
1130 if (error != NULL) {
1131 return error;
1132 }
1133
1134 if (*stats == NULL) {
1135 /* TODO: What now? */
1136 error = newErr("Null stats for db %s", dbname);
1137 return error;
1138 }
1139
1140 /* Verify that the requested db is running */
1141 if ((*stats)->state != SABdbRunning) {
1142 /* server is not running, shoo */
1143 error = newErr("Database %s is not running.", dbname);
1144 goto cleanup;
1145 }
1146
1147 /* Find the pid file and make sure the profiler is running */
1148 ckv = getDefaultProps();
1149 readAllProps(ckv, (*stats)->path);
1150 kv = findConfKey(ckv, PROFILERLOGPROPERTY);
1151
1152 if (kv == NULL) {
1153 error = newErr("Property '"PROFILERLOGPROPERTY"' not set for db %s\n",
1154 dbname);
1155 goto cleanup;
1156 }
1157
1158 /* construct the filename of the pid file */
1159 pidfnlen = strlen(kv->val) + strlen("/profiler.pid") + 1;
1160 pidfilename = malloc(pidfnlen);
1161 if (pidfilename == NULL) {
1162 error = newErr("Cannot allocate buffer while shutting down of profiler");
1163 goto cleanup;
1164 }
1165 snprintf(pidfilename, pidfnlen, "%s/profiler.pid", kv->val);
1166
1167 if ((pidfile = fopen(pidfilename, "r")) == NULL) {
1168 error = newErr("Unable to open %s for reading", pidfilename);
1169 goto cleanup;
1170 }
1171
1172 clearerr(pidfile);
1173 nbytes = fread(buf, 1, BUFSIZ, pidfile);
1174
1175 if (ferror(pidfile)) {
1176 error = newErr("Cannot read pid (%s) from file %s", buf, pidfilename);
1177 fclose(pidfile);
1178 goto cleanup;
1179 }
1180 fclose(pidfile);
1181
1182 if (buf[nbytes - 1] == '\n') {
1183 buf[nbytes - 1] = '\0';
1184 }
1185
1186 pid = (pid_t)strtol(buf, NULL, 10);
1187 if (pid == 0 && errno == EINVAL) {
1188 error = newErr("File contents %s not a valid pid", buf);
1189 goto cleanup;
1190 }
1191
1192 if (kill(pid, SIGTERM) == -1) {
1193 char error_message[BUFSIZ];
1194 if (strerror_r(errno, error_message, BUFSIZ) != 0)
1195 strcpy(error_message, "unknown error");
1196 error = newErr("%s", error_message);
1197 goto cleanup;
1198 }
1199
1200 /* All went well. Remove the pidfile */
1201 if (remove(pidfilename) != 0) {
1202 error = newErr("Profiler seems to have stopped, but cannot remove pid file.");
1203 }
1204
1205 cleanup:
1206 freeConfFile(ckv);
1207 free(pidfilename);
1208 return error;
1209}
1210
1211/* vim:set ts=4 sw=4 noexpandtab: */
1212