1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "opt_remoteQueries.h"
11#include "mal_interpreter.h" /* for showErrors() */
12#include "mal_builder.h"
13
14/*
15 * The instruction sent is produced with a variation of call2str
16 * from the debugger.
17 */
18static str
19RQcall2str(MalBlkPtr mb, InstrPtr p)
20{
21 int k;
22 size_t len=1;
23 str msg;
24 str s,cv= NULL;
25
26 msg = (str) GDKmalloc(BUFSIZ);
27 if (msg == NULL)
28 return NULL;
29 msg[0]='#';
30 msg[1]=0;
31 if( p->barrier)
32 strcat(msg, operatorName(p->barrier));
33
34 if( p->retc > 1) strcat(msg,"(");
35 len = strlen(msg);
36 for (k = 0; k < p->retc; k++) {
37 if( isVarUDFtype(mb, getArg(p,k)) ){
38 str tpe = getTypeName(getVarType(mb, getArg(p, k)));
39 sprintf(msg+len, "%s:%s ", getVarName(mb, getArg(p,k)), tpe);
40 GDKfree(tpe);
41 } else
42 sprintf(msg+len, "%s", getVarName(mb,getArg(p,k)));
43 if (k < p->retc - 1)
44 strcat(msg,",");
45 len = strlen(msg);
46 }
47 if( p->retc > 1) strcat(msg,")");
48 sprintf(msg+len,":= %s.%s(",getModuleId(p),getFunctionId(p));
49 s = strchr(msg, '(');
50 if (s) {
51 s++;
52 *s = 0;
53 len = strlen(msg);
54 for (k = p->retc; k < p->argc; k++) {
55 VarPtr v = getVar(mb, getArg(p, k));
56 if( isVarConstant(mb, getArg(p,k)) ){
57 if( v->type == TYPE_void) {
58 sprintf(msg+len, "nil");
59 } else {
60 if ((cv = VALformat(&v->value)) == NULL) {
61 GDKfree(msg);
62 return NULL;
63 }
64 sprintf(msg+len,"%s:%s",cv, ATOMname(v->type));
65 GDKfree(cv);
66 }
67
68 } else
69 sprintf(msg+len, "%s", v->id);
70 if (k < p->argc - 1)
71 strcat(msg,",");
72 len = strlen(msg);
73 }
74 strcat(msg,");");
75 }
76/* printf("#RQcall:%s\n",msg);*/
77 return msg;
78}
79/*
80 * The algorithm follows the common scheme used so far.
81 * Instructions are taken out one-by-one and copied
82 * to the new block.
83 *
84 * A local cache of connections is established, because
85 * the statements related to a single remote database
86 * should be executed in the same stack context.
87 * A pitfall is to create multiple connections with
88 * their isolated runtime environment.
89 */
90#define lookupServer(X)\
91 /* lookup the server connection */\
92 if( location[getArg(p,0)] == 0){\
93 db = 0;\
94 if( isVarConstant(mb,getArg(p,X)) )\
95 db= getVarConstant(mb, getArg(p,X)).val.sval;\
96 for(k=0; k<dbtop; k++)\
97 if( strcmp(db, dbalias[k].dbname)== 0)\
98 break;\
99 \
100 if( k== dbtop){\
101 r= newInstruction(mb,mapiRef,lookupRef);\
102 j= getArg(r,0)= newTmpVariable(mb, TYPE_int);\
103 r= pushArgument(mb,r, getArg(p,X));\
104 pushInstruction(mb,r);\
105 dbalias[dbtop].dbhdl= j;\
106 dbalias[dbtop++].dbname= db;\
107 if( dbtop== 127) dbtop--;\
108 } else j= dbalias[k].dbhdl;\
109 location[getArg(p,0)]= j;\
110 } else j= location[getArg(p,0)];
111
112#define prepareRemote(X)\
113 r= newInstruction(mb,mapiRef,rpcRef);\
114 getArg(r,0)= newTmpVariable(mb, X);\
115 r= pushArgument(mb,r,j);
116
117#define putRemoteVariables()\
118 for(j=p->retc; j<p->argc; j++)\
119 if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){\
120 q= newInstruction(0, mapiRef, putRef);\
121 getArg(q,0)= newTmpVariable(mb, TYPE_void);\
122 q= pushArgument(mb,q,location[getArg(p,j)]);\
123 q= pushStr(mb,q, getVarName(mb,getArg(p,j)));\
124 (void) pushArgument(mb,q,getArg(p,j));\
125 pushInstruction(mb,q);\
126 }
127
128#define remoteAction()\
129 s= RQcall2str(mb,p);\
130 r= pushStr(mb,r,s+1);\
131 GDKfree(s);\
132 pushInstruction(mb,r);\
133 freeInstruction(p);\
134 doit++;
135
136typedef struct{
137 str dbname;
138 int dbhdl;
139} DBalias;
140
141str
142OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
143{
144 InstrPtr p, q, r, *old;
145 int i, j, cnt, limit, slimit, doit=0;
146 int remoteSite,collectFirst;
147 int *location;
148 DBalias *dbalias;
149 int dbtop,k;
150 char buf[BUFSIZ],*s, *db;
151 ValRecord cst;
152 lng usec = GDKusec();
153 str msg = MAL_SUCCEED;
154
155 cst.vtype= TYPE_int;
156 cst.val.ival= 0;
157 cst.len = 0;
158
159 (void) cntxt;
160 (void) stk;
161 (void) pci;
162
163 limit = mb->stop;
164 slimit = mb->ssize;
165 old = mb->stmt;
166
167 location= (int*) GDKzalloc(mb->vsize * sizeof(int));
168 if ( location == NULL)
169 throw(MAL, "optimizer.remote", SQLSTATE(HY001) MAL_MALLOC_FAIL);
170 dbalias= (DBalias*) GDKzalloc(128 * sizeof(DBalias));
171 if (dbalias == NULL){
172 GDKfree(location);
173 throw(MAL, "optimizer.remote", SQLSTATE(HY001) MAL_MALLOC_FAIL);
174 }
175 dbtop= 0;
176
177 if ( newMalBlkStmt(mb, mb->ssize) < 0){
178 GDKfree(dbalias);
179 GDKfree(location);
180 throw(MAL, "optimizer.remote", SQLSTATE(HY001) MAL_MALLOC_FAIL);
181 }
182
183 for (i = 0; i < limit; i++) {
184 p = old[i];
185
186 /* detect remote instructions */
187 cnt=0;
188 for(j=0; j<p->argc; j++)
189 if (location[getArg(p,j)])
190 cnt++;
191
192 /* detect remote variable binding */
193
194 if( (getModuleId(p)== mapiRef && getFunctionId(p)==bindRef)){
195 if( p->argc == 3 && getArgType(mb,p,1) == TYPE_int ) {
196 int tpe;
197 setVarUDFtype(mb,getArg(p,0));
198 j = getArg(p,1); /* lookupServer with key */
199 tpe = getArgType(mb,p,0);
200 /* result is remote */
201 location[getArg(p,0)]= j;
202
203 /* turn the instruction into a local one */
204 /* one argument less */
205 p->argc--;
206 /* only use the second argument (string) */
207 getArg(p,1)= getArg(p,2);
208
209 getModuleId(p) = bbpRef;
210
211 prepareRemote(tpe)
212 putRemoteVariables()
213 remoteAction()
214 } else
215 pushInstruction(mb,p);
216 } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==evalRef) ){
217 if( p->argc == 3){
218 /* a remote sql eval is needed */
219 lookupServer(1)
220
221 /* turn the instruction into a local one */
222 /* one argument less */
223 p->argc--;
224 /* only use the second argument (string) */
225 getArg(p,1)= getArg(p,2);
226
227 prepareRemote(TYPE_void)
228
229 s= RQcall2str(mb,p);
230 r= pushStr(mb,r,s+1);
231 GDKfree(s);
232 pushInstruction(mb,r);
233 freeInstruction(p);
234 doit++;
235 }
236 } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==bindRef) ){
237
238 if( p->argc == 6 && getArgType(mb,p,4) == TYPE_str ) {
239 int tpe;
240 setVarUDFtype(mb,getArg(p,0));
241 j = getArg(p,1); /* lookupServer with key */
242 tpe = getArgType(mb,p,0);
243
244 lookupServer(4)
245
246 /* turn the instruction into a local one */
247 getArg(p,4)= defConstant(mb, TYPE_int, &cst);
248
249 prepareRemote(tpe)
250 putRemoteVariables()
251 remoteAction()
252 } else
253 pushInstruction(mb,p);
254 } else
255 if(getModuleId(p)== sqlRef && getFunctionId(p)== binddbatRef) {
256
257 if( p->argc == 5 && getArgType(mb,p,3) == TYPE_str ) {
258 lookupServer(3)
259
260 /* turn the instruction into a local one */
261 getArg(p,3)= defConstant(mb, TYPE_int, &cst);
262
263 prepareRemote(TYPE_void)
264 putRemoteVariables()
265 remoteAction()
266 } else {
267 if( OPTdebug & OPTremotequeries){
268 fprintf(stderr, "found remote variable %s ad %d\n",
269 getVarName(mb,getArg(p,0)), location[getArg(p,0)]);
270 }
271 pushInstruction(mb,p);
272 }
273 } else
274 if( getModuleId(p) && strcmp(getModuleId(p),"optimizer")==0 &&
275 getFunctionId(p) && strcmp(getFunctionId(p),"remoteQueries")==0 )
276 freeInstruction(p);
277 else if (cnt == 0 || p->barrier) /* local only or flow control statement */
278 pushInstruction(mb,p);
279 else {
280 /*
281 * The hard part is to decide what to do with instructions that
282 * contain a reference to a remote variable.
283 * In the first implementation we use the following policy.
284 * If there are multiple sites involved, all arguments are
285 * moved local for processing. Moreover, all local arguments
286 * to be shipped should be simple.
287 */
288 remoteSite=0;
289 collectFirst= FALSE;
290 for(j=0; j<p->argc; j++)
291 if( location[getArg(p,j)]){
292 if (remoteSite == 0)
293 remoteSite= location[getArg(p,j)];
294 else if( remoteSite != location[getArg(p,j)])
295 collectFirst= TRUE;
296 }
297 if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef
298 && (getFunctionId(p)== resultSetRef ||
299 getFunctionId(p)== rsColumnRef)))
300 collectFirst= TRUE;
301
302 /* local BATs are not shipped */
303 if( remoteSite && collectFirst== FALSE)
304 for(j=p->retc; j<p->argc; j++)
305 if( location[getArg(p,j)] == 0 &&
306 isaBatType(getVarType(mb,getArg(p,j))))
307 collectFirst= TRUE;
308
309 if (collectFirst){
310 /* perform locally */
311 for(j=p->retc; j<p->argc; j++)
312 if( location[getArg(p,j)]){
313 q= newInstruction(0,mapiRef,rpcRef);
314 getArg(q,0)= getArg(p,j);
315 q= pushArgument(mb,q,location[getArg(p,j)]);
316 snprintf(buf,BUFSIZ,"io.print(%s);",
317 getVarName(mb,getArg(p,j)) );
318 q= pushStr(mb,q,buf);
319 pushInstruction(mb,q);
320 }
321 pushInstruction(mb,p);
322 /* as of now all the targets are also local */
323 for(j=0; j<p->retc; j++)
324 location[getArg(p,j)]= 0;
325 doit++;
326 } else if (remoteSite){
327 /* single remote site involved */
328 r= newInstruction(mb,mapiRef,rpcRef);
329 getArg(r,0)= newTmpVariable(mb, TYPE_void);
330 r= pushArgument(mb, r, remoteSite);
331
332 for(j=p->retc; j<p->argc; j++)
333 if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){
334 q= newInstruction(0,mapiRef,putRef);
335 getArg(q,0)= newTmpVariable(mb, TYPE_void);
336 q= pushArgument(mb, q, remoteSite);
337 q= pushStr(mb,q, getVarName(mb,getArg(p,j)));
338 (void) pushArgument(mb, q, getArg(p,j));
339 pushInstruction(mb,q);
340 }
341 s= RQcall2str(mb, p);
342 pushInstruction(mb,r);
343 (void) pushStr(mb,r,s+1);
344 GDKfree(s);
345 for(j=0; j<p->retc; j++)
346 location[getArg(p,j)]= remoteSite;
347 freeInstruction(p);
348 doit++;
349 } else
350 pushInstruction(mb,p);
351 }
352 }
353 for(; i<slimit; i++)
354 if( old[i])
355 freeInstruction(old[i]);
356 GDKfree(old);
357 GDKfree(location);
358 GDKfree(dbalias);
359
360 /* Defense line against incorrect plans */
361 if( doit){
362 chkTypes(cntxt->usermodule, mb, FALSE);
363 chkFlow(mb);
364 chkDeclarations(mb);
365 }
366 /* keep all actions taken as a post block comment */
367 usec = GDKusec()- usec;
368 snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec","remoteQueries",doit, usec);
369 newComment(mb,buf);
370 if( doit >= 0)
371 addtoMalBlkHistory(mb);
372
373 if( OPTdebug & OPTremotequeries){
374 fprintf(stderr, "#remotequeries optimizer exit\n");
375 fprintFunction(stderr, mb, 0, LIST_MAL_ALL);
376 }
377 return msg;
378}
379