| 1 | /* |
| 2 | * This Source Code Form is subject to the terms of the Mozilla Public |
| 3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
| 4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 5 | * |
| 6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
| 7 | */ |
| 8 | |
| 9 | #include "monetdb_config.h" |
| 10 | #include "opt_mitosis.h" |
| 11 | #include "mal_interpreter.h" |
| 12 | #include "gdk_utils.h" |
| 13 | |
| 14 | |
| 15 | str |
| 16 | OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p) |
| 17 | { |
| 18 | int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0, mito_size = 0, row_size = 0, mt = -1; |
| 19 | str schema = 0, table = 0; |
| 20 | BUN r = 0, rowcnt = 0; /* table should be sizeable to consider parallel execution*/ |
| 21 | InstrPtr q, *old, target = 0; |
| 22 | size_t argsize = 6 * sizeof(lng), m = 0; |
| 23 | /* per op: 6 = (2+1)*2 <= 2 args + 1 res, each with head & tail */ |
| 24 | int threads = GDKnr_threads ? GDKnr_threads : 1; |
| 25 | int activeClients; |
| 26 | char buf[256]; |
| 27 | lng usec = GDKusec(); |
| 28 | str msg = MAL_SUCCEED; |
| 29 | |
| 30 | //if ( optimizerIsApplied(mb,"mitosis") ) |
| 31 | //return 0; |
| 32 | (void) cntxt; |
| 33 | (void) stk; |
| 34 | |
| 35 | old = mb->stmt; |
| 36 | for (i = 1; i < mb->stop; i++) { |
| 37 | InstrPtr p = old[i]; |
| 38 | |
| 39 | if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef && |
| 40 | p->argc > 2 && getArgType(mb, p, 2) == TYPE_str && |
| 41 | isVarConstant(mb, getArg(p, 2)) && |
| 42 | getVarConstant(mb, getArg(p, 2)).val.sval != NULL && |
| 43 | (strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "PRIMARY KEY constraint" ) || |
| 44 | strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "UNIQUE constraint" ))) |
| 45 | goto bailout; |
| 46 | |
| 47 | /* mitosis/mergetable bailout conditions */ |
| 48 | |
| 49 | if (p->argc > 2 && getModuleId(p) == aggrRef && |
| 50 | getFunctionId(p) != subcountRef && |
| 51 | getFunctionId(p) != subminRef && |
| 52 | getFunctionId(p) != submaxRef && |
| 53 | getFunctionId(p) != subavgRef && |
| 54 | getFunctionId(p) != subsumRef && |
| 55 | getFunctionId(p) != subprodRef && |
| 56 | |
| 57 | getFunctionId(p) != countRef && |
| 58 | getFunctionId(p) != minRef && |
| 59 | getFunctionId(p) != maxRef && |
| 60 | getFunctionId(p) != avgRef && |
| 61 | getFunctionId(p) != sumRef && |
| 62 | getFunctionId(p) != prodRef) |
| 63 | goto bailout; |
| 64 | |
| 65 | /* do not split up floating point bat that is being summed */ |
| 66 | if (p->retc == 1 && |
| 67 | (((p->argc == 6 || p->argc == 7) && |
| 68 | getModuleId(p) == aggrRef && |
| 69 | getFunctionId(p) == subsumRef) || |
| 70 | (p->argc == 4 && |
| 71 | getModuleId(p) == aggrRef && |
| 72 | getFunctionId(p) == sumRef)) && |
| 73 | isaBatType(getArgType(mb, p, p->retc)) && |
| 74 | (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt || |
| 75 | getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)) |
| 76 | goto bailout; |
| 77 | |
| 78 | if (p->argc > 2 && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef || getModuleId(p) == pyapiRef || getModuleId(p) == pyapi3Ref) && |
| 79 | getFunctionId(p) == subeval_aggrRef) |
| 80 | goto bailout; |
| 81 | |
| 82 | /* Mergetable cannot handle intersect/except's for now */ |
| 83 | if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef) |
| 84 | goto bailout; |
| 85 | |
| 86 | /* locate the largest non-partitioned table */ |
| 87 | if (getModuleId(p) != sqlRef || (getFunctionId(p) != bindRef && getFunctionId(p) != bindidxRef)) |
| 88 | continue; |
| 89 | /* don't split insert BATs */ |
| 90 | if (getVarConstant(mb, getArg(p, 5)).val.ival == 1) |
| 91 | continue; |
| 92 | if (p->argc > 6) |
| 93 | continue; /* already partitioned */ |
| 94 | /* |
| 95 | * The SQL optimizer already collects the counts of the base |
| 96 | * table and passes them on as a row property. All pieces for a |
| 97 | * single subplan should ideally fit together. |
| 98 | */ |
| 99 | r = getRowCnt(mb, getArg(p, 0)); |
| 100 | if (r >= rowcnt) { |
| 101 | /* the rowsize depends on the column types, assume void-headed */ |
| 102 | row_size = ATOMsize(getBatType(getArgType(mb,p,0))); |
| 103 | rowcnt = r; |
| 104 | target = p; |
| 105 | estimate++; |
| 106 | r = 0; |
| 107 | } |
| 108 | } |
| 109 | if (target == 0) |
| 110 | goto bailout; |
| 111 | /* |
| 112 | * The number of pieces should be based on the footprint of the |
| 113 | * queryplan, such that preferrably it can be handled without |
| 114 | * swapping intermediates. For the time being we just go for pieces |
| 115 | * that fit into memory in isolation. A fictive rowcount is derived |
| 116 | * based on argument types, such that all pieces would fit into |
| 117 | * memory conveniently for processing. We attempt to use not more |
| 118 | * threads than strictly needed. |
| 119 | * Experience shows that the pieces should not be too small. |
| 120 | * If we should limit to |threads| is still an open issue. |
| 121 | * |
| 122 | * Take into account the number of client connections, |
| 123 | * because all user together are responsible for resource contentions |
| 124 | */ |
| 125 | activeClients = mb->activeClients = MCactiveClients(); |
| 126 | |
| 127 | if (cntxt->memorylimit){ |
| 128 | /* the new mitosis scheme uses a maximum chunck size in MB from the client context */ |
| 129 | m = (size_t) ((cntxt->memorylimit * 1024 *1024) / row_size); |
| 130 | pieces = (int) (rowcnt / m + (rowcnt - m * pieces > 0)); |
| 131 | } |
| 132 | if (cntxt->memorylimit == 0 || pieces <= 1){ |
| 133 | /* the old allocation scheme */ |
| 134 | m = GDK_mem_maxsize / argsize; |
| 135 | /* if data exceeds memory size, |
| 136 | * i.e., (rowcnt*argsize > GDK_mem_maxsize), |
| 137 | * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */ |
| 138 | assert(threads > 0); |
| 139 | assert(activeClients > 0); |
| 140 | if (rowcnt > m && m / threads / activeClients > 0) { |
| 141 | /* create |pieces| > |threads| partitions such that |
| 142 | * |threads| partitions at a time fit in memory, |
| 143 | * i.e., (threads*(rowcnt/pieces) <= m), |
| 144 | * i.e., (rowcnt/pieces <= m/threads), |
| 145 | * i.e., (pieces => rowcnt/(m/threads)) |
| 146 | * (assuming that (m > threads*MINPARTCNT)) */ |
| 147 | pieces = (int) (rowcnt / (m / threads / activeClients)) + 1; |
| 148 | } else if (rowcnt > MINPARTCNT) { |
| 149 | /* exploit parallelism, but ensure minimal partition size to |
| 150 | * limit overhead */ |
| 151 | pieces = (int) MIN(rowcnt / MINPARTCNT, (BUN) threads); |
| 152 | } |
| 153 | } |
| 154 | |
| 155 | /* when testing, always aim for full parallelism, but avoid |
| 156 | * empty pieces */ |
| 157 | FORCEMITODEBUG |
| 158 | if (pieces < threads) |
| 159 | pieces = (int) MIN((BUN) threads, rowcnt); |
| 160 | /* prevent plan explosion */ |
| 161 | if (pieces > MAXSLICES) |
| 162 | pieces = MAXSLICES; |
| 163 | /* to enable experimentation we introduce the option to set |
| 164 | * the number of parts required and/or the size of each chunk (in K) |
| 165 | */ |
| 166 | mito_parts = GDKgetenv_int("mito_parts" , 0); |
| 167 | if (mito_parts > 0) |
| 168 | pieces = mito_parts; |
| 169 | mito_size = GDKgetenv_int("mito_size" , 0); |
| 170 | if (mito_size > 0) |
| 171 | pieces = (int) ((rowcnt * row_size) / (mito_size * 1024)); |
| 172 | |
| 173 | if(OPTdebug & OPTmitosis){ |
| 174 | fprintf(stderr, "#opt_mitosis: target is %s.%s " |
| 175 | " with " BUNFMT " rows of size %d into %zu" |
| 176 | " rows/piece %d threads %d pieces" |
| 177 | " fixed parts %d fixed size %d chunk = " BUNFMT"\n" , |
| 178 | getVarConstant(mb, getArg(target, 2)).val.sval, |
| 179 | getVarConstant(mb, getArg(target, 3)).val.sval, |
| 180 | rowcnt, row_size, m, threads, pieces, mito_parts, mito_size, rowcnt / pieces * row_size); |
| 181 | } |
| 182 | |
| 183 | if (pieces <= 1) |
| 184 | goto bailout; |
| 185 | |
| 186 | /* at this stage we have identified the #chunks to be used for the largest table */ |
| 187 | limit = mb->stop; |
| 188 | slimit = mb->ssize; |
| 189 | if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0) |
| 190 | throw(MAL,"optimizer.mitosis" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
| 191 | estimate = 0; |
| 192 | |
| 193 | schema = getVarConstant(mb, getArg(target, 2)).val.sval; |
| 194 | table = getVarConstant(mb, getArg(target, 3)).val.sval; |
| 195 | for (i = 0; i < limit; i++) { |
| 196 | int upd = 0, qtpe, rtpe = 0, qv, rv; |
| 197 | InstrPtr matq, matr = NULL; |
| 198 | p = old[i]; |
| 199 | |
| 200 | if (getModuleId(p) != sqlRef || |
| 201 | !(getFunctionId(p) == bindRef || |
| 202 | getFunctionId(p) == bindidxRef || |
| 203 | getFunctionId(p) == tidRef)) { |
| 204 | pushInstruction(mb, p); |
| 205 | continue; |
| 206 | } |
| 207 | /* don't split insert BATs */ |
| 208 | if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) { |
| 209 | pushInstruction(mb, p); |
| 210 | continue; |
| 211 | } |
| 212 | r = getRowCnt(mb, getArg(p, 0)); |
| 213 | if (r < rowcnt) { |
| 214 | pushInstruction(mb, p); |
| 215 | continue; |
| 216 | } |
| 217 | /* Don't split the (index) bat if we already have identified a range */ |
| 218 | /* This will happen if we inline separately optimized routines */ |
| 219 | if (p->argc > 7) { |
| 220 | pushInstruction(mb, p); |
| 221 | continue; |
| 222 | } |
| 223 | if (p->retc == 2) |
| 224 | upd = 1; |
| 225 | if( mt == -1) |
| 226 | mt = getMitosisPartition(p); |
| 227 | if (mt < 0 && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval) || |
| 228 | strcmp(table, getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) { |
| 229 | pushInstruction(mb, p); |
| 230 | continue; |
| 231 | } |
| 232 | if (mt >= 0 && getMitosisPartition(p) != mt) { |
| 233 | pushInstruction(mb, p); |
| 234 | continue; |
| 235 | } |
| 236 | /* we keep the original bind operation, because it allows for |
| 237 | * easy undo when the mergtable can not do something */ |
| 238 | pushInstruction(mb, p); |
| 239 | |
| 240 | qtpe = getVarType(mb, getArg(p, 0)); |
| 241 | |
| 242 | matq = newInstruction(NULL, matRef, newRef); |
| 243 | getArg(matq, 0) = getArg(p, 0); |
| 244 | |
| 245 | if (upd) { |
| 246 | matr = newInstruction(NULL, matRef, newRef); |
| 247 | getArg(matr, 0) = getArg(p, 1); |
| 248 | rtpe = getVarType(mb, getArg(p, 1)); |
| 249 | } |
| 250 | |
| 251 | for (j = 0; j < pieces; j++) { |
| 252 | q = copyInstruction(p); |
| 253 | if( q == NULL){ |
| 254 | for (; i<limit; i++) |
| 255 | if (old[i]) |
| 256 | pushInstruction(mb,old[i]); |
| 257 | GDKfree(old); |
| 258 | throw(MAL,"optimizer.mitosis" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
| 259 | } |
| 260 | q = pushInt(mb, q, j); |
| 261 | q = pushInt(mb, q, pieces); |
| 262 | |
| 263 | qv = getArg(q, 0) = newTmpVariable(mb, qtpe); |
| 264 | setVarUDFtype(mb, qv); |
| 265 | if (upd) { |
| 266 | rv = getArg(q, 1) = newTmpVariable(mb, rtpe); |
| 267 | setVarUDFtype(mb, rv); |
| 268 | } |
| 269 | pushInstruction(mb, q); |
| 270 | matq = pushArgument(mb, matq, qv); |
| 271 | if (upd) |
| 272 | matr = pushArgument(mb, matr, rv); |
| 273 | } |
| 274 | pushInstruction(mb, matq); |
| 275 | if (upd) |
| 276 | pushInstruction(mb, matr); |
| 277 | } |
| 278 | for (; i<slimit; i++) |
| 279 | if (old[i]) |
| 280 | freeInstruction(old[i]); |
| 281 | GDKfree(old); |
| 282 | |
| 283 | /* Defense line against incorrect plans */ |
| 284 | if( 1){ |
| 285 | chkTypes(cntxt->usermodule, mb, FALSE); |
| 286 | chkFlow(mb); |
| 287 | chkDeclarations(mb); |
| 288 | } |
| 289 | /* keep all actions taken as a post block comment */ |
| 290 | bailout: |
| 291 | usec = GDKusec()- usec; |
| 292 | snprintf(buf,256,"%-20s actions=1 time=" LLFMT " usec" ,"mitosis" , usec); |
| 293 | newComment(mb,buf); |
| 294 | addtoMalBlkHistory(mb); |
| 295 | |
| 296 | if( OPTdebug & OPTmitosis){ |
| 297 | fprintf(stderr, "#MITOSIS optimizer exit\n" ); |
| 298 | fprintFunction(stderr, mb, 0, LIST_MAL_ALL); |
| 299 | } |
| 300 | return msg; |
| 301 | } |
| 302 | |