1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include <unistd.h>
11#include <string.h>
12#include <sys/types.h>
13#ifdef HAVE_POLL_H
14#include <poll.h>
15#endif
16
17#include "mapi.h"
18#include "mutils.h" /* MT_lockf */
19#include <fcntl.h>
20
21#include "utils/glob.h"
22
23#include "merovingian.h"
24#include "discoveryrunner.h"
25#include "multiplex-funnel.h"
26
27#ifndef HAVE_PIPE2
28#define pipe2(pipefd, flags) pipe(pipefd)
29#endif
30
31#ifndef O_CLOEXEC
32#define O_CLOEXEC 0
33#endif
34
35typedef struct _multiplexlist {
36 multiplex *m;
37 struct _multiplexlist *next;
38} multiplexlist;
39
40static multiplexlist *multiplexes = NULL;
41static pthread_mutex_t mpl_lock = PTHREAD_MUTEX_INITIALIZER;
42static pthread_t mfmanager = 0;
43static int mfpipe[2];
44
45static void *multiplexThread(void *d);
46
47
48/**
49 * Connections from all multiplex funnels are maintained by a single
50 * thread that resolves and creates connections upon updates on the
51 * discovery space. Connections aren't made/checked/updated upon their
52 * usage, because this introduces delays for the clients. This is in
53 * particular an issue when a target is updated to point to another
54 * database. To maintain a stable query performance, the connection
55 * creation must happen in the background and set life once established.
56 */
57static void *
58MFconnectionManager(void *d)
59{
60 int i;
61 multiplex *m;
62 multiplexlist *w;
63 char buf[1024];
64 size_t len;
65 char *msg;
66#ifdef HAVE_POLL
67 struct pollfd pfd;
68#else
69 struct timeval tv;
70 fd_set fds;
71#endif
72
73 (void)d;
74
75 while (_mero_keep_listening == 1) {
76#ifdef HAVE_POLL
77 pfd = (struct pollfd) {.fd = mfpipe[0], .events = POLLIN};
78 /* wait up to 5 seconds */
79 i = poll(&pfd, 1, 5000);
80#else
81 FD_ZERO(&fds);
82 FD_SET(mfpipe[0], &fds);
83
84 /* wait up to 5 seconds */
85 tv.tv_sec = 5;
86 tv.tv_usec = 0;
87 i = select(mfpipe[0] + 1, &fds, NULL, NULL, &tv);
88#endif
89 if (i == 0)
90 continue;
91 if (i == -1 && errno != EINTR) {
92 Mfprintf(stderr, "failed to select on mfpipe: %s\n",
93 strerror(errno));
94 break;
95 }
96 /* coverity[string_null_argument] */
97 if (read(mfpipe[0], &msg, sizeof(msg)) < 0) {
98 Mfprintf(stderr, "failed reading from notification pipe: %s\n",
99 strerror(errno));
100 break;
101 }
102 /* we just received a POINTER to a string! */
103
104 /* intended behaviour:
105 * - additions don't change any connection targets, they only
106 * fill in gaps (conn == NULL)
107 * - removals of targets in use, cause a re-lookup of the
108 * original pattern, on failure, conn is left NULL
109 */
110 pthread_mutex_lock(&mpl_lock);
111 if (msg[0] == '+') { /* addition */
112 for (w = multiplexes; w != NULL; w = w->next) {
113 m = w->m;
114 for (i = 0; i < m->dbcc; i++) {
115 if (m->dbcv[i]->conn == NULL) {
116 len = snprintf(buf, sizeof(buf), "%s/*",
117 m->dbcv[i]->database);
118 if (len >= sizeof(buf)) {
119 Mfprintf(stderr, "buffer buf too small, "
120 "increase size in %s:%d\n",
121 __FILE__, __LINE__);
122 continue;
123 }
124 /* avoid double /'*'/'* (no ') */
125 if (len >= 4 &&
126 buf[len - 3] == '*' && buf[len - 4] == '/')
127 buf[len - 2] = '\0';
128 if (db_glob(buf, msg + 1) == 1) {
129 sabdb *stats;
130 Mapi tm = NULL;
131 /* match! eat away trailing / (for matching) */
132 msg[strlen(msg) - 1] = '\0';
133 stats = getRemoteDB(msg + 1);
134 if (stats == NULL) {
135 Mfprintf(stderr, "target %s cannot be resolved "
136 "despite being just discovered as %s\n",
137 m->dbcv[i]->database, msg + 1);
138 continue;
139 }
140 snprintf(buf, sizeof(buf), "%s%s",
141 stats->conns->val, stats->dbname);
142 msab_freeStatus(&stats);
143 Mfprintf(stdout, "setting up multiplexer "
144 "target %s->%s\n",
145 m->dbcv[i]->database, buf);
146 tm = mapi_mapiuri(buf,
147 m->dbcv[i]->user, m->dbcv[i]->pass, "sql");
148 if (mapi_reconnect(tm) == MOK) {
149 m->dbcv[i]->conn = tm;
150 mapi_cache_limit(tm, -1); /* don't page */
151 } else {
152 Mfprintf(stdout, "failed to connect to %s: %s\n",
153 buf, mapi_error_str(tm));
154 mapi_destroy(tm);
155 }
156 }
157 }
158 }
159 }
160 } else { /* removal */
161 for (w = multiplexes; w != NULL; w = w->next) {
162 m = w->m;
163 for (i = 0; i < m->dbcc; i++) {
164 if (m->dbcv[i]->conn != NULL) {
165 len = snprintf(buf, sizeof(buf), "%s/*",
166 m->dbcv[i]->database);
167 if (len >= sizeof(buf)) {
168 Mfprintf(stderr, "buffer buf too small, "
169 "increase size in %s:%d\n",
170 __FILE__, __LINE__);
171 continue;
172 }
173 /* avoid double /'*'/'* (no ') */
174 if (len >= 4 &&
175 buf[len - 3] == '*' && buf[len - 4] == '/')
176 buf[len - 2] = '\0';
177 if (db_glob(buf, msg + 1) == 1) {
178 /* reevaluate, to see if connection is still
179 * available */
180 sabdb *walk;
181 sabdb *stats = getRemoteDB(m->dbcv[i]->database);
182 Mapi tm = m->dbcv[i]->conn;
183 const char *uri = mapi_get_uri(tm);
184 if (stats == NULL) {
185 Mfprintf(stderr, "target %s can no longer "
186 "be resolved\n",
187 m->dbcv[i]->database);
188 /* schedule to drop connection */
189 m->dbcv[i]->newconn = NULL;
190 m->dbcv[i]->connupdate = 1;
191 continue;
192 }
193 /* walk all connections, in an attempt to
194 * see if the original connection is still
195 * available, despite the removal of the
196 * server we got a message for */
197 for (walk = stats; walk != NULL; walk = walk->next) {
198 snprintf(buf, sizeof(buf), "%s%s",
199 walk->conns->val, walk->dbname);
200 if (strcmp(uri, buf) == 0)
201 break;
202 }
203 if (walk == NULL) {
204 snprintf(buf, sizeof(buf), "%s%s",
205 stats->conns->val, stats->dbname);
206 Mfprintf(stdout, "changing multiplexer target %s: %s->%s\n",
207 m->dbcv[i]->database, uri, buf);
208 tm = mapi_mapiuri(buf,
209 m->dbcv[i]->user, m->dbcv[i]->pass,
210 "sql");
211 if (mapi_reconnect(tm) != MOK) {
212 Mfprintf(stderr, "mapi_reconnect for %s "
213 "failed: %s\n",
214 m->dbcv[i]->database,
215 mapi_error_str(tm));
216 mapi_destroy(tm);
217 /* schedule connection for removal */
218 m->dbcv[i]->newconn = NULL;
219 m->dbcv[i]->connupdate = 1;
220 msab_freeStatus(&stats);
221 continue;
222 }
223 mapi_cache_limit(tm, -1); /* don't page */
224
225 /* let the new connection go live */
226 m->dbcv[i]->newconn = tm;
227 m->dbcv[i]->connupdate = 1;
228 }
229 msab_freeStatus(&stats);
230 }
231 }
232 }
233 }
234 }
235 pthread_mutex_unlock(&mpl_lock);
236
237 free(msg); /* alloced by multiplexNotify* */
238 }
239 return NULL;
240}
241
242void
243multiplexNotifyAddedDB(const char *database)
244{
245 char dbslash[256];
246 char *p;
247
248 if (mfmanager == 0)
249 return;
250
251 snprintf(dbslash, sizeof(dbslash), "+%s/", database);
252 p = strdup(dbslash);
253 if (write(mfpipe[1], &p, sizeof(p)) != sizeof(p)) {
254 Mfprintf(stderr, "failed to write notify added message to mfpipe\n");
255 free(p);
256 }
257 /* p is freed by MFconnectionManager */
258 /* coverity[leaked_storage] */
259}
260
261void
262multiplexNotifyRemovedDB(const char *database)
263{
264 char dbslash[256];
265 char *p;
266
267 if (mfmanager == 0)
268 return;
269
270 snprintf(dbslash, sizeof(dbslash), "-%s/", database);
271 p = strdup(dbslash);
272 if (write(mfpipe[1], &p, sizeof(p)) != sizeof(p)) {
273 Mfprintf(stderr, "failed to write notify removed message to mfpipe\n");
274 free(p);
275 }
276 /* p is freed by MFconnectionManager */
277 /* coverity[leaked_storage] */
278}
279
280/* ultra ugly, we peek inside Sabaoth's internals to update the uplog
281 * file */
282extern char *_sabaoth_internal_dbname;
283
284err
285multiplexInit(char *name, char *pattern, FILE *sout, FILE *serr)
286{
287 multiplex *m = malloc(sizeof(multiplex));
288 multiplexlist *mpl;
289 char buf[256];
290 char *p, *q;
291 int i;
292
293 /* the multiplex targets are given separated by commas, split them
294 * out in multiplex_database entries */
295 /* user+pass@pattern,user+pass@pattern,... */
296
297 m->tid = 0;
298 m->gdklock = -1;
299 m->shutdown = 0;
300 m->name = strdup(name);
301 m->pool = strdup(pattern);
302 m->sout = sout;
303 m->serr = serr;
304 m->dbcc = 1;
305 p = m->pool;
306 while ((p = strchr(p, ',')) != NULL) {
307 m->dbcc++;
308 p++;
309 }
310 m->dbcv = malloc(sizeof(multiplex_database *) * m->dbcc);
311 p = m->pool;
312 i = 0;
313 while ((q = strchr(p, ',')) != NULL) {
314 m->dbcv[i] = malloc(sizeof(multiplex_database));
315 m->dbcv[i]->user = malloc(sizeof(char) * (q - p + 1));
316 memcpy(m->dbcv[i]->user, p, q - p);
317 m->dbcv[i]->user[q - p] = '\0';
318 if ((p = strchr(m->dbcv[i]->user, '+')) == NULL) {
319 err e = newErr("illegal target %s: missing '+'", m->dbcv[i]->user);
320 for (; i >= 0; i--) {
321 free(m->dbcv[i]->user);
322 free(m->dbcv[i]);
323 }
324 free(m->dbcv);
325 free(m->pool);
326 free(m);
327 return(e);
328 }
329 *p = '\0';
330 m->dbcv[i]->pass = p + 1;
331 if ((p = strchr(m->dbcv[i]->pass, '@')) == NULL) {
332 err e = newErr("illegal target %s+%s: missing '@'",
333 m->dbcv[i]->user, m->dbcv[i]->pass);
334 for (; i >= 0; i--) {
335 free(m->dbcv[i]->user);
336 free(m->dbcv[i]);
337 }
338 free(m->dbcv);
339 free(m->pool);
340 free(m);
341 return(e);
342 }
343 *p = '\0';
344 m->dbcv[i]->database = p + 1;
345 m->dbcv[i]->conn = NULL;
346 m->dbcv[i]->newconn = NULL;
347 m->dbcv[i]->connupdate = 0;
348
349 i++;
350 p = q + 1;
351 }
352 m->dbcv[i] = malloc(sizeof(multiplex_database));
353 m->dbcv[i]->user = strdup(p);
354 if ((p = strchr(m->dbcv[i]->user, '+')) == NULL) {
355 err e = newErr("illegal target %s: missing '+'", m->dbcv[i]->user);
356 for (; i >= 0; i--) {
357 free(m->dbcv[i]->user);
358 free(m->dbcv[i]);
359 }
360 free(m->dbcv);
361 free(m->pool);
362 free(m);
363 return(e);
364 } else {
365 *p = '\0';
366 m->dbcv[i]->pass = p + 1;
367 if ((p = strchr(m->dbcv[i]->pass, '@')) == NULL) {
368 err e = newErr("illegal target %s+%s: missing '@'",
369 m->dbcv[i]->user, m->dbcv[i]->pass);
370 for (; i >= 0; i--) {
371 free(m->dbcv[i]->user);
372 free(m->dbcv[i]);
373 }
374 free(m->dbcv);
375 free(m->pool);
376 free(m);
377 return(e);
378 } else {
379 *p = '\0';
380 m->dbcv[i]->database = p + 1;
381 m->dbcv[i]->conn = NULL;
382 m->dbcv[i]->newconn = NULL;
383 m->dbcv[i]->connupdate = 0;
384 }
385 }
386
387 m->clients = NULL; /* initially noone is connected */
388
389 if (mfmanager == 0) {
390 pthread_attr_t detach;
391 pthread_attr_init(&detach);
392 pthread_attr_setdetachstate(&detach, PTHREAD_CREATE_DETACHED);
393
394 /* create communication channel */
395 if (pipe2(mfpipe, O_CLOEXEC) != 0)
396 Mfprintf(stderr, "failed to create mfpipe: %s\n", strerror(errno));
397 else {
398#if !defined(HAVE_PIPE2) || O_CLOEXEC == 0
399 (void) fcntl(mfpipe[0], F_SETFD, FD_CLOEXEC);
400 (void) fcntl(mfpipe[1], F_SETFD, FD_CLOEXEC);
401#endif
402 Mfprintf(stdout, "starting multiplex-funnel connection manager\n");
403 if ((i = pthread_create(&mfmanager, &detach,
404 MFconnectionManager, NULL)) != 0) {
405 Mfprintf(stderr, "failed to start MFconnectionManager: %s\n",
406 strerror(i));
407 mfmanager = 0;
408 }
409 }
410 }
411
412 for (i = 0; i < m->dbcc; i++) {
413 sabdb *stats = getRemoteDB(m->dbcv[i]->database);
414 if (stats == NULL) {
415 Mfprintf(serr, "mfunnel: target %s cannot be resolved\n",
416 m->dbcv[i]->database);
417 continue;
418 }
419 snprintf(buf, sizeof(buf), "%s%s", stats->conns->val, stats->dbname);
420 Mfprintf(sout, "mfunnel: setting up multiplexer target %s->%s\n",
421 m->dbcv[i]->database, buf);
422 m->dbcv[i]->conn = mapi_mapiuri(buf,
423 m->dbcv[i]->user, m->dbcv[i]->pass, "sql");
424 msab_freeStatus(&stats);
425 }
426
427 pthread_mutex_lock(&mpl_lock);
428 mpl = malloc(sizeof(multiplexlist));
429 mpl->next = multiplexes;
430 mpl->m = m;
431 multiplexes = mpl;
432 pthread_mutex_unlock(&mpl_lock);
433
434 if ((i = pthread_create(&m->tid, NULL,
435 multiplexThread, (void *)m)) != 0)
436 {
437 /* FIXME: we don't cleanup here */
438 return(newErr("starting thread for multiplex-funnel %s failed: %s",
439 name, strerror(i)));
440 }
441
442 /* fake lock such that sabaoth believes we are (still) running, we
443 * rely on merovingian moving to dbfarm here */
444 snprintf(buf, sizeof(buf), "%s/.gdk_lock", name);
445 if ((m->gdklock = MT_lockf(buf, F_TLOCK, 4, 1)) == -1) {
446 /* locking failed, FIXME: cleanup here */
447 Mfprintf(serr, "mfunnel: another instance is already running?\n");
448 return(newErr("cannot lock for %s, already locked", name));
449 } else if (m->gdklock == -2) {
450 /* directory or something doesn't exist, FIXME: cleanup */
451 Mfprintf(serr, "mfunnel: unable to create %s file: %s\n",
452 buf, strerror(errno));
453 return(newErr("cannot create lock for %s", name));
454 }
455
456 /* hack alert: set sabaoth uplog status by cheating with its
457 * internals -- we know dbname should be NULL, and hack it for the
458 * purpose of this moment, see also extern declaration before this
459 * function */
460 _sabaoth_internal_dbname = name;
461 if ((p = msab_registerStarting()) != NULL ||
462 (p = msab_registerStarted()) != NULL ||
463 (p = msab_marchScenario("mfunnel")) != NULL)
464 {
465 err em;
466
467 _sabaoth_internal_dbname = NULL;
468
469 Mfprintf(serr, "mfunnel: unable to startup %s: %s\n",
470 name, p);
471 em = newErr("cannot create funnel %s due to sabaoth: %s", name, p);
472 free(p);
473
474 return(em);
475 }
476 _sabaoth_internal_dbname = NULL;
477
478 return(NO_ERR);
479}
480
481void
482multiplexDestroy(char *mp)
483{
484 multiplexlist *ml, *mlp;
485 multiplex *m = NULL;
486 char *msg;
487
488 /* lock and remove */
489 pthread_mutex_lock(&mpl_lock);
490 mlp = NULL;
491 for (ml = multiplexes; ml != NULL; ml = ml->next) {
492 if (strcmp(ml->m->name, mp) == 0) {
493 m = ml->m;
494 if (mlp == NULL) {
495 multiplexes = ml->next;
496 } else {
497 mlp->next = ml->next;
498 }
499 break;
500 }
501 mlp = ml;
502 }
503 pthread_mutex_unlock(&mpl_lock);
504
505 if (m == NULL) {
506 Mfprintf(stderr, "request to remove non-existing "
507 "multiplex-funnel: %s\n", mp);
508 return;
509 }
510
511 /* deregister from sabaoth, same hack alert as at Init */
512 _sabaoth_internal_dbname = m->name;
513 if ((msg = msab_registerStop()) != NULL ||
514 (msg = msab_wildRetreat()) != NULL) {
515 Mfprintf(stderr, "mfunnel: %s\n", msg);
516 free(msg);
517 }
518 _sabaoth_internal_dbname = NULL;
519
520 /* signal the thread to stop and cleanup */
521 m->shutdown = 1;
522 pthread_join(m->tid, NULL);
523}
524
525static void
526multiplexQuery(multiplex *m, char *buf, stream *fout)
527{
528 int i;
529 const char *t;
530 MapiHdl h;
531 int64_t rlen;
532 int fcnt;
533 int qtype;
534
535 /* first send the query to all, such that we don't waste time
536 * waiting for each server to produce an answer, but wait for all of
537 * them concurrently */
538 for (i = 0; i < m->dbcc; i++) {
539 if (m->dbcv[i]->conn == NULL) {
540 mnstr_printf(fout, "!connection for %s is currently unresolved\n",
541 m->dbcv[i]->database);
542 mnstr_flush(fout);
543 Mfprintf(m->serr, "failed to find a provider for %s\n",
544 m->dbcv[i]->database);
545 return;
546 }
547 if (!mapi_is_connected(m->dbcv[i]->conn)) {
548 if (mapi_reconnect(m->dbcv[i]->conn) != MOK) {
549 mnstr_printf(fout, "!failed to establish connection "
550 "for %s: %s\n", m->dbcv[i]->database,
551 mapi_error_str(m->dbcv[i]->conn));
552 mnstr_flush(fout);
553 Mfprintf(m->serr, "mapi_reconnect for %s failed: %s\n",
554 m->dbcv[i]->database,
555 mapi_error_str(m->dbcv[i]->conn));
556 return;
557 }
558 mapi_cache_limit(m->dbcv[i]->conn, -1); /* don't page */
559 }
560
561 m->dbcv[i]->hdl = mapi_send(m->dbcv[i]->conn, buf);
562 }
563 /* fail as soon as one of the servers fails */
564 t = NULL;
565 rlen = 0;
566 fcnt = -1;
567 qtype = -1;
568 for (i = 0; i < m->dbcc; i++) {
569 h = m->dbcv[i]->hdl;
570 /* check for responses */
571 if (mapi_read_response(h) != MOK) {
572 t = mapi_result_error(h);
573 mnstr_printf(fout, "!node %s failed: %s\n",
574 m->dbcv[i]->database, t ? t : "no response");
575 Mfprintf(m->serr, "mapi_read_response for %s failed: %s\n",
576 m->dbcv[i]->database, t ? t : "(no error)");
577 break;
578 }
579 /* check for errors */
580 if ((t = mapi_result_error(h)) != NULL) {
581 mnstr_printf(fout, "!node %s failed: %s\n",
582 m->dbcv[i]->database, t);
583 Mfprintf(m->serr, "mapi_result_error for %s: %s\n",
584 m->dbcv[i]->database, t);
585 break;
586 }
587 /* check for result type consistency */
588 if (qtype == -1) {
589 qtype = mapi_get_querytype(h);
590 } else if (qtype != mapi_get_querytype(h)) {
591 t = "err"; /* for cleanup code below */
592 mnstr_printf(fout, "!node %s returned a different type of result "
593 "than the previous node\n", m->dbcv[i]->database);
594 Mfprintf(m->serr, "encountered mix of result types, "
595 "got %d, expected %d\n", mapi_get_querytype(h), qtype);
596 break;
597 }
598
599 /* determine correctness based on headers */
600 switch (qtype) {
601 case Q_PARSE:
602 /* mapi returns Q_PARSE for empty results */
603 continue;
604 case Q_TABLE:
605 /* prepare easily appending of all results */
606 rlen += mapi_get_row_count(h);
607 if (fcnt == -1) {
608 fcnt = mapi_get_field_count(h);
609 } else if (mapi_get_field_count(h) != fcnt) {
610 t = "err"; /* for cleanup code below */
611 mnstr_printf(fout, "!node %s has mismatch in result fields\n",
612 m->dbcv[i]->database);
613 Mfprintf(m->serr, "mapi_get_field_count inconsistent for %s: "
614 "got %d, expected %d\n",
615 m->dbcv[i]->database,
616 mapi_get_field_count(h), fcnt);
617 }
618 break;
619 case Q_UPDATE:
620 /* just pile up the update counts */
621 rlen += mapi_rows_affected(h);
622 break;
623 case Q_SCHEMA:
624 /* accept, just write ok lateron */
625 break;
626 case Q_TRANS:
627 /* just check all servers end up in the same state */
628 if (fcnt == -1) {
629 fcnt = mapi_get_autocommit(m->dbcv[i]->conn);
630 } else if (fcnt != mapi_get_autocommit(m->dbcv[i]->conn)) {
631 t = "err"; /* for cleanup code below */
632 mnstr_printf(fout, "!node %s has mismatch in transaction state\n",
633 m->dbcv[i]->database);
634 Mfprintf(m->serr, "mapi_get_autocommit inconsistent for %s: "
635 "got %d, expected %d\n",
636 m->dbcv[i]->database,
637 mapi_get_autocommit(m->dbcv[i]->conn), fcnt);
638 }
639 break;
640 default:
641 t = "err"; /* for cleanup code below */
642 mnstr_printf(fout, "!node %s returned unhandled result type\n",
643 m->dbcv[i]->database);
644 Mfprintf(m->serr, "unhandled querytype for %s: %d\n",
645 m->dbcv[i]->database, mapi_get_querytype(h));
646 break;
647 }
648 if (t != NULL)
649 break;
650 }
651
652 /* error or empty result, just end here */
653 if (t != NULL || qtype == Q_PARSE) {
654 mnstr_flush(fout);
655 for (i = 0; i < m->dbcc; i++)
656 mapi_close_handle(m->dbcv[i]->hdl);
657 return;
658 }
659
660 /* write output to client */
661 switch (qtype) {
662 case Q_TABLE:
663 /* Compose the header. For the table id, we just send 0,
664 * such that we never get a close request. Steal headers
665 * from the first node. */
666 mnstr_printf(fout, "&%d 0 %" PRId64 " %d %" PRId64 "\n",
667 Q_TABLE, rlen, fcnt, rlen);
668 /* now read the answers, and write them directly to the client */
669 for (i = 0; i < m->dbcc; i++) {
670 h = m->dbcv[i]->hdl;
671 while ((t = mapi_fetch_line(h)) != NULL)
672 if (i == 0 || *t != '%') /* skip other server's headers */
673 mnstr_printf(fout, "%s\n", t);
674 }
675 break;
676 case Q_UPDATE:
677 /* Write a single header for all update counts, to sort of
678 * complement the transparency created for Q_TABLE results,
679 * but forget about last id data (wouldn't make sense if
680 * we'd emit multiple update counts either) */
681 mnstr_printf(fout, "&%d %" PRId64 " -1\n", Q_UPDATE, rlen);
682 break;
683 case Q_SCHEMA:
684 mnstr_printf(fout, "&%d\n", Q_SCHEMA);
685 break;
686 case Q_TRANS:
687 mnstr_printf(fout, "&%d %c\n", Q_TRANS, fcnt ? 't' : 'f');
688 break;
689 }
690 mnstr_flush(fout);
691 /* finish up */
692 for (i = 0; i < m->dbcc; i++)
693 mapi_close_handle(m->dbcv[i]->hdl);
694}
695
696static void *
697multiplexThread(void *d)
698{
699 multiplex *m = (multiplex *)d;
700#ifdef HAVE_POLL
701 struct pollfd *pfd;
702#else
703 struct timeval tv;
704 fd_set fds;
705#endif
706 multiplex_client *c;
707 int msock = -1;
708 char buf[10 * BLOCK + 1];
709 ssize_t len;
710 int r, i;
711 dpair p, q;
712
713 /* select on upstream clients, on new data, read query, forward,
714 * union all results, send back, and restart cycle. */
715
716 while (m->shutdown == 0) {
717#ifdef HAVE_POLL
718 msock = 0;
719 for (c = m->clients; c != NULL; c = c->next) {
720 msock++;
721 }
722 pfd = malloc(msock * sizeof(struct pollfd));
723 msock = 0;
724 for (c = m->clients; c != NULL; c = c->next) {
725 pfd[msock++] = (struct pollfd) {.fd = c->sock, .events = POLLIN};
726 }
727 /* wait up to 1 second. */
728 r = poll(pfd, msock, 1000);
729#else
730 FD_ZERO(&fds);
731 for (c = m->clients; c != NULL; c = c->next) {
732 FD_SET(c->sock, &fds);
733 if (c->sock > msock)
734 msock = c->sock;
735 }
736
737 /* wait up to 1 second. */
738 tv.tv_sec = 1;
739 tv.tv_usec = 0;
740 r = select(msock + 1, &fds, NULL, NULL, &tv);
741#endif
742
743 /* evaluate if connections have to be switched */
744 for (i = 0; i < m->dbcc; i++) {
745 if (m->dbcv[i]->connupdate) {
746 if (m->dbcv[i]->newconn != NULL) {
747 /* put new connection live */
748 Mfprintf(m->sout, "performing deferred connection cycle "
749 "for %s from %s to %s\n",
750 m->dbcv[i]->database,
751 m->dbcv[i]->conn != NULL ?
752 mapi_get_uri(m->dbcv[i]->conn) :
753 "<unconnected>",
754 mapi_get_uri(m->dbcv[i]->newconn));
755 mapi_disconnect(m->dbcv[i]->conn);
756 mapi_destroy(m->dbcv[i]->conn);
757 m->dbcv[i]->conn = m->dbcv[i]->newconn;
758 m->dbcv[i]->newconn = NULL;
759 m->dbcv[i]->connupdate = 0;
760 } else {
761 /* put new connection live */
762 Mfprintf(m->sout, "performing deferred connection drop "
763 "for %s from %s\n",
764 m->dbcv[i]->database,
765 m->dbcv[i]->conn != NULL ?
766 mapi_get_uri(m->dbcv[i]->conn) :
767 "<unconnected>");
768 mapi_disconnect(m->dbcv[i]->conn);
769 mapi_destroy(m->dbcv[i]->conn);
770 m->dbcv[i]->conn = NULL;
771 m->dbcv[i]->connupdate = 0;
772 }
773 }
774 }
775
776 /* nothing interesting has happened */
777 if (r <= 0) {
778#ifdef HAVE_POLL
779 free(pfd);
780#endif
781 continue;
782 }
783 for (c = m->clients; c != NULL; c = c->next) {
784#ifdef HAVE_POLL
785 for (i = 0; i < msock; i++) {
786 if (pfd[i].fd == c->sock)
787 break;
788 }
789 if (i == msock || (pfd[i].revents & POLLIN) == 0)
790 continue;
791#else
792 if (!FD_ISSET(c->sock, &fds))
793 continue;
794#endif
795 if ((len = mnstr_read(c->fdin, buf, 1, 10 * BLOCK)) < 0) {
796 /* error, or some garbage */
797 multiplexRemoveClient(m, c);
798 /* don't crash on now stale c */
799 break;
800 } else if (len == 0) {
801 /* flush from client, ignore */
802 continue;
803 }
804
805 buf[len] = '\0';
806 switch (*buf) {
807 case 's':
808 case 'S':
809 /* accepted, just SQL queries */
810 break;
811 case 'X':
812 /* ignored, some clients just really insist on using
813 * these */
814 mnstr_flush(c->fout);
815 continue;
816 default:
817 mnstr_printf(c->fout, "!modifier %c not supported by "
818 "multiplex-funnel\n", *buf);
819 mnstr_flush(c->fout);
820 Mfprintf(m->serr, "client attempted to perform %c "
821 "type query: %s\n", *buf, buf);
822 continue;
823 }
824 /* we assume (and require) the query to fit in one block,
825 * that is, we only forward the first block, without having
826 * any idea what it is */
827 multiplexQuery(m, buf + 1, c->fout);
828 }
829#ifdef HAVE_POLL
830 free(pfd);
831#endif
832 }
833
834 Mfprintf(stdout, "stopping mfunnel '%s'\n", m->name);
835
836 /* free, cleanup, etc. */
837 while (m->clients != NULL) {
838 c = m->clients;
839 close_stream(c->fdin);
840 close_stream(c->fout);
841 free(c->name);
842 m->clients = m->clients->next;
843 free(c);
844 }
845 for (i = 0; i < m->dbcc; i++) {
846 if (m->dbcv[i]->connupdate && m->dbcv[i]->newconn != NULL)
847 mapi_destroy(m->dbcv[i]->newconn);
848 if (m->dbcv[i]->conn != NULL)
849 mapi_destroy(m->dbcv[i]->conn);
850 free(m->dbcv[i]->user);
851 /* pass and database belong to the same malloced block from user */
852 }
853 fflush(m->sout);
854 fclose(m->sout);
855 fflush(m->serr);
856 fclose(m->serr);
857 close(m->gdklock);
858 free(m->pool);
859
860 /* last bit, remove from logger structure */
861 pthread_mutex_lock(&_mero_topdp_lock);
862
863 q = _mero_topdp->next; /* skip console */
864 p = q->next;
865 while (p != NULL) {
866 if (p->type == MEROFUN && strcmp(p->dbname, m->name) == 0) {
867 /* log everything that's still in the pipes */
868 logFD(p->out, "MSG", p->dbname, (long long int)p->pid, _mero_logfile, 1);
869 /* remove from the list */
870 q->next = p->next;
871 /* close the descriptors */
872 close(p->out);
873 close(p->err);
874 Mfprintf(stdout, "mfunnel '%s' has stopped\n", p->dbname);
875 free(p->dbname);
876 free(p);
877 break;
878 }
879 q = p;
880 p = q->next;
881 }
882
883 pthread_mutex_unlock(&_mero_topdp_lock);
884
885 free(m->name);
886 free(m);
887 return NULL;
888}
889
890void
891multiplexAddClient(char *mp, int sock, stream *fout, stream *fdin, char *name)
892{
893 multiplex_client *w;
894 multiplex_client *n = malloc(sizeof(multiplex_client));
895 multiplexlist *ml;
896 multiplex *m;
897
898 n->sock = sock;
899 n->fdin = fdin;
900 n->fout = fout;
901 n->name = strdup(name);
902 n->next = NULL;
903
904 pthread_mutex_lock(&mpl_lock);
905 for (ml = multiplexes; ml != NULL; ml = ml->next) {
906 if (strcmp(ml->m->name, mp) == 0)
907 break;
908 }
909 if (ml == NULL) {
910 pthread_mutex_unlock(&mpl_lock);
911 Mfprintf(stderr, "failed to find multiplex-funnel '%s' for client %s\n",
912 mp, name);
913 mnstr_printf(fout, "!monetdbd: internal error: could not find multiplex-funnel '%s'\n", mp);
914 mnstr_flush(fout);
915 close_stream(fdin);
916 close_stream(fout);
917 free(n->name);
918 free(n);
919 return;
920 }
921 m = ml->m;
922
923 if (m->clients == NULL) {
924 m->clients = n;
925 } else {
926 for (w = m->clients; w->next != NULL; w = w->next)
927 ;
928 w->next = n;
929 }
930 pthread_mutex_unlock(&mpl_lock);
931
932 Mfprintf(m->sout, "mfunnel: added new client %s\n", n->name);
933
934 /* send client a prompt */
935 mnstr_flush(fout);
936}
937
938void
939multiplexRemoveClient(multiplex *m, multiplex_client *c)
940{
941 multiplex_client *w;
942 multiplex_client *p = NULL;
943
944 Mfprintf(m->sout, "mfunnel: removing client %s\n", c->name);
945
946 for (w = m->clients; w != NULL; w = w->next) {
947 if (w == c) {
948 if (w == m->clients) {
949 m->clients = w->next;
950 } else {
951 p->next = w->next;
952 }
953 c->next = NULL;
954 close_stream(c->fdin);
955 close_stream(c->fout);
956 free(c->name);
957 free(c);
958 break;
959 }
960 p = w;
961 }
962}
963
964/* vim:set ts=4 sw=4 noexpandtab: */
965