1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | #include "monetdb_config.h" |
10 | #include "opt_mitosis.h" |
11 | #include "mal_interpreter.h" |
12 | #include "gdk_utils.h" |
13 | |
14 | |
15 | str |
16 | OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p) |
17 | { |
18 | int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0, mito_size = 0, row_size = 0, mt = -1; |
19 | str schema = 0, table = 0; |
20 | BUN r = 0, rowcnt = 0; /* table should be sizeable to consider parallel execution*/ |
21 | InstrPtr q, *old, target = 0; |
22 | size_t argsize = 6 * sizeof(lng), m = 0; |
23 | /* per op: 6 = (2+1)*2 <= 2 args + 1 res, each with head & tail */ |
24 | int threads = GDKnr_threads ? GDKnr_threads : 1; |
25 | int activeClients; |
26 | char buf[256]; |
27 | lng usec = GDKusec(); |
28 | str msg = MAL_SUCCEED; |
29 | |
30 | //if ( optimizerIsApplied(mb,"mitosis") ) |
31 | //return 0; |
32 | (void) cntxt; |
33 | (void) stk; |
34 | |
35 | old = mb->stmt; |
36 | for (i = 1; i < mb->stop; i++) { |
37 | InstrPtr p = old[i]; |
38 | |
39 | if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef && |
40 | p->argc > 2 && getArgType(mb, p, 2) == TYPE_str && |
41 | isVarConstant(mb, getArg(p, 2)) && |
42 | getVarConstant(mb, getArg(p, 2)).val.sval != NULL && |
43 | (strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "PRIMARY KEY constraint" ) || |
44 | strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "UNIQUE constraint" ))) |
45 | goto bailout; |
46 | |
47 | /* mitosis/mergetable bailout conditions */ |
48 | |
49 | if (p->argc > 2 && getModuleId(p) == aggrRef && |
50 | getFunctionId(p) != subcountRef && |
51 | getFunctionId(p) != subminRef && |
52 | getFunctionId(p) != submaxRef && |
53 | getFunctionId(p) != subavgRef && |
54 | getFunctionId(p) != subsumRef && |
55 | getFunctionId(p) != subprodRef && |
56 | |
57 | getFunctionId(p) != countRef && |
58 | getFunctionId(p) != minRef && |
59 | getFunctionId(p) != maxRef && |
60 | getFunctionId(p) != avgRef && |
61 | getFunctionId(p) != sumRef && |
62 | getFunctionId(p) != prodRef) |
63 | goto bailout; |
64 | |
65 | /* do not split up floating point bat that is being summed */ |
66 | if (p->retc == 1 && |
67 | (((p->argc == 6 || p->argc == 7) && |
68 | getModuleId(p) == aggrRef && |
69 | getFunctionId(p) == subsumRef) || |
70 | (p->argc == 4 && |
71 | getModuleId(p) == aggrRef && |
72 | getFunctionId(p) == sumRef)) && |
73 | isaBatType(getArgType(mb, p, p->retc)) && |
74 | (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt || |
75 | getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)) |
76 | goto bailout; |
77 | |
78 | if (p->argc > 2 && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef || getModuleId(p) == pyapiRef || getModuleId(p) == pyapi3Ref) && |
79 | getFunctionId(p) == subeval_aggrRef) |
80 | goto bailout; |
81 | |
82 | /* Mergetable cannot handle intersect/except's for now */ |
83 | if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef) |
84 | goto bailout; |
85 | |
86 | /* locate the largest non-partitioned table */ |
87 | if (getModuleId(p) != sqlRef || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef)) |
88 | continue; |
89 | /* don't split insert BATs */ |
90 | if (getVarConstant(mb, getArg(p, 5)).val.ival == 1) |
91 | continue; |
92 | if (p->argc > 6) |
93 | continue; /* already partitioned */ |
94 | /* |
95 | * The SQL optimizer already collects the counts of the base |
96 | * table and passes them on as a row property. All pieces for a |
97 | * single subplan should ideally fit together. |
98 | */ |
99 | r = getRowCnt(mb, getArg(p, 0)); |
100 | if (r >= rowcnt) { |
101 | /* the rowsize depends on the column types, assume void-headed */ |
102 | row_size = ATOMsize(getBatType(getArgType(mb,p,0))); |
103 | rowcnt = r; |
104 | target = p; |
105 | estimate++; |
106 | r = 0; |
107 | } |
108 | } |
109 | if (target == 0) |
110 | goto bailout; |
111 | /* |
112 | * The number of pieces should be based on the footprint of the |
113 | * queryplan, such that preferrably it can be handled without |
114 | * swapping intermediates. For the time being we just go for pieces |
115 | * that fit into memory in isolation. A fictive rowcount is derived |
116 | * based on argument types, such that all pieces would fit into |
117 | * memory conveniently for processing. We attempt to use not more |
118 | * threads than strictly needed. |
119 | * Experience shows that the pieces should not be too small. |
120 | * If we should limit to |threads| is still an open issue. |
121 | * |
122 | * Take into account the number of client connections, |
123 | * because all user together are responsible for resource contentions |
124 | */ |
125 | activeClients = mb->activeClients = MCactiveClients(); |
126 | |
127 | if (cntxt->memorylimit){ |
128 | /* the new mitosis scheme uses a maximum chunck size in MB from the client context */ |
129 | m = (size_t) ((cntxt->memorylimit * 1024 *1024) / row_size); |
130 | pieces = (int) (rowcnt / m + (rowcnt - m * pieces > 0)); |
131 | } |
132 | if (cntxt->memorylimit == 0 || pieces <= 1){ |
133 | /* the old allocation scheme */ |
134 | m = GDK_mem_maxsize / argsize; |
135 | /* if data exceeds memory size, |
136 | * i.e., (rowcnt*argsize > GDK_mem_maxsize), |
137 | * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */ |
138 | assert(threads > 0); |
139 | assert(activeClients > 0); |
140 | if (rowcnt > m && m / threads / activeClients > 0) { |
141 | /* create |pieces| > |threads| partitions such that |
142 | * |threads| partitions at a time fit in memory, |
143 | * i.e., (threads*(rowcnt/pieces) <= m), |
144 | * i.e., (rowcnt/pieces <= m/threads), |
145 | * i.e., (pieces => rowcnt/(m/threads)) |
146 | * (assuming that (m > threads*MINPARTCNT)) */ |
147 | pieces = (int) (rowcnt / (m / threads / activeClients)) + 1; |
148 | } else if (rowcnt > MINPARTCNT) { |
149 | /* exploit parallelism, but ensure minimal partition size to |
150 | * limit overhead */ |
151 | pieces = (int) MIN(rowcnt / MINPARTCNT, (BUN) threads); |
152 | } |
153 | } |
154 | |
155 | /* when testing, always aim for full parallelism, but avoid |
156 | * empty pieces */ |
157 | FORCEMITODEBUG |
158 | if (pieces < threads) |
159 | pieces = (int) MIN((BUN) threads, rowcnt); |
160 | /* prevent plan explosion */ |
161 | if (pieces > MAXSLICES) |
162 | pieces = MAXSLICES; |
163 | /* to enable experimentation we introduce the option to set |
164 | * the number of parts required and/or the size of each chunk (in K) |
165 | */ |
166 | mito_parts = GDKgetenv_int("mito_parts" , 0); |
167 | if (mito_parts > 0) |
168 | pieces = mito_parts; |
169 | mito_size = GDKgetenv_int("mito_size" , 0); |
170 | if (mito_size > 0) |
171 | pieces = (int) ((rowcnt * row_size) / (mito_size * 1024)); |
172 | |
173 | if(OPTdebug & OPTmitosis){ |
174 | fprintf(stderr, "#opt_mitosis: target is %s.%s " |
175 | " with " BUNFMT " rows of size %d into %zu" |
176 | " rows/piece %d threads %d pieces" |
177 | " fixed parts %d fixed size %d chunk = " BUNFMT"\n" , |
178 | getVarConstant(mb, getArg(target, 2)).val.sval, |
179 | getVarConstant(mb, getArg(target, 3)).val.sval, |
180 | rowcnt, row_size, m, threads, pieces, mito_parts, mito_size, rowcnt / pieces * row_size); |
181 | } |
182 | |
183 | if (pieces <= 1) |
184 | goto bailout; |
185 | |
186 | /* at this stage we have identified the #chunks to be used for the largest table */ |
187 | limit = mb->stop; |
188 | slimit = mb->ssize; |
189 | if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0) |
190 | throw(MAL,"optimizer.mitosis" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
191 | estimate = 0; |
192 | |
193 | schema = getVarConstant(mb, getArg(target, 2)).val.sval; |
194 | table = getVarConstant(mb, getArg(target, 3)).val.sval; |
195 | for (i = 0; i < limit; i++) { |
196 | int upd = 0, qtpe, rtpe = 0, qv, rv; |
197 | InstrPtr matq, matr = NULL; |
198 | p = old[i]; |
199 | |
200 | if (getModuleId(p) != sqlRef || |
201 | !(getFunctionId(p) == bindRef || |
202 | getFunctionId(p) == bindidxRef || |
203 | getFunctionId(p) == tidRef)) { |
204 | pushInstruction(mb, p); |
205 | continue; |
206 | } |
207 | /* don't split insert BATs */ |
208 | if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) { |
209 | pushInstruction(mb, p); |
210 | continue; |
211 | } |
212 | r = getRowCnt(mb, getArg(p, 0)); |
213 | if (r < rowcnt) { |
214 | pushInstruction(mb, p); |
215 | continue; |
216 | } |
217 | /* Don't split the (index) bat if we already have identified a range */ |
218 | /* This will happen if we inline separately optimized routines */ |
219 | if (p->argc > 7) { |
220 | pushInstruction(mb, p); |
221 | continue; |
222 | } |
223 | if (p->retc == 2) |
224 | upd = 1; |
225 | if( mt == -1) |
226 | mt = getMitosisPartition(p); |
227 | if (mt < 0 && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval) || |
228 | strcmp(table, getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) { |
229 | pushInstruction(mb, p); |
230 | continue; |
231 | } |
232 | if (mt >= 0 && getMitosisPartition(p) != mt) { |
233 | pushInstruction(mb, p); |
234 | continue; |
235 | } |
236 | /* we keep the original bind operation, because it allows for |
237 | * easy undo when the mergtable can not do something */ |
238 | pushInstruction(mb, p); |
239 | |
240 | qtpe = getVarType(mb, getArg(p, 0)); |
241 | |
242 | matq = newInstruction(NULL, matRef, newRef); |
243 | getArg(matq, 0) = getArg(p, 0); |
244 | |
245 | if (upd) { |
246 | matr = newInstruction(NULL, matRef, newRef); |
247 | getArg(matr, 0) = getArg(p, 1); |
248 | rtpe = getVarType(mb, getArg(p, 1)); |
249 | } |
250 | |
251 | for (j = 0; j < pieces; j++) { |
252 | q = copyInstruction(p); |
253 | if( q == NULL){ |
254 | for (; i<limit; i++) |
255 | if (old[i]) |
256 | pushInstruction(mb,old[i]); |
257 | GDKfree(old); |
258 | throw(MAL,"optimizer.mitosis" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
259 | } |
260 | q = pushInt(mb, q, j); |
261 | q = pushInt(mb, q, pieces); |
262 | |
263 | qv = getArg(q, 0) = newTmpVariable(mb, qtpe); |
264 | setVarUDFtype(mb, qv); |
265 | if (upd) { |
266 | rv = getArg(q, 1) = newTmpVariable(mb, rtpe); |
267 | setVarUDFtype(mb, rv); |
268 | } |
269 | pushInstruction(mb, q); |
270 | matq = pushArgument(mb, matq, qv); |
271 | if (upd) |
272 | matr = pushArgument(mb, matr, rv); |
273 | } |
274 | pushInstruction(mb, matq); |
275 | if (upd) |
276 | pushInstruction(mb, matr); |
277 | } |
278 | for (; i<slimit; i++) |
279 | if (old[i]) |
280 | freeInstruction(old[i]); |
281 | GDKfree(old); |
282 | |
283 | /* Defense line against incorrect plans */ |
284 | if( 1){ |
285 | chkTypes(cntxt->usermodule, mb, FALSE); |
286 | chkFlow(mb); |
287 | chkDeclarations(mb); |
288 | } |
289 | /* keep all actions taken as a post block comment */ |
290 | bailout: |
291 | usec = GDKusec()- usec; |
292 | snprintf(buf,256,"%-20s actions=1 time=" LLFMT " usec" ,"mitosis" , usec); |
293 | newComment(mb,buf); |
294 | addtoMalBlkHistory(mb); |
295 | |
296 | if( OPTdebug & OPTmitosis){ |
297 | fprintf(stderr, "#MITOSIS optimizer exit\n" ); |
298 | fprintFunction(stderr, mb, 0, LIST_MAL_ALL); |
299 | } |
300 | return msg; |
301 | } |
302 | |