| 1 | /* |
| 2 | * This Source Code Form is subject to the terms of the Mozilla Public |
| 3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
| 4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 5 | * |
| 6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
| 7 | */ |
| 8 | |
| 9 | #include "monetdb_config.h" |
| 10 | #include "opt_remoteQueries.h" |
| 11 | #include "mal_interpreter.h" /* for showErrors() */ |
| 12 | #include "mal_builder.h" |
| 13 | |
| 14 | /* |
| 15 | * The instruction sent is produced with a variation of call2str |
| 16 | * from the debugger. |
| 17 | */ |
| 18 | static str |
| 19 | RQcall2str(MalBlkPtr mb, InstrPtr p) |
| 20 | { |
| 21 | int k; |
| 22 | size_t len=1; |
| 23 | str msg; |
| 24 | str s,cv= NULL; |
| 25 | |
| 26 | msg = (str) GDKmalloc(BUFSIZ); |
| 27 | if (msg == NULL) |
| 28 | return NULL; |
| 29 | msg[0]='#'; |
| 30 | msg[1]=0; |
| 31 | if( p->barrier) |
| 32 | strcat(msg, operatorName(p->barrier)); |
| 33 | |
| 34 | if( p->retc > 1) strcat(msg,"(" ); |
| 35 | len = strlen(msg); |
| 36 | for (k = 0; k < p->retc; k++) { |
| 37 | if( isVarUDFtype(mb, getArg(p,k)) ){ |
| 38 | str tpe = getTypeName(getVarType(mb, getArg(p, k))); |
| 39 | sprintf(msg+len, "%s:%s " , getVarName(mb, getArg(p,k)), tpe); |
| 40 | GDKfree(tpe); |
| 41 | } else |
| 42 | sprintf(msg+len, "%s" , getVarName(mb,getArg(p,k))); |
| 43 | if (k < p->retc - 1) |
| 44 | strcat(msg,"," ); |
| 45 | len = strlen(msg); |
| 46 | } |
| 47 | if( p->retc > 1) strcat(msg,")" ); |
| 48 | sprintf(msg+len,":= %s.%s(" ,getModuleId(p),getFunctionId(p)); |
| 49 | s = strchr(msg, '('); |
| 50 | if (s) { |
| 51 | s++; |
| 52 | *s = 0; |
| 53 | len = strlen(msg); |
| 54 | for (k = p->retc; k < p->argc; k++) { |
| 55 | VarPtr v = getVar(mb, getArg(p, k)); |
| 56 | if( isVarConstant(mb, getArg(p,k)) ){ |
| 57 | if( v->type == TYPE_void) { |
| 58 | sprintf(msg+len, "nil" ); |
| 59 | } else { |
| 60 | if ((cv = VALformat(&v->value)) == NULL) { |
| 61 | GDKfree(msg); |
| 62 | return NULL; |
| 63 | } |
| 64 | sprintf(msg+len,"%s:%s" ,cv, ATOMname(v->type)); |
| 65 | GDKfree(cv); |
| 66 | } |
| 67 | |
| 68 | } else |
| 69 | sprintf(msg+len, "%s" , v->id); |
| 70 | if (k < p->argc - 1) |
| 71 | strcat(msg,"," ); |
| 72 | len = strlen(msg); |
| 73 | } |
| 74 | strcat(msg,");" ); |
| 75 | } |
| 76 | /* printf("#RQcall:%s\n",msg);*/ |
| 77 | return msg; |
| 78 | } |
| 79 | /* |
| 80 | * The algorithm follows the common scheme used so far. |
| 81 | * Instructions are taken out one-by-one and copied |
| 82 | * to the new block. |
| 83 | * |
| 84 | * A local cache of connections is established, because |
| 85 | * the statements related to a single remote database |
| 86 | * should be executed in the same stack context. |
| 87 | * A pitfall is to create multiple connections with |
| 88 | * their isolated runtime environment. |
| 89 | */ |
| 90 | #define lookupServer(X)\ |
| 91 | /* lookup the server connection */\ |
| 92 | if( location[getArg(p,0)] == 0){\ |
| 93 | db = 0;\ |
| 94 | if( isVarConstant(mb,getArg(p,X)) )\ |
| 95 | db= getVarConstant(mb, getArg(p,X)).val.sval;\ |
| 96 | for(k=0; k<dbtop; k++)\ |
| 97 | if( strcmp(db, dbalias[k].dbname)== 0)\ |
| 98 | break;\ |
| 99 | \ |
| 100 | if( k== dbtop){\ |
| 101 | r= newInstruction(mb,mapiRef,lookupRef);\ |
| 102 | j= getArg(r,0)= newTmpVariable(mb, TYPE_int);\ |
| 103 | r= pushArgument(mb,r, getArg(p,X));\ |
| 104 | pushInstruction(mb,r);\ |
| 105 | dbalias[dbtop].dbhdl= j;\ |
| 106 | dbalias[dbtop++].dbname= db;\ |
| 107 | if( dbtop== 127) dbtop--;\ |
| 108 | } else j= dbalias[k].dbhdl;\ |
| 109 | location[getArg(p,0)]= j;\ |
| 110 | } else j= location[getArg(p,0)]; |
| 111 | |
| 112 | #define prepareRemote(X)\ |
| 113 | r= newInstruction(mb,mapiRef,rpcRef);\ |
| 114 | getArg(r,0)= newTmpVariable(mb, X);\ |
| 115 | r= pushArgument(mb,r,j); |
| 116 | |
| 117 | #define putRemoteVariables()\ |
| 118 | for(j=p->retc; j<p->argc; j++)\ |
| 119 | if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){\ |
| 120 | q= newInstruction(0, mapiRef, putRef);\ |
| 121 | getArg(q,0)= newTmpVariable(mb, TYPE_void);\ |
| 122 | q= pushArgument(mb,q,location[getArg(p,j)]);\ |
| 123 | q= pushStr(mb,q, getVarName(mb,getArg(p,j)));\ |
| 124 | (void) pushArgument(mb,q,getArg(p,j));\ |
| 125 | pushInstruction(mb,q);\ |
| 126 | } |
| 127 | |
| 128 | #define remoteAction()\ |
| 129 | s= RQcall2str(mb,p);\ |
| 130 | r= pushStr(mb,r,s+1);\ |
| 131 | GDKfree(s);\ |
| 132 | pushInstruction(mb,r);\ |
| 133 | freeInstruction(p);\ |
| 134 | doit++; |
| 135 | |
| 136 | typedef struct{ |
| 137 | str dbname; |
| 138 | int dbhdl; |
| 139 | } DBalias; |
| 140 | |
| 141 | str |
| 142 | OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 143 | { |
| 144 | InstrPtr p, q, r, *old; |
| 145 | int i, j, cnt, limit, slimit, doit=0; |
| 146 | int remoteSite,collectFirst; |
| 147 | int *location; |
| 148 | DBalias *dbalias; |
| 149 | int dbtop,k; |
| 150 | char buf[BUFSIZ],*s, *db; |
| 151 | ValRecord cst; |
| 152 | lng usec = GDKusec(); |
| 153 | str msg = MAL_SUCCEED; |
| 154 | |
| 155 | cst.vtype= TYPE_int; |
| 156 | cst.val.ival= 0; |
| 157 | cst.len = 0; |
| 158 | |
| 159 | (void) cntxt; |
| 160 | (void) stk; |
| 161 | (void) pci; |
| 162 | |
| 163 | limit = mb->stop; |
| 164 | slimit = mb->ssize; |
| 165 | old = mb->stmt; |
| 166 | |
| 167 | location= (int*) GDKzalloc(mb->vsize * sizeof(int)); |
| 168 | if ( location == NULL) |
| 169 | throw(MAL, "optimizer.remote" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
| 170 | dbalias= (DBalias*) GDKzalloc(128 * sizeof(DBalias)); |
| 171 | if (dbalias == NULL){ |
| 172 | GDKfree(location); |
| 173 | throw(MAL, "optimizer.remote" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
| 174 | } |
| 175 | dbtop= 0; |
| 176 | |
| 177 | if ( newMalBlkStmt(mb, mb->ssize) < 0){ |
| 178 | GDKfree(dbalias); |
| 179 | GDKfree(location); |
| 180 | throw(MAL, "optimizer.remote" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
| 181 | } |
| 182 | |
| 183 | for (i = 0; i < limit; i++) { |
| 184 | p = old[i]; |
| 185 | |
| 186 | /* detect remote instructions */ |
| 187 | cnt=0; |
| 188 | for(j=0; j<p->argc; j++) |
| 189 | if (location[getArg(p,j)]) |
| 190 | cnt++; |
| 191 | |
| 192 | /* detect remote variable binding */ |
| 193 | |
| 194 | if( (getModuleId(p)== mapiRef && getFunctionId(p)==bindRef)){ |
| 195 | if( p->argc == 3 && getArgType(mb,p,1) == TYPE_int ) { |
| 196 | int tpe; |
| 197 | setVarUDFtype(mb,getArg(p,0)); |
| 198 | j = getArg(p,1); /* lookupServer with key */ |
| 199 | tpe = getArgType(mb,p,0); |
| 200 | /* result is remote */ |
| 201 | location[getArg(p,0)]= j; |
| 202 | |
| 203 | /* turn the instruction into a local one */ |
| 204 | /* one argument less */ |
| 205 | p->argc--; |
| 206 | /* only use the second argument (string) */ |
| 207 | getArg(p,1)= getArg(p,2); |
| 208 | |
| 209 | getModuleId(p) = bbpRef; |
| 210 | |
| 211 | prepareRemote(tpe) |
| 212 | putRemoteVariables() |
| 213 | remoteAction() |
| 214 | } else |
| 215 | pushInstruction(mb,p); |
| 216 | } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==evalRef) ){ |
| 217 | if( p->argc == 3){ |
| 218 | /* a remote sql eval is needed */ |
| 219 | lookupServer(1) |
| 220 | |
| 221 | /* turn the instruction into a local one */ |
| 222 | /* one argument less */ |
| 223 | p->argc--; |
| 224 | /* only use the second argument (string) */ |
| 225 | getArg(p,1)= getArg(p,2); |
| 226 | |
| 227 | prepareRemote(TYPE_void) |
| 228 | |
| 229 | s= RQcall2str(mb,p); |
| 230 | r= pushStr(mb,r,s+1); |
| 231 | GDKfree(s); |
| 232 | pushInstruction(mb,r); |
| 233 | freeInstruction(p); |
| 234 | doit++; |
| 235 | } |
| 236 | } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==bindRef) ){ |
| 237 | |
| 238 | if( p->argc == 6 && getArgType(mb,p,4) == TYPE_str ) { |
| 239 | int tpe; |
| 240 | setVarUDFtype(mb,getArg(p,0)); |
| 241 | j = getArg(p,1); /* lookupServer with key */ |
| 242 | tpe = getArgType(mb,p,0); |
| 243 | |
| 244 | lookupServer(4) |
| 245 | |
| 246 | /* turn the instruction into a local one */ |
| 247 | getArg(p,4)= defConstant(mb, TYPE_int, &cst); |
| 248 | |
| 249 | prepareRemote(tpe) |
| 250 | putRemoteVariables() |
| 251 | remoteAction() |
| 252 | } else |
| 253 | pushInstruction(mb,p); |
| 254 | } else |
| 255 | if(getModuleId(p)== sqlRef && getFunctionId(p)== binddbatRef) { |
| 256 | |
| 257 | if( p->argc == 5 && getArgType(mb,p,3) == TYPE_str ) { |
| 258 | lookupServer(3) |
| 259 | |
| 260 | /* turn the instruction into a local one */ |
| 261 | getArg(p,3)= defConstant(mb, TYPE_int, &cst); |
| 262 | |
| 263 | prepareRemote(TYPE_void) |
| 264 | putRemoteVariables() |
| 265 | remoteAction() |
| 266 | } else { |
| 267 | if( OPTdebug & OPTremotequeries){ |
| 268 | fprintf(stderr, "found remote variable %s ad %d\n" , |
| 269 | getVarName(mb,getArg(p,0)), location[getArg(p,0)]); |
| 270 | } |
| 271 | pushInstruction(mb,p); |
| 272 | } |
| 273 | } else |
| 274 | if( getModuleId(p) && strcmp(getModuleId(p),"optimizer" )==0 && |
| 275 | getFunctionId(p) && strcmp(getFunctionId(p),"remoteQueries" )==0 ) |
| 276 | freeInstruction(p); |
| 277 | else if (cnt == 0 || p->barrier) /* local only or flow control statement */ |
| 278 | pushInstruction(mb,p); |
| 279 | else { |
| 280 | /* |
| 281 | * The hard part is to decide what to do with instructions that |
| 282 | * contain a reference to a remote variable. |
| 283 | * In the first implementation we use the following policy. |
| 284 | * If there are multiple sites involved, all arguments are |
| 285 | * moved local for processing. Moreover, all local arguments |
| 286 | * to be shipped should be simple. |
| 287 | */ |
| 288 | remoteSite=0; |
| 289 | collectFirst= FALSE; |
| 290 | for(j=0; j<p->argc; j++) |
| 291 | if( location[getArg(p,j)]){ |
| 292 | if (remoteSite == 0) |
| 293 | remoteSite= location[getArg(p,j)]; |
| 294 | else if( remoteSite != location[getArg(p,j)]) |
| 295 | collectFirst= TRUE; |
| 296 | } |
| 297 | if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef |
| 298 | && (getFunctionId(p)== resultSetRef || |
| 299 | getFunctionId(p)== rsColumnRef))) |
| 300 | collectFirst= TRUE; |
| 301 | |
| 302 | /* local BATs are not shipped */ |
| 303 | if( remoteSite && collectFirst== FALSE) |
| 304 | for(j=p->retc; j<p->argc; j++) |
| 305 | if( location[getArg(p,j)] == 0 && |
| 306 | isaBatType(getVarType(mb,getArg(p,j)))) |
| 307 | collectFirst= TRUE; |
| 308 | |
| 309 | if (collectFirst){ |
| 310 | /* perform locally */ |
| 311 | for(j=p->retc; j<p->argc; j++) |
| 312 | if( location[getArg(p,j)]){ |
| 313 | q= newInstruction(0,mapiRef,rpcRef); |
| 314 | getArg(q,0)= getArg(p,j); |
| 315 | q= pushArgument(mb,q,location[getArg(p,j)]); |
| 316 | snprintf(buf,BUFSIZ,"io.print(%s);" , |
| 317 | getVarName(mb,getArg(p,j)) ); |
| 318 | q= pushStr(mb,q,buf); |
| 319 | pushInstruction(mb,q); |
| 320 | } |
| 321 | pushInstruction(mb,p); |
| 322 | /* as of now all the targets are also local */ |
| 323 | for(j=0; j<p->retc; j++) |
| 324 | location[getArg(p,j)]= 0; |
| 325 | doit++; |
| 326 | } else if (remoteSite){ |
| 327 | /* single remote site involved */ |
| 328 | r= newInstruction(mb,mapiRef,rpcRef); |
| 329 | getArg(r,0)= newTmpVariable(mb, TYPE_void); |
| 330 | r= pushArgument(mb, r, remoteSite); |
| 331 | |
| 332 | for(j=p->retc; j<p->argc; j++) |
| 333 | if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){ |
| 334 | q= newInstruction(0,mapiRef,putRef); |
| 335 | getArg(q,0)= newTmpVariable(mb, TYPE_void); |
| 336 | q= pushArgument(mb, q, remoteSite); |
| 337 | q= pushStr(mb,q, getVarName(mb,getArg(p,j))); |
| 338 | (void) pushArgument(mb, q, getArg(p,j)); |
| 339 | pushInstruction(mb,q); |
| 340 | } |
| 341 | s= RQcall2str(mb, p); |
| 342 | pushInstruction(mb,r); |
| 343 | (void) pushStr(mb,r,s+1); |
| 344 | GDKfree(s); |
| 345 | for(j=0; j<p->retc; j++) |
| 346 | location[getArg(p,j)]= remoteSite; |
| 347 | freeInstruction(p); |
| 348 | doit++; |
| 349 | } else |
| 350 | pushInstruction(mb,p); |
| 351 | } |
| 352 | } |
| 353 | for(; i<slimit; i++) |
| 354 | if( old[i]) |
| 355 | freeInstruction(old[i]); |
| 356 | GDKfree(old); |
| 357 | GDKfree(location); |
| 358 | GDKfree(dbalias); |
| 359 | |
| 360 | /* Defense line against incorrect plans */ |
| 361 | if( doit){ |
| 362 | chkTypes(cntxt->usermodule, mb, FALSE); |
| 363 | chkFlow(mb); |
| 364 | chkDeclarations(mb); |
| 365 | } |
| 366 | /* keep all actions taken as a post block comment */ |
| 367 | usec = GDKusec()- usec; |
| 368 | snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec" ,"remoteQueries" ,doit, usec); |
| 369 | newComment(mb,buf); |
| 370 | if( doit >= 0) |
| 371 | addtoMalBlkHistory(mb); |
| 372 | |
| 373 | if( OPTdebug & OPTremotequeries){ |
| 374 | fprintf(stderr, "#remotequeries optimizer exit\n" ); |
| 375 | fprintFunction(stderr, mb, 0, LIST_MAL_ALL); |
| 376 | } |
| 377 | return msg; |
| 378 | } |
| 379 | |