1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * @a N.J. Nes P. Boncz, S. Mullender, M. Kersten
11 * @v 1.1
12 * @+ MAPI interface
13 * The complete Mapi library is available to setup
14 * communication with another Mserver.
15 *
16 * Clients may initialize a private listener to implement
17 * specific services. For example, in an OLTP environment
18 * it may make sense to have a listener for each transaction
19 * type, which simply parses a sequence of transaction parameters.
20 *
21 * Authorization of access to the server is handled as part
22 * of the client record initialization phase.
23 *
24 * This library internally uses pointer handles, which we replace with
25 * an index in a locally maintained table. It provides a handle
26 * to easily detect havoc clients.
27 *
28 * A cleaner and simplier interface for distributed processing is available in
29 * the module remote.
30 */
31#include "monetdb_config.h"
32#ifdef HAVE_MAPI
33#include "mal_mapi.h"
34#include <sys/types.h>
35#include "stream_socket.h"
36#include "mapi.h"
37#ifdef HAVE_OPENSSL
38# include <openssl/rand.h> /* RAND_bytes() */
39#else
40#ifdef HAVE_COMMONCRYPTO
41# include <CommonCrypto/CommonCrypto.h>
42# include <CommonCrypto/CommonRandom.h>
43#endif
44#endif
45#ifdef HAVE_WINSOCK_H /* Windows specific */
46# include <winsock.h>
47#else /* UNIX specific */
48# include <sys/select.h>
49# include <sys/socket.h>
50# include <unistd.h> /* gethostname() */
51# include <netinet/in.h> /* hton and ntoh */
52# include <arpa/inet.h> /* addr_in */
53#endif
54#ifdef HAVE_SYS_UN_H
55# include <sys/un.h>
56#endif
57#ifdef HAVE_NETDB_H
58# include <netdb.h>
59# include <netinet/in.h>
60#endif
61#ifdef HAVE_POLL_H
62#include <poll.h>
63#endif
64#ifdef HAVE_SYS_UIO_H
65# include <sys/uio.h>
66#endif
67#ifdef HAVE_FCNTL_H
68#include <fcntl.h>
69#endif
70
71#define SOCKPTR struct sockaddr *
72#ifdef HAVE_SOCKLEN_T
73#define SOCKLEN socklen_t
74#else
75#define SOCKLEN int
76#endif
77
78#if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC)
79#define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen)
80#endif
81
82static char seedChars[] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
83 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
84 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
85 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
86 '1', '2', '3', '4', '5', '6', '7', '8', '9', '0'};
87
88
89static void generateChallenge(str buf, int min, int max) {
90 size_t size;
91 size_t bte;
92 size_t i;
93
94 /* don't seed the randomiser here, or you get the same challenge
95 * during the same second */
96#ifdef HAVE_OPENSSL
97 if (RAND_bytes((unsigned char *) &size, (int) sizeof(size)) < 0)
98#else
99#ifdef HAVE_COMMONCRYPTO
100 if (CCRandomGenerateBytes(&size, sizeof(size)) != kCCSuccess)
101#endif
102#endif
103 size = rand();
104 size = (size % (max - min)) + min;
105#ifdef HAVE_OPENSSL
106 if (RAND_bytes((unsigned char *) buf, (int) size) >= 0)
107 for (i = 0; i < size; i++)
108 buf[i] = seedChars[((unsigned char *) buf)[i] % 62];
109 else
110#else
111#ifdef HAVE_COMMONCRYPTO
112 if (CCRandomGenerateBytes(buf, size) == kCCSuccess)
113 for (i = 0; i < size; i++)
114 buf[i] = seedChars[((unsigned char *) buf)[i] % 62];
115 else
116#endif
117#endif
118 for (i = 0; i < size; i++) {
119 bte = rand();
120 bte %= 62;
121 buf[i] = seedChars[bte];
122 }
123 buf[i] = '\0';
124}
125
126struct challengedata {
127 stream *in;
128 stream *out;
129 char challenge[13];
130};
131
132static void
133doChallenge(void *data)
134{
135 char *buf = GDKmalloc(BLOCK + 1);
136 char challenge[13];
137
138 stream *fdin = ((struct challengedata *) data)->in;
139 stream *fdout = ((struct challengedata *) data)->out;
140 bstream *bs;
141 ssize_t len = 0;
142 protocol_version protocol = PROTOCOL_9;
143 size_t buflen = BLOCK;
144
145 MT_thread_setworking("challenging client");
146#ifdef _MSC_VER
147 srand((unsigned int) GDKusec());
148#endif
149 memcpy(challenge, ((struct challengedata *) data)->challenge, sizeof(challenge));
150 GDKfree(data);
151 if (buf == NULL) {
152 fprintf(stderr, "#doChallenge" MAL_MALLOC_FAIL);
153 close_stream(fdin);
154 close_stream(fdout);
155 return;
156 }
157
158 // send the challenge over the block stream
159 mnstr_printf(fdout, "%s:mserver:9:%s:%s:%s:",
160 challenge,
161 mcrypt_getHashAlgorithms(),
162#ifdef WORDS_BIGENDIAN
163 "BIG",
164#else
165 "LIT",
166#endif
167 MONETDB5_PASSWDHASH
168 );
169 mnstr_flush(fdout);
170 /* get response */
171 if ((len = mnstr_read_block(fdin, buf, 1, BLOCK)) < 0) {
172 /* the client must have gone away, so no reason to write anything */
173 close_stream(fdin);
174 close_stream(fdout);
175 GDKfree(buf);
176 return;
177 }
178 buf[len] = 0;
179
180 if (strstr(buf, "PROT10")) {
181 char *errmsg = NULL;
182 char *buflenstrend, *buflenstr = strstr(buf, "PROT10");
183 compression_method comp;
184 protocol = PROTOCOL_10;
185 if ((buflenstr = strchr(buflenstr, ':')) == NULL ||
186 (buflenstr = strchr(buflenstr + 1, ':')) == NULL) {
187 mnstr_printf(fdout, "!buffer size needs to be set and bigger than %d\n", BLOCK);
188 close_stream(fdin);
189 close_stream(fdout);
190 GDKfree(buf);
191 return;
192 }
193 buflenstr++; /* position after ':' */
194 buflenstrend = strchr(buflenstr, ':');
195
196 if (buflenstrend) buflenstrend[0] = '\0';
197 buflen = atol(buflenstr);
198 if (buflenstrend) buflenstrend[0] = ':';
199
200 if (buflen < BLOCK) {
201 mnstr_printf(fdout, "!buffer size needs to be set and bigger than %d\n", BLOCK);
202 close_stream(fdin);
203 close_stream(fdout);
204 GDKfree(buf);
205 return;
206 }
207
208 comp = COMPRESSION_NONE;
209 if (strstr(buf, "COMPRESSION_SNAPPY")) {
210#ifdef HAVE_LIBSNAPPY
211 comp = COMPRESSION_SNAPPY;
212#else
213 errmsg = "!server does not support Snappy compression.\n";
214#endif
215 } else if (strstr(buf, "COMPRESSION_LZ4")) {
216#ifdef HAVE_LIBLZ4
217 comp = COMPRESSION_LZ4;
218#else
219 errmsg = "!server does not support LZ4 compression.\n";
220#endif
221 } else if (strstr(buf, "COMPRESSION_NONE")) {
222 comp = COMPRESSION_NONE;
223 } else {
224 errmsg = "!no compression type specified.\n";
225 }
226
227 if (errmsg) {
228 // incorrect compression type specified
229 mnstr_printf(fdout, "%s", errmsg);
230 close_stream(fdin);
231 close_stream(fdout);
232 GDKfree(buf);
233 return;
234 }
235
236 {
237 // convert the block_stream into a block_stream2
238 stream *from, *to;
239 from = block_stream2(fdin, buflen, comp);
240 to = block_stream2(fdout, buflen, comp);
241 if (from == NULL || to == NULL) {
242 GDKsyserror("SERVERlisten:"MAL_MALLOC_FAIL);
243 close_stream(fdin);
244 close_stream(fdout);
245 GDKfree(buf);
246 return;
247 }
248 fdin = from;
249 fdout = to;
250 }
251 }
252
253#ifdef DEBUG_SERVER
254 fprintf(stderr,"mal_mapi:Client accepted %s\n", buf);
255 fflush(stderr);
256#endif
257 bs = bstream_create(fdin, 128 * BLOCK);
258
259 if (bs == NULL){
260 mnstr_printf(fdout, "!allocation failure in the server\n");
261 close_stream(fdin);
262 close_stream(fdout);
263 GDKfree(buf);
264 GDKsyserror("SERVERlisten:"MAL_MALLOC_FAIL);
265 return;
266 }
267 bs->eof = true;
268 MSscheduleClient(buf, challenge, bs, fdout, protocol, buflen);
269}
270
271static ATOMIC_TYPE nlistener = ATOMIC_VAR_INIT(0); /* nr of listeners */
272static ATOMIC_TYPE serveractive = ATOMIC_VAR_INIT(0);
273static ATOMIC_TYPE serverexiting = ATOMIC_VAR_INIT(0); /* listeners should exit */
274static ATOMIC_TYPE threadno = ATOMIC_VAR_INIT(0); /* thread sequence no */
275
276static void
277SERVERlistenThread(SOCKET *Sock)
278{
279 char *msg = 0;
280 int retval;
281#ifdef HAVE_POLL
282 struct pollfd pfd[2];
283 nfds_t npfd;
284#else
285 struct timeval tv;
286 fd_set fds;
287#endif
288 SOCKET sock = INVALID_SOCKET;
289 SOCKET usock = INVALID_SOCKET;
290 SOCKET msgsock = INVALID_SOCKET;
291 struct challengedata *data;
292 MT_Id tid;
293 stream *s;
294
295 sock = Sock[0];
296 usock = Sock[1];
297 GDKfree(Sock);
298
299 (void) ATOMIC_INC(&nlistener);
300
301 do {
302#ifdef HAVE_POLL
303 npfd = 0;
304 if (sock != INVALID_SOCKET)
305 pfd[npfd++] = (struct pollfd) {.fd = sock, .events = POLLIN};
306#ifdef HAVE_SYS_UN_H
307 if (usock != INVALID_SOCKET)
308 pfd[npfd++] = (struct pollfd) {.fd = usock, .events = POLLIN};
309#endif
310 /* Wait up to 0.025 seconds (0.001 if testing) */
311 retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 25);
312#else
313 FD_ZERO(&fds);
314 if (sock != INVALID_SOCKET)
315 FD_SET(sock, &fds);
316#ifdef HAVE_SYS_UN_H
317 if (usock != INVALID_SOCKET)
318 FD_SET(usock, &fds);
319#endif
320 /* Wait up to 0.025 seconds (0.001 if testing) */
321 tv.tv_sec = 0;
322 tv.tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 25000;
323
324 /* temporarily use msgsock to record the larger of sock and usock */
325 msgsock = sock;
326#ifdef HAVE_SYS_UN_H
327 if (usock != INVALID_SOCKET && (sock == INVALID_SOCKET || usock > sock))
328 msgsock = usock;
329#endif
330 retval = select((int)msgsock + 1, &fds, NULL, NULL, &tv);
331#endif
332 if (ATOMIC_GET(&serverexiting) || GDKexiting())
333 break;
334 if (retval == 0) {
335 /* nothing interesting has happened */
336 continue;
337 }
338 if (retval == SOCKET_ERROR) {
339 if (
340#ifdef _MSC_VER
341 WSAGetLastError() != WSAEINTR
342#else
343 errno != EINTR
344#endif
345 ) {
346 msg = "select failed";
347 goto error;
348 }
349 continue;
350 }
351 if (sock != INVALID_SOCKET &&
352#ifdef HAVE_POLL
353 (npfd > 0 && pfd[0].fd == sock && pfd[0].revents & POLLIN)
354#else
355 FD_ISSET(sock, &fds)
356#endif
357 ) {
358 if ((msgsock = accept4(sock, (SOCKPTR)0, (socklen_t *)0, SOCK_CLOEXEC)) == INVALID_SOCKET) {
359 if (
360#ifdef _MSC_VER
361 WSAGetLastError() != WSAEINTR
362#else
363 errno != EINTR
364#endif
365 || !ATOMIC_GET(&serveractive)) {
366 msg = "accept failed";
367 goto error;
368 }
369 continue;
370 }
371#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
372 (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
373#endif
374#ifdef HAVE_SYS_UN_H
375 } else if (usock != INVALID_SOCKET &&
376#ifdef HAVE_POLL
377 ((npfd > 0 && pfd[0].fd == usock && pfd[0].revents & POLLIN) ||
378 (npfd > 1 && pfd[1].fd == usock && pfd[1].revents & POLLIN))
379#else
380 FD_ISSET(usock, &fds)
381#endif
382 ) {
383 struct msghdr msgh;
384 struct iovec iov;
385 char buf[1];
386 int rv;
387 char ccmsg[CMSG_SPACE(sizeof(int))];
388 struct cmsghdr *cmsg;
389
390 if ((msgsock = accept4(usock, (SOCKPTR)0, (socklen_t *)0, SOCK_CLOEXEC)) == INVALID_SOCKET) {
391 if (
392#ifdef _MSC_VER
393 WSAGetLastError() != WSAEINTR
394#else
395 errno != EINTR
396#endif
397 ) {
398 msg = "accept failed";
399 goto error;
400 }
401 continue;
402 }
403#if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
404 (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
405#endif
406
407 /* BEWARE: unix domain sockets have a slightly different
408 * behaviour initialy than normal sockets, because we can
409 * send filedescriptors or credentials with them. To do so,
410 * we need to use sendmsg/recvmsg, which operates on a bare
411 * socket. Unfortunately we *have* to send something, so it
412 * is one byte that can optionally carry the ancillary data.
413 * This byte is at this moment defined to contain a character:
414 * '0' - there is no ancillary data
415 * '1' - ancillary data for passing a file descriptor
416 * The future may introduce a state for passing credentials.
417 * Any unknown character must be interpreted as some unknown
418 * action, and hence not supported by the server. */
419
420 iov.iov_base = buf;
421 iov.iov_len = 1;
422
423 msgh.msg_name = 0;
424 msgh.msg_namelen = 0;
425 msgh.msg_iov = &iov;
426 msgh.msg_iovlen = 1;
427 msgh.msg_control = ccmsg;
428 msgh.msg_controllen = sizeof(ccmsg);
429
430 rv = recvmsg(msgsock, &msgh, 0);
431 if (rv == -1) {
432 closesocket(msgsock);
433 continue;
434 }
435
436 switch (buf[0]) {
437 case '0':
438 /* nothing special, nothing to do */
439 break;
440 case '1':
441 { int *c_d;
442 /* filedescriptor, put it in place of msgsock */
443 cmsg = CMSG_FIRSTHDR(&msgh);
444 (void) shutdown(msgsock, SHUT_WR);
445 closesocket(msgsock);
446 if (!cmsg || cmsg->cmsg_type != SCM_RIGHTS) {
447 fprintf(stderr, "!mal_mapi.listen: "
448 "expected filedescriptor, but "
449 "received something else\n");
450 continue;
451 }
452 /* HACK to avoid
453 * "dereferencing type-punned pointer will break strict-aliasing rules"
454 * (with gcc 4.5.1 on Fedora 14)
455 */
456 c_d = (int*)CMSG_DATA(cmsg);
457 msgsock = *c_d;
458 }
459 break;
460 default:
461 /* some unknown state */
462 closesocket(msgsock);
463 fprintf(stderr, "!mal_mapi.listen: unknown command type in first byte\n");
464 continue;
465 }
466#endif
467 } else {
468 continue;
469 }
470#ifdef DEBUG_SERVER
471 fprintf(stderr,"server:accepted\n");
472 fflush(stderr);
473#endif
474 data = GDKmalloc(sizeof(*data));
475 if( data == NULL){
476 closesocket(msgsock);
477 fprintf(stderr, "#initClient " SQLSTATE(HY001) MAL_MALLOC_FAIL);
478 continue;
479 }
480 data->in = socket_rstream(msgsock, "Server read");
481 data->out = socket_wstream(msgsock, "Server write");
482 if (data->in == NULL || data->out == NULL) {
483 stream_alloc_fail:
484 mnstr_destroy(data->in);
485 mnstr_destroy(data->out);
486 GDKfree(data);
487 closesocket(msgsock);
488 fprintf(stderr, "!initClient cannot allocate stream");
489 continue;
490 }
491 s = block_stream(data->in);
492 if (s == NULL) {
493 goto stream_alloc_fail;
494 }
495 data->in = s;
496 s = block_stream(data->out);
497 if (s == NULL) {
498 goto stream_alloc_fail;
499 }
500 data->out = s;
501 char name[16];
502 snprintf(name, sizeof(name), "client%d",
503 (int) ATOMIC_INC(&threadno));
504
505 /* generate the challenge string */
506 generateChallenge(data->challenge, 8, 12);
507
508 if ((tid = THRcreate(doChallenge, data, MT_THR_DETACHED, name)) == 0) {
509 mnstr_destroy(data->in);
510 mnstr_destroy(data->out);
511 GDKfree(data);
512 closesocket(msgsock);
513 fprintf(stderr, "!initClient:cannot fork new client thread");
514 continue;
515 }
516 } while (!ATOMIC_GET(&serverexiting) && !GDKexiting());
517 (void) ATOMIC_DEC(&nlistener);
518 if (sock != INVALID_SOCKET)
519 closesocket(sock);
520 if (usock != INVALID_SOCKET)
521 closesocket(usock);
522 return;
523error:
524 fprintf(stderr, "!mal_mapi.listen: %s, terminating listener\n", msg);
525 if (sock != INVALID_SOCKET)
526 closesocket(sock);
527 if (usock != INVALID_SOCKET)
528 closesocket(usock);
529}
530
531static const struct in6_addr ipv6_loopback_addr = IN6ADDR_LOOPBACK_INIT;
532
533static const struct in6_addr ipv6_any_addr = IN6ADDR_ANY_INIT;
534
535static str
536SERVERlisten(int port, const char *usockfile, int maxusers)
537{
538 struct sockaddr* server = NULL;
539 struct sockaddr_in server_ipv4;
540 struct sockaddr_in6 server_ipv6;
541 SOCKET sock = INVALID_SOCKET;
542 SOCKET *psock;
543 bool bind_ipv6 = false;
544 bool accept_any = false;
545 bool autosense = false;
546#ifdef HAVE_SYS_UN_H
547 struct sockaddr_un userver;
548 SOCKET usock = INVALID_SOCKET;
549#endif
550 SOCKLEN length = 0;
551 int on = 1;
552 int i = 0;
553 MT_Id pid;
554 str buf;
555 char host[128];
556 const char *listenaddr;
557#ifdef DEBUG_SERVER
558 char msg[512], host[512];
559#endif
560
561 accept_any = GDKgetenv_istrue("mapi_open");
562 bind_ipv6 = GDKgetenv_istrue("mapi_ipv6");
563 autosense = GDKgetenv_istrue("mapi_autosense");
564 listenaddr = GDKgetenv("mapi_listenaddr");
565
566 /* early way out, we do not want to listen on any port when running in embedded mode */
567 if (GDKgetenv_istrue("mapi_disable")) {
568 return MAL_SUCCEED;
569 }
570
571 psock = GDKmalloc(sizeof(SOCKET) * 2);
572 if (psock == NULL)
573 throw(MAL,"mal_mapi.listen", SQLSTATE(HY001) MAL_MALLOC_FAIL);
574
575 if (usockfile == NULL || strcmp(usockfile, str_nil) == 0) {
576 usockfile = NULL;
577 } else {
578#ifndef HAVE_SYS_UN_H
579 GDKfree(psock);
580 throw(IO, "mal_mapi.listen", OPERATION_FAILED ": UNIX domain sockets are not supported");
581#endif
582 }
583 maxusers = (maxusers ? maxusers : SERVERMAXUSERS);
584
585 if (port <= 0 && usockfile == NULL) {
586 GDKfree(psock);
587 throw(ILLARG, "mal_mapi.listen", OPERATION_FAILED ": no port or socket file specified");
588 }
589
590 if (port > 65535) {
591 GDKfree(psock);
592 throw(ILLARG, "mal_mapi.listen", OPERATION_FAILED ": port number should be between 1 and 65535");
593 }
594
595 if (port > 0) {
596 if (listenaddr && *listenaddr) {
597 int check = 0, e = errno;
598 char sport[16];
599 struct addrinfo *result = NULL, *rp = NULL, hints = (struct addrinfo) {
600 .ai_family = bind_ipv6 ? AF_INET6 : AF_INET,
601 .ai_socktype = SOCK_STREAM,
602 .ai_flags = AI_PASSIVE,
603 .ai_protocol = IPPROTO_TCP,
604 };
605
606 do {
607 snprintf(sport, 16, "%d", port);
608 check = getaddrinfo(listenaddr, sport, &hints, &result);
609 if (check != 0) {
610 if (autosense && port <= 65535) {
611 port++;
612 continue;
613 }
614 GDKfree(psock);
615 throw(IO, "mal_mapi.listen", OPERATION_FAILED
616 ": bind to stream socket on address %s and port %d failed: %s", listenaddr, port,
617 gai_strerror(check));
618 }
619
620 for (rp = result; rp != NULL; rp = rp->ai_next) {
621 int bind_check;
622 sock = socket(rp->ai_family, rp->ai_socktype
623#ifdef SOCK_CLOEXEC
624 | SOCK_CLOEXEC
625#endif
626 , rp->ai_protocol);
627 if (sock == INVALID_SOCKET) {
628 e = errno;
629 continue;
630 }
631#if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
632 (void) fcntl(sock, F_SETFD, FD_CLOEXEC);
633#endif
634
635 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof on) == SOCKET_ERROR) {
636 e = errno;
637 closesocket(sock);
638 continue;
639 }
640
641 bind_check = bind(sock, (SOCKPTR) rp->ai_addr, (socklen_t) rp->ai_addrlen);
642 e = errno;
643 if (bind_check == SOCKET_ERROR) {
644 closesocket(sock);
645 continue;
646 } else
647 break;
648 }
649 if (result)
650 freeaddrinfo(result);
651 errno = e;
652 if (errno == 0 && sock != INVALID_SOCKET)
653 break;
654
655 if (port > 65535) {
656 GDKfree(psock);
657 throw(IO, "mal_mapi.listen", OPERATION_FAILED ": bind to stream socket port %d failed", port);
658 } else if (
659#ifdef _MSC_VER
660 WSAGetLastError() == WSAEADDRINUSE &&
661#else
662#ifdef EADDRINUSE
663 errno == EADDRINUSE &&
664#else
665#endif
666#endif
667 autosense && port <= 65535) {
668 port++;
669 continue;
670 }
671 GDKfree(psock);
672 errno = e;
673 throw(IO, "mal_mapi.listen", OPERATION_FAILED ": bind to stream socket port %d failed: %s", port,
674#ifdef _MSC_VER
675 wsaerror(WSAGetLastError())
676#else
677 strerror(errno)
678#endif
679 );
680 } while (1);
681 } else {
682 sock = socket(bind_ipv6 ? AF_INET6 : AF_INET, SOCK_STREAM
683#ifdef SOCK_CLOEXEC
684 | SOCK_CLOEXEC
685#endif
686 , 0);
687 if (sock == INVALID_SOCKET) {
688 int e = errno;
689 GDKfree(psock);
690 errno = e;
691 throw(IO, "mal_mapi.listen",
692 OPERATION_FAILED ": bind to stream socket port %d "
693 "failed: %s", port,
694#ifdef _MSC_VER
695 wsaerror(WSAGetLastError())
696#else
697 strerror(errno)
698#endif
699 );
700 }
701#if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
702 (void) fcntl(sock, F_SETFD, FD_CLOEXEC);
703#endif
704
705 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof on) == SOCKET_ERROR) {
706 int e = errno;
707#ifdef _MSC_VER
708 const char *err = wsaerror(WSAGetLastError());
709#else
710 const char *err = strerror(errno);
711#endif
712 GDKfree(psock);
713 closesocket(sock);
714 errno = e;
715 throw(IO, "mal_mapi.listen", OPERATION_FAILED ": setsockptr failed %s", err);
716 }
717
718 if (bind_ipv6) {
719 memset(&server_ipv6, 0, sizeof(server_ipv6));
720 server_ipv6.sin6_family = AF_INET6;
721 if (accept_any)
722 server_ipv6.sin6_addr = ipv6_any_addr;
723 else
724 server_ipv6.sin6_addr = ipv6_loopback_addr;
725 server = (struct sockaddr*) &server_ipv6;
726 length = (SOCKLEN) sizeof(server_ipv6);
727 } else {
728 server_ipv4.sin_family = AF_INET;
729 if (accept_any)
730 server_ipv4.sin_addr.s_addr = htonl(INADDR_ANY);
731 else
732 server_ipv4.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
733 for (i = 0; i < 8; i++)
734 server_ipv4.sin_zero[i] = 0;
735 server = (struct sockaddr*) &server_ipv4;
736 length = (SOCKLEN) sizeof(server_ipv4);
737 }
738
739 for (;;) {
740 if (bind_ipv6)
741 server_ipv6.sin6_port = htons((unsigned short) ((port) & 0xFFFF));
742 else
743 server_ipv4.sin_port = htons((unsigned short) ((port) & 0xFFFF));
744
745 if (bind(sock, (SOCKPTR) server, length) == SOCKET_ERROR) {
746 int e = errno;
747 if (
748#ifdef _MSC_VER
749 WSAGetLastError() == WSAEADDRINUSE &&
750#else
751#ifdef EADDRINUSE
752 errno == EADDRINUSE &&
753#else
754#endif
755#endif
756 autosense && port <= 65535)
757 {
758 port++;
759 continue;
760 }
761 closesocket(sock);
762 GDKfree(psock);
763 errno = e;
764 throw(IO, "mal_mapi.listen", OPERATION_FAILED ": bind to stream socket port %d failed: %s", port,
765#ifdef _MSC_VER
766 wsaerror(WSAGetLastError())
767#else
768 strerror(errno)
769#endif
770 );
771 } else {
772 break;
773 }
774 }
775
776 if (getsockname(sock, server, &length) == SOCKET_ERROR) {
777 int e = errno;
778 closesocket(sock);
779 GDKfree(psock);
780 errno = e;
781 throw(IO, "mal_mapi.listen", OPERATION_FAILED ": failed getting socket name: %s",
782#ifdef _MSC_VER
783 wsaerror(WSAGetLastError())
784#else
785 strerror(errno)
786#endif
787 );
788 }
789 }
790
791 if (listen(sock, maxusers) == SOCKET_ERROR) {
792 int e = errno;
793 GDKfree(psock);
794 if (sock != INVALID_SOCKET)
795 closesocket(sock);
796 errno = e;
797 throw(IO, "mal_mapi.listen",
798 OPERATION_FAILED ": failed to set socket to listen %s",
799#ifdef _MSC_VER
800 wsaerror(WSAGetLastError())
801#else
802 strerror(errno)
803#endif
804 );
805 }
806 }
807#ifdef HAVE_SYS_UN_H
808 if (usockfile) {
809 /* prevent silent truncation, sun_path is typically around 108
810 * chars long :/ */
811 if (strlen(usockfile) >= sizeof(userver.sun_path)) {
812 char *e;
813 if (sock != INVALID_SOCKET)
814 closesocket(sock);
815 GDKfree(psock);
816 e = createException(MAL, "mal_mapi.listen",
817 OPERATION_FAILED ": UNIX socket path too long: %s",
818 usockfile);
819 return e;
820 }
821
822 usock = socket(AF_UNIX, SOCK_STREAM
823#ifdef SOCK_CLOEXEC
824 | SOCK_CLOEXEC
825#endif
826 , 0);
827 if (usock == INVALID_SOCKET) {
828 int e = errno;
829 GDKfree(psock);
830 errno = e;
831 if (sock != INVALID_SOCKET)
832 closesocket(sock);
833 errno = e;
834 throw(IO, "mal_mapi.listen",
835 OPERATION_FAILED ": creation of UNIX socket failed: %s",
836#ifdef _MSC_VER
837 wsaerror(WSAGetLastError())
838#else
839 strerror(errno)
840#endif
841 );
842 }
843#if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
844 (void) fcntl(usock, F_SETFD, FD_CLOEXEC);
845#endif
846
847 userver.sun_family = AF_UNIX;
848 size_t ulen = strlen(usockfile);
849 if (ulen >= sizeof(userver.sun_path)) {
850 if (sock != INVALID_SOCKET)
851 closesocket(sock);
852 closesocket(usock);
853 GDKfree(psock);
854 throw(IO, "mal_mapi.listen", "usockfile name is too large");
855 }
856 memcpy(userver.sun_path, usockfile, ulen + 1);
857 length = (SOCKLEN) sizeof(userver);
858 if(remove(usockfile) == -1 && errno != ENOENT) {
859 char *e = createException(IO, "mal_mapi.listen", OPERATION_FAILED ": remove UNIX socket file");
860 if (sock != INVALID_SOCKET)
861 closesocket(sock);
862 closesocket(usock);
863 GDKfree(psock);
864 return e;
865 }
866 if (bind(usock, (SOCKPTR) &userver, length) == SOCKET_ERROR) {
867 char *e;
868 int err = errno;
869 if (sock != INVALID_SOCKET)
870 closesocket(sock);
871 closesocket(usock);
872 (void) remove(usockfile);
873 GDKfree(psock);
874 errno = err;
875 e = createException(IO, "mal_mapi.listen",
876 OPERATION_FAILED
877 ": binding to UNIX socket file %s failed: %s",
878 usockfile,
879#ifdef _MSC_VER
880 wsaerror(WSAGetLastError())
881#else
882 strerror(errno)
883#endif
884 );
885 return e;
886 }
887 if(listen(usock, maxusers) == SOCKET_ERROR) {
888 char *e;
889 int err = errno;
890 if (sock != INVALID_SOCKET)
891 closesocket(sock);
892 closesocket(usock);
893 (void) remove(usockfile);
894 GDKfree(psock);
895 errno = err;
896 e = createException(IO, "mal_mapi.listen",
897 OPERATION_FAILED
898 ": setting UNIX socket file %s to listen failed: %s",
899 usockfile,
900#ifdef _MSC_VER
901 wsaerror(WSAGetLastError())
902#else
903 strerror(errno)
904#endif
905 );
906 return e;
907 }
908 }
909#endif
910
911#ifdef DEBUG_SERVER
912 fprintf(stderr, "#SERVERlisten:Network started at %d\n", port);
913#endif
914
915 psock[0] = sock;
916#ifdef HAVE_SYS_UN_H
917 psock[1] = usock;
918#else
919 psock[1] = INVALID_SOCKET;
920#endif
921 if (MT_create_thread(&pid, (void (*)(void *)) SERVERlistenThread, psock,
922 MT_THR_DETACHED, "listenThread") != 0) {
923 if (sock != INVALID_SOCKET)
924 closesocket(sock);
925#ifdef HAVE_SYS_UN_H
926 if (usock != INVALID_SOCKET)
927 closesocket(usock);
928#endif
929 GDKfree(psock);
930 throw(MAL, "mal_mapi.listen", OPERATION_FAILED ": starting thread failed");
931 }
932#ifdef DEBUG_SERVER
933 gethostname(host, (int) 512);
934 snprintf(msg, (int) 512, "#Ready to accept connections on %s:%d\n", host, port);
935 fprintf(stderr, "%s", msg);
936#endif
937
938 /* seed the randomiser such that our challenges aren't
939 * predictable... */
940 srand((unsigned int) GDKusec());
941
942 if (port > 0) {
943 if (bind_ipv6) {
944 if (memcmp(server_ipv6.sin6_addr.s6_addr, &ipv6_loopback_addr, sizeof(struct in6_addr)) == 0) {
945 sprintf(host, "[::1]");
946 } else if (memcmp(server_ipv6.sin6_addr.s6_addr, &ipv6_any_addr, sizeof(struct in6_addr)) == 0) {
947 gethostname(host, sizeof(host));
948 host[sizeof(host) - 1] = '\0';
949 } else {
950 snprintf(host, sizeof(host),"[%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x]",
951 (int)server_ipv6.sin6_addr.s6_addr[0], (int)server_ipv6.sin6_addr.s6_addr[1],
952 (int)server_ipv6.sin6_addr.s6_addr[2], (int)server_ipv6.sin6_addr.s6_addr[3],
953 (int)server_ipv6.sin6_addr.s6_addr[4], (int)server_ipv6.sin6_addr.s6_addr[5],
954 (int)server_ipv6.sin6_addr.s6_addr[6], (int)server_ipv6.sin6_addr.s6_addr[7],
955 (int)server_ipv6.sin6_addr.s6_addr[8], (int)server_ipv6.sin6_addr.s6_addr[9],
956 (int)server_ipv6.sin6_addr.s6_addr[10], (int)server_ipv6.sin6_addr.s6_addr[11],
957 (int)server_ipv6.sin6_addr.s6_addr[12], (int)server_ipv6.sin6_addr.s6_addr[13],
958 (int)server_ipv6.sin6_addr.s6_addr[14], (int)server_ipv6.sin6_addr.s6_addr[15]);
959 }
960 } else {
961 if (server_ipv4.sin_addr.s_addr == INADDR_ANY) {
962 gethostname(host, sizeof(host));
963 host[sizeof(host) - 1] = '\0';
964 } else {
965 /* avoid doing this, it requires some includes that probably
966 * give trouble on windowz
967 host = inet_ntoa(addr);
968 */
969 snprintf(host, sizeof(host), "%u.%u.%u.%u",
970 (unsigned) ((ntohl(server_ipv4.sin_addr.s_addr) >> 24) & 0xff),
971 (unsigned) ((ntohl(server_ipv4.sin_addr.s_addr) >> 16) & 0xff),
972 (unsigned) ((ntohl(server_ipv4.sin_addr.s_addr) >> 8) & 0xff),
973 (unsigned) (ntohl(server_ipv4.sin_addr.s_addr) & 0xff));
974 }
975 }
976
977 if (!GDKinmemory() && (buf = msab_marchConnection(host, port)) != NULL)
978 free(buf);
979 else
980 /* announce that we're now reachable */
981 printf("# Listening for connection requests on "
982 "mapi:monetdb://%s:%i/\n", host, port);
983 }
984 if (usockfile != NULL) {
985 port = 0;
986 if (!GDKinmemory() && (buf = msab_marchConnection(usockfile, port)) != NULL)
987 free(buf);
988 else
989 /* announce that we're now reachable */
990 printf("# Listening for UNIX domain connection requests on "
991 "mapi:monetdb://%s\n", usockfile);
992 }
993
994 return MAL_SUCCEED;
995}
996
997/*
998 * @- Wrappers
999 * The MonetDB Version 5 wrappers are collected here
1000 * The latest port known to gain access is stored
1001 * in the database, so that others can more easily
1002 * be notified.
1003 */
1004str
1005SERVERlisten_default(int *ret)
1006{
1007 int port = SERVERPORT;
1008 const char* p = GDKgetenv("mapi_port");
1009
1010 (void) ret;
1011 if (p)
1012 port = (int) strtol(p, NULL, 10);
1013 p = GDKgetenv("mapi_usock");
1014 return SERVERlisten(port, p, SERVERMAXUSERS);
1015}
1016
1017str
1018SERVERlisten_usock(int *ret, str *usock)
1019{
1020 (void) ret;
1021 return SERVERlisten(0, usock ? *usock : NULL, SERVERMAXUSERS);
1022}
1023
1024str
1025SERVERlisten_port(int *ret, int *pid)
1026{
1027 (void) ret;
1028 return SERVERlisten(*pid, NULL, SERVERMAXUSERS);
1029}
1030/*
1031 * The internet connection listener may be terminated from the server console,
1032 * or temporarily suspended to enable system maintenance.
1033 * It is advisable to trace the interactions of clients on the server
1034 * side. At least as far as it concerns requests received.
1035 * The kernel supports this 'spying' behavior with a file descriptor
1036 * field in the client record.
1037 */
1038
1039str
1040SERVERstop(void *ret)
1041{
1042fprintf(stderr, "SERVERstop\n");
1043 ATOMIC_SET(&serverexiting, 1);
1044 /* wait until they all exited, but skip the wait if the whole
1045 * system is going down */
1046 while (ATOMIC_GET(&nlistener) > 0 && !GDKexiting())
1047 MT_sleep_ms(100);
1048 (void) ret; /* fool compiler */
1049 return MAL_SUCCEED;
1050}
1051
1052
1053str
1054SERVERsuspend(void *res)
1055{
1056 (void) res;
1057 ATOMIC_SET(&serveractive, 0);
1058 return MAL_SUCCEED;
1059}
1060
1061str
1062SERVERresume(void *res)
1063{
1064 ATOMIC_SET(&serveractive, 1);
1065 (void) res;
1066 return MAL_SUCCEED;
1067}
1068
1069str
1070SERVERclient(void *res, const Stream *In, const Stream *Out)
1071{
1072 struct challengedata *data;
1073 MT_Id tid;
1074
1075 (void) res;
1076 /* in embedded mode we allow just one client */
1077 data = GDKmalloc(sizeof(*data));
1078 if( data == NULL)
1079 throw(MAL, "mapi.SERVERclient", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1080 data->in = block_stream(*In);
1081 data->out = block_stream(*Out);
1082 if (data->in == NULL || data->out == NULL) {
1083 mnstr_destroy(data->in);
1084 mnstr_destroy(data->out);
1085 GDKfree(data);
1086 throw(MAL, "mapi.SERVERclient", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1087 }
1088 char name[16];
1089 snprintf(name, sizeof(name), "client%d",
1090 (int) ATOMIC_INC(&threadno));
1091
1092 /* generate the challenge string */
1093 generateChallenge(data->challenge, 8, 12);
1094
1095 if ((tid = THRcreate(doChallenge, data, MT_THR_DETACHED, name)) == 0) {
1096 mnstr_destroy(data->in);
1097 mnstr_destroy(data->out);
1098 GDKfree(data);
1099 throw(MAL, "mapi.SERVERclient", "cannot fork new client thread");
1100 }
1101 return MAL_SUCCEED;
1102}
1103
1104/*
1105 * @+ Remote Processing
1106 * The remainder of the file contains the wrappers around
1107 * the Mapi library used by application programmers.
1108 * Details on the functions can be found there.
1109 *
1110 * Sessions have a lifetime different from dynamic scopes.
1111 * This means the user should use a session identifier
1112 * to select the correct handle.
1113 * For the time being we use the index in the global
1114 * session table. The client pointer is retained to
1115 * perform access control.
1116 *
1117 * We use a single result set handle. All data should be
1118 * consumed before continueing.
1119 *
1120 * A few extra routines should be defined to
1121 * dump and inspect the sessions table.
1122 *
1123 * The remote site may return a single error
1124 * with a series of error lines. These contain
1125 * then a starting !. They are all stripped here.
1126 */
1127#define catchErrors(fcn) \
1128 do { \
1129 int rn = mapi_error(mid); \
1130 if ((rn == -4 && hdl && mapi_result_error(hdl)) || rn) { \
1131 const char *err, *e; \
1132 str newerr; \
1133 str ret; \
1134 size_t l; \
1135 char *f; \
1136 \
1137 if (hdl && mapi_result_error(hdl)) \
1138 err = mapi_result_error(hdl); \
1139 else \
1140 err = mapi_result_error(SERVERsessions[i].hdl); \
1141 \
1142 if (err == NULL) \
1143 err = "(no additional error message)"; \
1144 \
1145 l = 2 * strlen(err) + 8192; \
1146 newerr = (str) GDKmalloc(l); \
1147 if(newerr == NULL) { err = SQLSTATE(HY001) MAL_MALLOC_FAIL; break;} \
1148 \
1149 f = newerr; \
1150 /* I think this code tries to deal with multiple errors, this \
1151 * will fail this way if it does, since no ! is in the error \
1152 * string, only newlines to separate them */ \
1153 for (e = err; *e && l > 1; e++) { \
1154 if (*e == '!' && *(e - 1) == '\n') { \
1155 snprintf(f, l, "MALException:" fcn ":remote error:"); \
1156 l -= strlen(f); \
1157 while (*f) \
1158 f++; \
1159 } else { \
1160 *f++ = *e; \
1161 l--; \
1162 } \
1163 } \
1164 \
1165 *f = 0; \
1166 ret = createException(MAL, fcn, \
1167 OPERATION_FAILED ": remote error: %s", \
1168 newerr); \
1169 GDKfree(newerr); \
1170 return ret; \
1171 } \
1172 } while (0)
1173
1174#define MAXSESSIONS 32
1175struct{
1176 int key;
1177 str dbalias; /* logical name of the session */
1178 Client c;
1179 Mapi mid; /* communication channel */
1180 MapiHdl hdl; /* result set handle */
1181} SERVERsessions[MAXSESSIONS];
1182
1183static int sessionkey=0;
1184
1185/* #define MAPI_TEST*/
1186
1187static str
1188SERVERconnectAll(Client cntxt, int *key, str *host, int *port, str *username, str *password, str *lang)
1189{
1190 Mapi mid;
1191 int i;
1192
1193 MT_lock_set(&mal_contextLock);
1194 for(i=1; i< MAXSESSIONS; i++)
1195 if( SERVERsessions[i].c ==0 ) break;
1196
1197 if( i==MAXSESSIONS){
1198 MT_lock_unset(&mal_contextLock);
1199 throw(IO, "mapi.connect", OPERATION_FAILED ": too many sessions");
1200 }
1201 SERVERsessions[i].c= cntxt;
1202 SERVERsessions[i].key= ++sessionkey;
1203 MT_lock_unset(&mal_contextLock);
1204
1205 mid = mapi_connect(*host, *port, *username, *password, *lang, NULL);
1206
1207 if (mapi_error(mid)) {
1208 const char *err = mapi_error_str(mid);
1209 str ex;
1210 if (err == NULL)
1211 err = "(no reason given)";
1212 if (err[0] == '!')
1213 err = err + 1;
1214 SERVERsessions[i].c = NULL;
1215 ex = createException(IO, "mapi.connect", "Could not connect: %s", err);
1216 mapi_destroy(mid);
1217 return(ex);
1218 }
1219
1220#ifdef MAPI_TEST
1221 mnstr_printf(SERVERsessions[i].c->fdout,"Succeeded to establish session\n");
1222#endif
1223 SERVERsessions[i].mid= mid;
1224 *key = SERVERsessions[i].key;
1225 return MAL_SUCCEED;
1226}
1227
1228str
1229SERVERdisconnectALL(int *key){
1230 int i;
1231
1232 MT_lock_set(&mal_contextLock);
1233
1234 for(i=1; i< MAXSESSIONS; i++)
1235 if( SERVERsessions[i].c != 0 ) {
1236#ifdef MAPI_TEST
1237 mnstr_printf(SERVERsessions[i].c->fdout,"Close session %d\n",i);
1238#endif
1239 SERVERsessions[i].c = 0;
1240 if( SERVERsessions[i].dbalias)
1241 GDKfree(SERVERsessions[i].dbalias);
1242 SERVERsessions[i].dbalias = NULL;
1243 *key = SERVERsessions[i].key;
1244 mapi_disconnect(SERVERsessions[i].mid);
1245 }
1246
1247 MT_lock_unset(&mal_contextLock);
1248
1249 return MAL_SUCCEED;
1250}
1251
1252str
1253SERVERdisconnectWithAlias(int *key, str *dbalias){
1254 int i;
1255
1256 MT_lock_set(&mal_contextLock);
1257
1258 for(i=0; i<MAXSESSIONS; i++)
1259 if( SERVERsessions[i].dbalias &&
1260 strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1261 SERVERsessions[i].c = 0;
1262 if( SERVERsessions[i].dbalias)
1263 GDKfree(SERVERsessions[i].dbalias);
1264 SERVERsessions[i].dbalias = NULL;
1265 *key = SERVERsessions[i].key;
1266 mapi_disconnect(SERVERsessions[i].mid);
1267 break;
1268 }
1269
1270 if( i==MAXSESSIONS){
1271 MT_lock_unset(&mal_contextLock);
1272 throw(IO, "mapi.disconnect", "Impossible to close session for db_alias: '%s'", *dbalias);
1273 }
1274
1275 MT_lock_unset(&mal_contextLock);
1276 return MAL_SUCCEED;
1277}
1278
1279str
1280SERVERconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1281 int *key =getArgReference_int(stk,pci,0);
1282 str *host = getArgReference_str(stk,pci,1);
1283 int *port = getArgReference_int(stk,pci,2);
1284 str *username = getArgReference_str(stk,pci,3);
1285 str *password= getArgReference_str(stk,pci,4);
1286 str *lang = getArgReference_str(stk,pci,5);
1287
1288 (void) mb;
1289 return SERVERconnectAll(cntxt, key,host,port,username,password,lang);
1290}
1291
1292
1293str
1294SERVERreconnectAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1295{
1296 int *key =getArgReference_int(stk,pci,0);
1297 str *host = getArgReference_str(stk,pci,1);
1298 int *port = getArgReference_int(stk,pci,2);
1299 str *dbalias = getArgReference_str(stk,pci,3);
1300 str *username = getArgReference_str(stk,pci,4);
1301 str *password= getArgReference_str(stk,pci,5);
1302 str *lang = getArgReference_str(stk,pci,6);
1303 int i;
1304 str msg=MAL_SUCCEED;
1305
1306 (void) mb;
1307
1308 for(i=0; i<MAXSESSIONS; i++)
1309 if( SERVERsessions[i].key &&
1310 SERVERsessions[i].dbalias &&
1311 strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1312 *key = SERVERsessions[i].key;
1313 return msg;
1314 }
1315
1316 msg= SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1317 if( msg == MAL_SUCCEED)
1318 msg = SERVERsetAlias(NULL, key, dbalias);
1319 return msg;
1320}
1321
1322str
1323SERVERreconnectWithoutAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1324 int *key =getArgReference_int(stk,pci,0);
1325 str *host = getArgReference_str(stk,pci,1);
1326 int *port = getArgReference_int(stk,pci,2);
1327 str *username = getArgReference_str(stk,pci,3);
1328 str *password= getArgReference_str(stk,pci,4);
1329 str *lang = getArgReference_str(stk,pci,5);
1330 int i;
1331 str msg=MAL_SUCCEED, nme= "anonymous";
1332
1333 (void) mb;
1334
1335 for(i=0; i<MAXSESSIONS; i++)
1336 if( SERVERsessions[i].key ){
1337 *key = SERVERsessions[i].key;
1338 return msg;
1339 }
1340
1341 msg= SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1342 if( msg == MAL_SUCCEED)
1343 msg = SERVERsetAlias(NULL, key, &nme);
1344 return msg;
1345}
1346
1347#define accessTest(val, fcn) \
1348 do { \
1349 for(i=0; i< MAXSESSIONS; i++) \
1350 if( SERVERsessions[i].c && \
1351 SERVERsessions[i].key== (val)) break; \
1352 if( i== MAXSESSIONS) \
1353 throw(MAL, "mapi." fcn, "Access violation," \
1354 " could not find matching session descriptor"); \
1355 mid= SERVERsessions[i].mid; \
1356 (void) mid; /* silence compilers */ \
1357 } while (0)
1358
1359str
1360SERVERsetAlias(void *ret, int *key, str *dbalias){
1361 int i;
1362 Mapi mid;
1363 accessTest(*key, "setAlias");
1364 SERVERsessions[i].dbalias= GDKstrdup(*dbalias);
1365 if(SERVERsessions[i].dbalias == NULL)
1366 throw(MAL, "mapi.set_alias", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1367 (void) ret;
1368 return MAL_SUCCEED;
1369}
1370
1371str
1372SERVERlookup(int *ret, str *dbalias)
1373{
1374 int i;
1375 for(i=0; i< MAXSESSIONS; i++)
1376 if( SERVERsessions[i].dbalias &&
1377 strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1378 *ret= SERVERsessions[i].key;
1379 return MAL_SUCCEED;
1380 }
1381 throw(MAL, "mapi.lookup", "Could not find database connection");
1382}
1383
1384str
1385SERVERtrace(void *ret, int *key, int *flag){
1386 (void )ret;
1387 mapi_trace(SERVERsessions[*key].mid,(bool)*flag);
1388 return MAL_SUCCEED;
1389}
1390
1391str
1392SERVERdisconnect(void *ret, int *key){
1393 int i;
1394 Mapi mid;
1395 (void) ret;
1396 accessTest(*key, "disconnect");
1397 mapi_disconnect(mid);
1398 if( SERVERsessions[i].dbalias)
1399 GDKfree(SERVERsessions[i].dbalias);
1400 SERVERsessions[i].c= 0;
1401 SERVERsessions[i].dbalias= 0;
1402 return MAL_SUCCEED;
1403}
1404
1405str
1406SERVERdestroy(void *ret, int *key){
1407 int i;
1408 Mapi mid;
1409 (void) ret;
1410 accessTest(*key, "destroy");
1411 mapi_destroy(mid);
1412 SERVERsessions[i].c= 0;
1413 if( SERVERsessions[i].dbalias)
1414 GDKfree(SERVERsessions[i].dbalias);
1415 SERVERsessions[i].dbalias= 0;
1416 return MAL_SUCCEED;
1417}
1418
1419str
1420SERVERreconnect(void *ret, int *key){
1421 int i;
1422 Mapi mid;
1423 (void) ret;
1424 accessTest(*key, "destroy");
1425 mapi_reconnect(mid);
1426 return MAL_SUCCEED;
1427}
1428
1429str
1430SERVERping(int *ret, int *key){
1431 int i;
1432 Mapi mid;
1433 accessTest(*key, "destroy");
1434 *ret= mapi_ping(mid);
1435 return MAL_SUCCEED;
1436}
1437
1438str
1439SERVERquery(int *ret, int *key, str *qry){
1440 Mapi mid;
1441 MapiHdl hdl=0;
1442 int i;
1443 accessTest(*key, "query");
1444 if( SERVERsessions[i].hdl)
1445 mapi_close_handle(SERVERsessions[i].hdl);
1446 SERVERsessions[i].hdl = mapi_query(mid, *qry);
1447 catchErrors("mapi.query");
1448 *ret = *key;
1449 return MAL_SUCCEED;
1450}
1451
1452str
1453SERVERquery_handle(int *ret, int *key, str *qry){
1454 Mapi mid;
1455 MapiHdl hdl=0;
1456 int i;
1457 accessTest(*key, "query_handle");
1458 mapi_query_handle(SERVERsessions[i].hdl, *qry);
1459 catchErrors("mapi.query_handle");
1460 *ret = *key;
1461 return MAL_SUCCEED;
1462}
1463
1464str
1465SERVERquery_array(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pc){
1466 (void)cntxt, (void) mb; (void) stk; (void) pc;
1467 throw(MAL, "mapi.query_array", SQLSTATE(0A000) PROGRAM_NYI);
1468}
1469
1470str
1471SERVERprepare(int *ret, int *key, str *qry){
1472 Mapi mid;
1473 int i;
1474 accessTest(*key, "prepare");
1475 if( SERVERsessions[i].hdl)
1476 mapi_close_handle(SERVERsessions[i].hdl);
1477 SERVERsessions[i].hdl= mapi_prepare(mid, *qry);
1478 if( mapi_error(mid) )
1479 throw(MAL, "mapi.prepare", "%s",
1480 mapi_result_error(SERVERsessions[i].hdl));
1481 *ret = *key;
1482 return MAL_SUCCEED;
1483}
1484
1485str
1486SERVERfinish(int *ret, int *key){
1487 Mapi mid;
1488 int i;
1489 accessTest(*key, "finish");
1490 mapi_finish(SERVERsessions[i].hdl);
1491 if( mapi_error(mid) )
1492 throw(MAL, "mapi.finish", "%s",
1493 mapi_result_error(SERVERsessions[i].hdl));
1494 *ret = *key;
1495 return MAL_SUCCEED;
1496}
1497
1498str
1499SERVERget_row_count(lng *ret, int *key){
1500 Mapi mid;
1501 int i;
1502 accessTest(*key, "get_row_count");
1503 *ret= (lng) mapi_get_row_count(SERVERsessions[i].hdl);
1504 if( mapi_error(mid) )
1505 throw(MAL, "mapi.get_row_count", "%s",
1506 mapi_result_error(SERVERsessions[i].hdl));
1507 return MAL_SUCCEED;
1508}
1509
1510str
1511SERVERget_field_count(int *ret, int *key){
1512 Mapi mid;
1513 int i;
1514 accessTest(*key, "get_field_count");
1515 *ret= mapi_get_field_count(SERVERsessions[i].hdl);
1516 if( mapi_error(mid) )
1517 throw(MAL, "mapi.get_field_count", "%s",
1518 mapi_result_error(SERVERsessions[i].hdl));
1519 return MAL_SUCCEED;
1520}
1521
1522str
1523SERVERrows_affected(lng *ret, int *key){
1524 Mapi mid;
1525 int i;
1526 accessTest(*key, "rows_affected");
1527 *ret= (lng) mapi_rows_affected(SERVERsessions[i].hdl);
1528 return MAL_SUCCEED;
1529}
1530
1531str
1532SERVERfetch_row(int *ret, int *key){
1533 Mapi mid;
1534 int i;
1535 accessTest(*key, "fetch_row");
1536 *ret= mapi_fetch_row(SERVERsessions[i].hdl);
1537 return MAL_SUCCEED;
1538}
1539
1540str
1541SERVERfetch_all_rows(lng *ret, int *key){
1542 Mapi mid;
1543 int i;
1544 accessTest(*key, "fetch_all_rows");
1545 *ret= (lng) mapi_fetch_all_rows(SERVERsessions[i].hdl);
1546 return MAL_SUCCEED;
1547}
1548
1549str
1550SERVERfetch_field_str(str *ret, int *key, int *fnr){
1551 Mapi mid;
1552 int i;
1553 str fld;
1554 accessTest(*key, "fetch_field");
1555 fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1556 *ret= GDKstrdup(fld? fld: str_nil);
1557 if(*ret == NULL)
1558 throw(MAL, "mapi.fetch_field_str", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1559 if( mapi_error(mid) )
1560 throw(MAL, "mapi.fetch_field_str", "%s",
1561 mapi_result_error(SERVERsessions[i].hdl));
1562 return MAL_SUCCEED;
1563}
1564
1565str
1566SERVERfetch_field_int(int *ret, int *key, int *fnr){
1567 Mapi mid;
1568 int i;
1569 str fld;
1570 accessTest(*key, "fetch_field");
1571 fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1572 *ret= fld? (int) atol(fld): int_nil;
1573 if( mapi_error(mid) )
1574 throw(MAL, "mapi.fetch_field_int", "%s",
1575 mapi_result_error(SERVERsessions[i].hdl));
1576 return MAL_SUCCEED;
1577}
1578
1579str
1580SERVERfetch_field_lng(lng *ret, int *key, int *fnr){
1581 Mapi mid;
1582 int i;
1583 str fld;
1584 accessTest(*key, "fetch_field");
1585 fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1586 *ret= fld? atol(fld): lng_nil;
1587 if( mapi_error(mid) )
1588 throw(MAL, "mapi.fetch_field_lng", "%s",
1589 mapi_result_error(SERVERsessions[i].hdl));
1590 return MAL_SUCCEED;
1591}
1592
1593#ifdef HAVE_HGE
1594str
1595SERVERfetch_field_hge(hge *ret, int *key, int *fnr){
1596 Mapi mid;
1597 int i;
1598 str fld;
1599 accessTest(*key, "fetch_field");
1600 fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1601 *ret= fld? atol(fld): hge_nil;
1602 if( mapi_error(mid) )
1603 throw(MAL, "mapi.fetch_field_hge", "%s",
1604 mapi_result_error(SERVERsessions[i].hdl));
1605 return MAL_SUCCEED;
1606}
1607#endif
1608
1609str
1610SERVERfetch_field_sht(sht *ret, int *key, int *fnr){
1611 Mapi mid;
1612 int i;
1613 str fld;
1614 accessTest(*key, "fetch_field");
1615 fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1616 *ret= fld? (sht) atol(fld): sht_nil;
1617 if( mapi_error(mid) )
1618 throw(MAL, "mapi.fetch_field", "%s",
1619 mapi_result_error(SERVERsessions[i].hdl));
1620 return MAL_SUCCEED;
1621}
1622
1623str
1624SERVERfetch_field_void(void *ret, int *key, int *fnr){
1625 Mapi mid;
1626 int i;
1627 (void) ret;
1628 (void) fnr;
1629 accessTest(*key, "fetch_field");
1630 throw(MAL, "mapi.fetch_field_void","defaults to nil");
1631}
1632
1633str
1634SERVERfetch_field_oid(oid *ret, int *key, int *fnr){
1635 Mapi mid;
1636 int i;
1637 str fld;
1638 accessTest(*key, "fetch_field");
1639 fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1640 if( mapi_error(mid) )
1641 throw(MAL, "mapi.fetch_field_oid", "%s",
1642 mapi_result_error(SERVERsessions[i].hdl));
1643 if(fld==0 || strcmp(fld,"nil")==0)
1644 *(oid*) ret= void_nil;
1645 else *(oid*) ret = (oid) atol(fld);
1646 return MAL_SUCCEED;
1647}
1648
1649str
1650SERVERfetch_field_bte(bte *ret, int *key, int *fnr){
1651 Mapi mid;
1652 int i;
1653 str fld;
1654 accessTest(*key, "fetch_field");
1655 fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1656 if( mapi_error(mid) )
1657 throw(MAL, "mapi.fetch_field_bte", "%s",
1658 mapi_result_error(SERVERsessions[i].hdl));
1659 if(fld==0 || strcmp(fld,"nil")==0)
1660 *(bte*) ret= bte_nil;
1661 else *(bte*) ret = *fld;
1662 return MAL_SUCCEED;
1663}
1664
1665str
1666SERVERfetch_line(str *ret, int *key){
1667 Mapi mid;
1668 int i;
1669 str fld;
1670 accessTest(*key, "fetch_line");
1671 fld= mapi_fetch_line(SERVERsessions[i].hdl);
1672 if( mapi_error(mid) )
1673 throw(MAL, "mapi.fetch_line", "%s",
1674 mapi_result_error(SERVERsessions[i].hdl));
1675 *ret= GDKstrdup(fld? fld:str_nil);
1676 if(*ret == NULL)
1677 throw(MAL, "mapi.fetch_line", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1678 return MAL_SUCCEED;
1679}
1680
1681str
1682SERVERnext_result(int *ret, int *key){
1683 Mapi mid;
1684 int i;
1685 accessTest(*key, "next_result");
1686 mapi_next_result(SERVERsessions[i].hdl);
1687 if( mapi_error(mid) )
1688 throw(MAL, "mapi.next_result", "%s",
1689 mapi_result_error(SERVERsessions[i].hdl));
1690 *ret= *key;
1691 return MAL_SUCCEED;
1692}
1693
1694str
1695SERVERfetch_reset(int *ret, int *key){
1696 Mapi mid;
1697 int i;
1698 accessTest(*key, "fetch_reset");
1699 mapi_fetch_reset(SERVERsessions[i].hdl);
1700 if( mapi_error(mid) )
1701 throw(MAL, "mapi.fetch_reset", "%s",
1702 mapi_result_error(SERVERsessions[i].hdl));
1703 *ret= *key;
1704 return MAL_SUCCEED;
1705}
1706
1707str
1708SERVERfetch_field_bat(bat *bid, int *key){
1709 int i,j,cnt;
1710 Mapi mid;
1711 char *fld;
1712 BAT *b;
1713
1714 accessTest(*key, "rpc");
1715 b= COLnew(0,TYPE_str,256, TRANSIENT);
1716 if( b == NULL)
1717 throw(MAL,"mapi.fetch", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1718 cnt= mapi_get_field_count(SERVERsessions[i].hdl);
1719 for(j=0; j< cnt; j++){
1720 fld= mapi_fetch_field(SERVERsessions[i].hdl,j);
1721 if( mapi_error(mid) ) {
1722 BBPreclaim(b);
1723 throw(MAL, "mapi.fetch_field_bat", "%s",
1724 mapi_result_error(SERVERsessions[i].hdl));
1725 }
1726 if (BUNappend(b,fld, false) != GDK_SUCCEED) {
1727 BBPreclaim(b);
1728 throw(MAL, "mapi.fetch_field_bat", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1729 }
1730 }
1731 *bid = b->batCacheid;
1732 BBPkeepref(*bid);
1733 return MAL_SUCCEED;
1734}
1735
1736str
1737SERVERerror(int *ret, int *key){
1738 Mapi mid;
1739 int i;
1740 accessTest(*key, "error");
1741 *ret= mapi_error(mid);
1742 return MAL_SUCCEED;
1743}
1744
1745str
1746SERVERgetError(str *ret, int *key){
1747 Mapi mid;
1748 int i;
1749 accessTest(*key, "getError");
1750 *ret= GDKstrdup(mapi_error_str(mid));
1751 if(*ret == NULL)
1752 throw(MAL, "mapi.get_error", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1753 return MAL_SUCCEED;
1754}
1755
1756str
1757SERVERexplain(str *ret, int *key){
1758 Mapi mid;
1759 int i;
1760
1761 accessTest(*key, "explain");
1762 *ret= GDKstrdup(mapi_error_str(mid));
1763 if(*ret == NULL)
1764 throw(MAL, "mapi.explain", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1765 return MAL_SUCCEED;
1766}
1767/*
1768 * The remainder should contain the wrapping of
1769 * relevant SERVER functions. Furthermore, we
1770 * should analyse the return value and update
1771 * the stack trace.
1772 *
1773 * Two routines should be
1774 * mapi.rpc(key,"query")
1775 *
1776 * The generic scheme for handling a remote MAL
1777 * procedure call with a single row answer.
1778 */
1779static int SERVERfieldAnalysis(str fld, int tpe, ValPtr v){
1780 v->vtype= tpe;
1781 switch(tpe){
1782 case TYPE_void:
1783 v->val.oval = void_nil;
1784 break;
1785 case TYPE_oid:
1786 if(fld==0 || strcmp(fld,"nil")==0)
1787 v->val.oval= void_nil;
1788 else v->val.oval = (oid) atol(fld);
1789 break;
1790 case TYPE_bit:
1791 if(fld== 0 || strcmp(fld,"nil")==0)
1792 v->val.btval= bit_nil;
1793 else
1794 if(strcmp(fld,"true")==0)
1795 v->val.btval= TRUE;
1796 else
1797 if(strcmp(fld,"false")==0)
1798 v->val.btval= FALSE;
1799 break;
1800 case TYPE_bte:
1801 if(fld==0 || strcmp(fld,"nil")==0)
1802 v->val.btval= bte_nil;
1803 else
1804 v->val.btval= *fld;
1805 break;
1806 case TYPE_sht:
1807 if(fld==0 || strcmp(fld,"nil")==0)
1808 v->val.shval = sht_nil;
1809 else v->val.shval= (sht) atol(fld);
1810 break;
1811 case TYPE_int:
1812 if(fld==0 || strcmp(fld,"nil")==0)
1813 v->val.ival = int_nil;
1814 else v->val.ival= (int) atol(fld);
1815 break;
1816 case TYPE_lng:
1817 if(fld==0 || strcmp(fld,"nil")==0)
1818 v->val.lval= lng_nil;
1819 else v->val.lval= (lng) atol(fld);
1820 break;
1821#ifdef HAVE_HGE
1822 case TYPE_hge:
1823 if(fld==0 || strcmp(fld,"nil")==0)
1824 v->val.hval= hge_nil;
1825 else v->val.hval= (hge) atol(fld);
1826 break;
1827#endif
1828 case TYPE_flt:
1829 if(fld==0 || strcmp(fld,"nil")==0)
1830 v->val.fval= flt_nil;
1831 else v->val.fval= (flt) atof(fld);
1832 break;
1833 case TYPE_dbl:
1834 if(fld==0 || strcmp(fld,"nil")==0)
1835 v->val.dval= dbl_nil;
1836 else v->val.dval= (dbl) atof(fld);
1837 break;
1838 case TYPE_str:
1839 if(fld==0 || strcmp(fld,"nil")==0){
1840 if((v->val.sval= GDKstrdup(str_nil)) == NULL)
1841 return -1;
1842 v->len = strlen(v->val.sval);
1843 } else {
1844 if((v->val.sval= GDKstrdup(fld)) == NULL)
1845 return -1;
1846 v->len = strlen(fld);
1847 }
1848 break;
1849 }
1850 return 0;
1851}
1852
1853str
1854SERVERmapi_rpc_single_row(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1855{
1856 int key,i,j;
1857 Mapi mid;
1858 MapiHdl hdl;
1859 char *s,*fld, *qry=0;
1860
1861 (void) cntxt;
1862 key= * getArgReference_int(stk,pci,pci->retc);
1863 accessTest(key, "rpc");
1864#ifdef MAPI_TEST
1865 mnstr_printf(cntxt->fdout,"about to send: %s\n",qry);
1866#endif
1867 /* glue all strings together */
1868 for(i= pci->retc+1; i<pci->argc; i++){
1869 fld= * getArgReference_str(stk,pci,i);
1870 if( qry == 0) {
1871 qry= GDKstrdup(fld);
1872 if ( qry == NULL)
1873 throw(MAL, "mapi.rpc",SQLSTATE(HY001) MAL_MALLOC_FAIL);
1874 } else {
1875 s= (char*) GDKmalloc(strlen(qry)+strlen(fld)+1);
1876 if ( s == NULL) {
1877 GDKfree(qry);
1878 throw(MAL, "mapi.rpc", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1879 }
1880 strcpy(s,qry);
1881 strcat(s,fld);
1882 GDKfree(qry);
1883 qry= s;
1884 }
1885 }
1886 hdl= mapi_query(mid, qry);
1887 GDKfree(qry);
1888 catchErrors("mapi.rpc");
1889
1890 i= 0;
1891 while( mapi_fetch_row(hdl)){
1892 for(j=0; j<pci->retc; j++){
1893 fld= mapi_fetch_field(hdl,j);
1894#ifdef MAPI_TEST
1895 mnstr_printf(cntxt->fdout,"Got: %s\n",fld);
1896#endif
1897 switch(getVarType(mb,getArg(pci,j)) ){
1898 case TYPE_void:
1899 case TYPE_oid:
1900 case TYPE_bit:
1901 case TYPE_bte:
1902 case TYPE_sht:
1903 case TYPE_int:
1904 case TYPE_lng:
1905#ifdef HAVE_HGE
1906 case TYPE_hge:
1907#endif
1908 case TYPE_flt:
1909 case TYPE_dbl:
1910 case TYPE_str:
1911 if(SERVERfieldAnalysis(fld,getVarType(mb,getArg(pci,j)),&stk->stk[pci->argv[j]]) < 0)
1912 throw(MAL, "mapi.rpc", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1913 break;
1914 default:
1915 throw(MAL, "mapi.rpc",
1916 "Missing type implementation ");
1917 /* all the other basic types come here */
1918 }
1919 }
1920 i++;
1921 }
1922 if( i>1)
1923 throw(MAL, "mapi.rpc","Too many answers");
1924 return MAL_SUCCEED;
1925}
1926/*
1927 * Transport of the BATs is only slightly more complicated.
1928 * The generic implementation based on a pattern is the next
1929 * step.
1930 */
1931str
1932SERVERmapi_rpc_bat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1933 bat *ret;
1934 int *key;
1935 str *qry,err= MAL_SUCCEED;
1936 Mapi mid;
1937 MapiHdl hdl;
1938 char *fld2;
1939 BAT *b;
1940 ValRecord tval;
1941 int i=0, tt;
1942
1943 (void) cntxt;
1944 ret= getArgReference_bat(stk,pci,0);
1945 key= getArgReference_int(stk,pci,pci->retc);
1946 qry= getArgReference_str(stk,pci,pci->retc+1);
1947 accessTest(*key, "rpc");
1948 tt= getBatType(getVarType(mb,getArg(pci,0)));
1949
1950 hdl= mapi_query(mid, *qry);
1951 catchErrors("mapi.rpc");
1952
1953 b= COLnew(0,tt,256, TRANSIENT);
1954 if ( b == NULL)
1955 throw(MAL,"mapi.rpc", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1956 while( mapi_fetch_row(hdl)){
1957 fld2= mapi_fetch_field(hdl,1);
1958 if(SERVERfieldAnalysis(fld2, tt, &tval) < 0) {
1959 BBPreclaim(b);
1960 throw(MAL, "mapi.rpc", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1961 }
1962 if (BUNappend(b,VALptr(&tval), false) != GDK_SUCCEED) {
1963 BBPreclaim(b);
1964 throw(MAL, "mapi.rpc", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1965 }
1966 }
1967 *ret = b->batCacheid;
1968 BBPkeepref(*ret);
1969
1970 return err;
1971}
1972
1973str
1974SERVERput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1975 int *key;
1976 str *nme;
1977 ptr val;
1978 int i,tpe;
1979 Mapi mid;
1980 MapiHdl hdl=0;
1981 char *w=0, buf[BUFSIZ];
1982
1983 (void) cntxt;
1984 key= getArgReference_int(stk,pci,pci->retc);
1985 nme= getArgReference_str(stk,pci,pci->retc+1);
1986 val= getArgReference(stk,pci,pci->retc+2);
1987 accessTest(*key, "put");
1988 switch( (tpe=getArgType(mb,pci, pci->retc+2)) ){
1989 case TYPE_bat:{
1990 /* generate a tuple batch */
1991 /* and reload it into the proper format */
1992 str ht,tt;
1993 BAT *b= BATdescriptor(BBPindex(*nme));
1994 size_t len;
1995
1996 if( b== NULL){
1997 throw(MAL,"mapi.put","Can not access BAT");
1998 }
1999
2000 /* reconstruct the object */
2001 ht = getTypeName(TYPE_oid);
2002 tt = getTypeName(getBatType(tpe));
2003 snprintf(buf,BUFSIZ,"%s:= bat.new(:%s,%s);", *nme, ht,tt );
2004 len = strlen(buf);
2005 snprintf(buf+len,BUFSIZ-len,"%s:= io.import(%s,tuples);", *nme, *nme);
2006
2007 /* and execute the request */
2008 if( SERVERsessions[i].hdl)
2009 mapi_close_handle(SERVERsessions[i].hdl);
2010 SERVERsessions[i].hdl= mapi_query(mid, buf);
2011
2012 GDKfree(ht); GDKfree(tt);
2013 BBPrelease(b->batCacheid);
2014 break;
2015 }
2016 case TYPE_str:
2017 snprintf(buf,BUFSIZ,"%s:=%s;",*nme,*(char**)val);
2018 if( SERVERsessions[i].hdl)
2019 mapi_close_handle(SERVERsessions[i].hdl);
2020 SERVERsessions[i].hdl= mapi_query(mid, buf);
2021 break;
2022 default:
2023 if ((w = ATOMformat(tpe,val)) == NULL)
2024 throw(MAL, "mapi.put", SQLSTATE(HY001) GDK_EXCEPTION);
2025 snprintf(buf,BUFSIZ,"%s:=%s;",*nme,w);
2026 GDKfree(w);
2027 if( SERVERsessions[i].hdl)
2028 mapi_close_handle(SERVERsessions[i].hdl);
2029 SERVERsessions[i].hdl= mapi_query(mid, buf);
2030 break;
2031 }
2032 catchErrors("mapi.put");
2033 return MAL_SUCCEED;
2034}
2035
2036str
2037SERVERputLocal(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
2038 str *ret, *nme;
2039 ptr val;
2040 int tpe;
2041 char *w=0, buf[BUFSIZ];
2042
2043 (void) cntxt;
2044 ret= getArgReference_str(stk,pci,0);
2045 nme= getArgReference_str(stk,pci,pci->retc);
2046 val= getArgReference(stk,pci,pci->retc+1);
2047 switch( (tpe=getArgType(mb,pci, pci->retc+1)) ){
2048 case TYPE_bat:
2049 case TYPE_ptr:
2050 throw(MAL, "mapi.glue","Unsupported type");
2051 case TYPE_str:
2052 snprintf(buf,BUFSIZ,"%s:=%s;",*nme,*(char**)val);
2053 break;
2054 default:
2055 if ((w = ATOMformat(tpe,val)) == NULL)
2056 throw(MAL, "mapi.glue", SQLSTATE(HY001) GDK_EXCEPTION);
2057 snprintf(buf,BUFSIZ,"%s:=%s;",*nme,w);
2058 GDKfree(w);
2059 break;
2060 }
2061 *ret= GDKstrdup(buf);
2062 if(*ret == NULL)
2063 throw(MAL, "mapi.glue", SQLSTATE(HY001) GDK_EXCEPTION);
2064 return MAL_SUCCEED;
2065}
2066
2067str
2068SERVERbindBAT(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
2069 int *key;
2070 str *nme,*tab,*col;
2071 int i;
2072 Mapi mid;
2073 MapiHdl hdl=0;
2074 char buf[BUFSIZ];
2075
2076 (void) cntxt;
2077 key= getArgReference_int(stk,pci,pci->retc);
2078 nme= getArgReference_str(stk,pci,pci->retc+1);
2079 accessTest(*key, "bind");
2080 if( pci->argc == 6) {
2081 char *tn;
2082 tab= getArgReference_str(stk,pci,pci->retc+2);
2083 col= getArgReference_str(stk,pci,pci->retc+3);
2084 i= *getArgReference_int(stk,pci,pci->retc+4);
2085 tn = getTypeName(getBatType(getVarType(mb,getDestVar(pci))));
2086 snprintf(buf,BUFSIZ,"%s:bat[:%s]:=sql.bind(\"%s\",\"%s\",\"%s\",%d);",
2087 getVarName(mb,getDestVar(pci)),
2088 tn,
2089 *nme, *tab,*col,i);
2090 GDKfree(tn);
2091 } else if( pci->argc == 5) {
2092 tab= getArgReference_str(stk,pci,pci->retc+2);
2093 i= *getArgReference_int(stk,pci,pci->retc+3);
2094 snprintf(buf,BUFSIZ,"%s:bat[:oid]:=sql.bind(\"%s\",\"%s\",0,%d);",
2095 getVarName(mb,getDestVar(pci)),*nme, *tab,i);
2096 } else {
2097 str hn,tn;
2098 int target= getArgType(mb,pci,0);
2099 hn= getTypeName(TYPE_oid);
2100 tn= getTypeName(getBatType(target));
2101 snprintf(buf,BUFSIZ,"%s:bat[:%s]:=bbp.bind(\"%s\");",
2102 getVarName(mb,getDestVar(pci)), tn, *nme);
2103 GDKfree(hn);
2104 GDKfree(tn);
2105 }
2106 if( SERVERsessions[i].hdl)
2107 mapi_close_handle(SERVERsessions[i].hdl);
2108 SERVERsessions[i].hdl= mapi_query(mid, buf);
2109 catchErrors("mapi.bind");
2110 return MAL_SUCCEED;
2111}
2112#else
2113// this avoids a compiler warning w.r.t. empty compilation units.
2114int SERVERdummy = 42;
2115#endif // HAVE_MAPI
2116