1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "opt_mitosis.h"
11#include "mal_interpreter.h"
12#include "gdk_utils.h"
13
14
15str
16OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p)
17{
18 int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0, mito_size = 0, row_size = 0, mt = -1;
19 str schema = 0, table = 0;
20 BUN r = 0, rowcnt = 0; /* table should be sizeable to consider parallel execution*/
21 InstrPtr q, *old, target = 0;
22 size_t argsize = 6 * sizeof(lng), m = 0;
23 /* per op: 6 = (2+1)*2 <= 2 args + 1 res, each with head & tail */
24 int threads = GDKnr_threads ? GDKnr_threads : 1;
25 int activeClients;
26 char buf[256];
27 lng usec = GDKusec();
28 str msg = MAL_SUCCEED;
29
30 //if ( optimizerIsApplied(mb,"mitosis") )
31 //return 0;
32 (void) cntxt;
33 (void) stk;
34
35 old = mb->stmt;
36 for (i = 1; i < mb->stop; i++) {
37 InstrPtr p = old[i];
38
39 if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef &&
40 p->argc > 2 && getArgType(mb, p, 2) == TYPE_str &&
41 isVarConstant(mb, getArg(p, 2)) &&
42 getVarConstant(mb, getArg(p, 2)).val.sval != NULL &&
43 (strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "PRIMARY KEY constraint") ||
44 strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "UNIQUE constraint")))
45 goto bailout;
46
47 /* mitosis/mergetable bailout conditions */
48
49 if (p->argc > 2 && getModuleId(p) == aggrRef &&
50 getFunctionId(p) != subcountRef &&
51 getFunctionId(p) != subminRef &&
52 getFunctionId(p) != submaxRef &&
53 getFunctionId(p) != subavgRef &&
54 getFunctionId(p) != subsumRef &&
55 getFunctionId(p) != subprodRef &&
56
57 getFunctionId(p) != countRef &&
58 getFunctionId(p) != minRef &&
59 getFunctionId(p) != maxRef &&
60 getFunctionId(p) != avgRef &&
61 getFunctionId(p) != sumRef &&
62 getFunctionId(p) != prodRef)
63 goto bailout;
64
65 /* do not split up floating point bat that is being summed */
66 if (p->retc == 1 &&
67 (((p->argc == 6 || p->argc == 7) &&
68 getModuleId(p) == aggrRef &&
69 getFunctionId(p) == subsumRef) ||
70 (p->argc == 4 &&
71 getModuleId(p) == aggrRef &&
72 getFunctionId(p) == sumRef)) &&
73 isaBatType(getArgType(mb, p, p->retc)) &&
74 (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt ||
75 getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl))
76 goto bailout;
77
78 if (p->argc > 2 && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef || getModuleId(p) == pyapiRef || getModuleId(p) == pyapi3Ref) &&
79 getFunctionId(p) == subeval_aggrRef)
80 goto bailout;
81
82 /* Mergetable cannot handle intersect/except's for now */
83 if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef)
84 goto bailout;
85
86 /* locate the largest non-partitioned table */
87 if (getModuleId(p) != sqlRef || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef))
88 continue;
89 /* don't split insert BATs */
90 if (getVarConstant(mb, getArg(p, 5)).val.ival == 1)
91 continue;
92 if (p->argc > 6)
93 continue; /* already partitioned */
94 /*
95 * The SQL optimizer already collects the counts of the base
96 * table and passes them on as a row property. All pieces for a
97 * single subplan should ideally fit together.
98 */
99 r = getRowCnt(mb, getArg(p, 0));
100 if (r >= rowcnt) {
101 /* the rowsize depends on the column types, assume void-headed */
102 row_size = ATOMsize(getBatType(getArgType(mb,p,0)));
103 rowcnt = r;
104 target = p;
105 estimate++;
106 r = 0;
107 }
108 }
109 if (target == 0)
110 goto bailout;
111 /*
112 * The number of pieces should be based on the footprint of the
113 * queryplan, such that preferrably it can be handled without
114 * swapping intermediates. For the time being we just go for pieces
115 * that fit into memory in isolation. A fictive rowcount is derived
116 * based on argument types, such that all pieces would fit into
117 * memory conveniently for processing. We attempt to use not more
118 * threads than strictly needed.
119 * Experience shows that the pieces should not be too small.
120 * If we should limit to |threads| is still an open issue.
121 *
122 * Take into account the number of client connections,
123 * because all user together are responsible for resource contentions
124 */
125 activeClients = mb->activeClients = MCactiveClients();
126
127 if (cntxt->memorylimit){
128 /* the new mitosis scheme uses a maximum chunck size in MB from the client context */
129 m = (size_t) ((cntxt->memorylimit * 1024 *1024) / row_size);
130 pieces = (int) (rowcnt / m + (rowcnt - m * pieces > 0));
131 }
132 if (cntxt->memorylimit == 0 || pieces <= 1){
133 /* the old allocation scheme */
134 m = GDK_mem_maxsize / argsize;
135 /* if data exceeds memory size,
136 * i.e., (rowcnt*argsize > GDK_mem_maxsize),
137 * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
138 assert(threads > 0);
139 assert(activeClients > 0);
140 if (rowcnt > m && m / threads / activeClients > 0) {
141 /* create |pieces| > |threads| partitions such that
142 * |threads| partitions at a time fit in memory,
143 * i.e., (threads*(rowcnt/pieces) <= m),
144 * i.e., (rowcnt/pieces <= m/threads),
145 * i.e., (pieces => rowcnt/(m/threads))
146 * (assuming that (m > threads*MINPARTCNT)) */
147 pieces = (int) (rowcnt / (m / threads / activeClients)) + 1;
148 } else if (rowcnt > MINPARTCNT) {
149 /* exploit parallelism, but ensure minimal partition size to
150 * limit overhead */
151 pieces = (int) MIN(rowcnt / MINPARTCNT, (BUN) threads);
152 }
153 }
154
155 /* when testing, always aim for full parallelism, but avoid
156 * empty pieces */
157 FORCEMITODEBUG
158 if (pieces < threads)
159 pieces = (int) MIN((BUN) threads, rowcnt);
160 /* prevent plan explosion */
161 if (pieces > MAXSLICES)
162 pieces = MAXSLICES;
163 /* to enable experimentation we introduce the option to set
164 * the number of parts required and/or the size of each chunk (in K)
165 */
166 mito_parts = GDKgetenv_int("mito_parts", 0);
167 if (mito_parts > 0)
168 pieces = mito_parts;
169 mito_size = GDKgetenv_int("mito_size", 0);
170 if (mito_size > 0)
171 pieces = (int) ((rowcnt * row_size) / (mito_size * 1024));
172
173 if(OPTdebug & OPTmitosis){
174 fprintf(stderr, "#opt_mitosis: target is %s.%s "
175 " with " BUNFMT " rows of size %d into %zu"
176 " rows/piece %d threads %d pieces"
177 " fixed parts %d fixed size %d chunk = "BUNFMT"\n",
178 getVarConstant(mb, getArg(target, 2)).val.sval,
179 getVarConstant(mb, getArg(target, 3)).val.sval,
180 rowcnt, row_size, m, threads, pieces, mito_parts, mito_size, rowcnt / pieces * row_size);
181 }
182
183 if (pieces <= 1)
184 goto bailout;
185
186 /* at this stage we have identified the #chunks to be used for the largest table */
187 limit = mb->stop;
188 slimit = mb->ssize;
189 if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0)
190 throw(MAL,"optimizer.mitosis", SQLSTATE(HY001) MAL_MALLOC_FAIL);
191 estimate = 0;
192
193 schema = getVarConstant(mb, getArg(target, 2)).val.sval;
194 table = getVarConstant(mb, getArg(target, 3)).val.sval;
195 for (i = 0; i < limit; i++) {
196 int upd = 0, qtpe, rtpe = 0, qv, rv;
197 InstrPtr matq, matr = NULL;
198 p = old[i];
199
200 if (getModuleId(p) != sqlRef ||
201 !(getFunctionId(p) == bindRef ||
202 getFunctionId(p) == bindidxRef ||
203 getFunctionId(p) == tidRef)) {
204 pushInstruction(mb, p);
205 continue;
206 }
207 /* don't split insert BATs */
208 if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) {
209 pushInstruction(mb, p);
210 continue;
211 }
212 r = getRowCnt(mb, getArg(p, 0));
213 if (r < rowcnt) {
214 pushInstruction(mb, p);
215 continue;
216 }
217 /* Don't split the (index) bat if we already have identified a range */
218 /* This will happen if we inline separately optimized routines */
219 if (p->argc > 7) {
220 pushInstruction(mb, p);
221 continue;
222 }
223 if (p->retc == 2)
224 upd = 1;
225 if( mt == -1)
226 mt = getMitosisPartition(p);
227 if (mt < 0 && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval) ||
228 strcmp(table, getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) {
229 pushInstruction(mb, p);
230 continue;
231 }
232 if (mt >= 0 && getMitosisPartition(p) != mt) {
233 pushInstruction(mb, p);
234 continue;
235 }
236 /* we keep the original bind operation, because it allows for
237 * easy undo when the mergtable can not do something */
238 pushInstruction(mb, p);
239
240 qtpe = getVarType(mb, getArg(p, 0));
241
242 matq = newInstruction(NULL, matRef, newRef);
243 getArg(matq, 0) = getArg(p, 0);
244
245 if (upd) {
246 matr = newInstruction(NULL, matRef, newRef);
247 getArg(matr, 0) = getArg(p, 1);
248 rtpe = getVarType(mb, getArg(p, 1));
249 }
250
251 for (j = 0; j < pieces; j++) {
252 q = copyInstruction(p);
253 if( q == NULL){
254 for (; i<limit; i++)
255 if (old[i])
256 pushInstruction(mb,old[i]);
257 GDKfree(old);
258 throw(MAL,"optimizer.mitosis", SQLSTATE(HY001) MAL_MALLOC_FAIL);
259 }
260 q = pushInt(mb, q, j);
261 q = pushInt(mb, q, pieces);
262
263 qv = getArg(q, 0) = newTmpVariable(mb, qtpe);
264 setVarUDFtype(mb, qv);
265 if (upd) {
266 rv = getArg(q, 1) = newTmpVariable(mb, rtpe);
267 setVarUDFtype(mb, rv);
268 }
269 pushInstruction(mb, q);
270 matq = pushArgument(mb, matq, qv);
271 if (upd)
272 matr = pushArgument(mb, matr, rv);
273 }
274 pushInstruction(mb, matq);
275 if (upd)
276 pushInstruction(mb, matr);
277 }
278 for (; i<slimit; i++)
279 if (old[i])
280 freeInstruction(old[i]);
281 GDKfree(old);
282
283 /* Defense line against incorrect plans */
284 if( 1){
285 chkTypes(cntxt->usermodule, mb, FALSE);
286 chkFlow(mb);
287 chkDeclarations(mb);
288 }
289 /* keep all actions taken as a post block comment */
290bailout:
291 usec = GDKusec()- usec;
292 snprintf(buf,256,"%-20s actions=1 time=" LLFMT " usec","mitosis", usec);
293 newComment(mb,buf);
294 addtoMalBlkHistory(mb);
295
296 if( OPTdebug & OPTmitosis){
297 fprintf(stderr, "#MITOSIS optimizer exit\n");
298 fprintFunction(stderr, mb, 0, LIST_MAL_ALL);
299 }
300 return msg;
301}
302