1/*-------------------------------------------------------------------------
2 *
3 * lwlock.c
4 * Lightweight lock manager
5 *
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
12 *
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
22 *
23 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * IDENTIFICATION
27 * src/backend/storage/lmgr/lwlock.c
28 *
29 * NOTES:
30 *
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
37 *
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
40 *
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * in the OS.
46 *
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50 *
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
53 * waiters.
54 *
55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
58 *
59 *
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
65
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 * the queue
71 * Phase 4: Sleep till wake-up, goto Phase 1
72 *
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
76 */
77#include "postgres.h"
78
79#include "miscadmin.h"
80#include "pgstat.h"
81#include "pg_trace.h"
82#include "postmaster/postmaster.h"
83#include "replication/slot.h"
84#include "storage/ipc.h"
85#include "storage/predicate.h"
86#include "storage/proc.h"
87#include "storage/proclist.h"
88#include "storage/spin.h"
89#include "utils/memutils.h"
90
91#ifdef LWLOCK_STATS
92#include "utils/hsearch.h"
93#endif
94
95
96/* We use the ShmemLock spinlock to protect LWLockCounter */
97extern slock_t *ShmemLock;
98
99#define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
100#define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
101#define LW_FLAG_LOCKED ((uint32) 1 << 28)
102
103#define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
104#define LW_VAL_SHARED 1
105
106#define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
107/* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108#define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
109
110/*
111 * This is indexed by tranche ID and stores the names of all tranches known
112 * to the current backend.
113 */
114static const char **LWLockTrancheArray = NULL;
115static int LWLockTranchesAllocated = 0;
116
117#define T_NAME(lock) \
118 (LWLockTrancheArray[(lock)->tranche])
119
120/*
121 * This points to the main array of LWLocks in shared memory. Backends inherit
122 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
123 * where we have special measures to pass it down).
124 */
125LWLockPadded *MainLWLockArray = NULL;
126
127/*
128 * We use this structure to keep track of locked LWLocks for release
129 * during error recovery. Normally, only a few will be held at once, but
130 * occasionally the number can be much higher; for example, the pg_buffercache
131 * extension locks all buffer partitions simultaneously.
132 */
133#define MAX_SIMUL_LWLOCKS 200
134
135/* struct representing the LWLocks we're holding */
136typedef struct LWLockHandle
137{
138 LWLock *lock;
139 LWLockMode mode;
140} LWLockHandle;
141
142static int num_held_lwlocks = 0;
143static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
144
145/* struct representing the LWLock tranche request for named tranche */
146typedef struct NamedLWLockTrancheRequest
147{
148 char tranche_name[NAMEDATALEN];
149 int num_lwlocks;
150} NamedLWLockTrancheRequest;
151
152NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
153static int NamedLWLockTrancheRequestsAllocated = 0;
154int NamedLWLockTrancheRequests = 0;
155
156NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
157
158static bool lock_named_request_allowed = true;
159
160static void InitializeLWLocks(void);
161static void RegisterLWLockTranches(void);
162
163static inline void LWLockReportWaitStart(LWLock *lock);
164static inline void LWLockReportWaitEnd(void);
165
166#ifdef LWLOCK_STATS
167typedef struct lwlock_stats_key
168{
169 int tranche;
170 void *instance;
171} lwlock_stats_key;
172
173typedef struct lwlock_stats
174{
175 lwlock_stats_key key;
176 int sh_acquire_count;
177 int ex_acquire_count;
178 int block_count;
179 int dequeue_self_count;
180 int spin_delay_count;
181} lwlock_stats;
182
183static HTAB *lwlock_stats_htab;
184static lwlock_stats lwlock_stats_dummy;
185#endif
186
187#ifdef LOCK_DEBUG
188bool Trace_lwlocks = false;
189
190inline static void
191PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
192{
193 /* hide statement & context here, otherwise the log is just too verbose */
194 if (Trace_lwlocks)
195 {
196 uint32 state = pg_atomic_read_u32(&lock->state);
197
198 ereport(LOG,
199 (errhidestmt(true),
200 errhidecontext(true),
201 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
202 MyProcPid,
203 where, T_NAME(lock), lock,
204 (state & LW_VAL_EXCLUSIVE) != 0,
205 state & LW_SHARED_MASK,
206 (state & LW_FLAG_HAS_WAITERS) != 0,
207 pg_atomic_read_u32(&lock->nwaiters),
208 (state & LW_FLAG_RELEASE_OK) != 0)));
209 }
210}
211
212inline static void
213LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
214{
215 /* hide statement & context here, otherwise the log is just too verbose */
216 if (Trace_lwlocks)
217 {
218 ereport(LOG,
219 (errhidestmt(true),
220 errhidecontext(true),
221 errmsg_internal("%s(%s %p): %s", where,
222 T_NAME(lock), lock, msg)));
223 }
224}
225
226#else /* not LOCK_DEBUG */
227#define PRINT_LWDEBUG(a,b,c) ((void)0)
228#define LOG_LWDEBUG(a,b,c) ((void)0)
229#endif /* LOCK_DEBUG */
230
231#ifdef LWLOCK_STATS
232
233static void init_lwlock_stats(void);
234static void print_lwlock_stats(int code, Datum arg);
235static lwlock_stats * get_lwlock_stats_entry(LWLock *lockid);
236
237static void
238init_lwlock_stats(void)
239{
240 HASHCTL ctl;
241 static MemoryContext lwlock_stats_cxt = NULL;
242 static bool exit_registered = false;
243
244 if (lwlock_stats_cxt != NULL)
245 MemoryContextDelete(lwlock_stats_cxt);
246
247 /*
248 * The LWLock stats will be updated within a critical section, which
249 * requires allocating new hash entries. Allocations within a critical
250 * section are normally not allowed because running out of memory would
251 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
252 * turned on in production, so that's an acceptable risk. The hash entries
253 * are small, so the risk of running out of memory is minimal in practice.
254 */
255 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
256 "LWLock stats",
257 ALLOCSET_DEFAULT_SIZES);
258 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
259
260 MemSet(&ctl, 0, sizeof(ctl));
261 ctl.keysize = sizeof(lwlock_stats_key);
262 ctl.entrysize = sizeof(lwlock_stats);
263 ctl.hcxt = lwlock_stats_cxt;
264 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
265 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
266 if (!exit_registered)
267 {
268 on_shmem_exit(print_lwlock_stats, 0);
269 exit_registered = true;
270 }
271}
272
273static void
274print_lwlock_stats(int code, Datum arg)
275{
276 HASH_SEQ_STATUS scan;
277 lwlock_stats *lwstats;
278
279 hash_seq_init(&scan, lwlock_stats_htab);
280
281 /* Grab an LWLock to keep different backends from mixing reports */
282 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
283
284 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
285 {
286 fprintf(stderr,
287 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
288 MyProcPid, LWLockTrancheArray[lwstats->key.tranche],
289 lwstats->key.instance, lwstats->sh_acquire_count,
290 lwstats->ex_acquire_count, lwstats->block_count,
291 lwstats->spin_delay_count, lwstats->dequeue_self_count);
292 }
293
294 LWLockRelease(&MainLWLockArray[0].lock);
295}
296
297static lwlock_stats *
298get_lwlock_stats_entry(LWLock *lock)
299{
300 lwlock_stats_key key;
301 lwlock_stats *lwstats;
302 bool found;
303
304 /*
305 * During shared memory initialization, the hash table doesn't exist yet.
306 * Stats of that phase aren't very interesting, so just collect operations
307 * on all locks in a single dummy entry.
308 */
309 if (lwlock_stats_htab == NULL)
310 return &lwlock_stats_dummy;
311
312 /* Fetch or create the entry. */
313 key.tranche = lock->tranche;
314 key.instance = lock;
315 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
316 if (!found)
317 {
318 lwstats->sh_acquire_count = 0;
319 lwstats->ex_acquire_count = 0;
320 lwstats->block_count = 0;
321 lwstats->dequeue_self_count = 0;
322 lwstats->spin_delay_count = 0;
323 }
324 return lwstats;
325}
326#endif /* LWLOCK_STATS */
327
328
329/*
330 * Compute number of LWLocks required by named tranches. These will be
331 * allocated in the main array.
332 */
333static int
334NumLWLocksByNamedTranches(void)
335{
336 int numLocks = 0;
337 int i;
338
339 for (i = 0; i < NamedLWLockTrancheRequests; i++)
340 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
341
342 return numLocks;
343}
344
345/*
346 * Compute shmem space needed for LWLocks and named tranches.
347 */
348Size
349LWLockShmemSize(void)
350{
351 Size size;
352 int i;
353 int numLocks = NUM_FIXED_LWLOCKS;
354
355 numLocks += NumLWLocksByNamedTranches();
356
357 /* Space for the LWLock array. */
358 size = mul_size(numLocks, sizeof(LWLockPadded));
359
360 /* Space for dynamic allocation counter, plus room for alignment. */
361 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
362
363 /* space for named tranches. */
364 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
365
366 /* space for name of each tranche. */
367 for (i = 0; i < NamedLWLockTrancheRequests; i++)
368 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
369
370 /* Disallow named LWLocks' requests after startup */
371 lock_named_request_allowed = false;
372
373 return size;
374}
375
376/*
377 * Allocate shmem space for the main LWLock array and all tranches and
378 * initialize it. We also register all the LWLock tranches here.
379 */
380void
381CreateLWLocks(void)
382{
383 StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
384 "MAX_BACKENDS too big for lwlock.c");
385
386 StaticAssertStmt(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
387 sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
388 "Miscalculated LWLock padding");
389
390 if (!IsUnderPostmaster)
391 {
392 Size spaceLocks = LWLockShmemSize();
393 int *LWLockCounter;
394 char *ptr;
395
396 /* Allocate space */
397 ptr = (char *) ShmemAlloc(spaceLocks);
398
399 /* Leave room for dynamic allocation of tranches */
400 ptr += sizeof(int);
401
402 /* Ensure desired alignment of LWLock array */
403 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
404
405 MainLWLockArray = (LWLockPadded *) ptr;
406
407 /*
408 * Initialize the dynamic-allocation counter for tranches, which is
409 * stored just before the first LWLock.
410 */
411 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
412 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
413
414 /* Initialize all LWLocks */
415 InitializeLWLocks();
416 }
417
418 /* Register all LWLock tranches */
419 RegisterLWLockTranches();
420}
421
422/*
423 * Initialize LWLocks that are fixed and those belonging to named tranches.
424 */
425static void
426InitializeLWLocks(void)
427{
428 int numNamedLocks = NumLWLocksByNamedTranches();
429 int id;
430 int i;
431 int j;
432 LWLockPadded *lock;
433
434 /* Initialize all individual LWLocks in main array */
435 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
436 LWLockInitialize(&lock->lock, id);
437
438 /* Initialize buffer mapping LWLocks in main array */
439 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
440 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
441 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
442
443 /* Initialize lmgrs' LWLocks in main array */
444 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
445 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
446 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
447
448 /* Initialize predicate lmgrs' LWLocks in main array */
449 lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
450 NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
451 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
452 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
453
454 /* Initialize named tranches. */
455 if (NamedLWLockTrancheRequests > 0)
456 {
457 char *trancheNames;
458
459 NamedLWLockTrancheArray = (NamedLWLockTranche *)
460 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
461
462 trancheNames = (char *) NamedLWLockTrancheArray +
463 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
464 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
465
466 for (i = 0; i < NamedLWLockTrancheRequests; i++)
467 {
468 NamedLWLockTrancheRequest *request;
469 NamedLWLockTranche *tranche;
470 char *name;
471
472 request = &NamedLWLockTrancheRequestArray[i];
473 tranche = &NamedLWLockTrancheArray[i];
474
475 name = trancheNames;
476 trancheNames += strlen(request->tranche_name) + 1;
477 strcpy(name, request->tranche_name);
478 tranche->trancheId = LWLockNewTrancheId();
479 tranche->trancheName = name;
480
481 for (j = 0; j < request->num_lwlocks; j++, lock++)
482 LWLockInitialize(&lock->lock, tranche->trancheId);
483 }
484 }
485}
486
487/*
488 * Register named tranches and tranches for fixed LWLocks.
489 */
490static void
491RegisterLWLockTranches(void)
492{
493 int i;
494
495 if (LWLockTrancheArray == NULL)
496 {
497 LWLockTranchesAllocated = 128;
498 LWLockTrancheArray = (const char **)
499 MemoryContextAllocZero(TopMemoryContext,
500 LWLockTranchesAllocated * sizeof(char *));
501 Assert(LWLockTranchesAllocated >= LWTRANCHE_FIRST_USER_DEFINED);
502 }
503
504 for (i = 0; i < NUM_INDIVIDUAL_LWLOCKS; ++i)
505 LWLockRegisterTranche(i, MainLWLockNames[i]);
506
507 LWLockRegisterTranche(LWTRANCHE_BUFFER_MAPPING, "buffer_mapping");
508 LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, "lock_manager");
509 LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER,
510 "predicate_lock_manager");
511 LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
512 "parallel_query_dsa");
513 LWLockRegisterTranche(LWTRANCHE_SESSION_DSA,
514 "session_dsa");
515 LWLockRegisterTranche(LWTRANCHE_SESSION_RECORD_TABLE,
516 "session_record_table");
517 LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
518 "session_typmod_table");
519 LWLockRegisterTranche(LWTRANCHE_SHARED_TUPLESTORE,
520 "shared_tuplestore");
521 LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
522 LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
523 LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join");
524 LWLockRegisterTranche(LWTRANCHE_SXACT, "serializable_xact");
525
526 /* Register named tranches. */
527 for (i = 0; i < NamedLWLockTrancheRequests; i++)
528 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
529 NamedLWLockTrancheArray[i].trancheName);
530}
531
532/*
533 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
534 */
535void
536InitLWLockAccess(void)
537{
538#ifdef LWLOCK_STATS
539 init_lwlock_stats();
540#endif
541}
542
543/*
544 * GetNamedLWLockTranche - returns the base address of LWLock from the
545 * specified tranche.
546 *
547 * Caller needs to retrieve the requested number of LWLocks starting from
548 * the base lock address returned by this API. This can be used for
549 * tranches that are requested by using RequestNamedLWLockTranche() API.
550 */
551LWLockPadded *
552GetNamedLWLockTranche(const char *tranche_name)
553{
554 int lock_pos;
555 int i;
556
557 /*
558 * Obtain the position of base address of LWLock belonging to requested
559 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
560 * in MainLWLockArray after fixed locks.
561 */
562 lock_pos = NUM_FIXED_LWLOCKS;
563 for (i = 0; i < NamedLWLockTrancheRequests; i++)
564 {
565 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
566 tranche_name) == 0)
567 return &MainLWLockArray[lock_pos];
568
569 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
570 }
571
572 if (i >= NamedLWLockTrancheRequests)
573 elog(ERROR, "requested tranche is not registered");
574
575 /* just to keep compiler quiet */
576 return NULL;
577}
578
579/*
580 * Allocate a new tranche ID.
581 */
582int
583LWLockNewTrancheId(void)
584{
585 int result;
586 int *LWLockCounter;
587
588 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
589 SpinLockAcquire(ShmemLock);
590 result = (*LWLockCounter)++;
591 SpinLockRelease(ShmemLock);
592
593 return result;
594}
595
596/*
597 * Register a tranche ID in the lookup table for the current process. This
598 * routine will save a pointer to the tranche name passed as an argument,
599 * so the name should be allocated in a backend-lifetime context
600 * (TopMemoryContext, static variable, or similar).
601 */
602void
603LWLockRegisterTranche(int tranche_id, const char *tranche_name)
604{
605 Assert(LWLockTrancheArray != NULL);
606
607 if (tranche_id >= LWLockTranchesAllocated)
608 {
609 int i = LWLockTranchesAllocated;
610 int j = LWLockTranchesAllocated;
611
612 while (i <= tranche_id)
613 i *= 2;
614
615 LWLockTrancheArray = (const char **)
616 repalloc(LWLockTrancheArray, i * sizeof(char *));
617 LWLockTranchesAllocated = i;
618 while (j < LWLockTranchesAllocated)
619 LWLockTrancheArray[j++] = NULL;
620 }
621
622 LWLockTrancheArray[tranche_id] = tranche_name;
623}
624
625/*
626 * RequestNamedLWLockTranche
627 * Request that extra LWLocks be allocated during postmaster
628 * startup.
629 *
630 * This is only useful for extensions if called from the _PG_init hook
631 * of a library that is loaded into the postmaster via
632 * shared_preload_libraries. Once shared memory has been allocated, calls
633 * will be ignored. (We could raise an error, but it seems better to make
634 * it a no-op, so that libraries containing such calls can be reloaded if
635 * needed.)
636 */
637void
638RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
639{
640 NamedLWLockTrancheRequest *request;
641
642 if (IsUnderPostmaster || !lock_named_request_allowed)
643 return; /* too late */
644
645 if (NamedLWLockTrancheRequestArray == NULL)
646 {
647 NamedLWLockTrancheRequestsAllocated = 16;
648 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
649 MemoryContextAlloc(TopMemoryContext,
650 NamedLWLockTrancheRequestsAllocated
651 * sizeof(NamedLWLockTrancheRequest));
652 }
653
654 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
655 {
656 int i = NamedLWLockTrancheRequestsAllocated;
657
658 while (i <= NamedLWLockTrancheRequests)
659 i *= 2;
660
661 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
662 repalloc(NamedLWLockTrancheRequestArray,
663 i * sizeof(NamedLWLockTrancheRequest));
664 NamedLWLockTrancheRequestsAllocated = i;
665 }
666
667 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
668 Assert(strlen(tranche_name) + 1 < NAMEDATALEN);
669 StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN);
670 request->num_lwlocks = num_lwlocks;
671 NamedLWLockTrancheRequests++;
672}
673
674/*
675 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
676 */
677void
678LWLockInitialize(LWLock *lock, int tranche_id)
679{
680 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
681#ifdef LOCK_DEBUG
682 pg_atomic_init_u32(&lock->nwaiters, 0);
683#endif
684 lock->tranche = tranche_id;
685 proclist_init(&lock->waiters);
686}
687
688/*
689 * Report start of wait event for light-weight locks.
690 *
691 * This function will be used by all the light-weight lock calls which
692 * needs to wait to acquire the lock. This function distinguishes wait
693 * event based on tranche and lock id.
694 */
695static inline void
696LWLockReportWaitStart(LWLock *lock)
697{
698 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
699}
700
701/*
702 * Report end of wait event for light-weight locks.
703 */
704static inline void
705LWLockReportWaitEnd(void)
706{
707 pgstat_report_wait_end();
708}
709
710/*
711 * Return an identifier for an LWLock based on the wait class and event.
712 */
713const char *
714GetLWLockIdentifier(uint32 classId, uint16 eventId)
715{
716 Assert(classId == PG_WAIT_LWLOCK);
717
718 /*
719 * It is quite possible that user has registered tranche in one of the
720 * backends (e.g. by allocating lwlocks in dynamic shared memory) but not
721 * all of them, so we can't assume the tranche is registered here.
722 */
723 if (eventId >= LWLockTranchesAllocated ||
724 LWLockTrancheArray[eventId] == NULL)
725 return "extension";
726
727 return LWLockTrancheArray[eventId];
728}
729
730/*
731 * Internal function that tries to atomically acquire the lwlock in the passed
732 * in mode.
733 *
734 * This function will not block waiting for a lock to become free - that's the
735 * callers job.
736 *
737 * Returns true if the lock isn't free and we need to wait.
738 */
739static bool
740LWLockAttemptLock(LWLock *lock, LWLockMode mode)
741{
742 uint32 old_state;
743
744 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
745
746 /*
747 * Read once outside the loop, later iterations will get the newer value
748 * via compare & exchange.
749 */
750 old_state = pg_atomic_read_u32(&lock->state);
751
752 /* loop until we've determined whether we could acquire the lock or not */
753 while (true)
754 {
755 uint32 desired_state;
756 bool lock_free;
757
758 desired_state = old_state;
759
760 if (mode == LW_EXCLUSIVE)
761 {
762 lock_free = (old_state & LW_LOCK_MASK) == 0;
763 if (lock_free)
764 desired_state += LW_VAL_EXCLUSIVE;
765 }
766 else
767 {
768 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
769 if (lock_free)
770 desired_state += LW_VAL_SHARED;
771 }
772
773 /*
774 * Attempt to swap in the state we are expecting. If we didn't see
775 * lock to be free, that's just the old value. If we saw it as free,
776 * we'll attempt to mark it acquired. The reason that we always swap
777 * in the value is that this doubles as a memory barrier. We could try
778 * to be smarter and only swap in values if we saw the lock as free,
779 * but benchmark haven't shown it as beneficial so far.
780 *
781 * Retry if the value changed since we last looked at it.
782 */
783 if (pg_atomic_compare_exchange_u32(&lock->state,
784 &old_state, desired_state))
785 {
786 if (lock_free)
787 {
788 /* Great! Got the lock. */
789#ifdef LOCK_DEBUG
790 if (mode == LW_EXCLUSIVE)
791 lock->owner = MyProc;
792#endif
793 return false;
794 }
795 else
796 return true; /* somebody else has the lock */
797 }
798 }
799 pg_unreachable();
800}
801
802/*
803 * Lock the LWLock's wait list against concurrent activity.
804 *
805 * NB: even though the wait list is locked, non-conflicting lock operations
806 * may still happen concurrently.
807 *
808 * Time spent holding mutex should be short!
809 */
810static void
811LWLockWaitListLock(LWLock *lock)
812{
813 uint32 old_state;
814#ifdef LWLOCK_STATS
815 lwlock_stats *lwstats;
816 uint32 delays = 0;
817
818 lwstats = get_lwlock_stats_entry(lock);
819#endif
820
821 while (true)
822 {
823 /* always try once to acquire lock directly */
824 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
825 if (!(old_state & LW_FLAG_LOCKED))
826 break; /* got lock */
827
828 /* and then spin without atomic operations until lock is released */
829 {
830 SpinDelayStatus delayStatus;
831
832 init_local_spin_delay(&delayStatus);
833
834 while (old_state & LW_FLAG_LOCKED)
835 {
836 perform_spin_delay(&delayStatus);
837 old_state = pg_atomic_read_u32(&lock->state);
838 }
839#ifdef LWLOCK_STATS
840 delays += delayStatus.delays;
841#endif
842 finish_spin_delay(&delayStatus);
843 }
844
845 /*
846 * Retry. The lock might obviously already be re-acquired by the time
847 * we're attempting to get it again.
848 */
849 }
850
851#ifdef LWLOCK_STATS
852 lwstats->spin_delay_count += delays;
853#endif
854}
855
856/*
857 * Unlock the LWLock's wait list.
858 *
859 * Note that it can be more efficient to manipulate flags and release the
860 * locks in a single atomic operation.
861 */
862static void
863LWLockWaitListUnlock(LWLock *lock)
864{
865 uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
866
867 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
868
869 Assert(old_state & LW_FLAG_LOCKED);
870}
871
872/*
873 * Wakeup all the lockers that currently have a chance to acquire the lock.
874 */
875static void
876LWLockWakeup(LWLock *lock)
877{
878 bool new_release_ok;
879 bool wokeup_somebody = false;
880 proclist_head wakeup;
881 proclist_mutable_iter iter;
882
883 proclist_init(&wakeup);
884
885 new_release_ok = true;
886
887 /* lock wait list while collecting backends to wake up */
888 LWLockWaitListLock(lock);
889
890 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
891 {
892 PGPROC *waiter = GetPGProcByNumber(iter.cur);
893
894 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
895 continue;
896
897 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
898 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
899
900 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
901 {
902 /*
903 * Prevent additional wakeups until retryer gets to run. Backends
904 * that are just waiting for the lock to become free don't retry
905 * automatically.
906 */
907 new_release_ok = false;
908
909 /*
910 * Don't wakeup (further) exclusive locks.
911 */
912 wokeup_somebody = true;
913 }
914
915 /*
916 * Once we've woken up an exclusive lock, there's no point in waking
917 * up anybody else.
918 */
919 if (waiter->lwWaitMode == LW_EXCLUSIVE)
920 break;
921 }
922
923 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
924
925 /* unset required flags, and release lock, in one fell swoop */
926 {
927 uint32 old_state;
928 uint32 desired_state;
929
930 old_state = pg_atomic_read_u32(&lock->state);
931 while (true)
932 {
933 desired_state = old_state;
934
935 /* compute desired flags */
936
937 if (new_release_ok)
938 desired_state |= LW_FLAG_RELEASE_OK;
939 else
940 desired_state &= ~LW_FLAG_RELEASE_OK;
941
942 if (proclist_is_empty(&wakeup))
943 desired_state &= ~LW_FLAG_HAS_WAITERS;
944
945 desired_state &= ~LW_FLAG_LOCKED; /* release lock */
946
947 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
948 desired_state))
949 break;
950 }
951 }
952
953 /* Awaken any waiters I removed from the queue. */
954 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
955 {
956 PGPROC *waiter = GetPGProcByNumber(iter.cur);
957
958 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
959 proclist_delete(&wakeup, iter.cur, lwWaitLink);
960
961 /*
962 * Guarantee that lwWaiting being unset only becomes visible once the
963 * unlink from the link has completed. Otherwise the target backend
964 * could be woken up for other reason and enqueue for a new lock - if
965 * that happens before the list unlink happens, the list would end up
966 * being corrupted.
967 *
968 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
969 * another lock.
970 */
971 pg_write_barrier();
972 waiter->lwWaiting = false;
973 PGSemaphoreUnlock(waiter->sem);
974 }
975}
976
977/*
978 * Add ourselves to the end of the queue.
979 *
980 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
981 */
982static void
983LWLockQueueSelf(LWLock *lock, LWLockMode mode)
984{
985 /*
986 * If we don't have a PGPROC structure, there's no way to wait. This
987 * should never occur, since MyProc should only be null during shared
988 * memory initialization.
989 */
990 if (MyProc == NULL)
991 elog(PANIC, "cannot wait without a PGPROC structure");
992
993 if (MyProc->lwWaiting)
994 elog(PANIC, "queueing for lock while waiting on another one");
995
996 LWLockWaitListLock(lock);
997
998 /* setting the flag is protected by the spinlock */
999 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1000
1001 MyProc->lwWaiting = true;
1002 MyProc->lwWaitMode = mode;
1003
1004 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1005 if (mode == LW_WAIT_UNTIL_FREE)
1006 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1007 else
1008 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1009
1010 /* Can release the mutex now */
1011 LWLockWaitListUnlock(lock);
1012
1013#ifdef LOCK_DEBUG
1014 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1015#endif
1016
1017}
1018
1019/*
1020 * Remove ourselves from the waitlist.
1021 *
1022 * This is used if we queued ourselves because we thought we needed to sleep
1023 * but, after further checking, we discovered that we don't actually need to
1024 * do so.
1025 */
1026static void
1027LWLockDequeueSelf(LWLock *lock)
1028{
1029 bool found = false;
1030 proclist_mutable_iter iter;
1031
1032#ifdef LWLOCK_STATS
1033 lwlock_stats *lwstats;
1034
1035 lwstats = get_lwlock_stats_entry(lock);
1036
1037 lwstats->dequeue_self_count++;
1038#endif
1039
1040 LWLockWaitListLock(lock);
1041
1042 /*
1043 * Can't just remove ourselves from the list, but we need to iterate over
1044 * all entries as somebody else could have dequeued us.
1045 */
1046 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1047 {
1048 if (iter.cur == MyProc->pgprocno)
1049 {
1050 found = true;
1051 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1052 break;
1053 }
1054 }
1055
1056 if (proclist_is_empty(&lock->waiters) &&
1057 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1058 {
1059 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1060 }
1061
1062 /* XXX: combine with fetch_and above? */
1063 LWLockWaitListUnlock(lock);
1064
1065 /* clear waiting state again, nice for debugging */
1066 if (found)
1067 MyProc->lwWaiting = false;
1068 else
1069 {
1070 int extraWaits = 0;
1071
1072 /*
1073 * Somebody else dequeued us and has or will wake us up. Deal with the
1074 * superfluous absorption of a wakeup.
1075 */
1076
1077 /*
1078 * Reset releaseOk if somebody woke us before we removed ourselves -
1079 * they'll have set it to false.
1080 */
1081 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1082
1083 /*
1084 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1085 * get reset at some inconvenient point later. Most of the time this
1086 * will immediately return.
1087 */
1088 for (;;)
1089 {
1090 PGSemaphoreLock(MyProc->sem);
1091 if (!MyProc->lwWaiting)
1092 break;
1093 extraWaits++;
1094 }
1095
1096 /*
1097 * Fix the process wait semaphore's count for any absorbed wakeups.
1098 */
1099 while (extraWaits-- > 0)
1100 PGSemaphoreUnlock(MyProc->sem);
1101 }
1102
1103#ifdef LOCK_DEBUG
1104 {
1105 /* not waiting anymore */
1106 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1107
1108 Assert(nwaiters < MAX_BACKENDS);
1109 }
1110#endif
1111}
1112
1113/*
1114 * LWLockAcquire - acquire a lightweight lock in the specified mode
1115 *
1116 * If the lock is not available, sleep until it is. Returns true if the lock
1117 * was available immediately, false if we had to sleep.
1118 *
1119 * Side effect: cancel/die interrupts are held off until lock release.
1120 */
1121bool
1122LWLockAcquire(LWLock *lock, LWLockMode mode)
1123{
1124 PGPROC *proc = MyProc;
1125 bool result = true;
1126 int extraWaits = 0;
1127#ifdef LWLOCK_STATS
1128 lwlock_stats *lwstats;
1129
1130 lwstats = get_lwlock_stats_entry(lock);
1131#endif
1132
1133 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1134
1135 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1136
1137#ifdef LWLOCK_STATS
1138 /* Count lock acquisition attempts */
1139 if (mode == LW_EXCLUSIVE)
1140 lwstats->ex_acquire_count++;
1141 else
1142 lwstats->sh_acquire_count++;
1143#endif /* LWLOCK_STATS */
1144
1145 /*
1146 * We can't wait if we haven't got a PGPROC. This should only occur
1147 * during bootstrap or shared memory initialization. Put an Assert here
1148 * to catch unsafe coding practices.
1149 */
1150 Assert(!(proc == NULL && IsUnderPostmaster));
1151
1152 /* Ensure we will have room to remember the lock */
1153 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1154 elog(ERROR, "too many LWLocks taken");
1155
1156 /*
1157 * Lock out cancel/die interrupts until we exit the code section protected
1158 * by the LWLock. This ensures that interrupts will not interfere with
1159 * manipulations of data structures in shared memory.
1160 */
1161 HOLD_INTERRUPTS();
1162
1163 /*
1164 * Loop here to try to acquire lock after each time we are signaled by
1165 * LWLockRelease.
1166 *
1167 * NOTE: it might seem better to have LWLockRelease actually grant us the
1168 * lock, rather than retrying and possibly having to go back to sleep. But
1169 * in practice that is no good because it means a process swap for every
1170 * lock acquisition when two or more processes are contending for the same
1171 * lock. Since LWLocks are normally used to protect not-very-long
1172 * sections of computation, a process needs to be able to acquire and
1173 * release the same lock many times during a single CPU time slice, even
1174 * in the presence of contention. The efficiency of being able to do that
1175 * outweighs the inefficiency of sometimes wasting a process dispatch
1176 * cycle because the lock is not free when a released waiter finally gets
1177 * to run. See pgsql-hackers archives for 29-Dec-01.
1178 */
1179 for (;;)
1180 {
1181 bool mustwait;
1182
1183 /*
1184 * Try to grab the lock the first time, we're not in the waitqueue
1185 * yet/anymore.
1186 */
1187 mustwait = LWLockAttemptLock(lock, mode);
1188
1189 if (!mustwait)
1190 {
1191 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1192 break; /* got the lock */
1193 }
1194
1195 /*
1196 * Ok, at this point we couldn't grab the lock on the first try. We
1197 * cannot simply queue ourselves to the end of the list and wait to be
1198 * woken up because by now the lock could long have been released.
1199 * Instead add us to the queue and try to grab the lock again. If we
1200 * succeed we need to revert the queuing and be happy, otherwise we
1201 * recheck the lock. If we still couldn't grab it, we know that the
1202 * other locker will see our queue entries when releasing since they
1203 * existed before we checked for the lock.
1204 */
1205
1206 /* add to the queue */
1207 LWLockQueueSelf(lock, mode);
1208
1209 /* we're now guaranteed to be woken up if necessary */
1210 mustwait = LWLockAttemptLock(lock, mode);
1211
1212 /* ok, grabbed the lock the second time round, need to undo queueing */
1213 if (!mustwait)
1214 {
1215 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1216
1217 LWLockDequeueSelf(lock);
1218 break;
1219 }
1220
1221 /*
1222 * Wait until awakened.
1223 *
1224 * Since we share the process wait semaphore with the regular lock
1225 * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1226 * while one of those is pending, it is possible that we get awakened
1227 * for a reason other than being signaled by LWLockRelease. If so,
1228 * loop back and wait again. Once we've gotten the LWLock,
1229 * re-increment the sema by the number of additional signals received,
1230 * so that the lock manager or signal manager will see the received
1231 * signal when it next waits.
1232 */
1233 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1234
1235#ifdef LWLOCK_STATS
1236 lwstats->block_count++;
1237#endif
1238
1239 LWLockReportWaitStart(lock);
1240 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1241
1242 for (;;)
1243 {
1244 PGSemaphoreLock(proc->sem);
1245 if (!proc->lwWaiting)
1246 break;
1247 extraWaits++;
1248 }
1249
1250 /* Retrying, allow LWLockRelease to release waiters again. */
1251 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1252
1253#ifdef LOCK_DEBUG
1254 {
1255 /* not waiting anymore */
1256 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1257
1258 Assert(nwaiters < MAX_BACKENDS);
1259 }
1260#endif
1261
1262 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1263 LWLockReportWaitEnd();
1264
1265 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1266
1267 /* Now loop back and try to acquire lock again. */
1268 result = false;
1269 }
1270
1271 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1272
1273 /* Add lock to list of locks held by this backend */
1274 held_lwlocks[num_held_lwlocks].lock = lock;
1275 held_lwlocks[num_held_lwlocks++].mode = mode;
1276
1277 /*
1278 * Fix the process wait semaphore's count for any absorbed wakeups.
1279 */
1280 while (extraWaits-- > 0)
1281 PGSemaphoreUnlock(proc->sem);
1282
1283 return result;
1284}
1285
1286/*
1287 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1288 *
1289 * If the lock is not available, return false with no side-effects.
1290 *
1291 * If successful, cancel/die interrupts are held off until lock release.
1292 */
1293bool
1294LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1295{
1296 bool mustwait;
1297
1298 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1299
1300 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1301
1302 /* Ensure we will have room to remember the lock */
1303 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1304 elog(ERROR, "too many LWLocks taken");
1305
1306 /*
1307 * Lock out cancel/die interrupts until we exit the code section protected
1308 * by the LWLock. This ensures that interrupts will not interfere with
1309 * manipulations of data structures in shared memory.
1310 */
1311 HOLD_INTERRUPTS();
1312
1313 /* Check for the lock */
1314 mustwait = LWLockAttemptLock(lock, mode);
1315
1316 if (mustwait)
1317 {
1318 /* Failed to get lock, so release interrupt holdoff */
1319 RESUME_INTERRUPTS();
1320
1321 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1322 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1323 }
1324 else
1325 {
1326 /* Add lock to list of locks held by this backend */
1327 held_lwlocks[num_held_lwlocks].lock = lock;
1328 held_lwlocks[num_held_lwlocks++].mode = mode;
1329 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1330 }
1331 return !mustwait;
1332}
1333
1334/*
1335 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1336 *
1337 * The semantics of this function are a bit funky. If the lock is currently
1338 * free, it is acquired in the given mode, and the function returns true. If
1339 * the lock isn't immediately free, the function waits until it is released
1340 * and returns false, but does not acquire the lock.
1341 *
1342 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1343 * holding WALWriteLock, it can flush the commit records of many other
1344 * backends as a side-effect. Those other backends need to wait until the
1345 * flush finishes, but don't need to acquire the lock anymore. They can just
1346 * wake up, observe that their records have already been flushed, and return.
1347 */
1348bool
1349LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1350{
1351 PGPROC *proc = MyProc;
1352 bool mustwait;
1353 int extraWaits = 0;
1354#ifdef LWLOCK_STATS
1355 lwlock_stats *lwstats;
1356
1357 lwstats = get_lwlock_stats_entry(lock);
1358#endif
1359
1360 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1361
1362 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1363
1364 /* Ensure we will have room to remember the lock */
1365 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1366 elog(ERROR, "too many LWLocks taken");
1367
1368 /*
1369 * Lock out cancel/die interrupts until we exit the code section protected
1370 * by the LWLock. This ensures that interrupts will not interfere with
1371 * manipulations of data structures in shared memory.
1372 */
1373 HOLD_INTERRUPTS();
1374
1375 /*
1376 * NB: We're using nearly the same twice-in-a-row lock acquisition
1377 * protocol as LWLockAcquire(). Check its comments for details.
1378 */
1379 mustwait = LWLockAttemptLock(lock, mode);
1380
1381 if (mustwait)
1382 {
1383 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1384
1385 mustwait = LWLockAttemptLock(lock, mode);
1386
1387 if (mustwait)
1388 {
1389 /*
1390 * Wait until awakened. Like in LWLockAcquire, be prepared for
1391 * bogus wakeups, because we share the semaphore with
1392 * ProcWaitForSignal.
1393 */
1394 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1395
1396#ifdef LWLOCK_STATS
1397 lwstats->block_count++;
1398#endif
1399
1400 LWLockReportWaitStart(lock);
1401 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1402
1403 for (;;)
1404 {
1405 PGSemaphoreLock(proc->sem);
1406 if (!proc->lwWaiting)
1407 break;
1408 extraWaits++;
1409 }
1410
1411#ifdef LOCK_DEBUG
1412 {
1413 /* not waiting anymore */
1414 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1415
1416 Assert(nwaiters < MAX_BACKENDS);
1417 }
1418#endif
1419 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1420 LWLockReportWaitEnd();
1421
1422 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1423 }
1424 else
1425 {
1426 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1427
1428 /*
1429 * Got lock in the second attempt, undo queueing. We need to treat
1430 * this as having successfully acquired the lock, otherwise we'd
1431 * not necessarily wake up people we've prevented from acquiring
1432 * the lock.
1433 */
1434 LWLockDequeueSelf(lock);
1435 }
1436 }
1437
1438 /*
1439 * Fix the process wait semaphore's count for any absorbed wakeups.
1440 */
1441 while (extraWaits-- > 0)
1442 PGSemaphoreUnlock(proc->sem);
1443
1444 if (mustwait)
1445 {
1446 /* Failed to get lock, so release interrupt holdoff */
1447 RESUME_INTERRUPTS();
1448 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1449 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1450 }
1451 else
1452 {
1453 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1454 /* Add lock to list of locks held by this backend */
1455 held_lwlocks[num_held_lwlocks].lock = lock;
1456 held_lwlocks[num_held_lwlocks++].mode = mode;
1457 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1458 }
1459
1460 return !mustwait;
1461}
1462
1463/*
1464 * Does the lwlock in its current state need to wait for the variable value to
1465 * change?
1466 *
1467 * If we don't need to wait, and it's because the value of the variable has
1468 * changed, store the current value in newval.
1469 *
1470 * *result is set to true if the lock was free, and false otherwise.
1471 */
1472static bool
1473LWLockConflictsWithVar(LWLock *lock,
1474 uint64 *valptr, uint64 oldval, uint64 *newval,
1475 bool *result)
1476{
1477 bool mustwait;
1478 uint64 value;
1479
1480 /*
1481 * Test first to see if it the slot is free right now.
1482 *
1483 * XXX: the caller uses a spinlock before this, so we don't need a memory
1484 * barrier here as far as the current usage is concerned. But that might
1485 * not be safe in general.
1486 */
1487 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1488
1489 if (!mustwait)
1490 {
1491 *result = true;
1492 return false;
1493 }
1494
1495 *result = false;
1496
1497 /*
1498 * Read value using the lwlock's wait list lock, as we can't generally
1499 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1500 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1501 */
1502 LWLockWaitListLock(lock);
1503 value = *valptr;
1504 LWLockWaitListUnlock(lock);
1505
1506 if (value != oldval)
1507 {
1508 mustwait = false;
1509 *newval = value;
1510 }
1511 else
1512 {
1513 mustwait = true;
1514 }
1515
1516 return mustwait;
1517}
1518
1519/*
1520 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1521 *
1522 * If the lock is held and *valptr equals oldval, waits until the lock is
1523 * either freed, or the lock holder updates *valptr by calling
1524 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1525 * waiting), returns true. If the lock is still held, but *valptr no longer
1526 * matches oldval, returns false and sets *newval to the current value in
1527 * *valptr.
1528 *
1529 * Note: this function ignores shared lock holders; if the lock is held
1530 * in shared mode, returns 'true'.
1531 */
1532bool
1533LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1534{
1535 PGPROC *proc = MyProc;
1536 int extraWaits = 0;
1537 bool result = false;
1538#ifdef LWLOCK_STATS
1539 lwlock_stats *lwstats;
1540
1541 lwstats = get_lwlock_stats_entry(lock);
1542#endif
1543
1544 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1545
1546 /*
1547 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1548 * cleanup mechanism to remove us from the wait queue if we got
1549 * interrupted.
1550 */
1551 HOLD_INTERRUPTS();
1552
1553 /*
1554 * Loop here to check the lock's status after each time we are signaled.
1555 */
1556 for (;;)
1557 {
1558 bool mustwait;
1559
1560 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1561 &result);
1562
1563 if (!mustwait)
1564 break; /* the lock was free or value didn't match */
1565
1566 /*
1567 * Add myself to wait queue. Note that this is racy, somebody else
1568 * could wakeup before we're finished queuing. NB: We're using nearly
1569 * the same twice-in-a-row lock acquisition protocol as
1570 * LWLockAcquire(). Check its comments for details. The only
1571 * difference is that we also have to check the variable's values when
1572 * checking the state of the lock.
1573 */
1574 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1575
1576 /*
1577 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1578 * lock is released.
1579 */
1580 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1581
1582 /*
1583 * We're now guaranteed to be woken up if necessary. Recheck the lock
1584 * and variables state.
1585 */
1586 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1587 &result);
1588
1589 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1590 if (!mustwait)
1591 {
1592 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1593
1594 LWLockDequeueSelf(lock);
1595 break;
1596 }
1597
1598 /*
1599 * Wait until awakened.
1600 *
1601 * Since we share the process wait semaphore with the regular lock
1602 * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1603 * while one of those is pending, it is possible that we get awakened
1604 * for a reason other than being signaled by LWLockRelease. If so,
1605 * loop back and wait again. Once we've gotten the LWLock,
1606 * re-increment the sema by the number of additional signals received,
1607 * so that the lock manager or signal manager will see the received
1608 * signal when it next waits.
1609 */
1610 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1611
1612#ifdef LWLOCK_STATS
1613 lwstats->block_count++;
1614#endif
1615
1616 LWLockReportWaitStart(lock);
1617 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1618
1619 for (;;)
1620 {
1621 PGSemaphoreLock(proc->sem);
1622 if (!proc->lwWaiting)
1623 break;
1624 extraWaits++;
1625 }
1626
1627#ifdef LOCK_DEBUG
1628 {
1629 /* not waiting anymore */
1630 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1631
1632 Assert(nwaiters < MAX_BACKENDS);
1633 }
1634#endif
1635
1636 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1637 LWLockReportWaitEnd();
1638
1639 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1640
1641 /* Now loop back and check the status of the lock again. */
1642 }
1643
1644 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE);
1645
1646 /*
1647 * Fix the process wait semaphore's count for any absorbed wakeups.
1648 */
1649 while (extraWaits-- > 0)
1650 PGSemaphoreUnlock(proc->sem);
1651
1652 /*
1653 * Now okay to allow cancel/die interrupts.
1654 */
1655 RESUME_INTERRUPTS();
1656
1657 return result;
1658}
1659
1660
1661/*
1662 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1663 *
1664 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1665 * LWLockWaitForVar(). Setting the value and waking up the processes happen
1666 * atomically so that any process calling LWLockWaitForVar() on the same lock
1667 * is guaranteed to see the new value, and act accordingly.
1668 *
1669 * The caller must be holding the lock in exclusive mode.
1670 */
1671void
1672LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1673{
1674 proclist_head wakeup;
1675 proclist_mutable_iter iter;
1676
1677 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1678
1679 proclist_init(&wakeup);
1680
1681 LWLockWaitListLock(lock);
1682
1683 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1684
1685 /* Update the lock's value */
1686 *valptr = val;
1687
1688 /*
1689 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1690 * up. They are always in the front of the queue.
1691 */
1692 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1693 {
1694 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1695
1696 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1697 break;
1698
1699 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1700 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1701 }
1702
1703 /* We are done updating shared state of the lock itself. */
1704 LWLockWaitListUnlock(lock);
1705
1706 /*
1707 * Awaken any waiters I removed from the queue.
1708 */
1709 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1710 {
1711 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1712
1713 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1714 /* check comment in LWLockWakeup() about this barrier */
1715 pg_write_barrier();
1716 waiter->lwWaiting = false;
1717 PGSemaphoreUnlock(waiter->sem);
1718 }
1719}
1720
1721
1722/*
1723 * LWLockRelease - release a previously acquired lock
1724 */
1725void
1726LWLockRelease(LWLock *lock)
1727{
1728 LWLockMode mode;
1729 uint32 oldstate;
1730 bool check_waiters;
1731 int i;
1732
1733 /*
1734 * Remove lock from list of locks held. Usually, but not always, it will
1735 * be the latest-acquired lock; so search array backwards.
1736 */
1737 for (i = num_held_lwlocks; --i >= 0;)
1738 if (lock == held_lwlocks[i].lock)
1739 break;
1740
1741 if (i < 0)
1742 elog(ERROR, "lock %s is not held", T_NAME(lock));
1743
1744 mode = held_lwlocks[i].mode;
1745
1746 num_held_lwlocks--;
1747 for (; i < num_held_lwlocks; i++)
1748 held_lwlocks[i] = held_lwlocks[i + 1];
1749
1750 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1751
1752 /*
1753 * Release my hold on lock, after that it can immediately be acquired by
1754 * others, even if we still have to wakeup other waiters.
1755 */
1756 if (mode == LW_EXCLUSIVE)
1757 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1758 else
1759 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1760
1761 /* nobody else can have that kind of lock */
1762 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1763
1764
1765 /*
1766 * We're still waiting for backends to get scheduled, don't wake them up
1767 * again.
1768 */
1769 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1770 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1771 (oldstate & LW_LOCK_MASK) == 0)
1772 check_waiters = true;
1773 else
1774 check_waiters = false;
1775
1776 /*
1777 * As waking up waiters requires the spinlock to be acquired, only do so
1778 * if necessary.
1779 */
1780 if (check_waiters)
1781 {
1782 /* XXX: remove before commit? */
1783 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1784 LWLockWakeup(lock);
1785 }
1786
1787 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1788
1789 /*
1790 * Now okay to allow cancel/die interrupts.
1791 */
1792 RESUME_INTERRUPTS();
1793}
1794
1795/*
1796 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1797 */
1798void
1799LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1800{
1801 LWLockWaitListLock(lock);
1802
1803 /*
1804 * Set the variable's value before releasing the lock, that prevents race
1805 * a race condition wherein a new locker acquires the lock, but hasn't yet
1806 * set the variables value.
1807 */
1808 *valptr = val;
1809 LWLockWaitListUnlock(lock);
1810
1811 LWLockRelease(lock);
1812}
1813
1814
1815/*
1816 * LWLockReleaseAll - release all currently-held locks
1817 *
1818 * Used to clean up after ereport(ERROR). An important difference between this
1819 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1820 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1821 * has been set to an appropriate level earlier in error recovery. We could
1822 * decrement it below zero if we allow it to drop for each released lock!
1823 */
1824void
1825LWLockReleaseAll(void)
1826{
1827 while (num_held_lwlocks > 0)
1828 {
1829 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1830
1831 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1832 }
1833}
1834
1835
1836/*
1837 * LWLockHeldByMe - test whether my process holds a lock in any mode
1838 *
1839 * This is meant as debug support only.
1840 */
1841bool
1842LWLockHeldByMe(LWLock *l)
1843{
1844 int i;
1845
1846 for (i = 0; i < num_held_lwlocks; i++)
1847 {
1848 if (held_lwlocks[i].lock == l)
1849 return true;
1850 }
1851 return false;
1852}
1853
1854/*
1855 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1856 *
1857 * This is meant as debug support only.
1858 */
1859bool
1860LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1861{
1862 int i;
1863
1864 for (i = 0; i < num_held_lwlocks; i++)
1865 {
1866 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1867 return true;
1868 }
1869 return false;
1870}
1871