1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | /* |
10 | * (c) Martin Kersten |
11 | * This module provides an advisary lock manager for SQL transactions |
12 | * that prefer waiting over transaction failures due to OCC |
13 | * The table may only grow with lockable items |
14 | * It could be extended with a semaphore for queue management |
15 | */ |
16 | #include "monetdb_config.h" |
17 | #include "oltp.h" |
18 | #include "mtime.h" |
19 | |
20 | #define LOCKTIMEOUT (20 * 1000) |
21 | #define LOCKDELAY 20 |
22 | |
23 | typedef struct{ |
24 | Client cntxt; // user holding the write lock |
25 | lng start; // time when it started |
26 | lng retention; // time when the lock is released |
27 | lng total; // accumulated lock time |
28 | int used; // how often it used, for balancing |
29 | int locked; // writelock set or not |
30 | } OLTPlockRecord; |
31 | |
32 | static OLTPlockRecord oltp_locks[MAXOLTPLOCKS]; |
33 | static int oltp_delay; |
34 | |
35 | /* |
36 | static void |
37 | OLTPdump_(Client cntxt, str msg) |
38 | { |
39 | int i; |
40 | |
41 | mnstr_printf(cntxt->fdout,"%s",msg); |
42 | for(i=0; i< MAXOLTPLOCKS; i++) |
43 | if( oltp_locks[i].locked) |
44 | mnstr_printf(cntxt->fdout,"#[%i] %3d\n",i, (oltp_locks[i].cntxt ? oltp_locks[i].cntxt->idxx: -1)); |
45 | } |
46 | */ |
47 | |
48 | str |
49 | OLTPreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
50 | { int i; |
51 | (void) cntxt; |
52 | (void) mb; |
53 | (void) stk; |
54 | (void) pci; |
55 | |
56 | MT_lock_set(&mal_oltpLock); |
57 | #ifdef _DEBUG_OLTP_ |
58 | fprintf(stderr,"#OLTP %6d reset locktable\n" , GDKms()); |
59 | #endif |
60 | for( i=0; i<MAXOLTPLOCKS; i++){ |
61 | oltp_locks[i].locked = 0; |
62 | oltp_locks[i].cntxt = 0; |
63 | oltp_locks[i].start = 0; |
64 | oltp_locks[i].used = 0; |
65 | oltp_locks[i].retention = 0; |
66 | } |
67 | MT_lock_unset(&mal_oltpLock); |
68 | return MAL_SUCCEED; |
69 | } |
70 | |
71 | str |
72 | OLTPenable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
73 | { |
74 | (void) mb; |
75 | (void) stk; |
76 | (void) pci; |
77 | (void) cntxt; |
78 | #ifdef _DEBUG_OLTP_ |
79 | fprintf(stderr,"#OLTP %6d enabled\n" ,GDKms()); |
80 | #endif |
81 | oltp_delay = TRUE; |
82 | return MAL_SUCCEED; |
83 | } |
84 | |
85 | str |
86 | OLTPdisable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
87 | { |
88 | OLTPreset(cntxt, mb, stk,pci); |
89 | oltp_delay = FALSE; |
90 | #ifdef _DEBUG_OLTP_ |
91 | fprintf(stderr,"#OLTP %6d disabled\n" ,GDKms()); |
92 | #else |
93 | (void) cntxt; |
94 | #endif |
95 | return MAL_SUCCEED; |
96 | } |
97 | |
98 | str |
99 | OLTPinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
100 | { |
101 | // nothing to be done right now |
102 | return OLTPreset(cntxt,mb,stk,pci); |
103 | } |
104 | |
105 | // The locking is based in the hash-table. |
106 | // It contains all write locks outstanding |
107 | // A transaction may proceed if no element in its read set is locked |
108 | |
109 | str |
110 | OLTPlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
111 | { |
112 | int i,lck; |
113 | int clk, wait= GDKms(); |
114 | lng now; |
115 | str sql,cpy; |
116 | |
117 | (void) stk; |
118 | if ( oltp_delay == FALSE ) |
119 | return MAL_SUCCEED; |
120 | |
121 | #ifdef _DEBUG_OLTP_ |
122 | fprintf(stderr,"#OLTP %6d lock request for client %d:" , GDKms(), cntxt->idx); |
123 | fprintInstruction(stderr,mb,stk,pci, LIST_MAL_ALL); |
124 | #endif |
125 | do{ |
126 | MT_lock_set(&mal_oltpLock); |
127 | clk = GDKms(); |
128 | now = GDKusec(); |
129 | // check if all write locks are available |
130 | for( i=1; i< pci->argc; i++){ |
131 | lck= getVarConstant(mb, getArg(pci,i)).val.ival; |
132 | if ( lck > 0 && (oltp_locks[lck].locked || oltp_locks[lck].retention > now )) |
133 | break; |
134 | } |
135 | |
136 | if( i == pci->argc ){ |
137 | #ifdef _DEBUG_OLTP_ |
138 | fprintf(stderr,"#OLTP %6d set lock for client %d\n" , GDKms(), cntxt->idx); |
139 | #endif |
140 | for( i=1; i< pci->argc; i++){ |
141 | lck= getVarConstant(mb, getArg(pci,i)).val.ival; |
142 | // only set the write locks |
143 | if( lck > 0){ |
144 | oltp_locks[lck].cntxt = cntxt; |
145 | oltp_locks[lck].start = now; |
146 | oltp_locks[lck].locked = 1; |
147 | oltp_locks[lck].retention = 0; |
148 | } |
149 | } |
150 | MT_lock_unset(&mal_oltpLock); |
151 | return MAL_SUCCEED; |
152 | } else { |
153 | MT_lock_unset(&mal_oltpLock); |
154 | #ifdef _DEBUG_OLTP_ |
155 | fprintf(stderr,"#OLTP %d delay imposed for client %d\n" , GDKms(), cntxt->idx); |
156 | #endif |
157 | MT_sleep_ms(LOCKDELAY); |
158 | } |
159 | } while( clk - wait < LOCKTIMEOUT); |
160 | |
161 | #ifdef _DEBUG_OLTP_ |
162 | fprintf(stderr,"#OLTP %6d proceed query for client %d\n" , GDKms(), cntxt->idx); |
163 | #endif |
164 | // if the time out is related to a copy_from query, we should not start it either. |
165 | sql = getName("sql" ); |
166 | cpy = getName("copy_from" ); |
167 | |
168 | for( i = 0; i < mb->stop; i++) |
169 | if( getFunctionId(getInstrPtr(mb,i)) == cpy && getModuleId(getInstrPtr(mb,i)) == sql ){ |
170 | #ifdef _DEBUG_OLTP_ |
171 | fprintf(stderr,"#OLTP %6d bail out a concurrent copy into %d\n" , GDKms(), cntxt->idx); |
172 | #endif |
173 | throw(SQL,"oltp.lock" ,"Conflicts with other write operations\n" ); |
174 | } |
175 | return MAL_SUCCEED; |
176 | } |
177 | |
178 | str |
179 | OLTPrelease(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
180 | { |
181 | int i,lck; |
182 | lng delay,clk; |
183 | |
184 | (void) cntxt; |
185 | (void) stk; |
186 | if ( oltp_delay == FALSE ) |
187 | return MAL_SUCCEED; |
188 | |
189 | MT_lock_set(&mal_oltpLock); |
190 | clk = GDKusec(); |
191 | #ifdef _DEBUG_OLTP_ |
192 | fprintf(stderr,"#OLTP %6d release the locks %d:" , GDKms(), cntxt->idx); |
193 | fprintInstruction(stderr,mb,stk,pci, LIST_MAL_ALL); |
194 | #endif |
195 | for( i=1; i< pci->argc; i++){ |
196 | lck= getVarConstant(mb, getArg(pci,i)).val.ival; |
197 | if( lck > 0){ |
198 | oltp_locks[lck].total += clk - oltp_locks[lck].start; |
199 | oltp_locks[lck].used ++; |
200 | oltp_locks[lck].cntxt = 0; |
201 | oltp_locks[lck].start = 0; |
202 | oltp_locks[lck].locked = 0; |
203 | delay = oltp_locks[lck].total/ oltp_locks[lck].used; |
204 | if( delay > LOCKDELAY || delay < LOCKDELAY/10) |
205 | delay = LOCKDELAY; |
206 | oltp_locks[lck].retention = clk + delay; |
207 | #ifdef _DEBUG_OLTP_ |
208 | fprintf(stderr,"#OLTP retention period for lock %d: " LLFMT"\n" , lck,delay); |
209 | #endif |
210 | } |
211 | } |
212 | MT_lock_unset(&mal_oltpLock); |
213 | return MAL_SUCCEED; |
214 | } |
215 | |
216 | str |
217 | OLTPtable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) |
218 | { |
219 | BAT *bs= NULL, *bu= NULL, *bl= NULL, *bq= NULL, *bc = NULL; |
220 | bat *started = getArgReference_bat(stk,pci,0); |
221 | bat *userid = getArgReference_bat(stk,pci,1); |
222 | bat *lockid = getArgReference_bat(stk,pci,2); |
223 | bat *used = getArgReference_bat(stk,pci,3); |
224 | int i; |
225 | str msg = MAL_SUCCEED; |
226 | timestamp tsn; |
227 | |
228 | (void) cntxt; |
229 | (void) mb; |
230 | |
231 | bs = COLnew(0, TYPE_timestamp, 0, TRANSIENT); |
232 | bu = COLnew(0, TYPE_str, 0, TRANSIENT); |
233 | bl = COLnew(0, TYPE_int, 0, TRANSIENT); |
234 | bc = COLnew(0, TYPE_int, 0, TRANSIENT); |
235 | bq = COLnew(0, TYPE_str, 0, TRANSIENT); |
236 | |
237 | if( bs == NULL || bu == NULL || bl == NULL || bq == NULL || bc == NULL){ |
238 | if( bs) BBPunfix(bs->batCacheid); |
239 | if( bl) BBPunfix(bl->batCacheid); |
240 | if( bu) BBPunfix(bu->batCacheid); |
241 | if( bc) BBPunfix(bc->batCacheid); |
242 | if( bq) BBPunfix(bq->batCacheid); |
243 | throw(MAL,"oltp.table" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
244 | } |
245 | for( i = 0; msg == MAL_SUCCEED && i < MAXOLTPLOCKS; i++) |
246 | if (oltp_locks[i].used ){ |
247 | tsn = oltp_locks[i].start ? timestamp_fromusec(oltp_locks[i].start) : timestamp_nil; |
248 | if (BUNappend(bs, &tsn, false) != GDK_SUCCEED || |
249 | BUNappend(bu, oltp_locks[i].cntxt ? oltp_locks[i].cntxt->username : str_nil, false) != GDK_SUCCEED || |
250 | BUNappend(bl, &i, false) != GDK_SUCCEED || |
251 | BUNappend(bc, &oltp_locks[i].used, false) != GDK_SUCCEED) |
252 | goto bailout; |
253 | } |
254 | //OLTPdump_(cntxt,"#lock table\n"); |
255 | BBPkeepref(*started = bs->batCacheid); |
256 | BBPkeepref(*userid = bu->batCacheid); |
257 | BBPkeepref(*lockid = bl->batCacheid); |
258 | BBPkeepref(*used = bc->batCacheid); |
259 | return msg; |
260 | bailout: |
261 | BBPunfix(bs->batCacheid); |
262 | BBPunfix(bl->batCacheid); |
263 | BBPunfix(bu->batCacheid); |
264 | BBPunfix(bc->batCacheid); |
265 | BBPunfix(bq->batCacheid); |
266 | return msg ? msg : createException(MAL, "oltp.table" , SQLSTATE(HY001) MAL_MALLOC_FAIL); |
267 | } |
268 | |
269 | str |
270 | OLTPis_enabled(int *ret) { |
271 | *ret = oltp_delay; |
272 | return MAL_SUCCEED; |
273 | } |
274 | |