1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | #include "monetdb_config.h" |
10 | #include "opt_remoteQueries.h" |
11 | #include "mal_interpreter.h" /* for showErrors() */ |
12 | #include "mal_builder.h" |
13 | |
14 | /* |
15 | * The instruction sent is produced with a variation of call2str |
16 | * from the debugger. |
17 | */ |
18 | static str |
19 | RQcall2str(MalBlkPtr mb, InstrPtr p) |
20 | { |
21 | int k; |
22 | size_t len=1; |
23 | str msg; |
24 | str s,cv= NULL; |
25 | |
26 | msg = (str) GDKmalloc(BUFSIZ); |
27 | if (msg == NULL) |
28 | return NULL; |
29 | msg[0]='#'; |
30 | msg[1]=0; |
31 | if( p->barrier) |
32 | strcat(msg, operatorName(p->barrier)); |
33 | |
34 | if( p->retc > 1) strcat(msg,"(" ); |
35 | len = strlen(msg); |
36 | for (k = 0; k < p->retc; k++) { |
37 | if( isVarUDFtype(mb, getArg(p,k)) ){ |
38 | str tpe = getTypeName(getVarType(mb, getArg(p, k))); |
39 | sprintf(msg+len, "%s:%s " , getVarName(mb, getArg(p,k)), tpe); |
40 | GDKfree(tpe); |
41 | } else |
42 | sprintf(msg+len, "%s" , getVarName(mb,getArg(p,k))); |
43 | if (k < p->retc - 1) |
44 | strcat(msg,"," ); |
45 | len = strlen(msg); |
46 | } |
47 | if( p->retc > 1) strcat(msg,")" ); |
48 | sprintf(msg+len,":= %s.%s(" ,getModuleId(p),getFunctionId(p)); |
49 | s = strchr(msg, '('); |
50 | if (s) { |
51 | s++; |
52 | *s = 0; |
53 | len = strlen(msg); |
54 | for (k = p->retc; k < p->argc; k++) { |
55 | VarPtr v = getVar(mb, getArg(p, k)); |
56 | if( isVarConstant(mb, getArg(p,k)) ){ |
57 | if( v->type == TYPE_void) { |
58 | sprintf(msg+len, "nil" ); |
59 | } else { |
60 | if ((cv = VALformat(&v->value)) == NULL) { |
61 | GDKfree(msg); |
62 | return NULL; |
63 | } |
64 | sprintf(msg+len,"%s:%s" ,cv, ATOMname(v->type)); |
65 | GDKfree(cv); |
66 | } |
67 | |
68 | } else |
69 | sprintf(msg+len, "%s" , v->id); |
70 | if (k < p->argc - 1) |
71 | strcat(msg,"," ); |
72 | len = strlen(msg); |
73 | } |
74 | strcat(msg,");" ); |
75 | } |
76 | /* printf("#RQcall:%s\n",msg);*/ |
77 | return msg; |
78 | } |
79 | /* |
80 | * The algorithm follows the common scheme used so far. |
81 | * Instructions are taken out one-by-one and copied |
82 | * to the new block. |
83 | * |
84 | * A local cache of connections is established, because |
85 | * the statements related to a single remote database |
86 | * should be executed in the same stack context. |
87 | * A pitfall is to create multiple connections with |
88 | * their isolated runtime environment. |
89 | */ |
90 | #define lookupServer(X)\ |
91 | /* lookup the server connection */\ |
92 | if( location[getArg(p,0)] == 0){\ |
93 | db = 0;\ |
94 | if( isVarConstant(mb,getArg(p,X)) )\ |
95 | db= getVarConstant(mb, getArg(p,X)).val.sval;\ |
96 | for(k=0; k<dbtop; k++)\ |
97 | if( strcmp(db, dbalias[k].dbname)== 0)\ |
98 | break;\ |
99 | \ |
100 | if( k== dbtop){\ |
101 | r= newInstruction(mb,mapiRef,lookupRef);\ |
102 | j= getArg(r,0)= newTmpVariable(mb, TYPE_int);\ |
103 | r= pushArgument(mb,r, getArg(p,X));\ |
104 | pushInstruction(mb,r);\ |
105 | dbalias[dbtop].dbhdl= j;\ |
106 | dbalias[dbtop++].dbname= db;\ |
107 | if( dbtop== 127) dbtop--;\ |
108 | } else j= dbalias[k].dbhdl;\ |
109 | location[getArg(p,0)]= j;\ |
110 | } else j= location[getArg(p,0)]; |
111 | |
112 | #define prepareRemote(X)\ |
113 | r= newInstruction(mb,mapiRef,rpcRef);\ |
114 | getArg(r,0)= newTmpVariable(mb, X);\ |
115 | r= pushArgument(mb,r,j); |
116 | |
117 | #define putRemoteVariables()\ |
118 | for(j=p->retc; j<p->argc; j++)\ |
119 | if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){\ |
120 | q= newInstruction(0, mapiRef, putRef);\ |
121 | getArg(q,0)= newTmpVariable(mb, TYPE_void);\ |
122 | q= pushArgument(mb,q,location[getArg(p,j)]);\ |
123 | q= pushStr(mb,q, getVarName(mb,getArg(p,j)));\ |
124 | (void) pushArgument(mb,q,getArg(p,j));\ |
125 | pushInstruction(mb,q);\ |
126 | } |
127 | |
128 | #define remoteAction()\ |
129 | s= RQcall2str(mb,p);\ |
130 | r= pushStr(mb,r,s+1);\ |
131 | GDKfree(s);\ |
132 | pushInstruction(mb,r);\ |
133 | freeInstruction(p);\ |
134 | doit++; |
135 | |
136 | typedef struct{ |
137 | str dbname; |
138 | int dbhdl; |
139 | } DBalias; |
140 | |
141 | str |
142 | OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
143 | { |
144 | InstrPtr p, q, r, *old; |
145 | int i, j, cnt, limit, slimit, doit=0; |
146 | int remoteSite,collectFirst; |
147 | int *location; |
148 | DBalias *dbalias; |
149 | int dbtop,k; |
150 | char buf[BUFSIZ],*s, *db; |
151 | ValRecord cst; |
152 | lng usec = GDKusec(); |
153 | str msg = MAL_SUCCEED; |
154 | |
155 | cst.vtype= TYPE_int; |
156 | cst.val.ival= 0; |
157 | cst.len = 0; |
158 | |
159 | (void) cntxt; |
160 | (void) stk; |
161 | (void) pci; |
162 | |
163 | limit = mb->stop; |
164 | slimit = mb->ssize; |
165 | old = mb->stmt; |
166 | |
167 | location= (int*) GDKzalloc(mb->vsize * sizeof(int)); |
168 | if ( location == NULL) |
169 | throw(MAL, "optimizer.remote" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
170 | dbalias= (DBalias*) GDKzalloc(128 * sizeof(DBalias)); |
171 | if (dbalias == NULL){ |
172 | GDKfree(location); |
173 | throw(MAL, "optimizer.remote" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
174 | } |
175 | dbtop= 0; |
176 | |
177 | if ( newMalBlkStmt(mb, mb->ssize) < 0){ |
178 | GDKfree(dbalias); |
179 | GDKfree(location); |
180 | throw(MAL, "optimizer.remote" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
181 | } |
182 | |
183 | for (i = 0; i < limit; i++) { |
184 | p = old[i]; |
185 | |
186 | /* detect remote instructions */ |
187 | cnt=0; |
188 | for(j=0; j<p->argc; j++) |
189 | if (location[getArg(p,j)]) |
190 | cnt++; |
191 | |
192 | /* detect remote variable binding */ |
193 | |
194 | if( (getModuleId(p)== mapiRef && getFunctionId(p)==bindRef)){ |
195 | if( p->argc == 3 && getArgType(mb,p,1) == TYPE_int ) { |
196 | int tpe; |
197 | setVarUDFtype(mb,getArg(p,0)); |
198 | j = getArg(p,1); /* lookupServer with key */ |
199 | tpe = getArgType(mb,p,0); |
200 | /* result is remote */ |
201 | location[getArg(p,0)]= j; |
202 | |
203 | /* turn the instruction into a local one */ |
204 | /* one argument less */ |
205 | p->argc--; |
206 | /* only use the second argument (string) */ |
207 | getArg(p,1)= getArg(p,2); |
208 | |
209 | getModuleId(p) = bbpRef; |
210 | |
211 | prepareRemote(tpe) |
212 | putRemoteVariables() |
213 | remoteAction() |
214 | } else |
215 | pushInstruction(mb,p); |
216 | } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==evalRef) ){ |
217 | if( p->argc == 3){ |
218 | /* a remote sql eval is needed */ |
219 | lookupServer(1) |
220 | |
221 | /* turn the instruction into a local one */ |
222 | /* one argument less */ |
223 | p->argc--; |
224 | /* only use the second argument (string) */ |
225 | getArg(p,1)= getArg(p,2); |
226 | |
227 | prepareRemote(TYPE_void) |
228 | |
229 | s= RQcall2str(mb,p); |
230 | r= pushStr(mb,r,s+1); |
231 | GDKfree(s); |
232 | pushInstruction(mb,r); |
233 | freeInstruction(p); |
234 | doit++; |
235 | } |
236 | } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==bindRef) ){ |
237 | |
238 | if( p->argc == 6 && getArgType(mb,p,4) == TYPE_str ) { |
239 | int tpe; |
240 | setVarUDFtype(mb,getArg(p,0)); |
241 | j = getArg(p,1); /* lookupServer with key */ |
242 | tpe = getArgType(mb,p,0); |
243 | |
244 | lookupServer(4) |
245 | |
246 | /* turn the instruction into a local one */ |
247 | getArg(p,4)= defConstant(mb, TYPE_int, &cst); |
248 | |
249 | prepareRemote(tpe) |
250 | putRemoteVariables() |
251 | remoteAction() |
252 | } else |
253 | pushInstruction(mb,p); |
254 | } else |
255 | if(getModuleId(p)== sqlRef && getFunctionId(p)== binddbatRef) { |
256 | |
257 | if( p->argc == 5 && getArgType(mb,p,3) == TYPE_str ) { |
258 | lookupServer(3) |
259 | |
260 | /* turn the instruction into a local one */ |
261 | getArg(p,3)= defConstant(mb, TYPE_int, &cst); |
262 | |
263 | prepareRemote(TYPE_void) |
264 | putRemoteVariables() |
265 | remoteAction() |
266 | } else { |
267 | if( OPTdebug & OPTremotequeries){ |
268 | fprintf(stderr, "found remote variable %s ad %d\n" , |
269 | getVarName(mb,getArg(p,0)), location[getArg(p,0)]); |
270 | } |
271 | pushInstruction(mb,p); |
272 | } |
273 | } else |
274 | if( getModuleId(p) && strcmp(getModuleId(p),"optimizer" )==0 && |
275 | getFunctionId(p) && strcmp(getFunctionId(p),"remoteQueries" )==0 ) |
276 | freeInstruction(p); |
277 | else if (cnt == 0 || p->barrier) /* local only or flow control statement */ |
278 | pushInstruction(mb,p); |
279 | else { |
280 | /* |
281 | * The hard part is to decide what to do with instructions that |
282 | * contain a reference to a remote variable. |
283 | * In the first implementation we use the following policy. |
284 | * If there are multiple sites involved, all arguments are |
285 | * moved local for processing. Moreover, all local arguments |
286 | * to be shipped should be simple. |
287 | */ |
288 | remoteSite=0; |
289 | collectFirst= FALSE; |
290 | for(j=0; j<p->argc; j++) |
291 | if( location[getArg(p,j)]){ |
292 | if (remoteSite == 0) |
293 | remoteSite= location[getArg(p,j)]; |
294 | else if( remoteSite != location[getArg(p,j)]) |
295 | collectFirst= TRUE; |
296 | } |
297 | if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef |
298 | && (getFunctionId(p)== resultSetRef || |
299 | getFunctionId(p)== rsColumnRef))) |
300 | collectFirst= TRUE; |
301 | |
302 | /* local BATs are not shipped */ |
303 | if( remoteSite && collectFirst== FALSE) |
304 | for(j=p->retc; j<p->argc; j++) |
305 | if( location[getArg(p,j)] == 0 && |
306 | isaBatType(getVarType(mb,getArg(p,j)))) |
307 | collectFirst= TRUE; |
308 | |
309 | if (collectFirst){ |
310 | /* perform locally */ |
311 | for(j=p->retc; j<p->argc; j++) |
312 | if( location[getArg(p,j)]){ |
313 | q= newInstruction(0,mapiRef,rpcRef); |
314 | getArg(q,0)= getArg(p,j); |
315 | q= pushArgument(mb,q,location[getArg(p,j)]); |
316 | snprintf(buf,BUFSIZ,"io.print(%s);" , |
317 | getVarName(mb,getArg(p,j)) ); |
318 | q= pushStr(mb,q,buf); |
319 | pushInstruction(mb,q); |
320 | } |
321 | pushInstruction(mb,p); |
322 | /* as of now all the targets are also local */ |
323 | for(j=0; j<p->retc; j++) |
324 | location[getArg(p,j)]= 0; |
325 | doit++; |
326 | } else if (remoteSite){ |
327 | /* single remote site involved */ |
328 | r= newInstruction(mb,mapiRef,rpcRef); |
329 | getArg(r,0)= newTmpVariable(mb, TYPE_void); |
330 | r= pushArgument(mb, r, remoteSite); |
331 | |
332 | for(j=p->retc; j<p->argc; j++) |
333 | if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){ |
334 | q= newInstruction(0,mapiRef,putRef); |
335 | getArg(q,0)= newTmpVariable(mb, TYPE_void); |
336 | q= pushArgument(mb, q, remoteSite); |
337 | q= pushStr(mb,q, getVarName(mb,getArg(p,j))); |
338 | (void) pushArgument(mb, q, getArg(p,j)); |
339 | pushInstruction(mb,q); |
340 | } |
341 | s= RQcall2str(mb, p); |
342 | pushInstruction(mb,r); |
343 | (void) pushStr(mb,r,s+1); |
344 | GDKfree(s); |
345 | for(j=0; j<p->retc; j++) |
346 | location[getArg(p,j)]= remoteSite; |
347 | freeInstruction(p); |
348 | doit++; |
349 | } else |
350 | pushInstruction(mb,p); |
351 | } |
352 | } |
353 | for(; i<slimit; i++) |
354 | if( old[i]) |
355 | freeInstruction(old[i]); |
356 | GDKfree(old); |
357 | GDKfree(location); |
358 | GDKfree(dbalias); |
359 | |
360 | /* Defense line against incorrect plans */ |
361 | if( doit){ |
362 | chkTypes(cntxt->usermodule, mb, FALSE); |
363 | chkFlow(mb); |
364 | chkDeclarations(mb); |
365 | } |
366 | /* keep all actions taken as a post block comment */ |
367 | usec = GDKusec()- usec; |
368 | snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec" ,"remoteQueries" ,doit, usec); |
369 | newComment(mb,buf); |
370 | if( doit >= 0) |
371 | addtoMalBlkHistory(mb); |
372 | |
373 | if( OPTdebug & OPTremotequeries){ |
374 | fprintf(stderr, "#remotequeries optimizer exit\n" ); |
375 | fprintFunction(stderr, mb, 0, LIST_MAL_ALL); |
376 | } |
377 | return msg; |
378 | } |
379 | |