1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * (c) Martin Kersten
11 * This module provides an advisary lock manager for SQL transactions
12 * that prefer waiting over transaction failures due to OCC
13 * The table may only grow with lockable items
14 * It could be extended with a semaphore for queue management
15 */
16#include "monetdb_config.h"
17#include "oltp.h"
18#include "mtime.h"
19
20#define LOCKTIMEOUT (20 * 1000)
21#define LOCKDELAY 20
22
23typedef struct{
24 Client cntxt; // user holding the write lock
25 lng start; // time when it started
26 lng retention; // time when the lock is released
27 lng total; // accumulated lock time
28 int used; // how often it used, for balancing
29 int locked; // writelock set or not
30} OLTPlockRecord;
31
32static OLTPlockRecord oltp_locks[MAXOLTPLOCKS];
33static int oltp_delay;
34
35/*
36static void
37OLTPdump_(Client cntxt, str msg)
38{
39 int i;
40
41 mnstr_printf(cntxt->fdout,"%s",msg);
42 for(i=0; i< MAXOLTPLOCKS; i++)
43 if( oltp_locks[i].locked)
44 mnstr_printf(cntxt->fdout,"#[%i] %3d\n",i, (oltp_locks[i].cntxt ? oltp_locks[i].cntxt->idxx: -1));
45}
46*/
47
48str
49OLTPreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
50{ int i;
51 (void) cntxt;
52 (void) mb;
53 (void) stk;
54 (void) pci;
55
56 MT_lock_set(&mal_oltpLock);
57#ifdef _DEBUG_OLTP_
58 fprintf(stderr,"#OLTP %6d reset locktable\n", GDKms());
59#endif
60 for( i=0; i<MAXOLTPLOCKS; i++){
61 oltp_locks[i].locked = 0;
62 oltp_locks[i].cntxt = 0;
63 oltp_locks[i].start = 0;
64 oltp_locks[i].used = 0;
65 oltp_locks[i].retention = 0;
66 }
67 MT_lock_unset(&mal_oltpLock);
68 return MAL_SUCCEED;
69}
70
71str
72OLTPenable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
73{
74 (void) mb;
75 (void) stk;
76 (void) pci;
77 (void) cntxt;
78#ifdef _DEBUG_OLTP_
79 fprintf(stderr,"#OLTP %6d enabled\n",GDKms());
80#endif
81 oltp_delay = TRUE;
82 return MAL_SUCCEED;
83}
84
85str
86OLTPdisable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
87{
88 OLTPreset(cntxt, mb, stk,pci);
89 oltp_delay = FALSE;
90#ifdef _DEBUG_OLTP_
91 fprintf(stderr,"#OLTP %6d disabled\n",GDKms());
92#else
93 (void) cntxt;
94#endif
95 return MAL_SUCCEED;
96}
97
98str
99OLTPinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
100{
101 // nothing to be done right now
102 return OLTPreset(cntxt,mb,stk,pci);
103}
104
105// The locking is based in the hash-table.
106// It contains all write locks outstanding
107// A transaction may proceed if no element in its read set is locked
108
109str
110OLTPlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
111{
112 int i,lck;
113 int clk, wait= GDKms();
114 lng now;
115 str sql,cpy;
116
117 (void) stk;
118 if ( oltp_delay == FALSE )
119 return MAL_SUCCEED;
120
121#ifdef _DEBUG_OLTP_
122 fprintf(stderr,"#OLTP %6d lock request for client %d:", GDKms(), cntxt->idx);
123 fprintInstruction(stderr,mb,stk,pci, LIST_MAL_ALL);
124#endif
125 do{
126 MT_lock_set(&mal_oltpLock);
127 clk = GDKms();
128 now = GDKusec();
129 // check if all write locks are available
130 for( i=1; i< pci->argc; i++){
131 lck= getVarConstant(mb, getArg(pci,i)).val.ival;
132 if ( lck > 0 && (oltp_locks[lck].locked || oltp_locks[lck].retention > now ))
133 break;
134 }
135
136 if( i == pci->argc ){
137#ifdef _DEBUG_OLTP_
138 fprintf(stderr,"#OLTP %6d set lock for client %d\n", GDKms(), cntxt->idx);
139#endif
140 for( i=1; i< pci->argc; i++){
141 lck= getVarConstant(mb, getArg(pci,i)).val.ival;
142 // only set the write locks
143 if( lck > 0){
144 oltp_locks[lck].cntxt = cntxt;
145 oltp_locks[lck].start = now;
146 oltp_locks[lck].locked = 1;
147 oltp_locks[lck].retention = 0;
148 }
149 }
150 MT_lock_unset(&mal_oltpLock);
151 return MAL_SUCCEED;
152 } else {
153 MT_lock_unset(&mal_oltpLock);
154#ifdef _DEBUG_OLTP_
155 fprintf(stderr,"#OLTP %d delay imposed for client %d\n", GDKms(), cntxt->idx);
156#endif
157 MT_sleep_ms(LOCKDELAY);
158 }
159 } while( clk - wait < LOCKTIMEOUT);
160
161#ifdef _DEBUG_OLTP_
162 fprintf(stderr,"#OLTP %6d proceed query for client %d\n", GDKms(), cntxt->idx);
163#endif
164 // if the time out is related to a copy_from query, we should not start it either.
165 sql = getName("sql");
166 cpy = getName("copy_from");
167
168 for( i = 0; i < mb->stop; i++)
169 if( getFunctionId(getInstrPtr(mb,i)) == cpy && getModuleId(getInstrPtr(mb,i)) == sql ){
170#ifdef _DEBUG_OLTP_
171 fprintf(stderr,"#OLTP %6d bail out a concurrent copy into %d\n", GDKms(), cntxt->idx);
172#endif
173 throw(SQL,"oltp.lock","Conflicts with other write operations\n");
174 }
175 return MAL_SUCCEED;
176}
177
178str
179OLTPrelease(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
180{
181 int i,lck;
182 lng delay,clk;
183
184 (void) cntxt;
185 (void) stk;
186 if ( oltp_delay == FALSE )
187 return MAL_SUCCEED;
188
189 MT_lock_set(&mal_oltpLock);
190 clk = GDKusec();
191#ifdef _DEBUG_OLTP_
192 fprintf(stderr,"#OLTP %6d release the locks %d:", GDKms(), cntxt->idx);
193 fprintInstruction(stderr,mb,stk,pci, LIST_MAL_ALL);
194#endif
195 for( i=1; i< pci->argc; i++){
196 lck= getVarConstant(mb, getArg(pci,i)).val.ival;
197 if( lck > 0){
198 oltp_locks[lck].total += clk - oltp_locks[lck].start;
199 oltp_locks[lck].used ++;
200 oltp_locks[lck].cntxt = 0;
201 oltp_locks[lck].start = 0;
202 oltp_locks[lck].locked = 0;
203 delay = oltp_locks[lck].total/ oltp_locks[lck].used;
204 if( delay > LOCKDELAY || delay < LOCKDELAY/10)
205 delay = LOCKDELAY;
206 oltp_locks[lck].retention = clk + delay;
207#ifdef _DEBUG_OLTP_
208 fprintf(stderr,"#OLTP retention period for lock %d: "LLFMT"\n", lck,delay);
209#endif
210 }
211 }
212 MT_lock_unset(&mal_oltpLock);
213 return MAL_SUCCEED;
214}
215
216str
217OLTPtable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
218{
219 BAT *bs= NULL, *bu= NULL, *bl= NULL, *bq= NULL, *bc = NULL;
220 bat *started = getArgReference_bat(stk,pci,0);
221 bat *userid = getArgReference_bat(stk,pci,1);
222 bat *lockid = getArgReference_bat(stk,pci,2);
223 bat *used = getArgReference_bat(stk,pci,3);
224 int i;
225 str msg = MAL_SUCCEED;
226 timestamp tsn;
227
228 (void) cntxt;
229 (void) mb;
230
231 bs = COLnew(0, TYPE_timestamp, 0, TRANSIENT);
232 bu = COLnew(0, TYPE_str, 0, TRANSIENT);
233 bl = COLnew(0, TYPE_int, 0, TRANSIENT);
234 bc = COLnew(0, TYPE_int, 0, TRANSIENT);
235 bq = COLnew(0, TYPE_str, 0, TRANSIENT);
236
237 if( bs == NULL || bu == NULL || bl == NULL || bq == NULL || bc == NULL){
238 if( bs) BBPunfix(bs->batCacheid);
239 if( bl) BBPunfix(bl->batCacheid);
240 if( bu) BBPunfix(bu->batCacheid);
241 if( bc) BBPunfix(bc->batCacheid);
242 if( bq) BBPunfix(bq->batCacheid);
243 throw(MAL,"oltp.table", SQLSTATE(HY001) MAL_MALLOC_FAIL);
244 }
245 for( i = 0; msg == MAL_SUCCEED && i < MAXOLTPLOCKS; i++)
246 if (oltp_locks[i].used ){
247 tsn = oltp_locks[i].start ? timestamp_fromusec(oltp_locks[i].start) : timestamp_nil;
248 if (BUNappend(bs, &tsn, false) != GDK_SUCCEED ||
249 BUNappend(bu, oltp_locks[i].cntxt ? oltp_locks[i].cntxt->username : str_nil, false) != GDK_SUCCEED ||
250 BUNappend(bl, &i, false) != GDK_SUCCEED ||
251 BUNappend(bc, &oltp_locks[i].used, false) != GDK_SUCCEED)
252 goto bailout;
253 }
254 //OLTPdump_(cntxt,"#lock table\n");
255 BBPkeepref(*started = bs->batCacheid);
256 BBPkeepref(*userid = bu->batCacheid);
257 BBPkeepref(*lockid = bl->batCacheid);
258 BBPkeepref(*used = bc->batCacheid);
259 return msg;
260 bailout:
261 BBPunfix(bs->batCacheid);
262 BBPunfix(bl->batCacheid);
263 BBPunfix(bu->batCacheid);
264 BBPunfix(bc->batCacheid);
265 BBPunfix(bq->batCacheid);
266 return msg ? msg : createException(MAL, "oltp.table", SQLSTATE(HY001) MAL_MALLOC_FAIL);
267}
268
269str
270OLTPis_enabled(int *ret) {
271 *ret = oltp_delay;
272 return MAL_SUCCEED;
273}
274