1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * (c) Fabian Groffen, Martin Kersten
11 * Remote querying functionality
12 * Communication with other mservers at the MAL level is a delicate task.
13 * However, it is indispensable for any distributed functionality. This
14 * module provides an abstract way to store and retrieve objects on a
15 * remote site. Additionally, functions on a remote site can be executed
16 * using objects available in the remote session context. This yields in
17 * four primitive functions that form the basis for distribution methods:
18 * get, put, register and exec.
19 *
20 * The get method simply retrieves a copy of a remote object. Objects can
21 * be simple values, strings or Column. The same holds for the put method,
22 * but the other way around. A local object can be stored on a remote
23 * site. Upon a successful store, the put method returns the remote
24 * identifier for the stored object. With this identifier the object can
25 * be addressed, e.g. using the get method to retrieve the object that was
26 * stored using put.
27 *
28 * The get and put methods are symmetric. Performing a get on an
29 * identifier that was returned by put, results in an object with the same
30 * value and type as the one that was put. The result of such an operation is
31 * equivalent to making an (expensive) copy of the original object.
32 *
33 * The register function takes a local MAL function and makes it known at a
34 * remote site. It ensures that it does not overload an already known
35 * operation remotely, which could create a semantic conflict.
36 * Deregistering a function is forbidden, because it would allow for taking
37 * over the remote site completely.
38 * C-implemented functions, such as io.print() cannot be remotely stored.
39 * It would require even more complicated (byte) code shipping and remote
40 * compilation to make it work.
41 *
42 * The choice to let exec only execute functions avoids problems
43 * to decide what should be returned to the caller. With a function it is
44 * clear and simple to return that what the function signature prescribes.
45 * Any side effect (e.g. io.print calls) may cause havoc in the system,
46 * but are currently ignored.
47 *
48 * This leads to the final contract of this module. The methods should be
49 * used correctly, by obeying their contract. Failing to do so will result
50 * in errors and possibly undefined behaviour.
51 *
52 * The resolve() function can be used to query Merovingian. It returns one
53 * or more databases discovered in its vicinity matching the given pattern.
54 *
55 */
56#include "monetdb_config.h"
57#include "remote.h"
58
59#include "mal_authorize.h"
60
61/*
62 * Technically, these methods need to be serialised per connection,
63 * hence a scheduler that interleaves e.g. multiple get calls, simply
64 * violates this constraint. If parallelism to the same site is
65 * desired, a user could create a second connection. This is not always
66 * easy to generate at the proper place, e.g. overloading the dataflow
67 * optimizer to patch connections structures is not acceptable.
68 *
69 * Instead, we maintain a simple lock with each connection, which can be
70 * used to issue a safe, but blocking get/put/exec/register request.
71 */
72#ifdef HAVE_MAPI
73
74static connection conns = NULL;
75static unsigned char localtype = 0177;
76
77static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query);
78static inline str RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in);
79
80/**
81 * Returns a BAT with valid redirects for the given pattern. If
82 * merovingian is not running, this function throws an error.
83 */
84str RMTresolve(bat *ret, str *pat) {
85#ifdef NATIVE_WIN32
86 (void) ret;
87 (void) pat;
88 throw(MAL, "remote.resolve", "merovingian is not available on "
89 "your platform, sorry"); /* please upgrade to Linux, etc. */
90#else
91 BAT *list;
92 const char *mero_uri;
93 char *p;
94 unsigned int port;
95 char **redirs;
96 char **or;
97
98 if (pat == NULL || *pat == NULL || strcmp(*pat, (str)str_nil) == 0)
99 throw(ILLARG, "remote.resolve",
100 ILLEGAL_ARGUMENT ": pattern is NULL or nil");
101
102 mero_uri = GDKgetenv("merovingian_uri");
103 if (mero_uri == NULL)
104 throw(MAL, "remote.resolve", "this function needs the mserver "
105 "have been started by merovingian");
106
107 list = COLnew(0, TYPE_str, 0, TRANSIENT);
108 if (list == NULL)
109 throw(MAL, "remote.resolve", SQLSTATE(HY001) MAL_MALLOC_FAIL);
110
111 /* extract port from mero_uri, let mapi figure out the rest */
112 mero_uri+=strlen("mapi:monetdb://");
113 if (*mero_uri == '[') {
114 if ((mero_uri = strchr(mero_uri, ']')) == NULL)
115 throw(MAL, "remote.resolve", "illegal IPv6 address on merovingian_uri: %s",
116 GDKgetenv("merovingian_uri"));
117 }
118 if ((p = strchr(mero_uri, ':')) == NULL)
119 throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
120 GDKgetenv("merovingian_uri"));
121 port = (unsigned int)atoi(p + 1);
122
123 or = redirs = mapi_resolve(NULL, port, *pat);
124
125 if (redirs == NULL)
126 throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
127
128 while (*redirs != NULL) {
129 if (BUNappend(list, (ptr)*redirs, false) != GDK_SUCCEED) {
130 BBPreclaim(list);
131 do
132 free(*redirs);
133 while (*++redirs);
134 free(or);
135 throw(MAL, "remote.resolve", SQLSTATE(HY001) MAL_MALLOC_FAIL);
136 }
137 free(*redirs);
138 redirs++;
139 }
140 free(or);
141
142 BBPkeepref(*ret = list->batCacheid);
143 return(MAL_SUCCEED);
144#endif
145}
146
147
148/* for unique connection identifiers */
149static size_t connection_id = 0;
150
151/**
152 * Returns a connection to the given uri. It always returns a newly
153 * created connection.
154 */
155str RMTconnectScen(
156 str *ret,
157 str *ouri,
158 str *user,
159 str *passwd,
160 str *scen)
161{
162 connection c;
163 char conn[BUFSIZ];
164 char *s;
165 Mapi m;
166 MapiHdl hdl;
167 str msg;
168
169 /* just make sure the return isn't garbage */
170 *ret = 0;
171
172 if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str)str_nil) == 0)
173 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
174 "is NULL or nil");
175 if (user == NULL || *user == NULL || strcmp(*user, (str)str_nil) == 0)
176 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
177 "NULL or nil");
178 if (passwd == NULL || *passwd == NULL || strcmp(*passwd, (str)str_nil) == 0)
179 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": password is "
180 "NULL or nil");
181 if (scen == NULL || *scen == NULL || strcmp(*scen, (str)str_nil) == 0)
182 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
183 "NULL or nil");
184 if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
185 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
186 "is not supported", *scen);
187
188 m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
189 if (mapi_error(m)) {
190 msg = createException(MAL, "remote.connect",
191 "unable to connect to '%s': %s",
192 *ouri, mapi_error_str(m));
193 mapi_destroy(m);
194 return msg;
195 }
196
197 MT_lock_set(&mal_remoteLock);
198
199 /* generate an unique connection name, they are only known
200 * within one mserver, id is primary key, the rest is super key */
201 snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user, connection_id++);
202 /* make sure we can construct MAL identifiers using conn */
203 for (s = conn; *s != '\0'; s++) {
204 if (!isalnum((unsigned char)*s)) {
205 *s = '_';
206 }
207 }
208
209 if (mapi_reconnect(m) != MOK) {
210 MT_lock_unset(&mal_remoteLock);
211 msg = createException(IO, "remote.connect",
212 "unable to connect to '%s': %s",
213 *ouri, mapi_error_str(m));
214 mapi_destroy(m);
215 return msg;
216 }
217
218 /* connection established, add to list */
219 c = GDKzalloc(sizeof(struct _connection));
220 if ( c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
221 GDKfree(c);
222 mapi_destroy(m);
223 MT_lock_unset(&mal_remoteLock);
224 throw(MAL,"remote.connect", SQLSTATE(HY001) MAL_MALLOC_FAIL);
225 }
226 c->mconn = m;
227 c->nextid = 0;
228 MT_lock_init(&c->lock, c->name);
229 c->next = conns;
230 conns = c;
231
232 msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
233 if (msg)
234 return msg;
235 if (hdl != NULL && mapi_fetch_row(hdl)) {
236 char *val = mapi_fetch_field(hdl, 0);
237 c->type = (unsigned char)atoi(val);
238 mapi_close_handle(hdl);
239 } else {
240 c->type = 0;
241 }
242
243#ifdef _DEBUG_MAPI_
244 mapi_trace(c->mconn, true);
245#endif
246
247 MT_lock_unset(&mal_remoteLock);
248
249 *ret = GDKstrdup(conn);
250 if(*ret == NULL)
251 throw(MAL,"remote.connect", SQLSTATE(HY001) MAL_MALLOC_FAIL);
252 return(MAL_SUCCEED);
253}
254
255str RMTconnect(
256 str *ret,
257 str *uri,
258 str *user,
259 str *passwd)
260{
261 str scen = "mal";
262 return RMTconnectScen(ret, uri, user, passwd, &scen);
263}
264
265str
266RMTconnectTable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
267{
268 char *local_table;
269 char *remoteuser;
270 char *passwd;
271 char *uri;
272 char *tmp;
273 char *ret;
274 str scen;
275 str msg;
276 ValPtr v;
277
278 (void)mb;
279 (void)cntxt;
280
281 local_table = *getArgReference_str(stk, pci, 1);
282 scen = *getArgReference_str(stk, pci, 2);
283 if (local_table == NULL || strcmp(local_table, (str)str_nil) == 0) {
284 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": local table is NULL or nil");
285 }
286
287 rethrow("remote.connect", tmp, AUTHgetRemoteTableCredentials(local_table, &uri, &remoteuser, &passwd));
288
289 /* The password we just got is hashed. Add the byte \1 in front to
290 * signal this fact to the mapi. */
291 size_t pwlen = strlen(passwd);
292 char *pwhash = (char*)GDKmalloc(pwlen + 2);
293 if (pwhash == NULL) {
294 GDKfree(remoteuser);
295 GDKfree(passwd);
296 throw(MAL, "remote.connect", SQLSTATE(HY001) MAL_MALLOC_FAIL);
297 }
298 snprintf(pwhash, pwlen + 2, "\1%s", passwd);
299
300 msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen);
301
302 GDKfree(passwd);
303 GDKfree(pwhash);
304
305 if (msg == MAL_SUCCEED) {
306 v = &stk->stk[pci->argv[0]];
307 v->vtype = TYPE_str;
308 if((v->val.sval = GDKstrdup(ret)) == NULL) {
309 GDKfree(ret);
310 throw(MAL, "remote.connect", SQLSTATE(HY001) MAL_MALLOC_FAIL);
311 }
312 }
313
314 GDKfree(ret);
315 return msg;
316}
317
318
319/**
320 * Disconnects a connection. The connection needs not to exist in the
321 * system, it only needs to exist for the client (i.e. it was once
322 * created).
323 */
324str RMTdisconnect(void *ret, str *conn) {
325 connection c, t;
326
327 if (conn == NULL || *conn == NULL || strcmp(*conn, (str)str_nil) == 0)
328 throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
329 "is NULL or nil");
330
331
332 (void) ret;
333
334 /* we need a lock because the same user can be handled by multiple
335 * threads */
336 MT_lock_set(&mal_remoteLock);
337 c = conns;
338 t = NULL; /* parent */
339 /* walk through the list */
340 while (c != NULL) {
341 if (strcmp(c->name, *conn) == 0) {
342 /* ok, delete it... */
343 if (t == NULL) {
344 conns = c->next;
345 } else {
346 t->next = c->next;
347 }
348
349 MT_lock_set(&c->lock); /* shared connection */
350 mapi_disconnect(c->mconn);
351 mapi_destroy(c->mconn);
352 MT_lock_unset(&c->lock);
353 MT_lock_destroy(&c->lock);
354 GDKfree(c->name);
355 GDKfree(c);
356 MT_lock_unset(&mal_remoteLock);
357 return MAL_SUCCEED;
358 }
359 t = c;
360 c = c->next;
361 }
362
363 MT_lock_unset(&mal_remoteLock);
364 throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
365}
366
367/**
368 * Helper function to return a connection matching a given string, or an
369 * error if it does not exist. Since this function is internal, it
370 * doesn't check the argument conn, as it should have been checked
371 * already.
372 * NOTE: this function acquires the mal_remoteLock before accessing conns
373 */
374static inline str
375RMTfindconn(connection *ret, const char *conn) {
376 connection c;
377
378 /* just make sure the return isn't garbage */
379 *ret = NULL;
380 MT_lock_set(&mal_remoteLock); /* protect c */
381 c = conns;
382 while (c != NULL) {
383 if (strcmp(c->name, conn) == 0) {
384 *ret = c;
385 MT_lock_unset(&mal_remoteLock);
386 return(MAL_SUCCEED);
387 }
388 c = c->next;
389 }
390 MT_lock_unset(&mal_remoteLock);
391 throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
392}
393
394/**
395 * Little helper function that returns a GDKmalloced string containing a
396 * valid identifier that is supposed to be unique in the connection's
397 * remote context. The generated string depends on the module and
398 * function the caller is in. But also the runtime context is important.
399 * The format is rmt<id>_<retvar>_<type>. Every RMTgetId uses a fresh id,
400 * to distinguish amongst different (parallel) execution context.
401 * Re-use of this remote identifier should be done with care.
402 * The encoding of the type allows for ease of type checking later on.
403 */
404static inline str
405RMTgetId(char *buf, MalBlkPtr mb, InstrPtr p, int arg) {
406 InstrPtr f;
407 char *mod;
408 char *var;
409 str rt;
410 static int idtag=0;
411
412 if( p->retc == 0)
413 throw(MAL, "remote.getId", ILLEGAL_ARGUMENT "MAL instruction misses retc");
414
415 var = getArgName(mb, p, arg);
416 f = getInstrPtr(mb, 0); /* top level function */
417 mod = getModuleId(f);
418 if (mod == NULL)
419 mod = "user";
420 rt = getTypeIdentifier(getArgType(mb,p,arg));
421 if (rt == NULL)
422 throw(MAL, "remote.put", SQLSTATE(HY001) MAL_MALLOC_FAIL);
423
424 snprintf(buf, BUFSIZ, "rmt%d_%s_%s", idtag++, var, rt);
425
426 GDKfree(rt);
427 return(MAL_SUCCEED);
428}
429
430/**
431 * Helper function to execute a query over the given connection,
432 * returning the result handle. If communication fails in one way or
433 * another, an error is returned. Since this function is internal, it
434 * doesn't check the input arguments func, conn and query, as they
435 * should have been checked already.
436 * NOTE: this function assumes a lock for conn is set
437 */
438static inline str
439RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query) {
440 MapiHdl mhdl;
441
442 *ret = NULL;
443 mhdl = mapi_query(conn, query);
444 if (mhdl) {
445 if (mapi_result_error(mhdl) != NULL) {
446 str err = createException(
447 getExceptionType(mapi_result_error(mhdl)),
448 func,
449 "(mapi:monetdb://%s@%s/%s) %s",
450 mapi_get_user(conn),
451 mapi_get_host(conn),
452 mapi_get_dbname(conn),
453 getExceptionMessage(mapi_result_error(mhdl)));
454 mapi_close_handle(mhdl);
455 return(err);
456 }
457 } else {
458 if (mapi_error(conn) != MOK) {
459 throw(IO, func, "an error occurred on connection: %s",
460 mapi_error_str(conn));
461 } else {
462 throw(MAL, func, "remote function invocation didn't return a result");
463 }
464 }
465
466 *ret = mhdl;
467 return(MAL_SUCCEED);
468}
469
470str RMTprelude(void *ret) {
471 unsigned int type = 0;
472
473 (void)ret;
474#ifdef WORDS_BIGENDIAN
475 type |= RMTT_B_ENDIAN;
476#else
477 type |= RMTT_L_ENDIAN;
478#endif
479#if SIZEOF_SIZE_T == SIZEOF_LNG
480 type |= RMTT_64_BITS;
481#else
482 type |= RMTT_32_BITS;
483#endif
484#if SIZEOF_OID == SIZEOF_LNG
485 type |= RMTT_64_OIDS;
486#else
487 type |= RMTT_32_OIDS;
488#endif
489 localtype = (unsigned char)type;
490
491 return(MAL_SUCCEED);
492}
493
494str RMTepilogue(void *ret) {
495 connection c, t;
496
497 (void)ret;
498
499 MT_lock_set(&mal_remoteLock); /* nobody allowed here */
500 /* free connections list */
501 c = conns;
502 while (c != NULL) {
503 t = c;
504 c = c->next;
505 MT_lock_set(&t->lock);
506 mapi_destroy(t->mconn);
507 MT_lock_unset(&t->lock);
508 MT_lock_destroy(&t->lock);
509 GDKfree(t->name);
510 GDKfree(t);
511 }
512 /* not sure, but better be safe than sorry */
513 conns = NULL;
514 MT_lock_unset(&mal_remoteLock);
515
516 return(MAL_SUCCEED);
517}
518
519/**
520 * get fetches the object referenced by ident over connection conn.
521 * We are only interested in retrieving void-headed BATs, i.e. single columns.
522 */
523str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
524 str conn, ident, tmp, rt;
525 connection c;
526 char qbuf[BUFSIZ + 1];
527 MapiHdl mhdl = NULL;
528 int rtype;
529 ValPtr v;
530
531 (void)mb;
532 (void) cntxt;
533
534 conn = *getArgReference_str(stk, pci, 1);
535 if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
536 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
537 ident = *getArgReference_str(stk, pci, 2);
538 if (ident == 0 || isIdentifier(ident) < 0)
539 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
540
541 /* lookup conn, set c if valid */
542 rethrow("remote.get", tmp, RMTfindconn(&c, conn));
543
544 rtype = getArgType(mb, pci, 0);
545 v = &stk->stk[pci->argv[0]];
546
547 if (rtype == TYPE_any || isAnyExpression(rtype)) {
548 char *tpe, *msg;
549 tpe = getTypeName(rtype);
550 msg = createException(MAL, "remote.get", ILLEGAL_ARGUMENT ": unsupported any type: %s",
551 tpe);
552 GDKfree(tpe);
553 return msg;
554 }
555 /* check if the remote type complies with what we expect.
556 Since the put() encodes the type as known to the remote site
557 we can simple compare it here */
558 rt = getTypeIdentifier(rtype);
559 if (rt == NULL)
560 throw(MAL, "remote.get", SQLSTATE(HY001) MAL_MALLOC_FAIL);
561 if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
562 tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
563 ": remote object type %s does not match expected type %s",
564 rt, ident);
565 GDKfree(rt);
566 return tmp;
567 }
568 GDKfree(rt);
569
570 if (isaBatType(rtype) && (localtype == 0177 || localtype != c->type ))
571 {
572 int t;
573 size_t s;
574 ptr r;
575 str var;
576 BAT *b;
577
578 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
579#ifdef _DEBUG_REMOTE
580 fprintf(stderr, "#remote.get:%s\n", qbuf);
581#endif
582 /* this call should be a single transaction over the channel*/
583 MT_lock_set(&c->lock);
584
585 if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
586 != MAL_SUCCEED)
587 {
588#ifdef _DEBUG_REMOTE
589 fprintf(stderr, "#REMOTE GET error: %s\n%s\n",
590 qbuf, tmp);
591#endif
592 MT_lock_unset(&c->lock);
593 var = createException(MAL, "remote.get", "%s", tmp);
594 freeException(tmp);
595 return var;
596 }
597 t = getBatType(rtype);
598 b = COLnew(0, t, 0, TRANSIENT);
599 if (b == NULL) {
600 mapi_close_handle(mhdl);
601 MT_lock_unset(&c->lock);
602 throw(MAL, "remote.get", SQLSTATE(HY001) MAL_MALLOC_FAIL);
603 }
604
605 if (ATOMvarsized(t)) {
606 while (mapi_fetch_row(mhdl)) {
607 var = mapi_fetch_field(mhdl, 1);
608 if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
609 BBPreclaim(b);
610 mapi_close_handle(mhdl);
611 MT_lock_unset(&c->lock);
612 throw(MAL, "remote.get", SQLSTATE(HY001) MAL_MALLOC_FAIL);
613 }
614 }
615 } else
616 while (mapi_fetch_row(mhdl)) {
617 var = mapi_fetch_field(mhdl, 1);
618 if (var == NULL)
619 var = "nil";
620 s = 0;
621 r = NULL;
622 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
623 BUNappend(b, r, false) != GDK_SUCCEED) {
624 BBPreclaim(b);
625 GDKfree(r);
626 mapi_close_handle(mhdl);
627 MT_lock_unset(&c->lock);
628 throw(MAL, "remote.get", GDK_EXCEPTION);
629 }
630 GDKfree(r);
631 }
632
633 v->val.bval = b->batCacheid;
634 v->vtype = TYPE_bat;
635 BBPkeepref(b->batCacheid);
636
637 mapi_close_handle(mhdl);
638 MT_lock_unset(&c->lock);
639 } else if (isaBatType(rtype)) {
640 /* binary compatible remote host, transfer BAT in binary form */
641 stream *sout;
642 stream *sin;
643 char buf[256];
644 ssize_t sz = 0, rd;
645 BAT *b = NULL;
646
647 /* this call should be a single transaction over the channel*/
648 MT_lock_set(&c->lock);
649
650 /* bypass Mapi from this point to efficiently write all data to
651 * the server */
652 sout = mapi_get_to(c->mconn);
653 sin = mapi_get_from(c->mconn);
654 if (sin == NULL || sout == NULL) {
655 MT_lock_unset(&c->lock);
656 throw(MAL, "remote.get", "Connection lost");
657 }
658
659 /* call our remote helper to do this more efficiently */
660 mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
661 mnstr_flush(sout);
662
663 /* read the JSON header */
664 while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
665 sz += rd;
666 }
667 if (rd < 0) {
668 MT_lock_unset(&c->lock);
669 throw(MAL, "remote.get", "could not read BAT JSON header");
670 }
671 if (buf[0] == '!') {
672 char *result;
673 MT_lock_unset(&c->lock);
674 if((result = GDKstrdup(buf)) == NULL)
675 throw(MAL, "remote.get", SQLSTATE(HY001) MAL_MALLOC_FAIL);
676 return result;
677 }
678
679 buf[sz] = '\0';
680 if ((tmp = RMTinternalcopyfrom(&b, buf, sin)) != NULL) {
681 MT_lock_unset(&c->lock);
682 return(tmp);
683 }
684
685 v->val.bval = b->batCacheid;
686 v->vtype = TYPE_bat;
687 BBPkeepref(b->batCacheid);
688
689 MT_lock_unset(&c->lock);
690 } else {
691 ptr p = NULL;
692 str val;
693 size_t len = 0;
694
695 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
696#ifdef _DEBUG_REMOTE
697 fprintf(stderr, "#remote:%s:%s\n", c->name, qbuf);
698#endif
699 if ((tmp=RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED)
700 {
701 return tmp;
702 }
703 (void) mapi_fetch_row(mhdl); /* should succeed */
704 val = mapi_fetch_field(mhdl, 0);
705
706 if (ATOMvarsized(rtype)) {
707 p = GDKstrdup(val == NULL ? str_nil : val);
708 if (p == NULL) {
709 mapi_close_handle(mhdl);
710 throw(MAL, "remote.get", SQLSTATE(HY001) MAL_MALLOC_FAIL);
711 }
712 VALset(v, rtype, p);
713 } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true) < 0) {
714 char *msg;
715 msg = createException(MAL, "remote.get",
716 "unable to parse value: %s",
717 val == NULL ? "nil" : val);
718 mapi_close_handle(mhdl);
719 GDKfree(p);
720 return msg;
721 } else {
722 VALset(v, rtype, p);
723 if (ATOMextern(rtype) == 0)
724 GDKfree(p);
725 }
726
727 mapi_close_handle(mhdl);
728 }
729
730 return(MAL_SUCCEED);
731}
732
733/**
734 * stores the given object on the remote host. The identifier of the
735 * object on the remote host is returned for later use.
736 */
737str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
738 str conn, tmp;
739 char ident[BUFSIZ];
740 connection c;
741 ValPtr v;
742 int type;
743 ptr value;
744 MapiHdl mhdl = NULL;
745
746 (void)cntxt;
747
748 conn = *getArgReference_str(stk, pci, 1);
749 if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
750 throw(ILLARG, "remote.put", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
751
752 /* lookup conn */
753 rethrow("remote.put", tmp, RMTfindconn(&c, conn));
754
755 /* put the thing */
756 type = getArgType(mb, pci, 2);
757 value = getArgReference(stk, pci, 2);
758
759 /* this call should be a single transaction over the channel*/
760 MT_lock_set(&c->lock);
761
762 /* get a free, typed identifier for the remote host */
763 tmp = RMTgetId(ident, mb, pci, 2);
764 if (tmp != MAL_SUCCEED) {
765 MT_lock_unset(&c->lock);
766 return tmp;
767 }
768
769 /* depending on the input object generate actions to store the
770 * object remotely*/
771 if (type == TYPE_any || type == TYPE_bat || isAnyExpression(type)) {
772 char *tpe, *msg;
773 MT_lock_unset(&c->lock);
774 tpe = getTypeName(type);
775 msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
776 GDKfree(tpe);
777 return msg;
778 } else if (isaBatType(type) && !is_bat_nil(*(bat*) value)) {
779 BATiter bi;
780 /* naive approach using bat.new() and bat.insert() calls */
781 char *tail;
782 bat bid;
783 BAT *b = NULL;
784 BUN p, q;
785 str tailv;
786 stream *sout;
787
788 tail = getTypeIdentifier(getBatType(type));
789 if (tail == NULL) {
790 MT_lock_unset(&c->lock);
791 throw(MAL, "remote.put", SQLSTATE(HY001) MAL_MALLOC_FAIL);
792 }
793
794 bid = *(bat *)value;
795 if (bid != 0) {
796 if ((b = BATdescriptor(bid)) == NULL){
797 MT_lock_unset(&c->lock);
798 GDKfree(tail);
799 throw(MAL, "remote.put", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
800 }
801 }
802
803 /* bypass Mapi from this point to efficiently write all data to
804 * the server */
805 sout = mapi_get_to(c->mconn);
806
807 /* call our remote helper to do this more efficiently */
808 mnstr_printf(sout,
809 "%s := remote.batload(:%s, " BUNFMT ");\n",
810 ident, tail, (bid == 0 ? 0 : BATcount(b)));
811 mnstr_flush(sout);
812 GDKfree(tail);
813
814 /* b can be NULL if bid == 0 (only type given, ugh) */
815 if (b) {
816 bi = bat_iterator(b);
817 BATloop(b, p, q) {
818 tailv = ATOMformat(getBatType(type), BUNtail(bi, p));
819 if (tailv == NULL) {
820 BBPunfix(b->batCacheid);
821 MT_lock_unset(&c->lock);
822 throw(MAL, "remote.put", GDK_EXCEPTION);
823 }
824 if (getBatType(type) > TYPE_str)
825 mnstr_printf(sout, "\"%s\"\n", tailv);
826 else
827 mnstr_printf(sout, "%s\n", tailv);
828 GDKfree(tailv);
829 }
830 BBPunfix(b->batCacheid);
831 }
832
833 /* write the empty line the server is waiting for, handles
834 * all errors at the same time, if any */
835 if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
836 != MAL_SUCCEED)
837 {
838 MT_lock_unset(&c->lock);
839 return tmp;
840 }
841 mapi_close_handle(mhdl);
842 } else if (isaBatType(type) && is_bat_nil(*(bat*) value)) {
843 stream *sout;
844 str typename = getTypeName(type);
845 sout = mapi_get_to(c->mconn);
846 mnstr_printf(sout,
847 "%s := nil:%s;\n", ident, typename);
848 mnstr_flush(sout);
849 GDKfree(typename);
850 } else {
851 size_t l;
852 str val;
853 char *tpe;
854 char qbuf[512], *nbuf = qbuf;
855 if (ATOMvarsized(type)) {
856 val = ATOMformat(type, *(str *)value);
857 } else {
858 val = ATOMformat(type, value);
859 }
860 if (val == NULL) {
861 MT_lock_unset(&c->lock);
862 throw(MAL, "remote.put", GDK_EXCEPTION);
863 }
864 tpe = getTypeIdentifier(type);
865 if (tpe == NULL) {
866 MT_lock_unset(&c->lock);
867 GDKfree(val);
868 throw(MAL, "remote.put", SQLSTATE(HY001) MAL_MALLOC_FAIL);
869 }
870 l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
871 if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
872 MT_lock_unset(&c->lock);
873 GDKfree(val);
874 GDKfree(tpe);
875 throw(MAL, "remote.put", SQLSTATE(HY001) MAL_MALLOC_FAIL);
876 }
877 if (type <= TYPE_str)
878 snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
879 else
880 snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
881 GDKfree(tpe);
882 GDKfree(val);
883#ifdef _DEBUG_REMOTE
884 fprintf(stderr, "#remote.put:%s:%s\n", c->name, nbuf);
885#endif
886 tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
887 if (nbuf != qbuf)
888 GDKfree(nbuf);
889 if (tmp != MAL_SUCCEED) {
890 MT_lock_unset(&c->lock);
891 return tmp;
892 }
893 mapi_close_handle(mhdl);
894 }
895 MT_lock_unset(&c->lock);
896
897 /* return the identifier */
898 v = &stk->stk[pci->argv[0]];
899 v->vtype = TYPE_str;
900 if((v->val.sval = GDKstrdup(ident)) == NULL)
901 throw(MAL, "remote.put", SQLSTATE(HY001) MAL_MALLOC_FAIL);
902 return(MAL_SUCCEED);
903}
904
905/**
906 * stores the given <mod>.<fcn> on the remote host.
907 * An error is returned if the function is already known at the remote site.
908 * The implementation is based on serialisation of the block into a string
909 * followed by remote parsing.
910 */
911static str RMTregisterInternal(Client cntxt, const char *conn, const char *mod, const char *fcn)
912{
913 str tmp, qry, msg;
914 connection c;
915 char buf[BUFSIZ];
916 MapiHdl mhdl = NULL;
917 Symbol sym;
918
919 if (conn == NULL || strcmp(conn, str_nil) == 0)
920 throw(ILLARG, "remote.register", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
921
922 /* find local definition */
923 sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
924 if (sym == NULL)
925 throw(MAL, "remote.register", ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
926
927 /* lookup conn */
928 rethrow("remote.register", tmp, RMTfindconn(&c, conn));
929
930 /* this call should be a single transaction over the channel*/
931 MT_lock_set(&c->lock);
932
933 /* check remote definition */
934 snprintf(buf, BUFSIZ, "inspect.getSignature(\"%s\",\"%s\");", mod, fcn);
935#ifdef _DEBUG_REMOTE
936 fprintf(stderr, "#remote.register:%s:%s\n", c->name, buf);
937#endif
938 msg = RMTquery(&mhdl, "remote.register", c->mconn, buf);
939 if (msg == MAL_SUCCEED) {
940 MT_lock_unset(&c->lock);
941 throw(MAL, "remote.register",
942 "function already exists at the remote site: %s.%s",
943 mod, fcn);
944 } else {
945 /* we basically hope/assume this is a "doesn't exist" error */
946 freeException(msg);
947 }
948 if (mhdl)
949 mapi_close_handle(mhdl);
950
951 /* make sure the program is error free */
952 chkProgram(cntxt->usermodule, sym->def);
953 if (sym->def->errors) {
954 MT_lock_unset(&c->lock);
955 throw(MAL, "remote.register",
956 "function '%s.%s' contains syntax or type errors",
957 mod, fcn);
958 }
959
960 qry = mal2str(sym->def, 0, sym->def->stop);
961#ifdef _DEBUG_REMOTE
962 fprintf(stderr, "#remote.register:%s:%s\n", c->name, qry);
963#endif
964 msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
965 GDKfree(qry);
966 if (mhdl)
967 mapi_close_handle(mhdl);
968
969 MT_lock_unset(&c->lock);
970 return msg;
971}
972
973str RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
974 const char *conn = *getArgReference_str(stk, pci, 1);
975 const char *mod = *getArgReference_str(stk, pci, 2);
976 const char *fcn = *getArgReference_str(stk, pci, 3);
977 (void)mb;
978 return RMTregisterInternal(cntxt, conn, mod, fcn);
979}
980
981/**
982 * exec executes the function with its given arguments on the remote
983 * host, returning the function's return value. exec is purposely kept
984 * very spartan. All arguments need to be handles to previously put()
985 * values. It calls the function with the given arguments at the remote
986 * site, and returns the handle which stores the return value of the
987 * remotely executed function. This return value can be retrieved using
988 * a get call. It handles multiple return arguments.
989 */
990str RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
991 str conn, mod, func, tmp;
992 int i;
993 size_t len, buflen;
994 connection c= NULL;
995 char *qbuf;
996 MapiHdl mhdl;
997
998 (void)cntxt;
999 (void)mb;
1000
1001 for (i = 0; i < pci->retc; i++) {
1002 tmp = *getArgReference_str(stk, pci, i);
1003 if (tmp == NULL || strcmp(tmp, (str)str_nil) == 0)
1004 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
1005 ": return value %d is NULL or nil", i);
1006 }
1007 conn = *getArgReference_str(stk, pci, i++);
1008 if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
1009 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1010 mod = *getArgReference_str(stk, pci, i++);
1011 if (mod == NULL || strcmp(mod, (str)str_nil) == 0)
1012 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": module name is NULL or nil");
1013 func = *getArgReference_str(stk, pci, i++);
1014 if (func == NULL || strcmp(func, (str)str_nil) == 0)
1015 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": function name is NULL or nil");
1016
1017 /* lookup conn */
1018 rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
1019
1020 /* this call should be a single transaction over the channel*/
1021 MT_lock_set(&c->lock);
1022
1023 if(pci->argc - pci->retc < 3) /* conn, mod, func, ... */
1024 throw(MAL, "remote.exec", ILLEGAL_ARGUMENT " MAL instruction misses arguments");
1025
1026 len = 0;
1027 /* count how big a buffer we need */
1028 len += 2 * (pci->retc > 1);
1029 for (i = 0; i < pci->retc; i++) {
1030 len += 2 * (i > 0);
1031 len += strlen(*getArgReference_str(stk, pci, i));
1032 }
1033 len += strlen(mod) + strlen(func) + 6;
1034 for (i = 3; i < pci->argc - pci->retc; i++) {
1035 len += 2 * (i > 3);
1036 len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
1037 }
1038 len += 2;
1039 buflen = len + 1;
1040 if ((qbuf = GDKmalloc(buflen)) == NULL)
1041 throw(MAL, "remote.exec", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1042
1043 len = 0;
1044
1045 if (pci->retc > 1)
1046 qbuf[len++] = '(';
1047 for (i = 0; i < pci->retc; i++)
1048 len += snprintf(&qbuf[len], buflen - len, "%s%s",
1049 (i > 0 ? ", " : ""), *getArgReference_str(stk, pci, i));
1050
1051 if (pci->retc > 1)
1052 qbuf[len++] = ')';
1053
1054 /* build the function invocation string in qbuf */
1055 len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
1056
1057 /* handle the arguments to the function */
1058
1059 /* put the arguments one by one, and dynamically build the
1060 * invocation string */
1061 for (i = 3; i < pci->argc - pci->retc; i++) {
1062 len += snprintf(&qbuf[len], buflen - len, "%s%s",
1063 (i > 3 ? ", " : ""),
1064 *(getArgReference_str(stk, pci, pci->retc + i)));
1065 }
1066
1067 /* finish end execute the invocation string */
1068 len += snprintf(&qbuf[len], buflen - len, ");");
1069#ifdef _DEBUG_REMOTE
1070 fprintf(stderr,"#remote.exec:%s:%s\n",c->name,qbuf);
1071#endif
1072 tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
1073 GDKfree(qbuf);
1074 if (mhdl)
1075 mapi_close_handle(mhdl);
1076 MT_lock_unset(&c->lock);
1077 return tmp;
1078}
1079
1080/**
1081 * batload is a helper function to make transferring a BAT with RMTput
1082 * more efficient. It works by creating a BAT, and loading it with the
1083 * data as comma separated values from the input stream, until an empty
1084 * line is read. The given size argument is taken as a hint only, and
1085 * is not enforced to match the number of rows read.
1086 */
1087str RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1088 ValPtr v;
1089 int t;
1090 int size;
1091 ptr r;
1092 size_t s;
1093 BAT *b;
1094 size_t len;
1095 char *var;
1096 str msg = MAL_SUCCEED;
1097 bstream *fdin = cntxt->fdin;
1098
1099 v = &stk->stk[pci->argv[0]]; /* return */
1100 t = getArgType(mb, pci, 1); /* tail type */
1101 size = *getArgReference_int(stk, pci, 2); /* size */
1102
1103 b = COLnew(0, t, size, TRANSIENT);
1104 if (b == NULL)
1105 throw(MAL, "remote.load", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1106
1107 /* grab the input stream and start reading */
1108 fdin->eof = false;
1109 len = fdin->pos;
1110 while (len < fdin->len || bstream_next(fdin) > 0) {
1111 /* newline hunting (how spartan) */
1112 for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n'; len++)
1113 ;
1114 /* unterminated line, request more */
1115 if (fdin->buf[len] != '\n')
1116 continue;
1117 /* empty line, end of input */
1118 if (fdin->pos == len) {
1119 if (isa_block_stream(fdin->s)) {
1120 ssize_t n = bstream_next(fdin);
1121 if( n )
1122 msg = createException(MAL, "remote.load", SQLSTATE(HY001) "Unexpected return from remote");
1123 }
1124 break;
1125 }
1126 fdin->buf[len] = '\0'; /* kill \n */
1127 var = &fdin->buf[fdin->pos];
1128 /* skip over this line */
1129 fdin->pos = ++len;
1130
1131 s = 0;
1132 r = NULL;
1133 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
1134 BUNappend(b, r, false) != GDK_SUCCEED) {
1135 BBPreclaim(b);
1136 GDKfree(r);
1137 throw(MAL, "remote.get", GDK_EXCEPTION);
1138 }
1139 GDKfree(r);
1140 }
1141
1142 v->val.bval = b->batCacheid;
1143 v->vtype = TYPE_bat;
1144 BBPkeepref(b->batCacheid);
1145
1146 return msg;
1147}
1148
1149/**
1150 * dump given BAT to stream
1151 */
1152str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1153{
1154 bat bid = *getArgReference_bat(stk, pci, 1);
1155 BAT *b = BBPquickdesc(bid, false);
1156 char sendtheap = 0;
1157
1158 (void)mb;
1159 (void)stk;
1160 (void)pci;
1161
1162 if (b == NULL)
1163 throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
1164
1165 if (BBPfix(bid) <= 0)
1166 throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
1167
1168 sendtheap = b->ttype != TYPE_void && b->tvarsized;
1169
1170 mnstr_printf(cntxt->fdout, /*JSON*/"{"
1171 "\"version\":1,"
1172 "\"ttype\":%d,"
1173 "\"hseqbase\":" OIDFMT ","
1174 "\"tseqbase\":" OIDFMT ","
1175 "\"tsorted\":%d,"
1176 "\"trevsorted\":%d,"
1177 "\"tkey\":%d,"
1178 "\"tnonil\":%d,"
1179 "\"tdense\":%d,"
1180 "\"size\":" BUNFMT ","
1181 "\"tailsize\":%zu,"
1182 "\"theapsize\":%zu"
1183 "}\n",
1184 b->ttype,
1185 b->hseqbase, b->tseqbase,
1186 b->tsorted, b->trevsorted,
1187 b->tkey,
1188 b->tnonil,
1189 BATtdense(b),
1190 b->batCount,
1191 (size_t)b->batCount * Tsize(b),
1192 sendtheap && b->batCount > 0 ? b->tvheap->free : 0
1193 );
1194
1195 if (b->batCount > 0) {
1196 mnstr_write(cntxt->fdout, /* tail */
1197 Tloc(b, 0), b->batCount * Tsize(b), 1);
1198 if (sendtheap)
1199 mnstr_write(cntxt->fdout, /* theap */
1200 Tbase(b), b->tvheap->free, 1);
1201 }
1202 /* flush is done by the calling environment (MAL) */
1203
1204 BBPunfix(bid);
1205
1206 return(MAL_SUCCEED);
1207}
1208
1209typedef struct _binbat_v1 {
1210 int Ttype;
1211 oid Hseqbase;
1212 oid Tseqbase;
1213 bool
1214 Tsorted:1,
1215 Trevsorted:1,
1216 Tkey:1,
1217 Tnonil:1,
1218 Tdense:1;
1219 BUN size;
1220 size_t headsize;
1221 size_t tailsize;
1222 size_t theapsize;
1223} binbat;
1224
1225static inline str
1226RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in)
1227{
1228 binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0, 0 };
1229 char *nme = NULL;
1230 char *val = NULL;
1231 char tmp;
1232 size_t len;
1233 lng lv, *lvp;
1234
1235 BAT *b;
1236
1237 /* hdr is a JSON structure that looks like
1238 * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
1239 * we take the binary data directly from the stream */
1240
1241 /* could skip whitespace, but we just don't allow that */
1242 if (*hdr++ != '{')
1243 throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON header (got '%s')", hdr - 1);
1244 while (*hdr != '\0') {
1245 switch (*hdr) {
1246 case '"':
1247 /* we assume only numeric values, so all strings are
1248 * elems */
1249 if (nme != NULL) {
1250 *hdr = '\0';
1251 } else {
1252 nme = hdr + 1;
1253 }
1254 break;
1255 case ':':
1256 val = hdr + 1;
1257 break;
1258 case ',':
1259 case '}':
1260 if (val == NULL)
1261 throw(MAL, "remote.bincopyfrom",
1262 "illegal input, JSON value missing");
1263 *hdr = '\0';
1264
1265 lvp = &lv;
1266 len = sizeof(lv);
1267 /* tseqbase can be 1<<31/1<<63 which causes overflow
1268 * in lngFromStr, so we check separately */
1269 if (strcmp(val,
1270#if SIZEOF_OID == 8
1271 "9223372036854775808"
1272#else
1273 "2147483648"
1274#endif
1275 ) == 0 &&
1276 strcmp(nme, "tseqbase") == 0) {
1277 bb.Tseqbase = oid_nil;
1278 } else {
1279 /* all values should be non-negative, so we check that
1280 * here as well */
1281 if (lngFromStr(val, &len, &lvp, true) < 0 ||
1282 lv < 0 /* includes lng_nil */)
1283 throw(MAL, "remote.bincopyfrom",
1284 "bad %s value: %s", nme, val);
1285
1286 /* deal with nme and val */
1287 if (strcmp(nme, "version") == 0) {
1288 if (lv != 1)
1289 throw(MAL, "remote.bincopyfrom",
1290 "unsupported version: %s", val);
1291 } else if (strcmp(nme, "hseqbase") == 0) {
1292#if SIZEOF_OID < SIZEOF_LNG
1293 if (lv > GDK_oid_max)
1294 throw(MAL, "remote.bincopyfrom",
1295 "bad %s value: %s", nme, val);
1296#endif
1297 bb.Hseqbase = (oid)lv;
1298 } else if (strcmp(nme, "ttype") == 0) {
1299 if (lv >= GDKatomcnt)
1300 throw(MAL, "remote.bincopyfrom",
1301 "bad %s value: %s", nme, val);
1302 bb.Ttype = (int) lv;
1303 } else if (strcmp(nme, "tseqbase") == 0) {
1304#if SIZEOF_OID < SIZEOF_LNG
1305 if (lv > GDK_oid_max)
1306 throw(MAL, "remote.bincopyfrom",
1307 "bad %s value: %s", nme, val);
1308#endif
1309 bb.Tseqbase = (oid) lv;
1310 } else if (strcmp(nme, "tsorted") == 0) {
1311 bb.Tsorted = lv != 0;
1312 } else if (strcmp(nme, "trevsorted") == 0) {
1313 bb.Trevsorted = lv != 0;
1314 } else if (strcmp(nme, "tkey") == 0) {
1315 bb.Tkey = lv != 0;
1316 } else if (strcmp(nme, "tnonil") == 0) {
1317 bb.Tnonil = lv != 0;
1318 } else if (strcmp(nme, "tdense") == 0) {
1319 bb.Tdense = lv != 0;
1320 } else if (strcmp(nme, "size") == 0) {
1321 if (lv > (lng) BUN_MAX)
1322 throw(MAL, "remote.bincopyfrom",
1323 "bad %s value: %s", nme, val);
1324 bb.size = (BUN) lv;
1325 } else if (strcmp(nme, "tailsize") == 0) {
1326 bb.tailsize = (size_t) lv;
1327 } else if (strcmp(nme, "theapsize") == 0) {
1328 bb.theapsize = (size_t) lv;
1329 } else {
1330 throw(MAL, "remote.bincopyfrom",
1331 "unknown element: %s", nme);
1332 }
1333 }
1334 nme = val = NULL;
1335 break;
1336 }
1337 hdr++;
1338 }
1339
1340 b = COLnew(0, bb.Ttype, bb.size, TRANSIENT);
1341 if (b == NULL)
1342 throw(MAL, "remote.get", SQLSTATE(HY001) MAL_MALLOC_FAIL);
1343
1344 /* for strings, the width may not match, fix it to match what we
1345 * retrieved */
1346 if (bb.Ttype == TYPE_str && bb.size) {
1347 b->twidth = (unsigned short) (bb.tailsize / bb.size);
1348 b->tshift = ATOMelmshift(Tsize(b));
1349 }
1350
1351 if (bb.tailsize > 0) {
1352 if (HEAPextend(&b->theap, bb.tailsize, true) != GDK_SUCCEED ||
1353 mnstr_read(in, b->theap.base, bb.tailsize, 1) < 0)
1354 goto bailout;
1355 b->theap.dirty = true;
1356 }
1357 if (bb.theapsize > 0) {
1358 if (HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED ||
1359 mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
1360 goto bailout;
1361 b->tvheap->free = bb.theapsize;
1362 b->tvheap->dirty = true;
1363 }
1364
1365 /* set properties */
1366 b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
1367 b->tsorted = bb.Tsorted;
1368 b->trevsorted = bb.Trevsorted;
1369 b->tkey = bb.Tkey;
1370 b->tnonil = bb.Tnonil;
1371 if (bb.Ttype == TYPE_str && bb.size)
1372 BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
1373 BATsetcount(b, bb.size);
1374 b->batDirtydesc = true;
1375
1376 /* read blockmode flush */
1377 while (mnstr_read(in, &tmp, 1, 1) > 0) {
1378 fprintf(stderr, "!MALexception:remote.bincopyfrom: expected flush, got: %c\n", tmp);
1379 }
1380
1381 BATsettrivprop(b);
1382
1383 *ret = b;
1384 return(MAL_SUCCEED);
1385
1386 bailout:
1387 BBPreclaim(b);
1388 throw(MAL, "remote.bincopyfrom", "reading failed");
1389}
1390
1391/**
1392 * read from the input stream and give the BAT handle back to the caller
1393 */
1394str RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1395 BAT *b = NULL;
1396 ValPtr v;
1397 str err;
1398
1399 (void)mb;
1400
1401 /* We receive a normal line, which contains the JSON header, the
1402 * rest is binary data directly on the stream. We get the first
1403 * line from the buffered stream we have here, and pass it on
1404 * together with the raw stream we have. */
1405 cntxt->fdin->eof = false; /* in case it was before */
1406 if (bstream_next(cntxt->fdin) <= 0)
1407 throw(MAL, "remote.bincopyfrom", "expected JSON header");
1408
1409 cntxt->fdin->buf[cntxt->fdin->len] = '\0';
1410 err = RMTinternalcopyfrom(&b,
1411 &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s);
1412 /* skip the JSON line */
1413 cntxt->fdin->pos = ++cntxt->fdin->len;
1414 if (err != MAL_SUCCEED)
1415 return(err);
1416
1417 v = &stk->stk[pci->argv[0]];
1418 v->val.bval = b->batCacheid;
1419 v->vtype = TYPE_bat;
1420 BBPkeepref(b->batCacheid);
1421
1422 return(MAL_SUCCEED);
1423}
1424
1425/**
1426 * bintype identifies the system on its binary profile. This is mainly
1427 * used to determine if BATs can be sent binary across.
1428 */
1429str RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1430 int type = 0;
1431 (void)mb;
1432 (void)stk;
1433 (void)pci;
1434
1435#ifdef WORDS_BIGENDIAN
1436 type |= RMTT_B_ENDIAN;
1437#else
1438 type |= RMTT_L_ENDIAN;
1439#endif
1440#if SIZEOF_SIZE_T == SIZEOF_LNG
1441 type |= RMTT_64_BITS;
1442#else
1443 type |= RMTT_32_BITS;
1444#endif
1445#if SIZEOF_OID == SIZEOF_LNG
1446 type |= RMTT_64_OIDS;
1447#else
1448 type |= RMTT_32_OIDS;
1449#endif
1450
1451 mnstr_printf(cntxt->fdout, "[ %d ]\n", type);
1452
1453 return(MAL_SUCCEED);
1454}
1455
1456/**
1457 * Returns whether the underlying connection is still connected or not.
1458 * Best effort implementation on top of mapi using a ping.
1459 */
1460str
1461RMTisalive(int *ret, str *conn)
1462{
1463 str tmp;
1464 connection c;
1465
1466 if (*conn == NULL || strcmp(*conn, (str)str_nil) == 0)
1467 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1468
1469 /* lookup conn, set c if valid */
1470 rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
1471
1472 *ret = 0;
1473 if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
1474 *ret = 1;
1475
1476 return MAL_SUCCEED;
1477}
1478
1479// This is basically a no op
1480str
1481RMTregisterSupervisor(int *ret, str *sup_uuid, str *query_uuid) {
1482 (void)sup_uuid;
1483 (void)query_uuid;
1484
1485 *ret = 0;
1486 return MAL_SUCCEED;
1487}
1488
1489#endif // HAVE_MAPI
1490