1/* QQ: TODO - allocate everything from dynarrays !!! (benchmark) */
2/* QQ: TODO instant duration locks */
3/* QQ: #warning automatically place S instead of LS if possible */
4
5/* Copyright (C) 2006 MySQL AB
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
19
20/*
21 Generic Lock Manager
22
23 Lock manager handles locks on "resources", a resource must be uniquely
24 identified by a 64-bit number. Lock manager itself does not imply
25 anything about the nature of a resource - it can be a row, a table, a
26 database, or just anything.
27
28 Locks belong to "lock owners". A Lock owner is uniquely identified by a
29 16-bit number. A function loid2lo must be provided by the application
30 that takes such a number as an argument and returns a LOCK_OWNER
31 structure.
32
33 Lock levels are completely defined by three tables. Lock compatibility
34 matrix specifies which locks can be held at the same time on a resource.
35 Lock combining matrix specifies what lock level has the same behaviour as
36 a pair of two locks of given levels. getlock_result matrix simplifies
37 intention locking and lock escalation for an application, basically it
38 defines which locks are intention locks and which locks are "loose"
39 locks. It is only used to provide better diagnostics for the
40 application, lock manager itself does not differentiate between normal,
41 intention, and loose locks.
42
43 Internally lock manager is based on a lock-free hash, see lf_hash.c for
44 details. All locks are stored in a hash, with a resource id as a search
45 key, so all locks for the same resource will be considered collisions and
46 will be put in a one (lock-free) linked list. The main lock-handling
47 logic is in the inner loop that searches for a lock in such a linked
48 list - lockfind().
49
50 This works as follows. Locks generally are added to the end of the list
51 (with one exception, see below). When scanning the list it is always
52 possible to determine what locks are granted (active) and what locks are
53 waiting - first lock is obviously active, the second is active if it's
54 compatible with the first, and so on, a lock is active if it's compatible
55 with all previous locks and all locks before it are also active.
56 To calculate the "compatible with all previous locks" all locks are
57 accumulated in prev_lock variable using lock_combining_matrix.
58
59 Lock upgrades: when a thread that has a lock on a given resource,
60 requests a new lock on the same resource and the old lock is not enough
61 to satisfy new lock requirements (which is defined by
62 lock_combining_matrix[old_lock][new_lock] != old_lock), a new lock is
63 placed in the list. Depending on other locks it is immediately active or
64 it will wait for other locks. Here's an exception to "locks are added
65 to the end" rule - upgraded locks are added after the last active lock
66 but before all waiting locks. Old lock (the one we upgraded from) is
67 not removed from the list, indeed it may be needed if the new lock was
68 in a savepoint that gets rolled back. So old lock is marked as "ignored"
69 (IGNORE_ME flag). New lock gets an UPGRADED flag.
70
71 Loose locks add an important exception to the above. Loose locks do not
72 always commute with other locks. In the list IX-LS both locks are active,
73 while in the LS-IX list only the first lock is active. This creates a
74 problem in lock upgrades. If the list was IX-LS and the owner of the
75 first lock wants to place LS lock (which can be immediately granted), the
76 IX lock is upgraded to LSIX and the list becomes IX-LS-LSIX, which,
77 according to the lock compatibility matrix means that the last lock is
78 waiting - of course it all happened because IX and LS were swapped and
79 they don't commute. To work around this there's ACTIVE flag which is set
80 in every lock that never waited (was placed active), and this flag
81 overrides "compatible with all previous locks" rule.
82
83 When a lock is placed to the end of the list it's either compatible with
84 all locks and all locks are active - new lock becomes active at once, or
85 it conflicts with some of the locks, in this case in the 'blocker'
86 variable a conflicting lock is returned and the calling thread waits on a
87 pthread condition in the LOCK_OWNER structure of the owner of the
88 conflicting lock. Or a new lock is compatible with all locks, but some
89 existing locks are not compatible with each other (example: request IS,
90 when the list is S-IX) - that is not all locks are active. In this case a
91 first waiting lock is returned in the 'blocker' variable, lockman_getlock()
92 notices that a "blocker" does not conflict with the requested lock, and
93 "dereferences" it, to find the lock that it's waiting on. The calling
94 thread than begins to wait on the same lock.
95
96 To better support table-row relations where one needs to lock the table
97 with an intention lock before locking the row, extended diagnostics is
98 provided. When an intention lock (presumably on a table) is granted,
99 lockman_getlock() returns one of GOT_THE_LOCK (no need to lock the row,
100 perhaps the thread already has a normal lock on this table),
101 GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE (need to lock the row, as usual),
102 GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE (only need to check
103 whether it's possible to lock the row, but no need to lock it - perhaps
104 the thread has a loose lock on this table). This is defined by
105 getlock_result[] table.
106*/
107
108#include <my_global.h>
109#include <my_sys.h>
110#include <my_bit.h>
111#include <lf.h>
112#include "lockman.h"
113
114/*
115 Lock compatibility matrix.
116
117 It's asymmetric. Read it as "Somebody has the lock <value in the row
118 label>, can I set the lock <value in the column label> ?"
119
120 ') Though you can take LS lock while somebody has S lock, it makes no
121 sense - it's simpler to take S lock too.
122
123 1 - compatible
124 0 - incompatible
125 -1 - "impossible", so that we can assert the impossibility.
126*/
127static int lock_compatibility_matrix[10][10]=
128{ /* N S X IS IX SIX LS LX SLX LSIX */
129 { -1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }, /* N */
130 { -1, 1, 0, 1, 0, 0, 1, 0, 0, 0 }, /* S */
131 { -1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, /* X */
132 { -1, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, /* IS */
133 { -1, 0, 0, 1, 1, 0, 1, 1, 0, 1 }, /* IX */
134 { -1, 0, 0, 1, 0, 0, 1, 0, 0, 0 }, /* SIX */
135 { -1, 1, 0, 1, 0, 0, 1, 0, 0, 0 }, /* LS */
136 { -1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, /* LX */
137 { -1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, /* SLX */
138 { -1, 0, 0, 1, 0, 0, 1, 0, 0, 0 } /* LSIX */
139};
140
141/*
142 Lock combining matrix.
143
144 It's symmetric. Read it as "what lock level L is identical to the
145 set of two locks A and B"
146
147 One should never get N from it, we assert the impossibility
148*/
149static enum lockman_lock_type lock_combining_matrix[10][10]=
150{/* N S X IS IX SIX LS LX SLX LSIX */
151 { N, S, X, IS, IX, SIX, S, SLX, SLX, SIX}, /* N */
152 { S, S, X, S, SIX, SIX, S, SLX, SLX, SIX}, /* S */
153 { X, X, X, X, X, X, X, X, X, X}, /* X */
154 { IS, S, X, IS, IX, SIX, LS, LX, SLX, LSIX}, /* IS */
155 { IX, SIX, X, IX, IX, SIX, LSIX, LX, SLX, LSIX}, /* IX */
156 { SIX, SIX, X, SIX, SIX, SIX, SIX, SLX, SLX, SIX}, /* SIX */
157 { LS, S, X, LS, LSIX, SIX, LS, LX, SLX, LSIX}, /* LS */
158 { LX, SLX, X, LX, LX, SLX, LX, LX, SLX, LX}, /* LX */
159 { SLX, SLX, X, SLX, SLX, SLX, SLX, SLX, SLX, SLX}, /* SLX */
160 { LSIX, SIX, X, LSIX, LSIX, SIX, LSIX, LX, SLX, LSIX} /* LSIX */
161};
162
163#define REPEAT_ONCE_MORE 0
164#define OK_TO_PLACE_THE_LOCK 1
165#define OK_TO_PLACE_THE_REQUEST 2
166#define ALREADY_HAVE_THE_LOCK 4
167#define ALREADY_HAVE_THE_REQUEST 8
168#define PLACE_NEW_DISABLE_OLD 16
169#define REQUEST_NEW_DISABLE_OLD 32
170#define RESOURCE_WAS_UNLOCKED 64
171
172#define NEED_TO_WAIT (OK_TO_PLACE_THE_REQUEST | ALREADY_HAVE_THE_REQUEST |\
173 REQUEST_NEW_DISABLE_OLD)
174#define ALREADY_HAVE (ALREADY_HAVE_THE_LOCK | ALREADY_HAVE_THE_REQUEST)
175#define LOCK_UPGRADE (PLACE_NEW_DISABLE_OLD | REQUEST_NEW_DISABLE_OLD)
176
177
178/*
179 the return codes for lockman_getlock
180
181 It's asymmetric. Read it as "I have the lock <value in the row label>,
182 what value should be returned for <value in the column label> ?"
183
184 0 means impossible combination (assert!)
185
186 Defines below help to preserve the table structure.
187 I/L/A values are self explanatory
188 x means the combination is possible (assert should not crash)
189 but it cannot happen in row locks, only in table locks (S,X),
190 or lock escalations (LS,LX)
191*/
192#define I GOT_THE_LOCK_NEED_TO_LOCK_A_SUBRESOURCE
193#define L GOT_THE_LOCK_NEED_TO_INSTANT_LOCK_A_SUBRESOURCE
194#define A GOT_THE_LOCK
195#define x GOT_THE_LOCK
196static enum lockman_getlock_result getlock_result[10][10]=
197{/* N S X IS IX SIX LS LX SLX LSIX */
198 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* N */
199 { 0, x, 0, A, 0, 0, x, 0, 0, 0}, /* S */
200 { 0, x, x, A, A, 0, x, x, 0, 0}, /* X */
201 { 0, 0, 0, I, 0, 0, 0, 0, 0, 0}, /* IS */
202 { 0, 0, 0, I, I, 0, 0, 0, 0, 0}, /* IX */
203 { 0, x, 0, A, I, 0, x, 0, 0, 0}, /* SIX */
204 { 0, 0, 0, L, 0, 0, x, 0, 0, 0}, /* LS */
205 { 0, 0, 0, L, L, 0, x, x, 0, 0}, /* LX */
206 { 0, x, 0, A, L, 0, x, x, 0, 0}, /* SLX */
207 { 0, 0, 0, L, I, 0, x, 0, 0, 0} /* LSIX */
208};
209#undef I
210#undef L
211#undef A
212#undef x
213
214typedef struct lockman_lock {
215 uint64 resource;
216 struct lockman_lock *lonext;
217 intptr volatile link;
218 uint32 hashnr;
219 /* QQ: TODO - remove hashnr from LOCK */
220 uint16 loid;
221 uchar lock; /* sizeof(uchar) <= sizeof(enum) */
222 uchar flags;
223} LOCK;
224
225#define IGNORE_ME 1
226#define UPGRADED 2
227#define ACTIVE 4
228
229typedef struct {
230 intptr volatile *prev;
231 LOCK *curr, *next;
232 LOCK *blocker, *upgrade_from;
233} CURSOR;
234
235#define PTR(V) (LOCK *)((V) & (~(intptr)1))
236#define DELETED(V) ((V) & 1)
237
238/*
239 NOTE
240 cursor is positioned in either case
241 pins[0..3] are used, they are NOT removed on return
242*/
243static int lockfind(LOCK * volatile *head, LOCK *node,
244 CURSOR *cursor, LF_PINS *pins)
245{
246 uint32 hashnr, cur_hashnr;
247 uint64 resource, cur_resource;
248 intptr cur_link;
249 my_bool cur_active, compatible, upgrading, prev_active;
250 enum lockman_lock_type lock, prev_lock, cur_lock;
251 uint16 loid, cur_loid;
252 int cur_flags, flags;
253
254 hashnr= node->hashnr;
255 resource= node->resource;
256 lock= node->lock;
257 loid= node->loid;
258 flags= node->flags;
259
260retry:
261 cursor->prev= (intptr *)head;
262 prev_lock= N;
263 cur_active= TRUE;
264 compatible= TRUE;
265 upgrading= FALSE;
266 cursor->blocker= cursor->upgrade_from= 0;
267 lf_unpin(pins, 3);
268 do {
269 cursor->curr= PTR(*cursor->prev);
270 lf_pin(pins, 1, cursor->curr);
271 } while(*cursor->prev != (intptr)cursor->curr && LF_BACKOFF());
272 for (;;)
273 {
274 if (!cursor->curr)
275 break;
276 do {
277 cur_link= cursor->curr->link;
278 cursor->next= PTR(cur_link);
279 lf_pin(pins, 0, cursor->next);
280 } while (cur_link != cursor->curr->link && LF_BACKOFF());
281 cur_hashnr= cursor->curr->hashnr;
282 cur_resource= cursor->curr->resource;
283 cur_lock= cursor->curr->lock;
284 cur_loid= cursor->curr->loid;
285 cur_flags= cursor->curr->flags;
286 if (*cursor->prev != (intptr)cursor->curr)
287 {
288 (void)LF_BACKOFF();
289 goto retry;
290 }
291 if (!DELETED(cur_link))
292 {
293 if (cur_hashnr > hashnr ||
294 (cur_hashnr == hashnr && cur_resource >= resource))
295 {
296 if (cur_hashnr > hashnr || cur_resource > resource)
297 break;
298 /* ok, we have a lock for this resource */
299 DBUG_ASSERT(lock_compatibility_matrix[prev_lock][cur_lock] >= 0);
300 DBUG_ASSERT(lock_compatibility_matrix[cur_lock][lock] >= 0);
301 if ((cur_flags & IGNORE_ME) && ! (flags & IGNORE_ME))
302 {
303 DBUG_ASSERT(cur_active);
304 if (cur_loid == loid)
305 cursor->upgrade_from= cursor->curr;
306 }
307 else
308 {
309 prev_active= cur_active;
310 if (cur_flags & ACTIVE)
311 DBUG_ASSERT(prev_active == TRUE);
312 else
313 cur_active&= lock_compatibility_matrix[prev_lock][cur_lock];
314 if (upgrading && !cur_active /*&& !(cur_flags & UPGRADED)*/)
315 break;
316 if (prev_active && !cur_active)
317 {
318 cursor->blocker= cursor->curr;
319 lf_pin(pins, 3, cursor->curr);
320 }
321 if (cur_loid == loid)
322 {
323 /* we already have a lock on this resource */
324 DBUG_ASSERT(lock_combining_matrix[cur_lock][lock] != N);
325 DBUG_ASSERT(!upgrading || (flags & IGNORE_ME));
326 if (lock_combining_matrix[cur_lock][lock] == cur_lock)
327 {
328 /* new lock is compatible */
329 if (cur_active)
330 {
331 cursor->blocker= cursor->curr; /* loose-locks! */
332 lf_unpin(pins, 3); /* loose-locks! */
333 return ALREADY_HAVE_THE_LOCK;
334 }
335 else
336 return ALREADY_HAVE_THE_REQUEST;
337 }
338 /* not compatible, upgrading */
339 upgrading= TRUE;
340 cursor->upgrade_from= cursor->curr;
341 }
342 else
343 {
344 if (!lock_compatibility_matrix[cur_lock][lock])
345 {
346 compatible= FALSE;
347 cursor->blocker= cursor->curr;
348 lf_pin(pins, 3, cursor->curr);
349 }
350 }
351 prev_lock= lock_combining_matrix[prev_lock][cur_lock];
352 DBUG_ASSERT(prev_lock != N);
353 }
354 }
355 cursor->prev= &(cursor->curr->link);
356 lf_pin(pins, 2, cursor->curr);
357 }
358 else
359 {
360 if (my_atomic_casptr((void **)cursor->prev,
361 (void **)(char*) &cursor->curr, cursor->next))
362 lf_alloc_free(pins, cursor->curr);
363 else
364 {
365 (void)LF_BACKOFF();
366 goto retry;
367 }
368 }
369 cursor->curr= cursor->next;
370 lf_pin(pins, 1, cursor->curr);
371 }
372 /*
373 either the end of lock list - no more locks for this resource,
374 or upgrading and the end of active lock list
375 */
376 if (upgrading)
377 {
378 if (compatible /*&& prev_active*/)
379 return PLACE_NEW_DISABLE_OLD;
380 else
381 return REQUEST_NEW_DISABLE_OLD;
382 }
383 if (cur_active && compatible)
384 {
385 /*
386 either no locks for this resource or all are compatible.
387 ok to place the lock in any case.
388 */
389 return prev_lock == N ? RESOURCE_WAS_UNLOCKED
390 : OK_TO_PLACE_THE_LOCK;
391 }
392 /* we have a lock conflict. ok to place a lock request. And wait */
393 return OK_TO_PLACE_THE_REQUEST;
394}
395
396/*
397 NOTE
398 it uses pins[0..3], on return pins 0..2 are removed, pin 3 (blocker) stays
399*/
400static int lockinsert(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
401 LOCK **blocker)
402{
403 CURSOR cursor;
404 int res;
405
406 do
407 {
408 res= lockfind(head, node, &cursor, pins);
409 DBUG_ASSERT(res != ALREADY_HAVE_THE_REQUEST);
410 if (!(res & ALREADY_HAVE))
411 {
412 if (res & LOCK_UPGRADE)
413 {
414 node->flags|= UPGRADED;
415 node->lock= lock_combining_matrix[cursor.upgrade_from->lock][node->lock];
416 }
417 if (!(res & NEED_TO_WAIT))
418 node->flags|= ACTIVE;
419 node->link= (intptr)cursor.curr;
420 DBUG_ASSERT(node->link != (intptr)node);
421 DBUG_ASSERT(cursor.prev != &node->link);
422 if (!my_atomic_casptr((void **)cursor.prev,
423 (void **)(char*) &cursor.curr, node))
424 {
425 res= REPEAT_ONCE_MORE;
426 node->flags&= ~ACTIVE;
427 }
428 if (res & LOCK_UPGRADE)
429 cursor.upgrade_from->flags|= IGNORE_ME;
430 /*
431 QQ: is this OK ? if a reader has already read upgrade_from,
432 it may find it conflicting with node :(
433 - see the last test from test_lockman_simple()
434 */
435 }
436
437 } while (res == REPEAT_ONCE_MORE);
438 lf_unpin(pins, 0);
439 lf_unpin(pins, 1);
440 lf_unpin(pins, 2);
441 /*
442 note that blocker is not necessarily pinned here (when it's == curr).
443 this is ok as in such a case it's either a dummy node for
444 initialize_bucket() and dummy nodes don't need pinning,
445 or it's a lock of the same transaction for lockman_getlock,
446 and it cannot be removed by another thread
447 */
448 *blocker= cursor.blocker;
449 return res;
450}
451
452/*
453 NOTE
454 it uses pins[0..3], on return pins 0..2 are removed, pin 3 (blocker) stays
455*/
456static int lockpeek(LOCK * volatile *head, LOCK *node, LF_PINS *pins,
457 LOCK **blocker)
458{
459 CURSOR cursor;
460 int res;
461
462 res= lockfind(head, node, &cursor, pins);
463
464 lf_unpin(pins, 0);
465 lf_unpin(pins, 1);
466 lf_unpin(pins, 2);
467 if (blocker)
468 *blocker= cursor.blocker;
469 return res;
470}
471
472/*
473 NOTE
474 it uses pins[0..3], on return all pins are removed.
475
476 One _must_ have the lock (or request) to call this
477*/
478static int lockdelete(LOCK * volatile *head, LOCK *node, LF_PINS *pins)
479{
480 CURSOR cursor;
481 int res;
482
483 do
484 {
485 res= lockfind(head, node, &cursor, pins);
486 DBUG_ASSERT(res & ALREADY_HAVE);
487
488 if (cursor.upgrade_from)
489 cursor.upgrade_from->flags&= ~IGNORE_ME;
490
491 /*
492 XXX this does not work with savepoints, as old lock is left ignored.
493 It cannot be unignored, as would basically mean moving the lock back
494 in the lock chain (from upgraded). And the latter is not allowed -
495 because it breaks list scanning. So old ignored lock must be deleted,
496 new - same - lock must be installed right after the lock we're deleting,
497 then we can delete. Good news is - this is only required when rolling
498 back a savepoint.
499 */
500 if (my_atomic_casptr((void **)(char*)&(cursor.curr->link),
501 (void **)(char*)&cursor.next, 1+(char *)cursor.next))
502 {
503 if (my_atomic_casptr((void **)cursor.prev,
504 (void **)(char*)&cursor.curr, cursor.next))
505 lf_alloc_free(pins, cursor.curr);
506 else
507 lockfind(head, node, &cursor, pins);
508 }
509 else
510 {
511 res= REPEAT_ONCE_MORE;
512 if (cursor.upgrade_from)
513 cursor.upgrade_from->flags|= IGNORE_ME;
514 }
515 } while (res == REPEAT_ONCE_MORE);
516 lf_unpin(pins, 0);
517 lf_unpin(pins, 1);
518 lf_unpin(pins, 2);
519 lf_unpin(pins, 3);
520 return res;
521}
522
523void lockman_init(LOCKMAN *lm, loid_to_lo_func *func, uint timeout)
524{
525 lf_alloc_init(&lm->alloc, sizeof(LOCK), offsetof(LOCK, lonext));
526 lf_dynarray_init(&lm->array, sizeof(LOCK **));
527 lm->size= 1;
528 lm->count= 0;
529 lm->loid_to_lo= func;
530 lm->lock_timeout= timeout;
531}
532
533void lockman_destroy(LOCKMAN *lm)
534{
535 LOCK *el= *(LOCK **)lf_dynarray_lvalue(&lm->array, 0);
536 while (el)
537 {
538 intptr next= el->link;
539 if (el->hashnr & 1)
540 lf_alloc_direct_free(&lm->alloc, el);
541 else
542 my_free((void *)el);
543 el= (LOCK *)next;
544 }
545 lf_alloc_destroy(&lm->alloc);
546 lf_dynarray_destroy(&lm->array);
547}
548
549/* TODO: optimize it */
550#define MAX_LOAD 1
551
552static void initialize_bucket(LOCKMAN *lm, LOCK * volatile *node,
553 uint bucket, LF_PINS *pins)
554{
555 int res;
556 uint parent= my_clear_highest_bit(bucket);
557 LOCK *dummy= (LOCK *)my_malloc(sizeof(LOCK), MYF(MY_WME));
558 LOCK **tmp= 0, *cur;
559 LOCK * volatile *el= lf_dynarray_lvalue(&lm->array, parent);
560
561 if (*el == NULL && bucket)
562 initialize_bucket(lm, el, parent, pins);
563 dummy->hashnr= my_reverse_bits(bucket);
564 dummy->loid= 0;
565 dummy->lock= X; /* doesn't matter, in fact */
566 dummy->resource= 0;
567 dummy->flags= 0;
568 res= lockinsert(el, dummy, pins, &cur);
569 DBUG_ASSERT(res & (ALREADY_HAVE_THE_LOCK | RESOURCE_WAS_UNLOCKED));
570 if (res & ALREADY_HAVE_THE_LOCK)
571 {
572 my_free((void *)dummy);
573 dummy= cur;
574 }
575 my_atomic_casptr((void **)node, (void **)(char*) &tmp, dummy);
576}
577
578static inline uint calc_hash(uint64 resource)
579{
580 const uchar *pos= (uchar *)&resource;
581 ulong nr1= 1, nr2= 4, i;
582 for (i= 0; i < sizeof(resource) ; i++, pos++)
583 {
584 nr1^= (ulong) ((((uint) nr1 & 63)+nr2) * ((uint)*pos)) + (nr1 << 8);
585 nr2+= 3;
586 }
587 return nr1 & INT_MAX32;
588}
589
590/*
591 RETURN
592 see enum lockman_getlock_result
593 NOTE
594 uses pins[0..3], they're removed on return
595*/
596enum lockman_getlock_result lockman_getlock(LOCKMAN *lm, LOCK_OWNER *lo,
597 uint64 resource,
598 enum lockman_lock_type lock)
599{
600 int res;
601 uint csize, bucket, hashnr;
602 LOCK *node, * volatile *el, *blocker;
603 LF_PINS *pins= lo->pins;
604 enum lockman_lock_type old_lock;
605
606 DBUG_ASSERT(lo->loid);
607 node= (LOCK *)lf_alloc_new(pins);
608 node->flags= 0;
609 node->lock= lock;
610 node->loid= lo->loid;
611 node->resource= resource;
612 hashnr= calc_hash(resource);
613 bucket= hashnr % lm->size;
614 el= lf_dynarray_lvalue(&lm->array, bucket);
615 if (*el == NULL)
616 initialize_bucket(lm, el, bucket, pins);
617 node->hashnr= my_reverse_bits(hashnr) | 1;
618 res= lockinsert(el, node, pins, &blocker);
619 if (res & ALREADY_HAVE)
620 {
621 int r;
622 old_lock= blocker->lock;
623 lf_alloc_free(pins, node);
624 r= getlock_result[old_lock][lock];
625 DBUG_ASSERT(r);
626 return r;
627 }
628 /* a new value was added to the hash */
629 csize= lm->size;
630 if ((my_atomic_add32(&lm->count, 1)+1.0) / csize > MAX_LOAD)
631 my_atomic_cas32(&lm->size, (int*) &csize, csize*2);
632 node->lonext= lo->all_locks;
633 lo->all_locks= node;
634 for ( ; res & NEED_TO_WAIT; res= lockpeek(el, node, pins, &blocker))
635 {
636 LOCK_OWNER *wait_for_lo;
637 ulonglong deadline;
638 struct timespec timeout;
639
640 lf_assert_pin(pins, 3); /* blocker must be pinned here */
641 wait_for_lo= lm->loid_to_lo(blocker->loid);
642
643 /*
644 now, this is tricky. blocker is not necessarily a LOCK
645 we're waiting for. If it's compatible with what we want,
646 then we're waiting for a lock that blocker is waiting for
647 (see two places where blocker is set in lockfind)
648 In the latter case, let's "dereference" it
649 */
650 if (lock_compatibility_matrix[blocker->lock][lock])
651 {
652 blocker= wait_for_lo->all_locks;
653 lf_pin(pins, 3, blocker);
654 if (blocker != wait_for_lo->all_locks)
655 continue;
656 wait_for_lo= wait_for_lo->waiting_for;
657 }
658
659 /*
660 note that the blocker transaction may have ended by now,
661 its LOCK_OWNER and short id were reused, so 'wait_for_lo' may point
662 to an unrelated - albeit valid - LOCK_OWNER
663 */
664 if (!wait_for_lo)
665 continue;
666
667 lo->waiting_for= wait_for_lo;
668
669 /*
670 We lock a mutex - it may belong to a wrong LOCK_OWNER, but it must
671 belong to _some_ LOCK_OWNER. It means, we can never free() a LOCK_OWNER,
672 if there're other active LOCK_OWNERs.
673 */
674 /* QQ: race condition here */
675 pthread_mutex_lock(wait_for_lo->mutex);
676 if (DELETED(blocker->link))
677 {
678 /*
679 blocker transaction was ended, or a savepoint that owned
680 the lock was rolled back. Either way - the lock was removed
681 */
682 pthread_mutex_unlock(wait_for_lo->mutex);
683 continue;
684 }
685
686 /* yuck. waiting */
687 deadline= my_hrtime().val*1000 + lm->lock_timeout * 1000000;
688 set_timespec_time_nsec(timeout, deadline);
689 do
690 {
691 pthread_cond_timedwait(wait_for_lo->cond, wait_for_lo->mutex, &timeout);
692 } while (!DELETED(blocker->link) && my_hrtime().val < deadline/1000);
693 pthread_mutex_unlock(wait_for_lo->mutex);
694 if (!DELETED(blocker->link))
695 {
696 /*
697 timeout.
698 note that we _don't_ release the lock request here.
699 Instead we're relying on the caller to abort the transaction,
700 and release all locks at once - see lockman_release_locks()
701 */
702 lf_unpin(pins, 3);
703 return DIDNT_GET_THE_LOCK;
704 }
705 }
706 lo->waiting_for= 0;
707 lf_assert_unpin(pins, 3); /* unpin should not be needed */
708 return getlock_result[lock][lock];
709}
710
711/*
712 RETURN
713 0 - deleted
714 1 - didn't (not found)
715 NOTE
716 see lockdelete() for pin usage notes
717*/
718int lockman_release_locks(LOCKMAN *lm, LOCK_OWNER *lo)
719{
720 LOCK * volatile *el, *node, *next;
721 uint bucket;
722 LF_PINS *pins= lo->pins;
723
724 pthread_mutex_lock(lo->mutex);
725 for (node= lo->all_locks; node; node= next)
726 {
727 next= node->lonext;
728 bucket= calc_hash(node->resource) % lm->size;
729 el= lf_dynarray_lvalue(&lm->array, bucket);
730 if (*el == NULL)
731 initialize_bucket(lm, el, bucket, pins);
732 lockdelete(el, node, pins);
733 my_atomic_add32(&lm->count, -1);
734 }
735 lo->all_locks= 0;
736 /* now signal all waiters */
737 pthread_cond_broadcast(lo->cond);
738 pthread_mutex_unlock(lo->mutex);
739 return 0;
740}
741
742#ifdef MY_LF_EXTRA_DEBUG
743static const char *lock2str[]=
744{ "N", "S", "X", "IS", "IX", "SIX", "LS", "LX", "SLX", "LSIX" };
745/*
746 NOTE
747 the function below is NOT thread-safe !!!
748*/
749void print_lockhash(LOCKMAN *lm)
750{
751 LOCK *el= *(LOCK **)lf_dynarray_lvalue(&lm->array, 0);
752 printf("hash: size %u count %u\n", lm->size, lm->count);
753 while (el)
754 {
755 intptr next= el->link;
756 if (el->hashnr & 1)
757 {
758 printf("0x%08lx { resource %lu, loid %u, lock %s",
759 (long) el->hashnr, (ulong) el->resource, el->loid,
760 lock2str[el->lock]);
761 if (el->flags & IGNORE_ME) printf(" IGNORE_ME");
762 if (el->flags & UPGRADED) printf(" UPGRADED");
763 if (el->flags & ACTIVE) printf(" ACTIVE");
764 if (DELETED(next)) printf(" ***DELETED***");
765 printf("}\n");
766 }
767 else
768 {
769 /*printf("0x%08x { dummy }\n", el->hashnr);*/
770 DBUG_ASSERT(el->resource == 0 && el->loid == 0 && el->lock == X);
771 }
772 el= PTR(next);
773 }
774}
775#endif
776