1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * The statemens are all checked for being eligible for dataflow.
11 */
12#include "monetdb_config.h"
13#include "opt_dataflow.h"
14#include "mal_instruction.h"
15#include "mal_interpreter.h"
16#include "manifold.h"
17
18/*
19 * Dataflow processing incurs overhead and is only
20 * relevant if multiple tasks kan be handled at the same time.
21 * Also simple expressions dont have to be executed in parallel.
22 *
23 * The garbagesink contains variables whose endoflife is within
24 * a dataflow block and who are used concurrently.
25 * They are garbage collected at the end of the parallel block.
26 *
27 * The dataflow analysis centers around the read/write use patterns of
28 * the variables and the occurrence of side-effect bearing functions.
29 * Any such function should break the dataflow block as it may rely
30 * on the sequential order in the plan.
31 *
32 * The following state properties can be distinguished for all variables:
33 * VARWRITE - variable assigned a value in the dataflow block
34 * VARREAD - variable is used in an argument
35 * VAR2READ - variable is read in concurrent mode
36 * VARBLOCK - variable next use terminate the // block, set after encountering an update
37 *
38 * Only some combinations are allowed.
39 */
40
41#define VARFREE 0
42#define VARWRITE 1
43#define VARREAD 2
44#define VARBLOCK 4
45#define VAR2READ 8
46
47typedef char *States;
48
49#define setState(S,P,K,F) ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
50#define getState(S,P,K) ((S)[getArg(P,K)])
51
52static int
53simpleFlow(InstrPtr *old, int start, int last)
54{
55 int i, j, k, simple = TRUE;
56 InstrPtr p = NULL, q;
57
58 /* ignore trivial blocks */
59 if ( last - start == 1)
60 return TRUE;
61 /* skip sequence of simple arithmetic first */
62 for( ; simple && start < last; start++)
63 if ( old[start] ) {
64 p= old[start];
65 simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef;
66 }
67 for( i = start; i < last; i++)
68 if ( old[i]) {
69 q= old[i];
70 simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef || getModuleId(q) == strRef || getModuleId(q)== mmathRef;
71 if( !simple) {
72 /* if not arithmetic than we should consume the previous result directly */
73 for( j= q->retc; j < q->argc; j++)
74 for( k =0; k < p->retc; k++)
75 if( getArg(p,k) == getArg(q,j))
76 simple= TRUE;
77 if( !simple)
78 return 0;
79 }
80 p = q;
81 }
82 return simple;
83}
84
85/* Updates are permitted if it is a unique update on
86 * a BAT created in the context of this block
87 * As far as we know, no SQL nor MAL test re-uses the
88 * target BAT to insert again and subsequently calls dataflow.
89 * In MAL scripts, they still can occur.
90*/
91
92/* a limited set of MAL instructions may appear in the dataflow block*/
93static int
94dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
95{
96 int j;
97
98 if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p) ||
99 (isMultiplex(p) && MANIFOLDtypecheck(cntxt,mb,p,0) == NULL) ){
100
101 if( OPTdebug & OPTdataflow){
102 fprintf(stderr,"#breakpoint on instruction\n");
103 }
104 return TRUE;
105 }
106
107 /* flow blocks should be closed when we reach a point
108 where a variable is assigned more then once or already
109 being read.
110 */
111 for(j=0; j<p->retc; j++)
112 if ( getState(states,p,j) & (VARWRITE | VARREAD | VARBLOCK)){
113
114 if( OPTdebug & OPTdataflow){
115 fprintf(stderr,"#breakpoint on argument %s state %d\n", getVarName(mb,getArg(p,j)), getState(states,p,j));
116 }
117
118 return 1;
119 }
120
121 /* update instructions can be updated if the target variable
122 * has not been read in the block so far */
123 if ( isUpdateInstruction(p) ){
124 /* the SQL update functions change BATs that are not
125 * explicitly mentioned as arguments (and certainly not as the
126 * first argument), but that can still be available to the MAL
127 * program (see bugs.monetdb.org/6641) */
128 if (getModuleId(p) == sqlRef)
129 return 1;
130
131 if( OPTdebug & OPTdataflow){
132 if( getState(states,p,1) & (VARREAD | VARBLOCK))
133 fprintf(stderr,"#breakpoint on update %s state %d\n", getVarName(mb,getArg(p,j)), getState(states,p,j));
134 }
135 return getState(states,p,p->retc) & (VARREAD | VARBLOCK);
136 }
137
138 for(j=p->retc; j < p->argc; j++){
139 if ( getState(states,p,j) & VARBLOCK){
140 if( OPTdebug & OPTdataflow){
141 if( getState(states,p,j) & VARREAD)
142 fprintf(stderr,"#breakpoint on blocked var %s state %d\n", getVarName(mb,getArg(p,j)), getState(states,p,j));
143 }
144 return 1;
145 }
146
147 if( OPTdebug & OPTdataflow){
148 if( hasSideEffects(mb,p,FALSE))
149 fprintf(stderr,"#breakpoint on sideeffect var %s %s.%s\n", getVarName(mb,getArg(p,j)), getModuleId(p), getFunctionId(p));
150 }
151 }
152 return hasSideEffects(mb,p,FALSE);
153}
154
155/* Collect the BATs that are used concurrently to ensure that
156 * there is a single point where they can be released
157 */
158static int
159dflowGarbagesink(Client cntxt, MalBlkPtr mb, int var, InstrPtr *sink, int top)
160{
161 InstrPtr r;
162 int i;
163 for( i =0; i<top; i++)
164 if( getArg(sink[i],1) == var)
165 return top;
166 (void) cntxt;
167
168 r = newInstruction(NULL,languageRef, passRef);
169 getArg(r,0) = newTmpVariable(mb,TYPE_void);
170 r= pushArgument(mb,r, var);
171 sink[top++] = r;
172 return top;
173}
174
175/* dataflow blocks are transparent, because they are always
176 executed, either sequentially or in parallel */
177
178str
179OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p)
180{
181 int i,j,k, start=1, slimit, breakpoint, actions=0, simple = TRUE;
182 int flowblock= 0;
183 InstrPtr *sink = NULL, *old = NULL, q;
184 int limit, vlimit, top = 0;
185 States states;
186 char buf[256];
187 lng usec = GDKusec();
188 str msg = MAL_SUCCEED;
189
190 /* don't use dataflow on single processor systems */
191 if (GDKnr_threads <= 1)
192 return MAL_SUCCEED;
193
194 if ( optimizerIsApplied(mb,"dataflow"))
195 return MAL_SUCCEED;
196 (void) stk;
197 /* inlined functions will get their dataflow control later */
198 if ( mb->inlineProp)
199 return MAL_SUCCEED;
200
201 if( OPTdebug & OPTdataflow){
202 fprintf(stderr,"#dataflow input\n");
203 fprintFunction(stderr, mb, 0, LIST_MAL_ALL);
204 }
205
206 vlimit = mb->vsize;
207 states = (States) GDKzalloc(vlimit * sizeof(char));
208 sink = (InstrPtr *) GDKzalloc(mb->stop * sizeof(InstrPtr));
209 if (states == NULL || sink == NULL){
210 msg= createException(MAL,"optimizer.dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
211 goto wrapup;
212 }
213
214 setVariableScope(mb);
215
216 limit= mb->stop;
217 slimit= mb->ssize;
218 old = mb->stmt;
219 if (newMalBlkStmt(mb, mb->ssize) < 0) {
220 msg= createException(MAL,"optimizer.dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
221 actions = -1;
222 goto wrapup;
223 }
224 pushInstruction(mb,old[0]);
225
226 /* inject new dataflow barriers using a single pass through the program */
227 for (i = 1; i<limit; i++) {
228 p = old[i];
229 assert(p);
230 breakpoint = dataflowBreakpoint(cntxt, mb, p ,states);
231 if ( breakpoint ){
232 /* close previous flow block */
233 simple = simpleFlow(old,start,i);
234
235 if( OPTdebug & OPTdataflow){
236 fprintf(stderr,"#breakpoint pc %d %s\n",i, (simple?"simple":"") );
237 }
238 if ( !simple){
239 flowblock = newTmpVariable(mb,TYPE_bit);
240 q= newFcnCall(mb,languageRef,dataflowRef);
241 q->barrier= BARRIERsymbol;
242 getArg(q,0)= flowblock;
243 actions++;
244 }
245 // copyblock the collected statements
246 for( j=start ; j<i; j++) {
247 q= old[j];
248 pushInstruction(mb,q);
249 // collect BAT variables garbage collected within the block
250 if( !simple)
251 for( k=q->retc; k<q->argc; k++){
252 if (getState(states,q,k) & VAR2READ && getEndScope(mb,getArg(q,k)) == j && isaBatType(getVarType(mb,getArg(q,k))) )
253 top = dflowGarbagesink(cntxt, mb, getArg(q,k), sink, top);
254 }
255 }
256 /* exit parallel block */
257 if ( ! simple){
258 // force the pending final garbage statements
259 for( j=0; j<top; j++)
260 pushInstruction(mb,sink[j]);
261 q= newAssignment(mb);
262 q->barrier= EXITsymbol;
263 getArg(q,0) = flowblock;
264 }
265 if (p->token == ENDsymbol){
266 for(; i < limit; i++)
267 if( old[i])
268 pushInstruction(mb,old[i]);
269 break;
270 }
271 // implicitly a new flow block starts unless we have a hard side-effect
272 memset((char*) states, 0, vlimit * sizeof(char));
273 top = 0;
274 if ( p->token == ENDsymbol || (hasSideEffects(mb,p,FALSE) && !blockStart(p)) || isMultiplex(p)){
275 start = i+1;
276 pushInstruction(mb,p);
277 continue;
278 }
279 start = i;
280 }
281
282 if (blockStart(p)){
283 /* barrier blocks are kept out of the dataflow */
284 /* assumes that barrier entry/exit pairs are correct. */
285 /* A refinement is parallelize within a barrier block */
286 int copy= 1;
287 pushInstruction(mb,p);
288 for ( i++; i<limit; i++) {
289 p = old[i];
290 pushInstruction(mb,p);
291
292 if (blockStart(p))
293 copy++;
294 if (blockExit(p)) {
295 copy--;
296 if ( copy == 0) break;
297 }
298 }
299 // reset admin
300 start = i+1;
301 }
302 // remember you assigned/read variables
303 for ( k = 0; k < p->retc; k++)
304 setState(states, p, k, VARWRITE);
305 if( isUpdateInstruction(p) && (getState(states,p,1) == 0 || getState(states,p,1) & VARWRITE))
306 setState(states, p,1, VARBLOCK);
307 for ( k = p->retc; k< p->argc; k++)
308 if( !isVarConstant(mb,getArg(p,k)) ){
309 if( getState(states, p, k) & VARREAD)
310 setState(states, p, k, VAR2READ);
311 else
312 if( getState(states, p, k) & VARWRITE)
313 setState(states, p ,k, VARREAD);
314 }
315
316 if( OPTdebug & OPTdataflow){
317 fprintf(stderr,"# variable states\n");
318 fprintInstruction(stderr,mb, 0, p , LIST_MAL_ALL);
319 for(k = 0; k < p->argc; k++)
320 fprintf(stderr,"#%s %d\n", getVarName(mb,getArg(p,k)), states[getArg(p,k)] );
321 }
322
323 }
324 /* take the remainder as is */
325 for (; i<slimit; i++)
326 if (old[i])
327 freeInstruction(old[i]);
328 /* Defense line against incorrect plans */
329 if( actions > 0){
330 chkTypes(cntxt->usermodule, mb, FALSE);
331 chkFlow(mb);
332 chkDeclarations(mb);
333 }
334 /* keep all actions taken as a post block comment */
335 usec = GDKusec()- usec;
336 snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec","dataflow",actions,usec);
337 newComment(mb,buf);
338 if( actions >= 0)
339 addtoMalBlkHistory(mb);
340
341wrapup:
342 if(states) GDKfree(states);
343 if(sink) GDKfree(sink);
344 if(old) GDKfree(old);
345 if( OPTdebug & OPTdataflow){
346 fprintf(stderr, "#DATAFLOW optimizer exit\n");
347 fprintFunction(stderr, mb, 0, LIST_MAL_ALL);
348 }
349
350 return msg;
351}
352