1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | #include "monetdb_config.h" |
10 | #include <unistd.h> |
11 | #include <string.h> |
12 | #include <sys/types.h> |
13 | #ifdef HAVE_POLL_H |
14 | #include <poll.h> |
15 | #endif |
16 | |
17 | #include "mapi.h" |
18 | #include "mutils.h" /* MT_lockf */ |
19 | #include <fcntl.h> |
20 | |
21 | #include "utils/glob.h" |
22 | |
23 | #include "merovingian.h" |
24 | #include "discoveryrunner.h" |
25 | #include "multiplex-funnel.h" |
26 | |
27 | #ifndef HAVE_PIPE2 |
28 | #define pipe2(pipefd, flags) pipe(pipefd) |
29 | #endif |
30 | |
31 | #ifndef O_CLOEXEC |
32 | #define O_CLOEXEC 0 |
33 | #endif |
34 | |
35 | typedef struct _multiplexlist { |
36 | multiplex *m; |
37 | struct _multiplexlist *next; |
38 | } multiplexlist; |
39 | |
40 | static multiplexlist *multiplexes = NULL; |
41 | static pthread_mutex_t mpl_lock = PTHREAD_MUTEX_INITIALIZER; |
42 | static pthread_t mfmanager = 0; |
43 | static int mfpipe[2]; |
44 | |
45 | static void *multiplexThread(void *d); |
46 | |
47 | |
48 | /** |
49 | * Connections from all multiplex funnels are maintained by a single |
50 | * thread that resolves and creates connections upon updates on the |
51 | * discovery space. Connections aren't made/checked/updated upon their |
52 | * usage, because this introduces delays for the clients. This is in |
53 | * particular an issue when a target is updated to point to another |
54 | * database. To maintain a stable query performance, the connection |
55 | * creation must happen in the background and set life once established. |
56 | */ |
57 | static void * |
58 | MFconnectionManager(void *d) |
59 | { |
60 | int i; |
61 | multiplex *m; |
62 | multiplexlist *w; |
63 | char buf[1024]; |
64 | size_t len; |
65 | char *msg; |
66 | #ifdef HAVE_POLL |
67 | struct pollfd pfd; |
68 | #else |
69 | struct timeval tv; |
70 | fd_set fds; |
71 | #endif |
72 | |
73 | (void)d; |
74 | |
75 | while (_mero_keep_listening == 1) { |
76 | #ifdef HAVE_POLL |
77 | pfd = (struct pollfd) {.fd = mfpipe[0], .events = POLLIN}; |
78 | /* wait up to 5 seconds */ |
79 | i = poll(&pfd, 1, 5000); |
80 | #else |
81 | FD_ZERO(&fds); |
82 | FD_SET(mfpipe[0], &fds); |
83 | |
84 | /* wait up to 5 seconds */ |
85 | tv.tv_sec = 5; |
86 | tv.tv_usec = 0; |
87 | i = select(mfpipe[0] + 1, &fds, NULL, NULL, &tv); |
88 | #endif |
89 | if (i == 0) |
90 | continue; |
91 | if (i == -1 && errno != EINTR) { |
92 | Mfprintf(stderr, "failed to select on mfpipe: %s\n" , |
93 | strerror(errno)); |
94 | break; |
95 | } |
96 | /* coverity[string_null_argument] */ |
97 | if (read(mfpipe[0], &msg, sizeof(msg)) < 0) { |
98 | Mfprintf(stderr, "failed reading from notification pipe: %s\n" , |
99 | strerror(errno)); |
100 | break; |
101 | } |
102 | /* we just received a POINTER to a string! */ |
103 | |
104 | /* intended behaviour: |
105 | * - additions don't change any connection targets, they only |
106 | * fill in gaps (conn == NULL) |
107 | * - removals of targets in use, cause a re-lookup of the |
108 | * original pattern, on failure, conn is left NULL |
109 | */ |
110 | pthread_mutex_lock(&mpl_lock); |
111 | if (msg[0] == '+') { /* addition */ |
112 | for (w = multiplexes; w != NULL; w = w->next) { |
113 | m = w->m; |
114 | for (i = 0; i < m->dbcc; i++) { |
115 | if (m->dbcv[i]->conn == NULL) { |
116 | len = snprintf(buf, sizeof(buf), "%s/*" , |
117 | m->dbcv[i]->database); |
118 | if (len >= sizeof(buf)) { |
119 | Mfprintf(stderr, "buffer buf too small, " |
120 | "increase size in %s:%d\n" , |
121 | __FILE__, __LINE__); |
122 | continue; |
123 | } |
124 | /* avoid double /'*'/'* (no ') */ |
125 | if (len >= 4 && |
126 | buf[len - 3] == '*' && buf[len - 4] == '/') |
127 | buf[len - 2] = '\0'; |
128 | if (db_glob(buf, msg + 1) == 1) { |
129 | sabdb *stats; |
130 | Mapi tm = NULL; |
131 | /* match! eat away trailing / (for matching) */ |
132 | msg[strlen(msg) - 1] = '\0'; |
133 | stats = getRemoteDB(msg + 1); |
134 | if (stats == NULL) { |
135 | Mfprintf(stderr, "target %s cannot be resolved " |
136 | "despite being just discovered as %s\n" , |
137 | m->dbcv[i]->database, msg + 1); |
138 | continue; |
139 | } |
140 | snprintf(buf, sizeof(buf), "%s%s" , |
141 | stats->conns->val, stats->dbname); |
142 | msab_freeStatus(&stats); |
143 | Mfprintf(stdout, "setting up multiplexer " |
144 | "target %s->%s\n" , |
145 | m->dbcv[i]->database, buf); |
146 | tm = mapi_mapiuri(buf, |
147 | m->dbcv[i]->user, m->dbcv[i]->pass, "sql" ); |
148 | if (mapi_reconnect(tm) == MOK) { |
149 | m->dbcv[i]->conn = tm; |
150 | mapi_cache_limit(tm, -1); /* don't page */ |
151 | } else { |
152 | Mfprintf(stdout, "failed to connect to %s: %s\n" , |
153 | buf, mapi_error_str(tm)); |
154 | mapi_destroy(tm); |
155 | } |
156 | } |
157 | } |
158 | } |
159 | } |
160 | } else { /* removal */ |
161 | for (w = multiplexes; w != NULL; w = w->next) { |
162 | m = w->m; |
163 | for (i = 0; i < m->dbcc; i++) { |
164 | if (m->dbcv[i]->conn != NULL) { |
165 | len = snprintf(buf, sizeof(buf), "%s/*" , |
166 | m->dbcv[i]->database); |
167 | if (len >= sizeof(buf)) { |
168 | Mfprintf(stderr, "buffer buf too small, " |
169 | "increase size in %s:%d\n" , |
170 | __FILE__, __LINE__); |
171 | continue; |
172 | } |
173 | /* avoid double /'*'/'* (no ') */ |
174 | if (len >= 4 && |
175 | buf[len - 3] == '*' && buf[len - 4] == '/') |
176 | buf[len - 2] = '\0'; |
177 | if (db_glob(buf, msg + 1) == 1) { |
178 | /* reevaluate, to see if connection is still |
179 | * available */ |
180 | sabdb *walk; |
181 | sabdb *stats = getRemoteDB(m->dbcv[i]->database); |
182 | Mapi tm = m->dbcv[i]->conn; |
183 | const char *uri = mapi_get_uri(tm); |
184 | if (stats == NULL) { |
185 | Mfprintf(stderr, "target %s can no longer " |
186 | "be resolved\n" , |
187 | m->dbcv[i]->database); |
188 | /* schedule to drop connection */ |
189 | m->dbcv[i]->newconn = NULL; |
190 | m->dbcv[i]->connupdate = 1; |
191 | continue; |
192 | } |
193 | /* walk all connections, in an attempt to |
194 | * see if the original connection is still |
195 | * available, despite the removal of the |
196 | * server we got a message for */ |
197 | for (walk = stats; walk != NULL; walk = walk->next) { |
198 | snprintf(buf, sizeof(buf), "%s%s" , |
199 | walk->conns->val, walk->dbname); |
200 | if (strcmp(uri, buf) == 0) |
201 | break; |
202 | } |
203 | if (walk == NULL) { |
204 | snprintf(buf, sizeof(buf), "%s%s" , |
205 | stats->conns->val, stats->dbname); |
206 | Mfprintf(stdout, "changing multiplexer target %s: %s->%s\n" , |
207 | m->dbcv[i]->database, uri, buf); |
208 | tm = mapi_mapiuri(buf, |
209 | m->dbcv[i]->user, m->dbcv[i]->pass, |
210 | "sql" ); |
211 | if (mapi_reconnect(tm) != MOK) { |
212 | Mfprintf(stderr, "mapi_reconnect for %s " |
213 | "failed: %s\n" , |
214 | m->dbcv[i]->database, |
215 | mapi_error_str(tm)); |
216 | mapi_destroy(tm); |
217 | /* schedule connection for removal */ |
218 | m->dbcv[i]->newconn = NULL; |
219 | m->dbcv[i]->connupdate = 1; |
220 | msab_freeStatus(&stats); |
221 | continue; |
222 | } |
223 | mapi_cache_limit(tm, -1); /* don't page */ |
224 | |
225 | /* let the new connection go live */ |
226 | m->dbcv[i]->newconn = tm; |
227 | m->dbcv[i]->connupdate = 1; |
228 | } |
229 | msab_freeStatus(&stats); |
230 | } |
231 | } |
232 | } |
233 | } |
234 | } |
235 | pthread_mutex_unlock(&mpl_lock); |
236 | |
237 | free(msg); /* alloced by multiplexNotify* */ |
238 | } |
239 | return NULL; |
240 | } |
241 | |
242 | void |
243 | multiplexNotifyAddedDB(const char *database) |
244 | { |
245 | char dbslash[256]; |
246 | char *p; |
247 | |
248 | if (mfmanager == 0) |
249 | return; |
250 | |
251 | snprintf(dbslash, sizeof(dbslash), "+%s/" , database); |
252 | p = strdup(dbslash); |
253 | if (write(mfpipe[1], &p, sizeof(p)) != sizeof(p)) { |
254 | Mfprintf(stderr, "failed to write notify added message to mfpipe\n" ); |
255 | free(p); |
256 | } |
257 | /* p is freed by MFconnectionManager */ |
258 | /* coverity[leaked_storage] */ |
259 | } |
260 | |
261 | void |
262 | multiplexNotifyRemovedDB(const char *database) |
263 | { |
264 | char dbslash[256]; |
265 | char *p; |
266 | |
267 | if (mfmanager == 0) |
268 | return; |
269 | |
270 | snprintf(dbslash, sizeof(dbslash), "-%s/" , database); |
271 | p = strdup(dbslash); |
272 | if (write(mfpipe[1], &p, sizeof(p)) != sizeof(p)) { |
273 | Mfprintf(stderr, "failed to write notify removed message to mfpipe\n" ); |
274 | free(p); |
275 | } |
276 | /* p is freed by MFconnectionManager */ |
277 | /* coverity[leaked_storage] */ |
278 | } |
279 | |
280 | /* ultra ugly, we peek inside Sabaoth's internals to update the uplog |
281 | * file */ |
282 | extern char *_sabaoth_internal_dbname; |
283 | |
284 | err |
285 | multiplexInit(char *name, char *pattern, FILE *sout, FILE *serr) |
286 | { |
287 | multiplex *m = malloc(sizeof(multiplex)); |
288 | multiplexlist *mpl; |
289 | char buf[256]; |
290 | char *p, *q; |
291 | int i; |
292 | |
293 | /* the multiplex targets are given separated by commas, split them |
294 | * out in multiplex_database entries */ |
295 | /* user+pass@pattern,user+pass@pattern,... */ |
296 | |
297 | m->tid = 0; |
298 | m->gdklock = -1; |
299 | m->shutdown = 0; |
300 | m->name = strdup(name); |
301 | m->pool = strdup(pattern); |
302 | m->sout = sout; |
303 | m->serr = serr; |
304 | m->dbcc = 1; |
305 | p = m->pool; |
306 | while ((p = strchr(p, ',')) != NULL) { |
307 | m->dbcc++; |
308 | p++; |
309 | } |
310 | m->dbcv = malloc(sizeof(multiplex_database *) * m->dbcc); |
311 | p = m->pool; |
312 | i = 0; |
313 | while ((q = strchr(p, ',')) != NULL) { |
314 | m->dbcv[i] = malloc(sizeof(multiplex_database)); |
315 | m->dbcv[i]->user = malloc(sizeof(char) * (q - p + 1)); |
316 | memcpy(m->dbcv[i]->user, p, q - p); |
317 | m->dbcv[i]->user[q - p] = '\0'; |
318 | if ((p = strchr(m->dbcv[i]->user, '+')) == NULL) { |
319 | err e = newErr("illegal target %s: missing '+'" , m->dbcv[i]->user); |
320 | for (; i >= 0; i--) { |
321 | free(m->dbcv[i]->user); |
322 | free(m->dbcv[i]); |
323 | } |
324 | free(m->dbcv); |
325 | free(m->pool); |
326 | free(m); |
327 | return(e); |
328 | } |
329 | *p = '\0'; |
330 | m->dbcv[i]->pass = p + 1; |
331 | if ((p = strchr(m->dbcv[i]->pass, '@')) == NULL) { |
332 | err e = newErr("illegal target %s+%s: missing '@'" , |
333 | m->dbcv[i]->user, m->dbcv[i]->pass); |
334 | for (; i >= 0; i--) { |
335 | free(m->dbcv[i]->user); |
336 | free(m->dbcv[i]); |
337 | } |
338 | free(m->dbcv); |
339 | free(m->pool); |
340 | free(m); |
341 | return(e); |
342 | } |
343 | *p = '\0'; |
344 | m->dbcv[i]->database = p + 1; |
345 | m->dbcv[i]->conn = NULL; |
346 | m->dbcv[i]->newconn = NULL; |
347 | m->dbcv[i]->connupdate = 0; |
348 | |
349 | i++; |
350 | p = q + 1; |
351 | } |
352 | m->dbcv[i] = malloc(sizeof(multiplex_database)); |
353 | m->dbcv[i]->user = strdup(p); |
354 | if ((p = strchr(m->dbcv[i]->user, '+')) == NULL) { |
355 | err e = newErr("illegal target %s: missing '+'" , m->dbcv[i]->user); |
356 | for (; i >= 0; i--) { |
357 | free(m->dbcv[i]->user); |
358 | free(m->dbcv[i]); |
359 | } |
360 | free(m->dbcv); |
361 | free(m->pool); |
362 | free(m); |
363 | return(e); |
364 | } else { |
365 | *p = '\0'; |
366 | m->dbcv[i]->pass = p + 1; |
367 | if ((p = strchr(m->dbcv[i]->pass, '@')) == NULL) { |
368 | err e = newErr("illegal target %s+%s: missing '@'" , |
369 | m->dbcv[i]->user, m->dbcv[i]->pass); |
370 | for (; i >= 0; i--) { |
371 | free(m->dbcv[i]->user); |
372 | free(m->dbcv[i]); |
373 | } |
374 | free(m->dbcv); |
375 | free(m->pool); |
376 | free(m); |
377 | return(e); |
378 | } else { |
379 | *p = '\0'; |
380 | m->dbcv[i]->database = p + 1; |
381 | m->dbcv[i]->conn = NULL; |
382 | m->dbcv[i]->newconn = NULL; |
383 | m->dbcv[i]->connupdate = 0; |
384 | } |
385 | } |
386 | |
387 | m->clients = NULL; /* initially noone is connected */ |
388 | |
389 | if (mfmanager == 0) { |
390 | pthread_attr_t detach; |
391 | pthread_attr_init(&detach); |
392 | pthread_attr_setdetachstate(&detach, PTHREAD_CREATE_DETACHED); |
393 | |
394 | /* create communication channel */ |
395 | if (pipe2(mfpipe, O_CLOEXEC) != 0) |
396 | Mfprintf(stderr, "failed to create mfpipe: %s\n" , strerror(errno)); |
397 | else { |
398 | #if !defined(HAVE_PIPE2) || O_CLOEXEC == 0 |
399 | (void) fcntl(mfpipe[0], F_SETFD, FD_CLOEXEC); |
400 | (void) fcntl(mfpipe[1], F_SETFD, FD_CLOEXEC); |
401 | #endif |
402 | Mfprintf(stdout, "starting multiplex-funnel connection manager\n" ); |
403 | if ((i = pthread_create(&mfmanager, &detach, |
404 | MFconnectionManager, NULL)) != 0) { |
405 | Mfprintf(stderr, "failed to start MFconnectionManager: %s\n" , |
406 | strerror(i)); |
407 | mfmanager = 0; |
408 | } |
409 | } |
410 | } |
411 | |
412 | for (i = 0; i < m->dbcc; i++) { |
413 | sabdb *stats = getRemoteDB(m->dbcv[i]->database); |
414 | if (stats == NULL) { |
415 | Mfprintf(serr, "mfunnel: target %s cannot be resolved\n" , |
416 | m->dbcv[i]->database); |
417 | continue; |
418 | } |
419 | snprintf(buf, sizeof(buf), "%s%s" , stats->conns->val, stats->dbname); |
420 | Mfprintf(sout, "mfunnel: setting up multiplexer target %s->%s\n" , |
421 | m->dbcv[i]->database, buf); |
422 | m->dbcv[i]->conn = mapi_mapiuri(buf, |
423 | m->dbcv[i]->user, m->dbcv[i]->pass, "sql" ); |
424 | msab_freeStatus(&stats); |
425 | } |
426 | |
427 | pthread_mutex_lock(&mpl_lock); |
428 | mpl = malloc(sizeof(multiplexlist)); |
429 | mpl->next = multiplexes; |
430 | mpl->m = m; |
431 | multiplexes = mpl; |
432 | pthread_mutex_unlock(&mpl_lock); |
433 | |
434 | if ((i = pthread_create(&m->tid, NULL, |
435 | multiplexThread, (void *)m)) != 0) |
436 | { |
437 | /* FIXME: we don't cleanup here */ |
438 | return(newErr("starting thread for multiplex-funnel %s failed: %s" , |
439 | name, strerror(i))); |
440 | } |
441 | |
442 | /* fake lock such that sabaoth believes we are (still) running, we |
443 | * rely on merovingian moving to dbfarm here */ |
444 | snprintf(buf, sizeof(buf), "%s/.gdk_lock" , name); |
445 | if ((m->gdklock = MT_lockf(buf, F_TLOCK, 4, 1)) == -1) { |
446 | /* locking failed, FIXME: cleanup here */ |
447 | Mfprintf(serr, "mfunnel: another instance is already running?\n" ); |
448 | return(newErr("cannot lock for %s, already locked" , name)); |
449 | } else if (m->gdklock == -2) { |
450 | /* directory or something doesn't exist, FIXME: cleanup */ |
451 | Mfprintf(serr, "mfunnel: unable to create %s file: %s\n" , |
452 | buf, strerror(errno)); |
453 | return(newErr("cannot create lock for %s" , name)); |
454 | } |
455 | |
456 | /* hack alert: set sabaoth uplog status by cheating with its |
457 | * internals -- we know dbname should be NULL, and hack it for the |
458 | * purpose of this moment, see also extern declaration before this |
459 | * function */ |
460 | _sabaoth_internal_dbname = name; |
461 | if ((p = msab_registerStarting()) != NULL || |
462 | (p = msab_registerStarted()) != NULL || |
463 | (p = msab_marchScenario("mfunnel" )) != NULL) |
464 | { |
465 | err em; |
466 | |
467 | _sabaoth_internal_dbname = NULL; |
468 | |
469 | Mfprintf(serr, "mfunnel: unable to startup %s: %s\n" , |
470 | name, p); |
471 | em = newErr("cannot create funnel %s due to sabaoth: %s" , name, p); |
472 | free(p); |
473 | |
474 | return(em); |
475 | } |
476 | _sabaoth_internal_dbname = NULL; |
477 | |
478 | return(NO_ERR); |
479 | } |
480 | |
481 | void |
482 | multiplexDestroy(char *mp) |
483 | { |
484 | multiplexlist *ml, *mlp; |
485 | multiplex *m = NULL; |
486 | char *msg; |
487 | |
488 | /* lock and remove */ |
489 | pthread_mutex_lock(&mpl_lock); |
490 | mlp = NULL; |
491 | for (ml = multiplexes; ml != NULL; ml = ml->next) { |
492 | if (strcmp(ml->m->name, mp) == 0) { |
493 | m = ml->m; |
494 | if (mlp == NULL) { |
495 | multiplexes = ml->next; |
496 | } else { |
497 | mlp->next = ml->next; |
498 | } |
499 | break; |
500 | } |
501 | mlp = ml; |
502 | } |
503 | pthread_mutex_unlock(&mpl_lock); |
504 | |
505 | if (m == NULL) { |
506 | Mfprintf(stderr, "request to remove non-existing " |
507 | "multiplex-funnel: %s\n" , mp); |
508 | return; |
509 | } |
510 | |
511 | /* deregister from sabaoth, same hack alert as at Init */ |
512 | _sabaoth_internal_dbname = m->name; |
513 | if ((msg = msab_registerStop()) != NULL || |
514 | (msg = msab_wildRetreat()) != NULL) { |
515 | Mfprintf(stderr, "mfunnel: %s\n" , msg); |
516 | free(msg); |
517 | } |
518 | _sabaoth_internal_dbname = NULL; |
519 | |
520 | /* signal the thread to stop and cleanup */ |
521 | m->shutdown = 1; |
522 | pthread_join(m->tid, NULL); |
523 | } |
524 | |
525 | static void |
526 | multiplexQuery(multiplex *m, char *buf, stream *fout) |
527 | { |
528 | int i; |
529 | const char *t; |
530 | MapiHdl h; |
531 | int64_t rlen; |
532 | int fcnt; |
533 | int qtype; |
534 | |
535 | /* first send the query to all, such that we don't waste time |
536 | * waiting for each server to produce an answer, but wait for all of |
537 | * them concurrently */ |
538 | for (i = 0; i < m->dbcc; i++) { |
539 | if (m->dbcv[i]->conn == NULL) { |
540 | mnstr_printf(fout, "!connection for %s is currently unresolved\n" , |
541 | m->dbcv[i]->database); |
542 | mnstr_flush(fout); |
543 | Mfprintf(m->serr, "failed to find a provider for %s\n" , |
544 | m->dbcv[i]->database); |
545 | return; |
546 | } |
547 | if (!mapi_is_connected(m->dbcv[i]->conn)) { |
548 | if (mapi_reconnect(m->dbcv[i]->conn) != MOK) { |
549 | mnstr_printf(fout, "!failed to establish connection " |
550 | "for %s: %s\n" , m->dbcv[i]->database, |
551 | mapi_error_str(m->dbcv[i]->conn)); |
552 | mnstr_flush(fout); |
553 | Mfprintf(m->serr, "mapi_reconnect for %s failed: %s\n" , |
554 | m->dbcv[i]->database, |
555 | mapi_error_str(m->dbcv[i]->conn)); |
556 | return; |
557 | } |
558 | mapi_cache_limit(m->dbcv[i]->conn, -1); /* don't page */ |
559 | } |
560 | |
561 | m->dbcv[i]->hdl = mapi_send(m->dbcv[i]->conn, buf); |
562 | } |
563 | /* fail as soon as one of the servers fails */ |
564 | t = NULL; |
565 | rlen = 0; |
566 | fcnt = -1; |
567 | qtype = -1; |
568 | for (i = 0; i < m->dbcc; i++) { |
569 | h = m->dbcv[i]->hdl; |
570 | /* check for responses */ |
571 | if (mapi_read_response(h) != MOK) { |
572 | t = mapi_result_error(h); |
573 | mnstr_printf(fout, "!node %s failed: %s\n" , |
574 | m->dbcv[i]->database, t ? t : "no response" ); |
575 | Mfprintf(m->serr, "mapi_read_response for %s failed: %s\n" , |
576 | m->dbcv[i]->database, t ? t : "(no error)" ); |
577 | break; |
578 | } |
579 | /* check for errors */ |
580 | if ((t = mapi_result_error(h)) != NULL) { |
581 | mnstr_printf(fout, "!node %s failed: %s\n" , |
582 | m->dbcv[i]->database, t); |
583 | Mfprintf(m->serr, "mapi_result_error for %s: %s\n" , |
584 | m->dbcv[i]->database, t); |
585 | break; |
586 | } |
587 | /* check for result type consistency */ |
588 | if (qtype == -1) { |
589 | qtype = mapi_get_querytype(h); |
590 | } else if (qtype != mapi_get_querytype(h)) { |
591 | t = "err" ; /* for cleanup code below */ |
592 | mnstr_printf(fout, "!node %s returned a different type of result " |
593 | "than the previous node\n" , m->dbcv[i]->database); |
594 | Mfprintf(m->serr, "encountered mix of result types, " |
595 | "got %d, expected %d\n" , mapi_get_querytype(h), qtype); |
596 | break; |
597 | } |
598 | |
599 | /* determine correctness based on headers */ |
600 | switch (qtype) { |
601 | case Q_PARSE: |
602 | /* mapi returns Q_PARSE for empty results */ |
603 | continue; |
604 | case Q_TABLE: |
605 | /* prepare easily appending of all results */ |
606 | rlen += mapi_get_row_count(h); |
607 | if (fcnt == -1) { |
608 | fcnt = mapi_get_field_count(h); |
609 | } else if (mapi_get_field_count(h) != fcnt) { |
610 | t = "err" ; /* for cleanup code below */ |
611 | mnstr_printf(fout, "!node %s has mismatch in result fields\n" , |
612 | m->dbcv[i]->database); |
613 | Mfprintf(m->serr, "mapi_get_field_count inconsistent for %s: " |
614 | "got %d, expected %d\n" , |
615 | m->dbcv[i]->database, |
616 | mapi_get_field_count(h), fcnt); |
617 | } |
618 | break; |
619 | case Q_UPDATE: |
620 | /* just pile up the update counts */ |
621 | rlen += mapi_rows_affected(h); |
622 | break; |
623 | case Q_SCHEMA: |
624 | /* accept, just write ok lateron */ |
625 | break; |
626 | case Q_TRANS: |
627 | /* just check all servers end up in the same state */ |
628 | if (fcnt == -1) { |
629 | fcnt = mapi_get_autocommit(m->dbcv[i]->conn); |
630 | } else if (fcnt != mapi_get_autocommit(m->dbcv[i]->conn)) { |
631 | t = "err" ; /* for cleanup code below */ |
632 | mnstr_printf(fout, "!node %s has mismatch in transaction state\n" , |
633 | m->dbcv[i]->database); |
634 | Mfprintf(m->serr, "mapi_get_autocommit inconsistent for %s: " |
635 | "got %d, expected %d\n" , |
636 | m->dbcv[i]->database, |
637 | mapi_get_autocommit(m->dbcv[i]->conn), fcnt); |
638 | } |
639 | break; |
640 | default: |
641 | t = "err" ; /* for cleanup code below */ |
642 | mnstr_printf(fout, "!node %s returned unhandled result type\n" , |
643 | m->dbcv[i]->database); |
644 | Mfprintf(m->serr, "unhandled querytype for %s: %d\n" , |
645 | m->dbcv[i]->database, mapi_get_querytype(h)); |
646 | break; |
647 | } |
648 | if (t != NULL) |
649 | break; |
650 | } |
651 | |
652 | /* error or empty result, just end here */ |
653 | if (t != NULL || qtype == Q_PARSE) { |
654 | mnstr_flush(fout); |
655 | for (i = 0; i < m->dbcc; i++) |
656 | mapi_close_handle(m->dbcv[i]->hdl); |
657 | return; |
658 | } |
659 | |
660 | /* write output to client */ |
661 | switch (qtype) { |
662 | case Q_TABLE: |
663 | /* Compose the header. For the table id, we just send 0, |
664 | * such that we never get a close request. Steal headers |
665 | * from the first node. */ |
666 | mnstr_printf(fout, "&%d 0 %" PRId64 " %d %" PRId64 "\n" , |
667 | Q_TABLE, rlen, fcnt, rlen); |
668 | /* now read the answers, and write them directly to the client */ |
669 | for (i = 0; i < m->dbcc; i++) { |
670 | h = m->dbcv[i]->hdl; |
671 | while ((t = mapi_fetch_line(h)) != NULL) |
672 | if (i == 0 || *t != '%') /* skip other server's headers */ |
673 | mnstr_printf(fout, "%s\n" , t); |
674 | } |
675 | break; |
676 | case Q_UPDATE: |
677 | /* Write a single header for all update counts, to sort of |
678 | * complement the transparency created for Q_TABLE results, |
679 | * but forget about last id data (wouldn't make sense if |
680 | * we'd emit multiple update counts either) */ |
681 | mnstr_printf(fout, "&%d %" PRId64 " -1\n" , Q_UPDATE, rlen); |
682 | break; |
683 | case Q_SCHEMA: |
684 | mnstr_printf(fout, "&%d\n" , Q_SCHEMA); |
685 | break; |
686 | case Q_TRANS: |
687 | mnstr_printf(fout, "&%d %c\n" , Q_TRANS, fcnt ? 't' : 'f'); |
688 | break; |
689 | } |
690 | mnstr_flush(fout); |
691 | /* finish up */ |
692 | for (i = 0; i < m->dbcc; i++) |
693 | mapi_close_handle(m->dbcv[i]->hdl); |
694 | } |
695 | |
696 | static void * |
697 | multiplexThread(void *d) |
698 | { |
699 | multiplex *m = (multiplex *)d; |
700 | #ifdef HAVE_POLL |
701 | struct pollfd *pfd; |
702 | #else |
703 | struct timeval tv; |
704 | fd_set fds; |
705 | #endif |
706 | multiplex_client *c; |
707 | int msock = -1; |
708 | char buf[10 * BLOCK + 1]; |
709 | ssize_t len; |
710 | int r, i; |
711 | dpair p, q; |
712 | |
713 | /* select on upstream clients, on new data, read query, forward, |
714 | * union all results, send back, and restart cycle. */ |
715 | |
716 | while (m->shutdown == 0) { |
717 | #ifdef HAVE_POLL |
718 | msock = 0; |
719 | for (c = m->clients; c != NULL; c = c->next) { |
720 | msock++; |
721 | } |
722 | pfd = malloc(msock * sizeof(struct pollfd)); |
723 | msock = 0; |
724 | for (c = m->clients; c != NULL; c = c->next) { |
725 | pfd[msock++] = (struct pollfd) {.fd = c->sock, .events = POLLIN}; |
726 | } |
727 | /* wait up to 1 second. */ |
728 | r = poll(pfd, msock, 1000); |
729 | #else |
730 | FD_ZERO(&fds); |
731 | for (c = m->clients; c != NULL; c = c->next) { |
732 | FD_SET(c->sock, &fds); |
733 | if (c->sock > msock) |
734 | msock = c->sock; |
735 | } |
736 | |
737 | /* wait up to 1 second. */ |
738 | tv.tv_sec = 1; |
739 | tv.tv_usec = 0; |
740 | r = select(msock + 1, &fds, NULL, NULL, &tv); |
741 | #endif |
742 | |
743 | /* evaluate if connections have to be switched */ |
744 | for (i = 0; i < m->dbcc; i++) { |
745 | if (m->dbcv[i]->connupdate) { |
746 | if (m->dbcv[i]->newconn != NULL) { |
747 | /* put new connection live */ |
748 | Mfprintf(m->sout, "performing deferred connection cycle " |
749 | "for %s from %s to %s\n" , |
750 | m->dbcv[i]->database, |
751 | m->dbcv[i]->conn != NULL ? |
752 | mapi_get_uri(m->dbcv[i]->conn) : |
753 | "<unconnected>" , |
754 | mapi_get_uri(m->dbcv[i]->newconn)); |
755 | mapi_disconnect(m->dbcv[i]->conn); |
756 | mapi_destroy(m->dbcv[i]->conn); |
757 | m->dbcv[i]->conn = m->dbcv[i]->newconn; |
758 | m->dbcv[i]->newconn = NULL; |
759 | m->dbcv[i]->connupdate = 0; |
760 | } else { |
761 | /* put new connection live */ |
762 | Mfprintf(m->sout, "performing deferred connection drop " |
763 | "for %s from %s\n" , |
764 | m->dbcv[i]->database, |
765 | m->dbcv[i]->conn != NULL ? |
766 | mapi_get_uri(m->dbcv[i]->conn) : |
767 | "<unconnected>" ); |
768 | mapi_disconnect(m->dbcv[i]->conn); |
769 | mapi_destroy(m->dbcv[i]->conn); |
770 | m->dbcv[i]->conn = NULL; |
771 | m->dbcv[i]->connupdate = 0; |
772 | } |
773 | } |
774 | } |
775 | |
776 | /* nothing interesting has happened */ |
777 | if (r <= 0) { |
778 | #ifdef HAVE_POLL |
779 | free(pfd); |
780 | #endif |
781 | continue; |
782 | } |
783 | for (c = m->clients; c != NULL; c = c->next) { |
784 | #ifdef HAVE_POLL |
785 | for (i = 0; i < msock; i++) { |
786 | if (pfd[i].fd == c->sock) |
787 | break; |
788 | } |
789 | if (i == msock || (pfd[i].revents & POLLIN) == 0) |
790 | continue; |
791 | #else |
792 | if (!FD_ISSET(c->sock, &fds)) |
793 | continue; |
794 | #endif |
795 | if ((len = mnstr_read(c->fdin, buf, 1, 10 * BLOCK)) < 0) { |
796 | /* error, or some garbage */ |
797 | multiplexRemoveClient(m, c); |
798 | /* don't crash on now stale c */ |
799 | break; |
800 | } else if (len == 0) { |
801 | /* flush from client, ignore */ |
802 | continue; |
803 | } |
804 | |
805 | buf[len] = '\0'; |
806 | switch (*buf) { |
807 | case 's': |
808 | case 'S': |
809 | /* accepted, just SQL queries */ |
810 | break; |
811 | case 'X': |
812 | /* ignored, some clients just really insist on using |
813 | * these */ |
814 | mnstr_flush(c->fout); |
815 | continue; |
816 | default: |
817 | mnstr_printf(c->fout, "!modifier %c not supported by " |
818 | "multiplex-funnel\n" , *buf); |
819 | mnstr_flush(c->fout); |
820 | Mfprintf(m->serr, "client attempted to perform %c " |
821 | "type query: %s\n" , *buf, buf); |
822 | continue; |
823 | } |
824 | /* we assume (and require) the query to fit in one block, |
825 | * that is, we only forward the first block, without having |
826 | * any idea what it is */ |
827 | multiplexQuery(m, buf + 1, c->fout); |
828 | } |
829 | #ifdef HAVE_POLL |
830 | free(pfd); |
831 | #endif |
832 | } |
833 | |
834 | Mfprintf(stdout, "stopping mfunnel '%s'\n" , m->name); |
835 | |
836 | /* free, cleanup, etc. */ |
837 | while (m->clients != NULL) { |
838 | c = m->clients; |
839 | close_stream(c->fdin); |
840 | close_stream(c->fout); |
841 | free(c->name); |
842 | m->clients = m->clients->next; |
843 | free(c); |
844 | } |
845 | for (i = 0; i < m->dbcc; i++) { |
846 | if (m->dbcv[i]->connupdate && m->dbcv[i]->newconn != NULL) |
847 | mapi_destroy(m->dbcv[i]->newconn); |
848 | if (m->dbcv[i]->conn != NULL) |
849 | mapi_destroy(m->dbcv[i]->conn); |
850 | free(m->dbcv[i]->user); |
851 | /* pass and database belong to the same malloced block from user */ |
852 | } |
853 | fflush(m->sout); |
854 | fclose(m->sout); |
855 | fflush(m->serr); |
856 | fclose(m->serr); |
857 | close(m->gdklock); |
858 | free(m->pool); |
859 | |
860 | /* last bit, remove from logger structure */ |
861 | pthread_mutex_lock(&_mero_topdp_lock); |
862 | |
863 | q = _mero_topdp->next; /* skip console */ |
864 | p = q->next; |
865 | while (p != NULL) { |
866 | if (p->type == MEROFUN && strcmp(p->dbname, m->name) == 0) { |
867 | /* log everything that's still in the pipes */ |
868 | logFD(p->out, "MSG" , p->dbname, (long long int)p->pid, _mero_logfile, 1); |
869 | /* remove from the list */ |
870 | q->next = p->next; |
871 | /* close the descriptors */ |
872 | close(p->out); |
873 | close(p->err); |
874 | Mfprintf(stdout, "mfunnel '%s' has stopped\n" , p->dbname); |
875 | free(p->dbname); |
876 | free(p); |
877 | break; |
878 | } |
879 | q = p; |
880 | p = q->next; |
881 | } |
882 | |
883 | pthread_mutex_unlock(&_mero_topdp_lock); |
884 | |
885 | free(m->name); |
886 | free(m); |
887 | return NULL; |
888 | } |
889 | |
890 | void |
891 | multiplexAddClient(char *mp, int sock, stream *fout, stream *fdin, char *name) |
892 | { |
893 | multiplex_client *w; |
894 | multiplex_client *n = malloc(sizeof(multiplex_client)); |
895 | multiplexlist *ml; |
896 | multiplex *m; |
897 | |
898 | n->sock = sock; |
899 | n->fdin = fdin; |
900 | n->fout = fout; |
901 | n->name = strdup(name); |
902 | n->next = NULL; |
903 | |
904 | pthread_mutex_lock(&mpl_lock); |
905 | for (ml = multiplexes; ml != NULL; ml = ml->next) { |
906 | if (strcmp(ml->m->name, mp) == 0) |
907 | break; |
908 | } |
909 | if (ml == NULL) { |
910 | pthread_mutex_unlock(&mpl_lock); |
911 | Mfprintf(stderr, "failed to find multiplex-funnel '%s' for client %s\n" , |
912 | mp, name); |
913 | mnstr_printf(fout, "!monetdbd: internal error: could not find multiplex-funnel '%s'\n" , mp); |
914 | mnstr_flush(fout); |
915 | close_stream(fdin); |
916 | close_stream(fout); |
917 | free(n->name); |
918 | free(n); |
919 | return; |
920 | } |
921 | m = ml->m; |
922 | |
923 | if (m->clients == NULL) { |
924 | m->clients = n; |
925 | } else { |
926 | for (w = m->clients; w->next != NULL; w = w->next) |
927 | ; |
928 | w->next = n; |
929 | } |
930 | pthread_mutex_unlock(&mpl_lock); |
931 | |
932 | Mfprintf(m->sout, "mfunnel: added new client %s\n" , n->name); |
933 | |
934 | /* send client a prompt */ |
935 | mnstr_flush(fout); |
936 | } |
937 | |
938 | void |
939 | multiplexRemoveClient(multiplex *m, multiplex_client *c) |
940 | { |
941 | multiplex_client *w; |
942 | multiplex_client *p = NULL; |
943 | |
944 | Mfprintf(m->sout, "mfunnel: removing client %s\n" , c->name); |
945 | |
946 | for (w = m->clients; w != NULL; w = w->next) { |
947 | if (w == c) { |
948 | if (w == m->clients) { |
949 | m->clients = w->next; |
950 | } else { |
951 | p->next = w->next; |
952 | } |
953 | c->next = NULL; |
954 | close_stream(c->fdin); |
955 | close_stream(c->fout); |
956 | free(c->name); |
957 | free(c); |
958 | break; |
959 | } |
960 | p = w; |
961 | } |
962 | } |
963 | |
964 | /* vim:set ts=4 sw=4 noexpandtab: */ |
965 | |