1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include <string.h> /* str* */
11#include <sys/types.h>
12#include <sys/socket.h>
13#include <netdb.h>
14#ifdef HAVE_POLL_H
15#include <poll.h>
16#endif
17#include <fcntl.h>
18#include <time.h>
19
20#include "msabaoth.h"
21#include "utils/glob.h"
22#include "utils/utils.h"
23#include "utils/properties.h"
24
25#include "merovingian.h"
26#include "multiplex-funnel.h"
27#include "discoveryrunner.h"
28
29
30/* list of remote databases as discovered */
31remotedb _mero_remotedbs = NULL;
32/* lock to _mero_remotedbs */
33pthread_mutex_t _mero_remotedb_lock = PTHREAD_MUTEX_INITIALIZER;
34
35void
36broadcast(char *msg)
37{
38 int len = strlen(msg) + 1;
39 if (_mero_broadcastsock < 0)
40 return;
41 if (sendto(_mero_broadcastsock, msg, len, 0,
42 (struct sockaddr *)&_mero_broadcastaddr,
43 sizeof(_mero_broadcastaddr)) != len)
44 Mfprintf(_mero_discerr, "error while sending broadcast "
45 "message: %s\n", strerror(errno));
46}
47
48static int
49removeRemoteDB(const char *dbname, const char *conn)
50{
51 remotedb rdb;
52 remotedb prv;
53 char hadmatch = 0;
54
55 pthread_mutex_lock(&_mero_remotedb_lock);
56
57 prv = NULL;
58 rdb = _mero_remotedbs;
59 while (rdb != NULL) {
60 /* look for the database, and verify that its "conn"
61 * (merovingian) is the same */
62 if (strcmp(dbname, rdb->dbname) == 0 &&
63 strcmp(conn, rdb->conn) == 0)
64 {
65 /* found, let's remove */
66 if (prv == NULL) {
67 _mero_remotedbs = rdb->next;
68 } else {
69 prv->next = rdb->next;
70 }
71
72 /* inform multiplex-funnels about this removal */
73 multiplexNotifyRemovedDB(rdb->fullname);
74
75 Mfprintf(_mero_discout,
76 "removed neighbour database %s%s\n",
77 conn, rdb->fullname);
78 free(rdb->dbname);
79 free(rdb->conn);
80 free(rdb->fullname);
81 free(rdb);
82 rdb = prv;
83 hadmatch = 1;
84 /* in the future, there may be more, so keep looking */
85 }
86 prv = rdb;
87 if (rdb == NULL) {
88 rdb = _mero_remotedbs;
89 } else {
90 rdb = rdb->next;
91 }
92 }
93
94 pthread_mutex_unlock(&_mero_remotedb_lock);
95
96 return(hadmatch);
97}
98
99static int
100addRemoteDB(const char *dbname, const char *conn, const int ttl) {
101 remotedb rdb;
102 remotedb prv;
103 char *tag;
104
105 pthread_mutex_lock(&_mero_remotedb_lock);
106
107 if (_mero_remotedbs == NULL) {
108 rdb = _mero_remotedbs = malloc(sizeof(struct _remotedb));
109 } else {
110 prv = NULL;
111 rdb = _mero_remotedbs;
112 while (rdb != NULL) {
113 if (strcmp(dbname, rdb->fullname) == 0 &&
114 strcmp(conn, rdb->conn) == 0)
115 {
116 /* refresh ttl */
117 rdb->ttl = time(NULL) + ttl;
118 rdb = prv;
119 break;
120 }
121 prv = rdb;
122 rdb = rdb->next;
123 }
124 if (rdb == prv) {
125 pthread_mutex_unlock(&_mero_remotedb_lock);
126 return(0);
127 }
128 rdb = prv->next = malloc(sizeof(struct _remotedb));
129 }
130 rdb->fullname = strdup(dbname);
131 rdb->dbname = strdup(dbname);
132 if ((tag = strchr(rdb->dbname, '/')) != NULL)
133 *tag++ = '\0';
134 rdb->tag = tag;
135 rdb->conn = strdup(conn);
136 rdb->ttl = time(NULL) + ttl;
137 rdb->next = NULL;
138
139 pthread_mutex_unlock(&_mero_remotedb_lock);
140
141 /* inform multiplex-funnels about this addition */
142 multiplexNotifyAddedDB(rdb->fullname);
143
144 return(1);
145}
146
147sabdb *
148getRemoteDB(char *database)
149{
150 struct _remotedb dummy = { NULL, NULL, NULL, NULL, 0, NULL };
151 remotedb rdb = NULL;
152 remotedb pdb = NULL;
153 remotedb down = NULL;
154 sabdb *walk = NULL;
155 sabdb *stats = NULL;
156 size_t dbsize = strlen(database);
157 char *mdatabase = malloc(sizeof(char) * (dbsize + 2 + 1));
158 char mfullname[8096]; /* should be enough for everyone... */
159
160 /* each request has an implicit /'* (without ') added to match
161 * all sub-levels to the request, such that a request for e.g. X
162 * will return X/level1/level2/... */
163 memcpy(mdatabase, database, dbsize + 1);
164 if (dbsize <= 2 ||
165 mdatabase[dbsize - 2] != '/' ||
166 mdatabase[dbsize - 1] != '*')
167 {
168 mdatabase[dbsize++] = '/';
169 mdatabase[dbsize++] = '*';
170 mdatabase[dbsize++] = '\0';
171 }
172
173 /* check the remote databases, in private */
174 pthread_mutex_lock(&_mero_remotedb_lock);
175
176 dummy.next = _mero_remotedbs;
177 rdb = dummy.next;
178 pdb = &dummy;
179 while (rdb != NULL) {
180 snprintf(mfullname, sizeof(mfullname), "%s/", rdb->fullname);
181 if (db_glob(mdatabase, mfullname) == 1) {
182 /* create a fake sabdb struct, chain where necessary */
183 if (walk != NULL) {
184 walk = walk->next = malloc(sizeof(sabdb));
185 } else {
186 walk = stats = malloc(sizeof(sabdb));
187 }
188 walk->dbname = strdup(rdb->dbname);
189 walk->path = walk->dbname; /* only freed by sabaoth */
190 walk->locked = false;
191 walk->state = SABdbRunning;
192 walk->scens = malloc(sizeof(sablist));
193 walk->scens->val = strdup("sql");
194 walk->scens->next = NULL;
195 walk->conns = malloc(sizeof(sablist));
196 walk->conns->val = strdup(rdb->conn);
197 walk->conns->next = NULL;
198 walk->uri = NULL;
199 walk->next = NULL;
200 walk->uplog = NULL;
201
202 /* cut out first returned entry, put it down the list
203 * later, as to implement a round-robin DNS-like
204 * algorithm */
205 if (down == NULL) {
206 down = rdb;
207 if (pdb->next == _mero_remotedbs) {
208 _mero_remotedbs = pdb->next = rdb->next;
209 } else {
210 pdb->next = rdb->next;
211 }
212 rdb->next = NULL;
213 rdb = pdb;
214 }
215 }
216 pdb = rdb;
217 rdb = rdb->next;
218 }
219
220 if (down != NULL)
221 pdb->next = down;
222
223 pthread_mutex_unlock(&_mero_remotedb_lock);
224
225 free(mdatabase);
226
227 return(stats);
228}
229
230typedef struct _disc_message_tap {
231 int fd;
232 struct _disc_message_tap *next;
233} *disc_message_tap;
234
235/* list of hooks for incoming messages */
236static disc_message_tap _mero_disc_msg_taps = NULL;
237
238void
239registerMessageTap(int fd)
240{
241 disc_message_tap h;
242 /* make sure we never block in the main loop below because we can't
243 * write to the pipe */
244 (void) fcntl(fd, F_SETFD, O_NONBLOCK);
245 pthread_mutex_lock(&_mero_remotedb_lock);
246 h = _mero_disc_msg_taps;
247 if (h == NULL) {
248 h = malloc(sizeof(struct _disc_message_tap));
249 _mero_disc_msg_taps = h;
250 } else {
251 for (; h->next != NULL; h = h->next)
252 ;
253 h = h->next = malloc(sizeof(struct _disc_message_tap));
254 }
255 h->next = NULL;
256 h->fd = fd;
257 pthread_mutex_unlock(&_mero_remotedb_lock);
258}
259
260void
261unregisterMessageTap(int fd)
262{
263 disc_message_tap h, lasth;
264 pthread_mutex_lock(&_mero_remotedb_lock);
265 h = _mero_disc_msg_taps;
266 for (lasth = NULL; h != NULL; lasth = h, h = h->next) {
267 if (h->fd == fd) {
268 if (lasth == NULL) {
269 _mero_disc_msg_taps = h->next;
270 } else {
271 lasth->next = h->next;
272 }
273 free(h);
274 break;
275 }
276 }
277 pthread_mutex_unlock(&_mero_remotedb_lock);
278}
279
280void *
281discoveryRunner(void *d)
282{
283 int sock = *(int *)d;
284 int s = -1;
285 struct sockaddr_storage peer_addr;
286 socklen_t peer_addr_len;
287#ifdef HAVE_POLL
288 struct pollfd pfd;
289#else
290 fd_set fds;
291 struct timeval tv;
292#endif
293 /* avoid first announce, the HELO will cause an announce when it's
294 * received by ourself */
295 time_t deadline = 1;
296 time_t now = 0;
297 int forceannc = 0;
298 sabdb *orig;
299 sabdb *stats;
300 confkeyval *ckv;
301 confkeyval *kv;
302 confkeyval *discttl;
303 err e;
304 remotedb rdb;
305 remotedb prv;
306 char *val;
307
308 ssize_t nread;
309 char buf[512]; /* our packages should be pretty small */
310 char host[128];
311 char service[8];
312
313 /* start shouting around that we're here ;) request others to tell
314 * what databases they have */
315 snprintf(buf, 512, "HELO %s", _mero_hostname);
316 broadcast(buf);
317
318 ckv = getDefaultProps();
319 discttl = findConfKey(_mero_props, "discoveryttl");
320
321 /* main loop */
322 while (_mero_keep_listening == 1) {
323 now = time(NULL);
324 /* do a round of announcements, we're ahead of the ttl because
325 * when we announce, we add 60 seconds to avoid a "gap" */
326 if (forceannc == 1 || deadline <= now) {
327 forceannc = 0;
328 /* set new deadline */
329 deadline = now + discttl->ival;
330
331 /* list all known databases */
332 if ((e = msab_getStatus(&stats, NULL)) != NULL) {
333 Mfprintf(_mero_discerr, "msab_getStatus error: %s, "
334 "discovery services disabled\n", e);
335 free(e);
336 free(ckv);
337 closesocket(sock);
338 return NULL;
339 }
340
341 for (orig = stats; stats != NULL; stats = stats->next) {
342 readProps(ckv, stats->path);
343 kv = findConfKey(ckv, "shared");
344 val = kv->val == NULL ? "" : kv->val;
345 /* skip databases under maintenance */
346 if (strcmp(val, "no") != 0 && !stats->locked) {
347 /* craft ANNC message for this db */
348 if (strcmp(val, "yes") == 0)
349 val = "";
350 snprintf(buf, 512, "ANNC %s%s%s mapi:monetdb://%s:%u/ %d",
351 stats->dbname, val[0] == '\0' ? "" : "/", val,
352 _mero_hostname, (unsigned int)getConfNum(_mero_props, "port"),
353 discttl->ival + 60);
354 broadcast(buf);
355 }
356 freeConfFile(ckv);
357 }
358
359 if (orig != NULL)
360 msab_freeStatus(&orig);
361
362 if (getConfNum(_mero_props, "control") != 0) {
363 /* announce control port */
364 snprintf(buf, 512, "ANNC * %s:%u %d",
365 _mero_hostname, (unsigned int)getConfNum(_mero_props, "port"),
366 discttl->ival + 60);
367 /* coverity[string_null] */
368 broadcast(buf);
369 }
370 }
371
372 /* do a round to see if we have to cleanup anything (expired
373 * ttl) */
374 pthread_mutex_lock(&_mero_remotedb_lock);
375
376 prv = NULL;
377 rdb = _mero_remotedbs;
378 while (rdb != NULL) {
379 if (rdb->ttl > 0 && rdb->ttl <= now) {
380 /* expired, let's remove */
381 if (prv == NULL) {
382 _mero_remotedbs = rdb->next;
383 } else {
384 prv->next = rdb->next;
385 }
386 Mfprintf(_mero_discout, "neighbour database %s%s "
387 "has expired\n", rdb->conn, rdb->fullname);
388 free(rdb->dbname);
389 free(rdb->conn);
390 free(rdb->fullname);
391 free(rdb);
392 break;
393 }
394 prv = rdb;
395 rdb = rdb->next;
396 }
397
398 pthread_mutex_unlock(&_mero_remotedb_lock);
399
400 peer_addr_len = sizeof(struct sockaddr_storage);
401 /* Wait up to 5 seconds. */
402 for (s = 0; s < 5; s++) {
403#ifdef HAVE_POLL
404 pfd = (struct pollfd) {.fd = sock, .events = POLLIN};
405 nread = poll(&pfd, 1, 1000);
406#else
407 FD_ZERO(&fds);
408 FD_SET(sock, &fds);
409 tv.tv_sec = 1;
410 tv.tv_usec = 0;
411 nread = select(sock + 1, &fds, NULL, NULL, &tv);
412#endif
413 if (nread != 0)
414 break;
415 if (!_mero_keep_listening)
416 goto breakout;
417 }
418 if (nread <= 0) { /* assume only failure is EINTR */
419 /* nothing interesting has happened */
420 buf[0] = '\0';
421 continue;
422 }
423 nread = recvfrom(sock, buf, 512, 0,
424 (struct sockaddr *)&peer_addr, &peer_addr_len);
425 if (nread == -1) {
426 buf[0] = '\0';
427 continue; /* ignore failed request */
428 }
429
430 s = getnameinfo((struct sockaddr *)&peer_addr,
431 peer_addr_len, host, 128,
432 service, 8, NI_NUMERICSERV);
433 if (s != 0) {
434 Mfprintf(_mero_discerr, "cannot retrieve name info: %s\n",
435 gai_strerror(s));
436 continue; /* skip this message */
437 }
438
439 /* ignore messages from broadcast interface */
440 if (strcmp(host, "0.0.0.0") == 0)
441 continue;
442 /* forward messages not coming from ourself to all routes that
443 * are active */
444 if (strcmp(host, _mero_hostname) != 0) {
445 disc_message_tap h = _mero_disc_msg_taps;
446 for (; h != NULL; h = h->next) {
447 if (write(h->fd, buf, nread) == -1) {
448 /* really nothing to be done here, since this is
449 * best effort stuff, keep the condition to keep
450 * fortification warnings off */
451 }
452 }
453 }
454
455 if (strncmp(buf, "HELO ", 5) == 0) {
456 /* HELLO message, respond with current databases */
457 Mfprintf(_mero_discout, "new neighbour %s (%s)\n", buf + 5, host);
458 /* sleep a random amount of time to avoid an avalanche of
459 * ANNC messages flooding the network */
460#ifndef STATIC_CODE_ANALYSIS /* hide rand() from Coverity */
461 sleep_ms(1 + (int)(2500.0 * (rand() / (RAND_MAX + 1.0))));
462#endif
463 /* force an announcement round by dropping the deadline */
464 forceannc = 1;
465 continue;
466 } else if (strncmp(buf, "LEAV ", 5) == 0) {
467 /* LEAVE message, unregister database */
468 char *sp = NULL;
469 char *dbname;
470 char *conn;
471
472 strtok_r(buf, " ", &sp); /* discard the msg type */
473 dbname = strtok_r(NULL, " ", &sp);
474 conn = strtok_r(NULL, " ", &sp);
475
476 if (dbname == NULL || conn == NULL)
477 continue;
478
479 if (removeRemoteDB(dbname, conn) == 0)
480 Mfprintf(_mero_discout,
481 "received leave request for unknown database "
482 "%s%s from %s\n", conn, dbname, host);
483 } else if (strncmp(buf, "ANNC ", 5) == 0) {
484 /* ANNOUNCE message, register database */
485 char *sp = NULL;
486 char *dbname;
487 char *conn;
488 char *ttl;
489
490 strtok_r(buf, " ", &sp); /* discard the msg type */
491 dbname = strtok_r(NULL, " ", &sp);
492 conn = strtok_r(NULL, " ", &sp);
493 ttl = strtok_r(NULL, " ", &sp);
494
495 if (dbname == NULL || conn == NULL || ttl == NULL)
496 continue;
497
498 if (addRemoteDB(dbname, conn, atoi(ttl)) == 1) {
499 if (strcmp(dbname, "*") == 0) {
500 Mfprintf(_mero_discout, "registered neighbour %s\n",
501 conn);
502 } else {
503 Mfprintf(_mero_discout, "new database "
504 "%s%s (ttl=%ss)\n",
505 conn, dbname, ttl);
506 }
507 }
508 } else {
509 Mfprintf(_mero_discout, "ignoring unknown message from "
510 "%s:%s: '%s'\n", host, service, buf);
511 }
512 }
513 breakout:
514
515 shutdown(sock, SHUT_WR);
516 closesocket(sock);
517
518 /* now notify of imminent absence ;) */
519
520 /* list all known databases */
521 if ((e = msab_getStatus(&stats, NULL)) != NULL) {
522 Mfprintf(_mero_discerr, "msab_getStatus error: %s, "
523 "discovery services disabled\n", e);
524 free(e);
525 free(ckv);
526 return NULL;
527 }
528
529 /* craft LEAV messages for each db */
530 orig = stats;
531 while (stats != NULL) {
532 readProps(ckv, stats->path);
533 kv = findConfKey(ckv, "shared");
534 if (kv->val != NULL && strcmp(kv->val, "no") != 0) {
535 snprintf(buf, 512, "LEAV %s mapi:monetdb://%s:%u/",
536 stats->dbname, _mero_hostname,
537 (unsigned int)getConfNum(_mero_props, "port"));
538 broadcast(buf);
539 }
540 freeConfFile(ckv);
541 stats = stats->next;
542 }
543
544 if (orig != NULL)
545 msab_freeStatus(&orig);
546
547 /* deregister this merovingian, so it doesn't remain a stale entry */
548 if (getConfNum(_mero_props, "control") != 0) {
549 snprintf(buf, 512, "LEAV * %s:%u",
550 _mero_hostname, (unsigned int)getConfNum(_mero_props, "port"));
551 broadcast(buf);
552 }
553
554 free(ckv);
555 return NULL;
556}
557
558/* vim:set ts=4 sw=4 noexpandtab: */
559