1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | /* |
10 | * The statemens are all checked for being eligible for dataflow. |
11 | */ |
12 | #include "monetdb_config.h" |
13 | #include "opt_dataflow.h" |
14 | #include "mal_instruction.h" |
15 | #include "mal_interpreter.h" |
16 | #include "manifold.h" |
17 | |
18 | /* |
19 | * Dataflow processing incurs overhead and is only |
20 | * relevant if multiple tasks kan be handled at the same time. |
21 | * Also simple expressions dont have to be executed in parallel. |
22 | * |
23 | * The garbagesink contains variables whose endoflife is within |
24 | * a dataflow block and who are used concurrently. |
25 | * They are garbage collected at the end of the parallel block. |
26 | * |
27 | * The dataflow analysis centers around the read/write use patterns of |
28 | * the variables and the occurrence of side-effect bearing functions. |
29 | * Any such function should break the dataflow block as it may rely |
30 | * on the sequential order in the plan. |
31 | * |
32 | * The following state properties can be distinguished for all variables: |
33 | * VARWRITE - variable assigned a value in the dataflow block |
34 | * VARREAD - variable is used in an argument |
35 | * VAR2READ - variable is read in concurrent mode |
36 | * VARBLOCK - variable next use terminate the // block, set after encountering an update |
37 | * |
38 | * Only some combinations are allowed. |
39 | */ |
40 | |
41 | #define VARFREE 0 |
42 | #define VARWRITE 1 |
43 | #define VARREAD 2 |
44 | #define VARBLOCK 4 |
45 | #define VAR2READ 8 |
46 | |
47 | typedef char *States; |
48 | |
49 | #define setState(S,P,K,F) ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F) |
50 | #define getState(S,P,K) ((S)[getArg(P,K)]) |
51 | |
52 | static int |
53 | simpleFlow(InstrPtr *old, int start, int last) |
54 | { |
55 | int i, j, k, simple = TRUE; |
56 | InstrPtr p = NULL, q; |
57 | |
58 | /* ignore trivial blocks */ |
59 | if ( last - start == 1) |
60 | return TRUE; |
61 | /* skip sequence of simple arithmetic first */ |
62 | for( ; simple && start < last; start++) |
63 | if ( old[start] ) { |
64 | p= old[start]; |
65 | simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef; |
66 | } |
67 | for( i = start; i < last; i++) |
68 | if ( old[i]) { |
69 | q= old[i]; |
70 | simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef || getModuleId(q) == strRef || getModuleId(q)== mmathRef; |
71 | if( !simple) { |
72 | /* if not arithmetic than we should consume the previous result directly */ |
73 | for( j= q->retc; j < q->argc; j++) |
74 | for( k =0; k < p->retc; k++) |
75 | if( getArg(p,k) == getArg(q,j)) |
76 | simple= TRUE; |
77 | if( !simple) |
78 | return 0; |
79 | } |
80 | p = q; |
81 | } |
82 | return simple; |
83 | } |
84 | |
85 | /* Updates are permitted if it is a unique update on |
86 | * a BAT created in the context of this block |
87 | * As far as we know, no SQL nor MAL test re-uses the |
88 | * target BAT to insert again and subsequently calls dataflow. |
89 | * In MAL scripts, they still can occur. |
90 | */ |
91 | |
92 | /* a limited set of MAL instructions may appear in the dataflow block*/ |
93 | static int |
94 | dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states) |
95 | { |
96 | int j; |
97 | |
98 | if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p) || |
99 | (isMultiplex(p) && MANIFOLDtypecheck(cntxt,mb,p,0) == NULL) ){ |
100 | |
101 | if( OPTdebug & OPTdataflow){ |
102 | fprintf(stderr,"#breakpoint on instruction\n" ); |
103 | } |
104 | return TRUE; |
105 | } |
106 | |
107 | /* flow blocks should be closed when we reach a point |
108 | where a variable is assigned more then once or already |
109 | being read. |
110 | */ |
111 | for(j=0; j<p->retc; j++) |
112 | if ( getState(states,p,j) & (VARWRITE | VARREAD | VARBLOCK)){ |
113 | |
114 | if( OPTdebug & OPTdataflow){ |
115 | fprintf(stderr,"#breakpoint on argument %s state %d\n" , getVarName(mb,getArg(p,j)), getState(states,p,j)); |
116 | } |
117 | |
118 | return 1; |
119 | } |
120 | |
121 | /* update instructions can be updated if the target variable |
122 | * has not been read in the block so far */ |
123 | if ( isUpdateInstruction(p) ){ |
124 | /* the SQL update functions change BATs that are not |
125 | * explicitly mentioned as arguments (and certainly not as the |
126 | * first argument), but that can still be available to the MAL |
127 | * program (see bugs.monetdb.org/6641) */ |
128 | if (getModuleId(p) == sqlRef) |
129 | return 1; |
130 | |
131 | if( OPTdebug & OPTdataflow){ |
132 | if( getState(states,p,1) & (VARREAD | VARBLOCK)) |
133 | fprintf(stderr,"#breakpoint on update %s state %d\n" , getVarName(mb,getArg(p,j)), getState(states,p,j)); |
134 | } |
135 | return getState(states,p,p->retc) & (VARREAD | VARBLOCK); |
136 | } |
137 | |
138 | for(j=p->retc; j < p->argc; j++){ |
139 | if ( getState(states,p,j) & VARBLOCK){ |
140 | if( OPTdebug & OPTdataflow){ |
141 | if( getState(states,p,j) & VARREAD) |
142 | fprintf(stderr,"#breakpoint on blocked var %s state %d\n" , getVarName(mb,getArg(p,j)), getState(states,p,j)); |
143 | } |
144 | return 1; |
145 | } |
146 | |
147 | if( OPTdebug & OPTdataflow){ |
148 | if( hasSideEffects(mb,p,FALSE)) |
149 | fprintf(stderr,"#breakpoint on sideeffect var %s %s.%s\n" , getVarName(mb,getArg(p,j)), getModuleId(p), getFunctionId(p)); |
150 | } |
151 | } |
152 | return hasSideEffects(mb,p,FALSE); |
153 | } |
154 | |
155 | /* Collect the BATs that are used concurrently to ensure that |
156 | * there is a single point where they can be released |
157 | */ |
158 | static int |
159 | dflowGarbagesink(Client cntxt, MalBlkPtr mb, int var, InstrPtr *sink, int top) |
160 | { |
161 | InstrPtr r; |
162 | int i; |
163 | for( i =0; i<top; i++) |
164 | if( getArg(sink[i],1) == var) |
165 | return top; |
166 | (void) cntxt; |
167 | |
168 | r = newInstruction(NULL,languageRef, passRef); |
169 | getArg(r,0) = newTmpVariable(mb,TYPE_void); |
170 | r= pushArgument(mb,r, var); |
171 | sink[top++] = r; |
172 | return top; |
173 | } |
174 | |
175 | /* dataflow blocks are transparent, because they are always |
176 | executed, either sequentially or in parallel */ |
177 | |
178 | str |
179 | OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p) |
180 | { |
181 | int i,j,k, start=1, slimit, breakpoint, actions=0, simple = TRUE; |
182 | int flowblock= 0; |
183 | InstrPtr *sink = NULL, *old = NULL, q; |
184 | int limit, vlimit, top = 0; |
185 | States states; |
186 | char buf[256]; |
187 | lng usec = GDKusec(); |
188 | str msg = MAL_SUCCEED; |
189 | |
190 | /* don't use dataflow on single processor systems */ |
191 | if (GDKnr_threads <= 1) |
192 | return MAL_SUCCEED; |
193 | |
194 | if ( optimizerIsApplied(mb,"dataflow" )) |
195 | return MAL_SUCCEED; |
196 | (void) stk; |
197 | /* inlined functions will get their dataflow control later */ |
198 | if ( mb->inlineProp) |
199 | return MAL_SUCCEED; |
200 | |
201 | if( OPTdebug & OPTdataflow){ |
202 | fprintf(stderr,"#dataflow input\n" ); |
203 | fprintFunction(stderr, mb, 0, LIST_MAL_ALL); |
204 | } |
205 | |
206 | vlimit = mb->vsize; |
207 | states = (States) GDKzalloc(vlimit * sizeof(char)); |
208 | sink = (InstrPtr *) GDKzalloc(mb->stop * sizeof(InstrPtr)); |
209 | if (states == NULL || sink == NULL){ |
210 | msg= createException(MAL,"optimizer.dataflow" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
211 | goto wrapup; |
212 | } |
213 | |
214 | setVariableScope(mb); |
215 | |
216 | limit= mb->stop; |
217 | slimit= mb->ssize; |
218 | old = mb->stmt; |
219 | if (newMalBlkStmt(mb, mb->ssize) < 0) { |
220 | msg= createException(MAL,"optimizer.dataflow" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
221 | actions = -1; |
222 | goto wrapup; |
223 | } |
224 | pushInstruction(mb,old[0]); |
225 | |
226 | /* inject new dataflow barriers using a single pass through the program */ |
227 | for (i = 1; i<limit; i++) { |
228 | p = old[i]; |
229 | assert(p); |
230 | breakpoint = dataflowBreakpoint(cntxt, mb, p ,states); |
231 | if ( breakpoint ){ |
232 | /* close previous flow block */ |
233 | simple = simpleFlow(old,start,i); |
234 | |
235 | if( OPTdebug & OPTdataflow){ |
236 | fprintf(stderr,"#breakpoint pc %d %s\n" ,i, (simple?"simple" :"" ) ); |
237 | } |
238 | if ( !simple){ |
239 | flowblock = newTmpVariable(mb,TYPE_bit); |
240 | q= newFcnCall(mb,languageRef,dataflowRef); |
241 | q->barrier= BARRIERsymbol; |
242 | getArg(q,0)= flowblock; |
243 | actions++; |
244 | } |
245 | // copyblock the collected statements |
246 | for( j=start ; j<i; j++) { |
247 | q= old[j]; |
248 | pushInstruction(mb,q); |
249 | // collect BAT variables garbage collected within the block |
250 | if( !simple) |
251 | for( k=q->retc; k<q->argc; k++){ |
252 | if (getState(states,q,k) & VAR2READ && getEndScope(mb,getArg(q,k)) == j && isaBatType(getVarType(mb,getArg(q,k))) ) |
253 | top = dflowGarbagesink(cntxt, mb, getArg(q,k), sink, top); |
254 | } |
255 | } |
256 | /* exit parallel block */ |
257 | if ( ! simple){ |
258 | // force the pending final garbage statements |
259 | for( j=0; j<top; j++) |
260 | pushInstruction(mb,sink[j]); |
261 | q= newAssignment(mb); |
262 | q->barrier= EXITsymbol; |
263 | getArg(q,0) = flowblock; |
264 | } |
265 | if (p->token == ENDsymbol){ |
266 | for(; i < limit; i++) |
267 | if( old[i]) |
268 | pushInstruction(mb,old[i]); |
269 | break; |
270 | } |
271 | // implicitly a new flow block starts unless we have a hard side-effect |
272 | memset((char*) states, 0, vlimit * sizeof(char)); |
273 | top = 0; |
274 | if ( p->token == ENDsymbol || (hasSideEffects(mb,p,FALSE) && !blockStart(p)) || isMultiplex(p)){ |
275 | start = i+1; |
276 | pushInstruction(mb,p); |
277 | continue; |
278 | } |
279 | start = i; |
280 | } |
281 | |
282 | if (blockStart(p)){ |
283 | /* barrier blocks are kept out of the dataflow */ |
284 | /* assumes that barrier entry/exit pairs are correct. */ |
285 | /* A refinement is parallelize within a barrier block */ |
286 | int copy= 1; |
287 | pushInstruction(mb,p); |
288 | for ( i++; i<limit; i++) { |
289 | p = old[i]; |
290 | pushInstruction(mb,p); |
291 | |
292 | if (blockStart(p)) |
293 | copy++; |
294 | if (blockExit(p)) { |
295 | copy--; |
296 | if ( copy == 0) break; |
297 | } |
298 | } |
299 | // reset admin |
300 | start = i+1; |
301 | } |
302 | // remember you assigned/read variables |
303 | for ( k = 0; k < p->retc; k++) |
304 | setState(states, p, k, VARWRITE); |
305 | if( isUpdateInstruction(p) && (getState(states,p,1) == 0 || getState(states,p,1) & VARWRITE)) |
306 | setState(states, p,1, VARBLOCK); |
307 | for ( k = p->retc; k< p->argc; k++) |
308 | if( !isVarConstant(mb,getArg(p,k)) ){ |
309 | if( getState(states, p, k) & VARREAD) |
310 | setState(states, p, k, VAR2READ); |
311 | else |
312 | if( getState(states, p, k) & VARWRITE) |
313 | setState(states, p ,k, VARREAD); |
314 | } |
315 | |
316 | if( OPTdebug & OPTdataflow){ |
317 | fprintf(stderr,"# variable states\n" ); |
318 | fprintInstruction(stderr,mb, 0, p , LIST_MAL_ALL); |
319 | for(k = 0; k < p->argc; k++) |
320 | fprintf(stderr,"#%s %d\n" , getVarName(mb,getArg(p,k)), states[getArg(p,k)] ); |
321 | } |
322 | |
323 | } |
324 | /* take the remainder as is */ |
325 | for (; i<slimit; i++) |
326 | if (old[i]) |
327 | freeInstruction(old[i]); |
328 | /* Defense line against incorrect plans */ |
329 | if( actions > 0){ |
330 | chkTypes(cntxt->usermodule, mb, FALSE); |
331 | chkFlow(mb); |
332 | chkDeclarations(mb); |
333 | } |
334 | /* keep all actions taken as a post block comment */ |
335 | usec = GDKusec()- usec; |
336 | snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec" ,"dataflow" ,actions,usec); |
337 | newComment(mb,buf); |
338 | if( actions >= 0) |
339 | addtoMalBlkHistory(mb); |
340 | |
341 | wrapup: |
342 | if(states) GDKfree(states); |
343 | if(sink) GDKfree(sink); |
344 | if(old) GDKfree(old); |
345 | if( OPTdebug & OPTdataflow){ |
346 | fprintf(stderr, "#DATAFLOW optimizer exit\n" ); |
347 | fprintFunction(stderr, mb, 0, LIST_MAL_ALL); |
348 | } |
349 | |
350 | return msg; |
351 | } |
352 | |