| 1 | /* |
| 2 | * This Source Code Form is subject to the terms of the Mozilla Public |
| 3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
| 4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 5 | * |
| 6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
| 7 | */ |
| 8 | |
| 9 | /* |
| 10 | * (c) Martin Kersten |
| 11 | * This module provides an advisary lock manager for SQL transactions |
| 12 | * that prefer waiting over transaction failures due to OCC |
| 13 | * The table may only grow with lockable items |
| 14 | * It could be extended with a semaphore for queue management |
| 15 | */ |
| 16 | #include "monetdb_config.h" |
| 17 | #include "oltp.h" |
| 18 | #include "mtime.h" |
| 19 | |
| 20 | #define LOCKTIMEOUT (20 * 1000) |
| 21 | #define LOCKDELAY 20 |
| 22 | |
| 23 | typedef struct{ |
| 24 | Client cntxt; // user holding the write lock |
| 25 | lng start; // time when it started |
| 26 | lng retention; // time when the lock is released |
| 27 | lng total; // accumulated lock time |
| 28 | int used; // how often it used, for balancing |
| 29 | int locked; // writelock set or not |
| 30 | } OLTPlockRecord; |
| 31 | |
| 32 | static OLTPlockRecord oltp_locks[MAXOLTPLOCKS]; |
| 33 | static int oltp_delay; |
| 34 | |
| 35 | /* |
| 36 | static void |
| 37 | OLTPdump_(Client cntxt, str msg) |
| 38 | { |
| 39 | int i; |
| 40 | |
| 41 | mnstr_printf(cntxt->fdout,"%s",msg); |
| 42 | for(i=0; i< MAXOLTPLOCKS; i++) |
| 43 | if( oltp_locks[i].locked) |
| 44 | mnstr_printf(cntxt->fdout,"#[%i] %3d\n",i, (oltp_locks[i].cntxt ? oltp_locks[i].cntxt->idxx: -1)); |
| 45 | } |
| 46 | */ |
| 47 | |
| 48 | str |
| 49 | OLTPreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 50 | { int i; |
| 51 | (void) cntxt; |
| 52 | (void) mb; |
| 53 | (void) stk; |
| 54 | (void) pci; |
| 55 | |
| 56 | MT_lock_set(&mal_oltpLock); |
| 57 | #ifdef _DEBUG_OLTP_ |
| 58 | fprintf(stderr,"#OLTP %6d reset locktable\n" , GDKms()); |
| 59 | #endif |
| 60 | for( i=0; i<MAXOLTPLOCKS; i++){ |
| 61 | oltp_locks[i].locked = 0; |
| 62 | oltp_locks[i].cntxt = 0; |
| 63 | oltp_locks[i].start = 0; |
| 64 | oltp_locks[i].used = 0; |
| 65 | oltp_locks[i].retention = 0; |
| 66 | } |
| 67 | MT_lock_unset(&mal_oltpLock); |
| 68 | return MAL_SUCCEED; |
| 69 | } |
| 70 | |
| 71 | str |
| 72 | OLTPenable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 73 | { |
| 74 | (void) mb; |
| 75 | (void) stk; |
| 76 | (void) pci; |
| 77 | (void) cntxt; |
| 78 | #ifdef _DEBUG_OLTP_ |
| 79 | fprintf(stderr,"#OLTP %6d enabled\n" ,GDKms()); |
| 80 | #endif |
| 81 | oltp_delay = TRUE; |
| 82 | return MAL_SUCCEED; |
| 83 | } |
| 84 | |
| 85 | str |
| 86 | OLTPdisable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 87 | { |
| 88 | OLTPreset(cntxt, mb, stk,pci); |
| 89 | oltp_delay = FALSE; |
| 90 | #ifdef _DEBUG_OLTP_ |
| 91 | fprintf(stderr,"#OLTP %6d disabled\n" ,GDKms()); |
| 92 | #else |
| 93 | (void) cntxt; |
| 94 | #endif |
| 95 | return MAL_SUCCEED; |
| 96 | } |
| 97 | |
| 98 | str |
| 99 | OLTPinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 100 | { |
| 101 | // nothing to be done right now |
| 102 | return OLTPreset(cntxt,mb,stk,pci); |
| 103 | } |
| 104 | |
| 105 | // The locking is based in the hash-table. |
| 106 | // It contains all write locks outstanding |
| 107 | // A transaction may proceed if no element in its read set is locked |
| 108 | |
| 109 | str |
| 110 | OLTPlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 111 | { |
| 112 | int i,lck; |
| 113 | int clk, wait= GDKms(); |
| 114 | lng now; |
| 115 | str sql,cpy; |
| 116 | |
| 117 | (void) stk; |
| 118 | if ( oltp_delay == FALSE ) |
| 119 | return MAL_SUCCEED; |
| 120 | |
| 121 | #ifdef _DEBUG_OLTP_ |
| 122 | fprintf(stderr,"#OLTP %6d lock request for client %d:" , GDKms(), cntxt->idx); |
| 123 | fprintInstruction(stderr,mb,stk,pci, LIST_MAL_ALL); |
| 124 | #endif |
| 125 | do{ |
| 126 | MT_lock_set(&mal_oltpLock); |
| 127 | clk = GDKms(); |
| 128 | now = GDKusec(); |
| 129 | // check if all write locks are available |
| 130 | for( i=1; i< pci->argc; i++){ |
| 131 | lck= getVarConstant(mb, getArg(pci,i)).val.ival; |
| 132 | if ( lck > 0 && (oltp_locks[lck].locked || oltp_locks[lck].retention > now )) |
| 133 | break; |
| 134 | } |
| 135 | |
| 136 | if( i == pci->argc ){ |
| 137 | #ifdef _DEBUG_OLTP_ |
| 138 | fprintf(stderr,"#OLTP %6d set lock for client %d\n" , GDKms(), cntxt->idx); |
| 139 | #endif |
| 140 | for( i=1; i< pci->argc; i++){ |
| 141 | lck= getVarConstant(mb, getArg(pci,i)).val.ival; |
| 142 | // only set the write locks |
| 143 | if( lck > 0){ |
| 144 | oltp_locks[lck].cntxt = cntxt; |
| 145 | oltp_locks[lck].start = now; |
| 146 | oltp_locks[lck].locked = 1; |
| 147 | oltp_locks[lck].retention = 0; |
| 148 | } |
| 149 | } |
| 150 | MT_lock_unset(&mal_oltpLock); |
| 151 | return MAL_SUCCEED; |
| 152 | } else { |
| 153 | MT_lock_unset(&mal_oltpLock); |
| 154 | #ifdef _DEBUG_OLTP_ |
| 155 | fprintf(stderr,"#OLTP %d delay imposed for client %d\n" , GDKms(), cntxt->idx); |
| 156 | #endif |
| 157 | MT_sleep_ms(LOCKDELAY); |
| 158 | } |
| 159 | } while( clk - wait < LOCKTIMEOUT); |
| 160 | |
| 161 | #ifdef _DEBUG_OLTP_ |
| 162 | fprintf(stderr,"#OLTP %6d proceed query for client %d\n" , GDKms(), cntxt->idx); |
| 163 | #endif |
| 164 | // if the time out is related to a copy_from query, we should not start it either. |
| 165 | sql = getName("sql" ); |
| 166 | cpy = getName("copy_from" ); |
| 167 | |
| 168 | for( i = 0; i < mb->stop; i++) |
| 169 | if( getFunctionId(getInstrPtr(mb,i)) == cpy && getModuleId(getInstrPtr(mb,i)) == sql ){ |
| 170 | #ifdef _DEBUG_OLTP_ |
| 171 | fprintf(stderr,"#OLTP %6d bail out a concurrent copy into %d\n" , GDKms(), cntxt->idx); |
| 172 | #endif |
| 173 | throw(SQL,"oltp.lock" ,"Conflicts with other write operations\n" ); |
| 174 | } |
| 175 | return MAL_SUCCEED; |
| 176 | } |
| 177 | |
| 178 | str |
| 179 | OLTPrelease(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 180 | { |
| 181 | int i,lck; |
| 182 | lng delay,clk; |
| 183 | |
| 184 | (void) cntxt; |
| 185 | (void) stk; |
| 186 | if ( oltp_delay == FALSE ) |
| 187 | return MAL_SUCCEED; |
| 188 | |
| 189 | MT_lock_set(&mal_oltpLock); |
| 190 | clk = GDKusec(); |
| 191 | #ifdef _DEBUG_OLTP_ |
| 192 | fprintf(stderr,"#OLTP %6d release the locks %d:" , GDKms(), cntxt->idx); |
| 193 | fprintInstruction(stderr,mb,stk,pci, LIST_MAL_ALL); |
| 194 | #endif |
| 195 | for( i=1; i< pci->argc; i++){ |
| 196 | lck= getVarConstant(mb, getArg(pci,i)).val.ival; |
| 197 | if( lck > 0){ |
| 198 | oltp_locks[lck].total += clk - oltp_locks[lck].start; |
| 199 | oltp_locks[lck].used ++; |
| 200 | oltp_locks[lck].cntxt = 0; |
| 201 | oltp_locks[lck].start = 0; |
| 202 | oltp_locks[lck].locked = 0; |
| 203 | delay = oltp_locks[lck].total/ oltp_locks[lck].used; |
| 204 | if( delay > LOCKDELAY || delay < LOCKDELAY/10) |
| 205 | delay = LOCKDELAY; |
| 206 | oltp_locks[lck].retention = clk + delay; |
| 207 | #ifdef _DEBUG_OLTP_ |
| 208 | fprintf(stderr,"#OLTP retention period for lock %d: " LLFMT"\n" , lck,delay); |
| 209 | #endif |
| 210 | } |
| 211 | } |
| 212 | MT_lock_unset(&mal_oltpLock); |
| 213 | return MAL_SUCCEED; |
| 214 | } |
| 215 | |
| 216 | str |
| 217 | OLTPtable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
| 218 | { |
| 219 | BAT *bs= NULL, *bu= NULL, *bl= NULL, *bq= NULL, *bc = NULL; |
| 220 | bat *started = getArgReference_bat(stk,pci,0); |
| 221 | bat *userid = getArgReference_bat(stk,pci,1); |
| 222 | bat *lockid = getArgReference_bat(stk,pci,2); |
| 223 | bat *used = getArgReference_bat(stk,pci,3); |
| 224 | int i; |
| 225 | str msg = MAL_SUCCEED; |
| 226 | timestamp tsn; |
| 227 | |
| 228 | (void) cntxt; |
| 229 | (void) mb; |
| 230 | |
| 231 | bs = COLnew(0, TYPE_timestamp, 0, TRANSIENT); |
| 232 | bu = COLnew(0, TYPE_str, 0, TRANSIENT); |
| 233 | bl = COLnew(0, TYPE_int, 0, TRANSIENT); |
| 234 | bc = COLnew(0, TYPE_int, 0, TRANSIENT); |
| 235 | bq = COLnew(0, TYPE_str, 0, TRANSIENT); |
| 236 | |
| 237 | if( bs == NULL || bu == NULL || bl == NULL || bq == NULL || bc == NULL){ |
| 238 | if( bs) BBPunfix(bs->batCacheid); |
| 239 | if( bl) BBPunfix(bl->batCacheid); |
| 240 | if( bu) BBPunfix(bu->batCacheid); |
| 241 | if( bc) BBPunfix(bc->batCacheid); |
| 242 | if( bq) BBPunfix(bq->batCacheid); |
| 243 | throw(MAL,"oltp.table" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
| 244 | } |
| 245 | for( i = 0; msg == MAL_SUCCEED && i < MAXOLTPLOCKS; i++) |
| 246 | if (oltp_locks[i].used ){ |
| 247 | tsn = oltp_locks[i].start ? timestamp_fromusec(oltp_locks[i].start) : timestamp_nil; |
| 248 | if (BUNappend(bs, &tsn, false) != GDK_SUCCEED || |
| 249 | BUNappend(bu, oltp_locks[i].cntxt ? oltp_locks[i].cntxt->username : str_nil, false) != GDK_SUCCEED || |
| 250 | BUNappend(bl, &i, false) != GDK_SUCCEED || |
| 251 | BUNappend(bc, &oltp_locks[i].used, false) != GDK_SUCCEED) |
| 252 | goto bailout; |
| 253 | } |
| 254 | //OLTPdump_(cntxt,"#lock table\n"); |
| 255 | BBPkeepref(*started = bs->batCacheid); |
| 256 | BBPkeepref(*userid = bu->batCacheid); |
| 257 | BBPkeepref(*lockid = bl->batCacheid); |
| 258 | BBPkeepref(*used = bc->batCacheid); |
| 259 | return msg; |
| 260 | bailout: |
| 261 | BBPunfix(bs->batCacheid); |
| 262 | BBPunfix(bl->batCacheid); |
| 263 | BBPunfix(bu->batCacheid); |
| 264 | BBPunfix(bc->batCacheid); |
| 265 | BBPunfix(bq->batCacheid); |
| 266 | return msg ? msg : createException(MAL, "oltp.table" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
| 267 | } |
| 268 | |
| 269 | str |
| 270 | OLTPis_enabled(int *ret) { |
| 271 | *ret = oltp_delay; |
| 272 | return MAL_SUCCEED; |
| 273 | } |
| 274 | |