1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | #include "monetdb_config.h" |
10 | #include <string.h> /* str* */ |
11 | #include <sys/types.h> |
12 | #include <sys/socket.h> |
13 | #include <netdb.h> |
14 | #ifdef HAVE_POLL_H |
15 | #include <poll.h> |
16 | #endif |
17 | #include <fcntl.h> |
18 | #include <time.h> |
19 | |
20 | #include "msabaoth.h" |
21 | #include "utils/glob.h" |
22 | #include "utils/utils.h" |
23 | #include "utils/properties.h" |
24 | |
25 | #include "merovingian.h" |
26 | #include "multiplex-funnel.h" |
27 | #include "discoveryrunner.h" |
28 | |
29 | |
30 | /* list of remote databases as discovered */ |
31 | remotedb _mero_remotedbs = NULL; |
32 | /* lock to _mero_remotedbs */ |
33 | pthread_mutex_t _mero_remotedb_lock = PTHREAD_MUTEX_INITIALIZER; |
34 | |
35 | void |
36 | broadcast(char *msg) |
37 | { |
38 | int len = strlen(msg) + 1; |
39 | if (_mero_broadcastsock < 0) |
40 | return; |
41 | if (sendto(_mero_broadcastsock, msg, len, 0, |
42 | (struct sockaddr *)&_mero_broadcastaddr, |
43 | sizeof(_mero_broadcastaddr)) != len) |
44 | Mfprintf(_mero_discerr, "error while sending broadcast " |
45 | "message: %s\n" , strerror(errno)); |
46 | } |
47 | |
48 | static int |
49 | removeRemoteDB(const char *dbname, const char *conn) |
50 | { |
51 | remotedb rdb; |
52 | remotedb prv; |
53 | char hadmatch = 0; |
54 | |
55 | pthread_mutex_lock(&_mero_remotedb_lock); |
56 | |
57 | prv = NULL; |
58 | rdb = _mero_remotedbs; |
59 | while (rdb != NULL) { |
60 | /* look for the database, and verify that its "conn" |
61 | * (merovingian) is the same */ |
62 | if (strcmp(dbname, rdb->dbname) == 0 && |
63 | strcmp(conn, rdb->conn) == 0) |
64 | { |
65 | /* found, let's remove */ |
66 | if (prv == NULL) { |
67 | _mero_remotedbs = rdb->next; |
68 | } else { |
69 | prv->next = rdb->next; |
70 | } |
71 | |
72 | /* inform multiplex-funnels about this removal */ |
73 | multiplexNotifyRemovedDB(rdb->fullname); |
74 | |
75 | Mfprintf(_mero_discout, |
76 | "removed neighbour database %s%s\n" , |
77 | conn, rdb->fullname); |
78 | free(rdb->dbname); |
79 | free(rdb->conn); |
80 | free(rdb->fullname); |
81 | free(rdb); |
82 | rdb = prv; |
83 | hadmatch = 1; |
84 | /* in the future, there may be more, so keep looking */ |
85 | } |
86 | prv = rdb; |
87 | if (rdb == NULL) { |
88 | rdb = _mero_remotedbs; |
89 | } else { |
90 | rdb = rdb->next; |
91 | } |
92 | } |
93 | |
94 | pthread_mutex_unlock(&_mero_remotedb_lock); |
95 | |
96 | return(hadmatch); |
97 | } |
98 | |
99 | static int |
100 | addRemoteDB(const char *dbname, const char *conn, const int ttl) { |
101 | remotedb rdb; |
102 | remotedb prv; |
103 | char *tag; |
104 | |
105 | pthread_mutex_lock(&_mero_remotedb_lock); |
106 | |
107 | if (_mero_remotedbs == NULL) { |
108 | rdb = _mero_remotedbs = malloc(sizeof(struct _remotedb)); |
109 | } else { |
110 | prv = NULL; |
111 | rdb = _mero_remotedbs; |
112 | while (rdb != NULL) { |
113 | if (strcmp(dbname, rdb->fullname) == 0 && |
114 | strcmp(conn, rdb->conn) == 0) |
115 | { |
116 | /* refresh ttl */ |
117 | rdb->ttl = time(NULL) + ttl; |
118 | rdb = prv; |
119 | break; |
120 | } |
121 | prv = rdb; |
122 | rdb = rdb->next; |
123 | } |
124 | if (rdb == prv) { |
125 | pthread_mutex_unlock(&_mero_remotedb_lock); |
126 | return(0); |
127 | } |
128 | rdb = prv->next = malloc(sizeof(struct _remotedb)); |
129 | } |
130 | rdb->fullname = strdup(dbname); |
131 | rdb->dbname = strdup(dbname); |
132 | if ((tag = strchr(rdb->dbname, '/')) != NULL) |
133 | *tag++ = '\0'; |
134 | rdb->tag = tag; |
135 | rdb->conn = strdup(conn); |
136 | rdb->ttl = time(NULL) + ttl; |
137 | rdb->next = NULL; |
138 | |
139 | pthread_mutex_unlock(&_mero_remotedb_lock); |
140 | |
141 | /* inform multiplex-funnels about this addition */ |
142 | multiplexNotifyAddedDB(rdb->fullname); |
143 | |
144 | return(1); |
145 | } |
146 | |
147 | sabdb * |
148 | getRemoteDB(char *database) |
149 | { |
150 | struct _remotedb dummy = { NULL, NULL, NULL, NULL, 0, NULL }; |
151 | remotedb rdb = NULL; |
152 | remotedb pdb = NULL; |
153 | remotedb down = NULL; |
154 | sabdb *walk = NULL; |
155 | sabdb *stats = NULL; |
156 | size_t dbsize = strlen(database); |
157 | char *mdatabase = malloc(sizeof(char) * (dbsize + 2 + 1)); |
158 | char mfullname[8096]; /* should be enough for everyone... */ |
159 | |
160 | /* each request has an implicit /'* (without ') added to match |
161 | * all sub-levels to the request, such that a request for e.g. X |
162 | * will return X/level1/level2/... */ |
163 | memcpy(mdatabase, database, dbsize + 1); |
164 | if (dbsize <= 2 || |
165 | mdatabase[dbsize - 2] != '/' || |
166 | mdatabase[dbsize - 1] != '*') |
167 | { |
168 | mdatabase[dbsize++] = '/'; |
169 | mdatabase[dbsize++] = '*'; |
170 | mdatabase[dbsize++] = '\0'; |
171 | } |
172 | |
173 | /* check the remote databases, in private */ |
174 | pthread_mutex_lock(&_mero_remotedb_lock); |
175 | |
176 | dummy.next = _mero_remotedbs; |
177 | rdb = dummy.next; |
178 | pdb = &dummy; |
179 | while (rdb != NULL) { |
180 | snprintf(mfullname, sizeof(mfullname), "%s/" , rdb->fullname); |
181 | if (db_glob(mdatabase, mfullname) == 1) { |
182 | /* create a fake sabdb struct, chain where necessary */ |
183 | if (walk != NULL) { |
184 | walk = walk->next = malloc(sizeof(sabdb)); |
185 | } else { |
186 | walk = stats = malloc(sizeof(sabdb)); |
187 | } |
188 | walk->dbname = strdup(rdb->dbname); |
189 | walk->path = walk->dbname; /* only freed by sabaoth */ |
190 | walk->locked = false; |
191 | walk->state = SABdbRunning; |
192 | walk->scens = malloc(sizeof(sablist)); |
193 | walk->scens->val = strdup("sql" ); |
194 | walk->scens->next = NULL; |
195 | walk->conns = malloc(sizeof(sablist)); |
196 | walk->conns->val = strdup(rdb->conn); |
197 | walk->conns->next = NULL; |
198 | walk->uri = NULL; |
199 | walk->next = NULL; |
200 | walk->uplog = NULL; |
201 | |
202 | /* cut out first returned entry, put it down the list |
203 | * later, as to implement a round-robin DNS-like |
204 | * algorithm */ |
205 | if (down == NULL) { |
206 | down = rdb; |
207 | if (pdb->next == _mero_remotedbs) { |
208 | _mero_remotedbs = pdb->next = rdb->next; |
209 | } else { |
210 | pdb->next = rdb->next; |
211 | } |
212 | rdb->next = NULL; |
213 | rdb = pdb; |
214 | } |
215 | } |
216 | pdb = rdb; |
217 | rdb = rdb->next; |
218 | } |
219 | |
220 | if (down != NULL) |
221 | pdb->next = down; |
222 | |
223 | pthread_mutex_unlock(&_mero_remotedb_lock); |
224 | |
225 | free(mdatabase); |
226 | |
227 | return(stats); |
228 | } |
229 | |
230 | typedef struct _disc_message_tap { |
231 | int fd; |
232 | struct _disc_message_tap *next; |
233 | } *disc_message_tap; |
234 | |
235 | /* list of hooks for incoming messages */ |
236 | static disc_message_tap _mero_disc_msg_taps = NULL; |
237 | |
238 | void |
239 | registerMessageTap(int fd) |
240 | { |
241 | disc_message_tap h; |
242 | /* make sure we never block in the main loop below because we can't |
243 | * write to the pipe */ |
244 | (void) fcntl(fd, F_SETFD, O_NONBLOCK); |
245 | pthread_mutex_lock(&_mero_remotedb_lock); |
246 | h = _mero_disc_msg_taps; |
247 | if (h == NULL) { |
248 | h = malloc(sizeof(struct _disc_message_tap)); |
249 | _mero_disc_msg_taps = h; |
250 | } else { |
251 | for (; h->next != NULL; h = h->next) |
252 | ; |
253 | h = h->next = malloc(sizeof(struct _disc_message_tap)); |
254 | } |
255 | h->next = NULL; |
256 | h->fd = fd; |
257 | pthread_mutex_unlock(&_mero_remotedb_lock); |
258 | } |
259 | |
260 | void |
261 | unregisterMessageTap(int fd) |
262 | { |
263 | disc_message_tap h, lasth; |
264 | pthread_mutex_lock(&_mero_remotedb_lock); |
265 | h = _mero_disc_msg_taps; |
266 | for (lasth = NULL; h != NULL; lasth = h, h = h->next) { |
267 | if (h->fd == fd) { |
268 | if (lasth == NULL) { |
269 | _mero_disc_msg_taps = h->next; |
270 | } else { |
271 | lasth->next = h->next; |
272 | } |
273 | free(h); |
274 | break; |
275 | } |
276 | } |
277 | pthread_mutex_unlock(&_mero_remotedb_lock); |
278 | } |
279 | |
280 | void * |
281 | discoveryRunner(void *d) |
282 | { |
283 | int sock = *(int *)d; |
284 | int s = -1; |
285 | struct sockaddr_storage peer_addr; |
286 | socklen_t peer_addr_len; |
287 | #ifdef HAVE_POLL |
288 | struct pollfd pfd; |
289 | #else |
290 | fd_set fds; |
291 | struct timeval tv; |
292 | #endif |
293 | /* avoid first announce, the HELO will cause an announce when it's |
294 | * received by ourself */ |
295 | time_t deadline = 1; |
296 | time_t now = 0; |
297 | int forceannc = 0; |
298 | sabdb *orig; |
299 | sabdb *stats; |
300 | confkeyval *ckv; |
301 | confkeyval *kv; |
302 | confkeyval *discttl; |
303 | err e; |
304 | remotedb rdb; |
305 | remotedb prv; |
306 | char *val; |
307 | |
308 | ssize_t nread; |
309 | char buf[512]; /* our packages should be pretty small */ |
310 | char host[128]; |
311 | char service[8]; |
312 | |
313 | /* start shouting around that we're here ;) request others to tell |
314 | * what databases they have */ |
315 | snprintf(buf, 512, "HELO %s" , _mero_hostname); |
316 | broadcast(buf); |
317 | |
318 | ckv = getDefaultProps(); |
319 | discttl = findConfKey(_mero_props, "discoveryttl" ); |
320 | |
321 | /* main loop */ |
322 | while (_mero_keep_listening == 1) { |
323 | now = time(NULL); |
324 | /* do a round of announcements, we're ahead of the ttl because |
325 | * when we announce, we add 60 seconds to avoid a "gap" */ |
326 | if (forceannc == 1 || deadline <= now) { |
327 | forceannc = 0; |
328 | /* set new deadline */ |
329 | deadline = now + discttl->ival; |
330 | |
331 | /* list all known databases */ |
332 | if ((e = msab_getStatus(&stats, NULL)) != NULL) { |
333 | Mfprintf(_mero_discerr, "msab_getStatus error: %s, " |
334 | "discovery services disabled\n" , e); |
335 | free(e); |
336 | free(ckv); |
337 | closesocket(sock); |
338 | return NULL; |
339 | } |
340 | |
341 | for (orig = stats; stats != NULL; stats = stats->next) { |
342 | readProps(ckv, stats->path); |
343 | kv = findConfKey(ckv, "shared" ); |
344 | val = kv->val == NULL ? "" : kv->val; |
345 | /* skip databases under maintenance */ |
346 | if (strcmp(val, "no" ) != 0 && !stats->locked) { |
347 | /* craft ANNC message for this db */ |
348 | if (strcmp(val, "yes" ) == 0) |
349 | val = "" ; |
350 | snprintf(buf, 512, "ANNC %s%s%s mapi:monetdb://%s:%u/ %d" , |
351 | stats->dbname, val[0] == '\0' ? "" : "/" , val, |
352 | _mero_hostname, (unsigned int)getConfNum(_mero_props, "port" ), |
353 | discttl->ival + 60); |
354 | broadcast(buf); |
355 | } |
356 | freeConfFile(ckv); |
357 | } |
358 | |
359 | if (orig != NULL) |
360 | msab_freeStatus(&orig); |
361 | |
362 | if (getConfNum(_mero_props, "control" ) != 0) { |
363 | /* announce control port */ |
364 | snprintf(buf, 512, "ANNC * %s:%u %d" , |
365 | _mero_hostname, (unsigned int)getConfNum(_mero_props, "port" ), |
366 | discttl->ival + 60); |
367 | /* coverity[string_null] */ |
368 | broadcast(buf); |
369 | } |
370 | } |
371 | |
372 | /* do a round to see if we have to cleanup anything (expired |
373 | * ttl) */ |
374 | pthread_mutex_lock(&_mero_remotedb_lock); |
375 | |
376 | prv = NULL; |
377 | rdb = _mero_remotedbs; |
378 | while (rdb != NULL) { |
379 | if (rdb->ttl > 0 && rdb->ttl <= now) { |
380 | /* expired, let's remove */ |
381 | if (prv == NULL) { |
382 | _mero_remotedbs = rdb->next; |
383 | } else { |
384 | prv->next = rdb->next; |
385 | } |
386 | Mfprintf(_mero_discout, "neighbour database %s%s " |
387 | "has expired\n" , rdb->conn, rdb->fullname); |
388 | free(rdb->dbname); |
389 | free(rdb->conn); |
390 | free(rdb->fullname); |
391 | free(rdb); |
392 | break; |
393 | } |
394 | prv = rdb; |
395 | rdb = rdb->next; |
396 | } |
397 | |
398 | pthread_mutex_unlock(&_mero_remotedb_lock); |
399 | |
400 | peer_addr_len = sizeof(struct sockaddr_storage); |
401 | /* Wait up to 5 seconds. */ |
402 | for (s = 0; s < 5; s++) { |
403 | #ifdef HAVE_POLL |
404 | pfd = (struct pollfd) {.fd = sock, .events = POLLIN}; |
405 | nread = poll(&pfd, 1, 1000); |
406 | #else |
407 | FD_ZERO(&fds); |
408 | FD_SET(sock, &fds); |
409 | tv.tv_sec = 1; |
410 | tv.tv_usec = 0; |
411 | nread = select(sock + 1, &fds, NULL, NULL, &tv); |
412 | #endif |
413 | if (nread != 0) |
414 | break; |
415 | if (!_mero_keep_listening) |
416 | goto breakout; |
417 | } |
418 | if (nread <= 0) { /* assume only failure is EINTR */ |
419 | /* nothing interesting has happened */ |
420 | buf[0] = '\0'; |
421 | continue; |
422 | } |
423 | nread = recvfrom(sock, buf, 512, 0, |
424 | (struct sockaddr *)&peer_addr, &peer_addr_len); |
425 | if (nread == -1) { |
426 | buf[0] = '\0'; |
427 | continue; /* ignore failed request */ |
428 | } |
429 | |
430 | s = getnameinfo((struct sockaddr *)&peer_addr, |
431 | peer_addr_len, host, 128, |
432 | service, 8, NI_NUMERICSERV); |
433 | if (s != 0) { |
434 | Mfprintf(_mero_discerr, "cannot retrieve name info: %s\n" , |
435 | gai_strerror(s)); |
436 | continue; /* skip this message */ |
437 | } |
438 | |
439 | /* ignore messages from broadcast interface */ |
440 | if (strcmp(host, "0.0.0.0" ) == 0) |
441 | continue; |
442 | /* forward messages not coming from ourself to all routes that |
443 | * are active */ |
444 | if (strcmp(host, _mero_hostname) != 0) { |
445 | disc_message_tap h = _mero_disc_msg_taps; |
446 | for (; h != NULL; h = h->next) { |
447 | if (write(h->fd, buf, nread) == -1) { |
448 | /* really nothing to be done here, since this is |
449 | * best effort stuff, keep the condition to keep |
450 | * fortification warnings off */ |
451 | } |
452 | } |
453 | } |
454 | |
455 | if (strncmp(buf, "HELO " , 5) == 0) { |
456 | /* HELLO message, respond with current databases */ |
457 | Mfprintf(_mero_discout, "new neighbour %s (%s)\n" , buf + 5, host); |
458 | /* sleep a random amount of time to avoid an avalanche of |
459 | * ANNC messages flooding the network */ |
460 | #ifndef STATIC_CODE_ANALYSIS /* hide rand() from Coverity */ |
461 | sleep_ms(1 + (int)(2500.0 * (rand() / (RAND_MAX + 1.0)))); |
462 | #endif |
463 | /* force an announcement round by dropping the deadline */ |
464 | forceannc = 1; |
465 | continue; |
466 | } else if (strncmp(buf, "LEAV " , 5) == 0) { |
467 | /* LEAVE message, unregister database */ |
468 | char *sp = NULL; |
469 | char *dbname; |
470 | char *conn; |
471 | |
472 | strtok_r(buf, " " , &sp); /* discard the msg type */ |
473 | dbname = strtok_r(NULL, " " , &sp); |
474 | conn = strtok_r(NULL, " " , &sp); |
475 | |
476 | if (dbname == NULL || conn == NULL) |
477 | continue; |
478 | |
479 | if (removeRemoteDB(dbname, conn) == 0) |
480 | Mfprintf(_mero_discout, |
481 | "received leave request for unknown database " |
482 | "%s%s from %s\n" , conn, dbname, host); |
483 | } else if (strncmp(buf, "ANNC " , 5) == 0) { |
484 | /* ANNOUNCE message, register database */ |
485 | char *sp = NULL; |
486 | char *dbname; |
487 | char *conn; |
488 | char *ttl; |
489 | |
490 | strtok_r(buf, " " , &sp); /* discard the msg type */ |
491 | dbname = strtok_r(NULL, " " , &sp); |
492 | conn = strtok_r(NULL, " " , &sp); |
493 | ttl = strtok_r(NULL, " " , &sp); |
494 | |
495 | if (dbname == NULL || conn == NULL || ttl == NULL) |
496 | continue; |
497 | |
498 | if (addRemoteDB(dbname, conn, atoi(ttl)) == 1) { |
499 | if (strcmp(dbname, "*" ) == 0) { |
500 | Mfprintf(_mero_discout, "registered neighbour %s\n" , |
501 | conn); |
502 | } else { |
503 | Mfprintf(_mero_discout, "new database " |
504 | "%s%s (ttl=%ss)\n" , |
505 | conn, dbname, ttl); |
506 | } |
507 | } |
508 | } else { |
509 | Mfprintf(_mero_discout, "ignoring unknown message from " |
510 | "%s:%s: '%s'\n" , host, service, buf); |
511 | } |
512 | } |
513 | breakout: |
514 | |
515 | shutdown(sock, SHUT_WR); |
516 | closesocket(sock); |
517 | |
518 | /* now notify of imminent absence ;) */ |
519 | |
520 | /* list all known databases */ |
521 | if ((e = msab_getStatus(&stats, NULL)) != NULL) { |
522 | Mfprintf(_mero_discerr, "msab_getStatus error: %s, " |
523 | "discovery services disabled\n" , e); |
524 | free(e); |
525 | free(ckv); |
526 | return NULL; |
527 | } |
528 | |
529 | /* craft LEAV messages for each db */ |
530 | orig = stats; |
531 | while (stats != NULL) { |
532 | readProps(ckv, stats->path); |
533 | kv = findConfKey(ckv, "shared" ); |
534 | if (kv->val != NULL && strcmp(kv->val, "no" ) != 0) { |
535 | snprintf(buf, 512, "LEAV %s mapi:monetdb://%s:%u/" , |
536 | stats->dbname, _mero_hostname, |
537 | (unsigned int)getConfNum(_mero_props, "port" )); |
538 | broadcast(buf); |
539 | } |
540 | freeConfFile(ckv); |
541 | stats = stats->next; |
542 | } |
543 | |
544 | if (orig != NULL) |
545 | msab_freeStatus(&orig); |
546 | |
547 | /* deregister this merovingian, so it doesn't remain a stale entry */ |
548 | if (getConfNum(_mero_props, "control" ) != 0) { |
549 | snprintf(buf, 512, "LEAV * %s:%u" , |
550 | _mero_hostname, (unsigned int)getConfNum(_mero_props, "port" )); |
551 | broadcast(buf); |
552 | } |
553 | |
554 | free(ckv); |
555 | return NULL; |
556 | } |
557 | |
558 | /* vim:set ts=4 sw=4 noexpandtab: */ |
559 | |